Use Momento Topics to process data asynchronously
With Momento Topics, you can subscribe to messages on a topic, as well as publish messages to a different topic. Webhooks allow you to connect these topics to stateless consumers, which can then process these events asynchronously. Whether this is aggregating events by topic_id
, saving each event to a database, or using the payload to trigger a Step function workflow, webhooks give you the flexibility to process these events how you want to.
The key to asynchronously processing these events is to use multiple topics. An inbound topic, which the webhook listens to, and a single, or multiple, outbound topics which publish the processed data.
In this diagram, Topic 1
is the inbound
topic, and Topic 2
is the outbound
topic.
Pre-requisites
- A public-facing endpoint to receive webhook events. This endpoint must accept POST requests and be able to receive inbound calls from Momento. More detail about the structure of this event is described here.
Getting Started
- Create a cache in the Momento console
- Create a webhook for the cache. Assign the webhook destination to the public facing endpoint.
- Add code to the webhook to process the incoming messages. For example, if your webhook is implemented as an AWS Lambda function behind an API Gateway, then here is some very simple code that processes the incoming message, converts it to upper case, and then publishes it to the outbound topic:
import {
APIGatewayProxyEvent,
APIGatewayProxyResult
} from "aws-lambda/trigger/api-gateway-proxy";
import {TopicClient, CredentialProvider} from '@gomomento/sdk';
interface MomentoWebhookEvent {
cache: string;
topic: string;
event_timestamp: number;
publish_timestamp: number;
topic_sequence_number: number;
token_id: string;
text: string;
}
const momento = new TopicClient({
credentialProvider: CredentialProvider.fromString('<the api key>'),
})
export const lambdaHandler = async (
event: APIGatewayProxyEvent
): Promise<APIGatewayProxyResult> => {
const webhookEvent: WebhookEvent = JSON.parse(event.body);
// simply take the current message, uppercase it, and publish to a new topic
await momento.publish(webhookEvent.cache, "topic 2", webhookEvent.text.toUpperCase());
return {
statusCode: 200,
body: JSON.stringify({status : "success"})
}
}
- On the client side, add a subscriber to this new
topic 2
const result = await topicClient.subscribe(cacheName, 'topic 2', {
onError: () => {
console.error('Received an error from the topic');
return;
},
onItem: (item: TopicItem) => {
console.log(`Received uppercase text on topic 'topic-b': ${item.value().toString()}`);
return;
},
});
- Begin publishing messages to the topic that the webhook is listening to. You can use the Momento SDK to publish to the topic.
import {TopicPublish} from '@gomomento/sdk';
const publishResponse = await topicClient.publish(cacheName, 'topic 1', 'a value');
if (publishResponse instanceof TopicPublish.Success) {
console.log('Value published successfully!');
} else {
console.log(`Error publishing value: ${publishResponse.toString()}`);
}
- The subscriber to
topic 2
should now be receiving the uppercase messages, and logging the messages to the console!
And that is it! This is an extremely basic example of string conversion, but this pattern holds true for more advanced asynchronous processing, such as saving state to a db or publishing messages to a queue. This topics pattern allows for clients to be completely agnostic of the downstream consumer, if the load for the application outgrows an environment such as API Gateway + Lambda function, you can easily replace it with containers or EC2 instances. For a more complete example of using webhooks for event processing, checkout our EventBridge example, and our blog post about using this pattern to create a multi-language chat application.