Using the Momento Topics API
Momento Topics is a messaging pattern enabling real-time communication between parts of a distributed application. It enables you to publish (produce) values to a topic and subscribe (consume) from a topic. This page details the Momento API methods for interacting with Momento Topics.
TopicClient
Momento Topics API calls are made using a TopicClient
object.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Swift
- Dart
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable({
environmentVariableName: 'MOMENTO_API_KEY',
}),
});
TopicClientAsync(
TopicConfigurations.Default.latest(), CredentialProvider.from_environment_variable("MOMENTO_API_KEY")
)
TopicClient(
CredentialProvider.fromEnvVar("MOMENTO_API_KEY"), TopicConfigurations.Laptop.latest
).use { topicClient ->
//...
}
credProvider, err := auth.NewEnvMomentoTokenProvider("MOMENTO_API_KEY")
if err != nil {
panic(err)
}
topicClient, err := momento.NewTopicClient(
config.TopicsDefault(),
credProvider,
)
if err != nil {
panic(err)
}
new TopicClient(
TopicConfigurations.Laptop.latest(),
new EnvMomentoTokenProvider("MOMENTO_API_KEY")
);
do {
let credentialProvider = try CredentialProvider.fromEnvironmentVariable(envVariableName: "MOMENTO_API_KEY")
let topicClient = TopicClient(
configuration: TopicClientConfigurations.iOS.latest(),
credentialProvider: credentialProvider
)
} catch {
print("Unable to create topic client: \(error)")
exit(1)
}
try {
final topicClient = TopicClient(
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
TopicClientConfigurations.latest());
} catch (e) {
print("Unable to create topic client: $e");
exit(1);
}
Topics methods
Subscribe
This method subscribes to a topic to receive new values with a stateful connection.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to subscribe to. |
Method response object
- Success - Returns a subscription object.
- Error
See response objects for specific information.
With the returned subscription object, once put in a for loop, your code will receive an event when a new value is published to the Topic.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Swift
- Dart
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
if (result instanceof TopicSubscribe.Subscription) {
console.log("Successfully subscribed to topic 'test-topic'");
console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');
console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
} else if (result instanceof TopicSubscribe.Error) {
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.subscribe("cache", "my_topic")
match response:
case TopicSubscribe.Error() as error:
print(f"Error subscribing to topic: {error.message}")
case TopicSubscribe.SubscriptionAsync() as subscription:
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value!r}")
return
case TopicSubscriptionItem.Error():
print(f"Error with received message: {item.inner_exception.message}")
return
when (val response = topicClient.subscribe("test-cache", "test-topic")) {
is TopicSubscribeResponse.Subscription -> coroutineScope {
launch {
withTimeoutOrNull(2000) {
response.collect { item ->
when (item) {
is TopicMessage.Text -> println("Received text message: ${item.value}")
is TopicMessage.Binary -> println("Received binary message: ${item.value}")
is TopicMessage.Error -> throw RuntimeException(
"An error occurred reading messages from topic 'test-topic': ${item.errorCode}", item
)
}
}
}
}
}
is TopicSubscribeResponse.Error -> throw RuntimeException(
"An error occurred while attempting to subscribe to topic 'test-topic': ${response.errorCode}", response
)
}
// Instantiate subscriber
sub, subErr := client.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: "test-cache",
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(2000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
var cancellableSubscription = subscription.WithCancellation(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var message in cancellableSubscription)
{
switch (message)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
let subscribeResponse = await topicClient.subscribe(cacheName: cacheName, topicName: "topic")
#if swift(>=5.9)
let subscription = switch subscribeResponse {
case .error(let err): fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub): sub
}
#else
let subscription: TopicSubscription
switch subscribeResponse {
case .error(let err):
fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub):
subscription = sub
}
#endif
// unsubscribe in 5 seconds
Task {
try await Task.sleep(nanoseconds: 5_000_000_000)
subscription.unsubscribe()
}
// loop over messages as they are received
for try await item in subscription.stream {
var value: String = ""
switch item {
case .itemText(let textItem):
value = textItem.value
print("Subscriber recieved text message: \(value)")
case .itemBinary(let binaryItem):
value = String(decoding: binaryItem.value, as: UTF8.self)
print("Subscriber recieved binary message: \(value)")
case .error(let err):
print("Subscriber received error: \(err)")
}
}
final subscription = await topicClient.subscribe("test-cache", "test-topic");
final messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
"Subscribe error: ${subscription.errorCode} ${subscription.message}"),
};
// cancel subscription 5 seconds from now
Timer(const Duration(seconds: 5), () {
print("Cancelling subscription!");
subscription.unsubscribe();
});
try {
await for (final msg in messageStream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
Publish
Publishes a message to a topic.
Name | Type | Description |
---|---|---|
cacheName | String | Name of the cache where the topic exists. |
topicName | String | Name of the topic to publish the value to. |
value | String / bytes | Value to publish to the topic. |
Method response object
- Success
- Error
See response objects for specific information.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Swift
- Dart
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
if (result instanceof TopicPublish.Success) {
console.log("Value published to topic 'test-topic'");
} else if (result instanceof TopicPublish.Error) {
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error() as error:
print(f"Error publishing a message: {error.message}")
when (val response = topicClient.publish("test-cache", "test-topic", "test-message")) {
is TopicPublishResponse.Success -> println("Message published successfully")
is TopicPublishResponse.Error -> throw RuntimeException(
"An error occurred while attempting to publish message to topic 'test-topic': ${response.errorCode}",
response
)
}
_, err := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
var publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
let result = await topicClient.publish(
cacheName: cacheName,
topicName: "topic",
value: "value"
)
switch result {
case .success(_):
print("Successfully published message!")
case .error(let err):
print("Unable to publish message: \(err)")
exit(1)
}
final result = await topicClient.publish("cache", "topic", "hello message!");
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
Example apps using Momento Topics APIs
A growing list of example apps using the Momento Topics.
- A serverless item publishing microservice This microservice is written in TypeScript and runs on AWS using API Gateway, a Lambda function, and Momento Topics. It can be used by any of your other services (with the correct security on API Gateway) to publish messages to various topics that are then subscribed to by other applications. You pass into this API a
topicName
andtopicValue
and this service publishes the value to that topic.