Skip to main content

Using the Momento Topics API

Momento Topics is a messaging pattern enabling real-time communication between parts of a distributed application. It enables you to publish (produce) values to a topic and subscribe (consume) from a topic. This page details the Momento API methods for interacting with Momento Topics.

TopicClient

Momento Topics API calls are made using a TopicClient object.

new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});

Topics methods

Subscribe

This method subscribes to a topic to receive new values with a stateful connection.

NameTypeDescription
cacheNameStringName of the cache where the topic exists.
topicNameStringName of the topic to subscribe to.
Method response object
  • Success - Returns a subscription object.
  • Error

See response objects for specific information.

With the returned subscription object, once put in a for loop, your code will receive an event when a new value is published to the Topic.

const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
if (result instanceof TopicSubscribe.Subscription) {
console.log("Successfully subscribed to topic 'test-topic'");

console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');

console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));

// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
} else if (result instanceof TopicSubscribe.Error) {
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}

Publish

Publishes a message to a topic.

NameTypeDescription
cacheNameStringName of the cache where the topic exists.
topicNameStringName of the topic to publish the value to.
valueString / bytesValue to publish to the topic.
Method response object
  • Success
  • Error

See response objects for specific information.

const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
if (result instanceof TopicPublish.Success) {
console.log("Value published to topic 'test-topic'");
} else if (result instanceof TopicPublish.Error) {
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}

Example apps using Momento Topics APIs

A growing list of example apps using the Momento Topics.

  • A serverless item publishing microservice This microservice is written in TypeScript and runs on AWS using API Gateway, a Lambda function, and Momento Topics. It can be used by any of your other services (with the correct security on API Gateway) to publish messages to various topics that are then subscribed to by other applications. You pass into this API a topicName and topicValue and this service publishes the value to that topic.