Skip to main content

Use Momento Topics to process data asynchronously

With Momento Topics, you can subscribe to messages on a topic, as well as publish messages to a different topic. Webhooks allow you to connect these topics to stateless consumers, which can then process these events asynchronously. Whether this is aggregating events by topic_id, saving each event to a database, or using the payload to trigger a Step function workflow, webhooks give you the flexibility to process these events how you want to.

The key to asynchronously processing these events is to use multiple topics. An inbound topic, which the webhook listens to, and a single, or multiple, outbound topics which publish the processed data.

Architecture

In this diagram, Topic 1 is the inbound topic, and Topic 2 is the outbound topic.

Pre-requisites

  1. A public-facing endpoint to receive webhook events. This endpoint must accept POST requests and be able to receive inbound calls from Momento. More detail about the structure of this event is described here.

Getting Started

  1. Create a cache in the Momento console
  2. Create a webhook for the cache. Assign the webhook destination to the public facing endpoint.
  3. Add code to the webhook to process the incoming messages. For example, if your webhook is implemented as an AWS Lambda function behind an API Gateway, then here is some very simple code that processes the incoming message, converts it to upper case, and then publishes it to the outbound topic:
<SdkExampleCodeBlock language={'javascript'} file={'examples/nodejs/webhooks/doc-example-files/webhook-lambda-handler.ts'} />
  1. On the client side, add a subscriber to this new topic 2
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");

console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');

console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));

// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
info
Full example code and imports can be found here
  1. Begin publishing messages to the topic that the webhook is listening to. You can use the Momento SDK to publish to the topic.
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
info
Full example code and imports can be found here
  1. The subscriber to topic 2 should now be receiving the uppercase messages, and logging the messages to the console!

And that is it! This is an extremely basic example of string conversion, but this pattern holds true for more advanced asynchronous processing, such as saving state to a db or publishing messages to a queue. This topics pattern allows for clients to be completely agnostic of the downstream consumer, if the load for the application outgrows an environment such as API Gateway + Lambda function, you can easily replace it with containers or EC2 instances. For a more complete example of using webhooks for event processing, checkout our EventBridge example, and our blog post about using this pattern to create a multi-language chat application.