Using the Momento Topics API
Momento Topics is a messaging pattern enabling real-time communication between parts of a distributed application. It enables you to publish (produce) values to a topic and subscribe (consume) from a topic. This page details the Momento API methods for interacting with Momento Topics.
TopicClient
Momento Topics API calls are made using a TopicClient
object.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});
TopicClientAsync(
TopicConfigurations.Default.latest(), CredentialProvider.from_environment_variable("MOMENTO_API_KEY")
)
TopicClient(
CredentialProvider.fromEnvVar("MOMENTO_API_KEY"), TopicConfigurations.Laptop.latest
).use { topicClient ->
//...
}
credProvider, err := auth.NewEnvMomentoTokenProvider("MOMENTO_API_KEY")
if err != nil {
panic(err)
}
topicClient, err = momento.NewTopicClient(
config.TopicsDefault(),
credProvider,
)
if err != nil {
panic(err)
}
new TopicClient(
TopicConfigurations.Laptop.latest(),
new EnvMomentoTokenProvider("MOMENTO_API_KEY")
);
let _topic_client = TopicClient::builder()
.configuration(momento::topics::configurations::Laptop::latest())
.credential_provider(CredentialProvider::from_env_var("MOMENTO_API_KEY")?)
.build()?;
do {
let credentialProvider = try CredentialProvider.fromEnvironmentVariable(envVariableName: "MOMENTO_API_KEY")
let topicClient = TopicClient(
configuration: TopicClientConfigurations.iOS.latest(),
credentialProvider: credentialProvider
)
} catch {
print("Unable to create topic client: \(error)")
exit(1)
}
try {
final topicClient = TopicClient(
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
TopicClientConfigurations.latest());
} catch (e) {
print("Unable to create topic client: $e");
exit(1);
}
Topics methods
Subscribe
This method subscribes to a topic to receive subscription events with a stateful connection.
Depending on the language, you may use a callback function or an iterator to receive new subscription events, such as items, heartbeats, and discontinuities:
- Items include a string or byte message, topic sequence number, and a unique token identifier if one is present (learn more).
- Heartbeats indicate that the connection is still active.
- Discontinuities indicate that there was an interruption in the subscription and some messages may have been skipped.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to subscribe to. |
Method response object
- Success - Returns a subscription object.
- Error
See response objects for specific information.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");
console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');
console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.subscribe("cache", "my_topic")
match response:
case TopicSubscribe.Error() as error:
print(f"Error subscribing to topic: {error.message}")
case TopicSubscribe.SubscriptionAsync() as subscription:
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value!r}")
return
case TopicSubscriptionItem.Error():
print(f"Error with received message: {item.inner_exception.message}")
return
when (val response = topicClient.subscribe("test-cache", "test-topic")) {
is TopicSubscribeResponse.Subscription -> coroutineScope {
launch {
withTimeoutOrNull(2000) {
response.collect { item ->
when (item) {
is TopicMessage.Text -> println("Received text message: ${item.value}")
is TopicMessage.Binary -> println("Received binary message: ${item.value}")
is TopicMessage.Error -> throw RuntimeException(
"An error occurred reading messages from topic 'test-topic': ${item.errorCode}", item
)
}
}
}
}
}
is TopicSubscribeResponse.Error -> throw RuntimeException(
"An error occurred while attempting to subscribe to topic 'test-topic': ${response.errorCode}", response
)
}
// Instantiate subscriber
sub, subErr := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
// Receive only subscription items with messages
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
// Receive all subscription events (messages, discontinuities, heartbeats)
event, err := sub.Event(ctx)
if err != nil {
panic(err)
}
switch e := event.(type) {
case momento.TopicHeartbeat:
fmt.Printf("received heartbeat\n")
case momento.TopicDiscontinuity:
fmt.Printf("received discontinuity\n")
case momento.TopicItem:
fmt.Printf(
"received message with sequence number %d and publisher id %s: %v \n",
e.GetTopicSequenceNumber(),
e.GetPublisherId(),
e.GetValue(),
)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(5_000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
// Note: use `WithCancellation` to filter only the `TopicMessage` types
var cancellableSubscription = subscription.WithCancellationForAllEvents(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var topicEvent in cancellableSubscription)
{
switch (topicEvent)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicSystemEvent.Heartbeat:
Console.WriteLine("Received heartbeat from topic.");
break;
case TopicSystemEvent.Discontinuity discontinuity:
Console.WriteLine($"Received discontinuity from topic: {discontinuity}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
// Make a subscription
let mut subscription = topic_client
.subscribe(cache_name, topic_name)
.await
.expect("subscribe rpc failed");
// Consume the subscription
while let Some(item) = subscription.next().await {
println!("Received subscription item: {item:?}")
}
let subscribeResponse = await topicClient.subscribe(cacheName: cacheName, topicName: "topic")
#if swift(>=5.9)
let subscription = switch subscribeResponse {
case .error(let err): fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub): sub
}
#else
let subscription: TopicSubscription
switch subscribeResponse {
case .error(let err):
fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub):
subscription = sub
}
#endif
// unsubscribe in 5 seconds
Task {
try await Task.sleep(nanoseconds: 5_000_000_000)
subscription.unsubscribe()
}
// loop over messages as they are received
for try await item in subscription.stream {
var value: String = ""
switch item {
case .itemText(let textItem):
value = textItem.value
print("Subscriber recieved text message: \(value)")
case .itemBinary(let binaryItem):
value = String(decoding: binaryItem.value, as: UTF8.self)
print("Subscriber recieved binary message: \(value)")
case .error(let err):
print("Subscriber received error: \(err)")
}
}
final subscription = await topicClient.subscribe("test-cache", "test-topic");
final messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
"Subscribe error: ${subscription.errorCode} ${subscription.message}"),
};
// cancel subscription 5 seconds from now
Timer(const Duration(seconds: 5), () {
print("Cancelling subscription!");
subscription.unsubscribe();
});
try {
await for (final msg in messageStream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
Publish
Publishes a message to a topic.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to publish the value to. |
value | String / bytes | Value to publish to the topic. |
Method response object
- Success
- Error
See response objects for specific information.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error() as error:
print(f"Error publishing a message: {error.message}")
when (val response = topicClient.publish("test-cache", "test-topic", "test-message")) {
is TopicPublishResponse.Success -> println("Message published successfully")
is TopicPublishResponse.Error -> throw RuntimeException(
"An error occurred while attempting to publish message to topic 'test-topic': ${response.errorCode}",
response
)
}
_, err := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
var publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
topic_client
.publish(cache_name, topic_name, "Hello, Momento!")
.await?;
println!("Published message");
let result = await topicClient.publish(
cacheName: cacheName,
topicName: "topic",
value: "value"
)
switch result {
case .success(_):
print("Successfully published message!")
case .error(let err):
print("Unable to publish message: \(err)")
exit(1)
}
final result = await topicClient.publish("cache", "topic", "hello message!");
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
Example apps using Momento Topics APIs
A growing list of example apps using the Momento Topics.
- A serverless item publishing microservice This microservice is written in TypeScript and runs on AWS using API Gateway, a Lambda function, and Momento Topics. It can be used by any of your other services (with the correct security on API Gateway) to publish messages to various topics that are then subscribed to by other applications. You pass into this API a
topicName
andtopicValue
and this service publishes the value to that topic.