Using the Momento Topics (pub/sub) API
Momento Topics is a messaging pattern enabling real-time communication between parts of a distributed application. It enables you to publish (produce) values to a topic and subscribe (consume) from a topic. This page details the Momento API methods for interacting with Momento Topics.

Topics methods
Subscribe
This method subscribes to a topic to receive new values with a stateful connection.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to subscribe to. |
Method response object
- Success - Returns a subscription object.
- Error
See response objects for specific information.
With the returned subscription object, once put in a for loop, your code will receive an event when a new value is published to the Topic.
- JavaScript
- Python
- Go
- C#
const result = await topicClient.subscribe('test-cache', 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Publishing values to the topic 'test-topic': ${item.value().toString()}`);
return;
},
});
if (result instanceof TopicSubscribe.Subscription) {
console.log("Successfully subscribed to topic 'test-topic'");
// Publish a value
await topicClient.publish('test-cache', 'test-topic', 'test-value');
// Wait for published values to be received.
setTimeout(() => {
console.log('Waiting for the published values');
}, 2000);
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
} else if (result instanceof TopicSubscribe.Error) {
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache 'test-cache': ${result.errorCode()}: ${result.toString()}`
);
}
subscription = await topic_client.subscribe("cache", "my_topic")
match subscription:
case TopicSubscribe.Error():
print("Error subscribing to topic: ", subscription.message)
case TopicSubscribe.SubscriptionAsync():
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value}")
return
case TopicSubscriptionItem.Error():
print("Error with received message:", item.inner_exception.message)
return
// Instantiate subscriber
sub, subErr := client.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: "test-cache",
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(2000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
var cancellableSubscription = subscription.WithCancellation(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var message in cancellableSubscription)
{
switch (message)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
Publish
Publishes a message to a topic.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to publish the value to. |
value | String / bytes | Value to publish to the topic. |
Method response object
- Success
- Error
See response objects for specific information.
- JavaScript
- Python
- Go
- C#
const result = await topicClient.publish('test-cache', 'test-topic', 'test-topic-value');
if (result instanceof TopicPublish.Success) {
console.log("Value published to topic 'test-topic'");
} else if (result instanceof TopicPublish.Error) {
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache 'test-cache': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error():
print("Error publishing a message: ", response.message)
_, err := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
ar publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
TopicClient
Instead of the CacheClient, as used in most Momento Cache API calls, for Topics you use a TopicClient object.
- JavaScript
- Python
- Go
- C#
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable({
environmentVariableName: 'MOMENTO_API_KEY',
}),
});
topic_client = TopicClientAsync(
TopicConfigurations.Default.latest(),
CredentialProvider.from_environment_variable("MOMENTO_AUTH_TOKEN")
)
credProvider, err := auth.NewEnvMomentoTokenProvider("MOMENTO_API_KEY")
if err != nil {
panic(err)
}
topicClient, err := momento.NewTopicClient(
config.TopicsDefault(),
credProvider,
)
if err != nil {
panic(err)
}
new TopicClient(
TopicConfigurations.Laptop.latest(),
new EnvMomentoTokenProvider("MOMENTO_AUTH_TOKEN")
);
Example apps using Momento Topics APIs
A growing list of example apps using the Momento Topics.
- A serverless item publishing microservice This microservice is written in TypeScript and runs on AWS using API Gateway, a Lambda function, and Momento Topics. It can be used by any of your other services (with the correct security on API Gateway) to publish messages to various topics that are then subscribed to by other applications. You pass into this API a
topicName
andtopicValue
and this service publishes the value to that topic.