Use Momento Topics to process data asynchronously
With Momento Topics, you can subscribe to messages on a topic, as well as publish messages to a different topic. Webhooks allow you to connect these topics to stateless consumers, which can then process these events asynchronously. Whether this is aggregating events by topic_id
, saving each event to a database, or using the payload to trigger a Step function workflow, webhooks give you the flexibility to process these events how you want to.
The key to asynchronously processing these events is to use multiple topics. An inbound topic, which the webhook listens to, and a single, or multiple, outbound topics which publish the processed data.
In this diagram, Topic 1
is the inbound
topic, and Topic 2
is the outbound
topic.
Pre-requisites
- A public-facing endpoint to receive webhook events. This endpoint must accept POST requests and be able to receive inbound calls from Momento. More detail about the structure of this event is described here.
Getting Started
- Create a cache in the Momento console
- Create a webhook for the cache. Assign the webhook destination to the public facing endpoint.
- Add code to the webhook to process the incoming messages. For example, if your webhook is implemented as an AWS Lambda function behind an API Gateway, then here is some very simple code that processes the incoming message, converts it to upper case, and then publishes it to the outbound topic:
import {APIGatewayProxyEvent, APIGatewayProxyResult} from 'aws-lambda';
import {TopicClient, CredentialProvider} from '@gomomento/sdk';
interface MomentoWebhookEvent {
cache: string;
topic: string;
event_timestamp: number;
publish_timestamp: number;
topic_sequence_number: number;
token_id: string;
text: string;
}
const momento = new TopicClient({
credentialProvider: CredentialProvider.fromString('<the api key>'),
});
export const lambdaHandler = async (event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> => {
const webhookEvent: MomentoWebhookEvent = JSON.parse(event.body!) as MomentoWebhookEvent;
// simply take the current message, uppercase it, and publish to a new topic
await momento.publish(webhookEvent.cache, 'topic 2', webhookEvent.text.toUpperCase());
return {
statusCode: 200,
body: JSON.stringify({status: 'success'}),
};
};
- On the client side, add a subscriber to this new
topic 2
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");
console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');
console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.subscribe("cache", "my_topic")
match response:
case TopicSubscribe.Error() as error:
print(f"Error subscribing to topic: {error.message}")
case TopicSubscribe.SubscriptionAsync() as subscription:
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value!r}")
return
case TopicSubscriptionItem.Error():
print(f"Error with received message: {item.inner_exception.message}")
return
when (val response = topicClient.subscribe("test-cache", "test-topic")) {
is TopicSubscribeResponse.Subscription -> coroutineScope {
launch {
withTimeoutOrNull(2000) {
response.collect { item ->
when (item) {
is TopicMessage.Text -> println("Received text message: ${item.value}")
is TopicMessage.Binary -> println("Received binary message: ${item.value}")
is TopicMessage.Error -> throw RuntimeException(
"An error occurred reading messages from topic 'test-topic': ${item.errorCode}", item
)
}
}
}
}
}
is TopicSubscribeResponse.Error -> throw RuntimeException(
"An error occurred while attempting to subscribe to topic 'test-topic': ${response.errorCode}", response
)
}
// Instantiate subscriber
sub, subErr := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
// Receive only subscription items with messages
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
// Receive all subscription events (messages, discontinuities, heartbeats)
event, err := sub.Event(ctx)
if err != nil {
panic(err)
}
switch e := event.(type) {
case momento.TopicHeartbeat:
fmt.Printf("received heartbeat\n")
case momento.TopicDiscontinuity:
fmt.Printf("received discontinuity\n")
case momento.TopicItem:
fmt.Printf(
"received message with sequence number %d and publisher id %s: %v \n",
e.GetTopicSequenceNumber(),
e.GetPublisherId(),
e.GetValue(),
)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(5_000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
// Note: use `WithCancellation` to filter only the `TopicMessage` types
var cancellableSubscription = subscription.WithCancellationForAllEvents(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var topicEvent in cancellableSubscription)
{
switch (topicEvent)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicSystemEvent.Heartbeat:
Console.WriteLine("Received heartbeat from topic.");
break;
case TopicSystemEvent.Discontinuity discontinuity:
Console.WriteLine($"Received discontinuity from topic: {discontinuity}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
// Make a subscription
let mut subscription = topic_client
.subscribe(cache_name, topic_name)
.await
.expect("subscribe rpc failed");
// Consume the subscription
while let Some(item) = subscription.next().await {
println!("Received subscription item: {item:?}")
}
let subscribeResponse = await topicClient.subscribe(cacheName: cacheName, topicName: "topic")
#if swift(>=5.9)
let subscription = switch subscribeResponse {
case .error(let err): fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub): sub
}
#else
let subscription: TopicSubscription
switch subscribeResponse {
case .error(let err):
fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub):
subscription = sub
}
#endif
// unsubscribe in 5 seconds
Task {
try await Task.sleep(nanoseconds: 5_000_000_000)
subscription.unsubscribe()
}
// loop over messages as they are received
for try await item in subscription.stream {
var value: String = ""
switch item {
case .itemText(let textItem):
value = textItem.value
print("Subscriber recieved text message: \(value)")
case .itemBinary(let binaryItem):
value = String(decoding: binaryItem.value, as: UTF8.self)
print("Subscriber recieved binary message: \(value)")
case .error(let err):
print("Subscriber received error: \(err)")
}
}
final subscription = await topicClient.subscribe("test-cache", "test-topic");
final messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
"Subscribe error: ${subscription.errorCode} ${subscription.message}"),
};
// cancel subscription 5 seconds from now
Timer(const Duration(seconds: 5), () {
print("Cancelling subscription!");
subscription.unsubscribe();
});
try {
await for (final msg in messageStream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
- Begin publishing messages to the topic that the webhook is listening to. You can use the Momento SDK to publish to the topic.
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error() as error:
print(f"Error publishing a message: {error.message}")
when (val response = topicClient.publish("test-cache", "test-topic", "test-message")) {
is TopicPublishResponse.Success -> println("Message published successfully")
is TopicPublishResponse.Error -> throw RuntimeException(
"An error occurred while attempting to publish message to topic 'test-topic': ${response.errorCode}",
response
)
}
_, err := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
var publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
topic_client
.publish(cache_name, topic_name, "Hello, Momento!")
.await?;
println!("Published message");
let result = await topicClient.publish(
cacheName: cacheName,
topicName: "topic",
value: "value"
)
switch result {
case .success(_):
print("Successfully published message!")
case .error(let err):
print("Unable to publish message: \(err)")
exit(1)
}
final result = await topicClient.publish("cache", "topic", "hello message!");
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
- The subscriber to
topic 2
should now be receiving the uppercase messages, and logging the messages to the console!
And that is it! This is an extremely basic example of string conversion, but this pattern holds true for more advanced asynchronous processing, such as saving state to a db or publishing messages to a queue. This topics pattern allows for clients to be completely agnostic of the downstream consumer, if the load for the application outgrows an environment such as API Gateway + Lambda function, you can easily replace it with containers or EC2 instances. For a more complete example of using webhooks for event processing, checkout our EventBridge example, and our blog post about using this pattern to create a multi-language chat application.