Skip to main content

Using the Momento Topics API

Momento Topics is a messaging pattern enabling real-time communication between parts of a distributed application. It enables you to publish (produce) values to a topic and subscribe (consume) from a topic. This page details the Momento API methods for interacting with Momento Topics.

TopicClient

Momento Topics API calls are made using a TopicClient object.

new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});

Topics methods

Subscribe

This method subscribes to a topic to receive subscription events with a stateful connection.

Depending on the language, you may use a callback function or an iterator to receive new subscription events, such as items, heartbeats, and discontinuities:

  • Items include a string or byte message, topic sequence number, and a unique token identifier if one is present (learn more).
  • Heartbeats indicate that the connection is still active.
  • Discontinuities indicate that there was an interruption in the subscription and some messages may have been skipped.
NameTypeDescription
cacheNameStringName of the cache where the topic exists.
topicNameStringName of the topic to subscribe to.
Method response object

See response objects for specific information.

const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");

console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');

console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));

// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}

Publish

Publishes a message to a topic.

NameTypeDescription
cacheNameStringName of the cache where the topic exists.
topicNameStringName of the topic to publish the value to.
valueString / bytesValue to publish to the topic.
Method response object
  • Success
  • Error

See response objects for specific information.

const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}

Example apps using Momento Topics APIs

A growing list of example apps using the Momento Topics.

  • A serverless item publishing microservice This microservice is written in TypeScript and runs on AWS using API Gateway, a Lambda function, and Momento Topics. It can be used by any of your other services (with the correct security on API Gateway) to publish messages to various topics that are then subscribed to by other applications. You pass into this API a topicName and topicValue and this service publishes the value to that topic.