Use Momento Cache and Topics to build instant messaging
Instant messaging is a must-have feature in many applications. Communicating seamlessly in one-to-one, one-to-many, and many-to-many conversations is often thought of as a necessary component of an application. With Momento Cache and Topics, you can build instant messaging into your application without any backend infrastructure. Using the Momento web SDK, you can access your cache data and publish and subscribe to messages directly in your browser.
This pattern uses Momento Cache to store historical messages and Momento Topics to facilitate real-time communication between instant messaging participants.
Architecture
Building instant messaging on Momento can be done entirely client-side, meaning in your user interface. Browsers, iPhone apps, and Android apps all have the capability to access resources in Momento directly.
In the diagram above, messages are being loaded from a list when a new user joins the chat. Users subscribe to a chatroom-specific topic to recieve messages in real-time while they are active in the chat. As messages are sent, they are added directly to the list by the sender.
Components
Instant messaging is separated into two parts:
- Message storage
- Delivery
These core components allow users to join your chat at any time and view the message history and enable them to receive new messages in real time.
Message storage
Most instant messaging platforms store messages to allow users to view the conversation history if messages were sent prior to their arrival. Part of the startup tasks when entering a chat room is loading said history and displaying it on screen. This can easily be done with a Momento list cache item.
Lists
A list is a collection data type that stores an ordered array of elements. You can push elements to either the front or back of a list and you can add multiple entries at one time. A list is a great data type for instant messaging because it stores messages in the order they were sent. When users join an instant messaging session, you can quickly fetch the entire chat history with a single listFetch API call.
Elements in a list do not need to be unique. You can push the same message into a list over and over again and it will be added every time. Lists are a good choice for instant messaging for the following use cases:
- Fetching messages N at a time
- Adding multiple messages at once
- Maintaining a chat history of fixed length, i.e. the last 100 messages
When using a list, you have a number of array manipulation functions, allowing you to automatically truncate message history if it gets too long or remove specific elements for things like content moderation.
Delivery
Technically, message storage is not a required component for instant messaging. In theory you could have a messaging system that only showed the messages that were sent while a user is connected. Messages could be stored in memory in the client and you'd be done. In contrast, message delivery is a mandatory part of instant messaging. Getting the message from the sender to all recipients with minimal latency is the core of this pattern. To do this, we use Momento Topics.
Momento Topics is a low-latency pub/sub service that connects clients together, clients to servers, servers to clients, and servers to servers. Think WebSockets but way better. When you publish a message to a topic, Momento will broadcast to all subscribers of the topic, providing fan-out capability or 1 to 1, depending on your use case.
For instant messaging, the sender will publish a message to a topic. The recipients of the message, be it one or many depending on your use case, will be subscribers. Momento Topics will deliver the published message to the subscribers in real-time, providing for the instant messaging experience we're looking for!
Momento Topics provides you with flexible, dynamic topics that are not defined as resources in your infrastructure. The recommended way to build instant messaging is to use the chat room or session identifier as the topic name. This provides flexible and dedicated topics for chat use cases.
Publishing a message
To publish a message, all you need is a single command:
- Node.js
- Python
- Go
- Java
- .NET
await topics.publish('message-namespace', sessionId, 'Hello world!');
topics.publish("message-namespace", sessionId, "Hello world!")
_, err := topics.Publish(ctx, &momento.TopicPublishRequest{
CacheName: momento.String("message-namespace"),
TopicName: sessionId,
Value: momento.String("Hello world!")
})
final TopicPublishResponse response = topics.publish("message-namespace", sessionId, "Hello world!").join();
var response = await topics.PublishAsync("message-namespace", sessionId, "Hello world!");
Subscribing to a topic
Subscribing for messages tells Momento whenever something is published to a specific topic, deliver it here and run a specific event handler function. You can subscribe 1 or thousands (or more!) of clients to an individual topic.
- Node.js
- Python
- Go
- Java
- .NET
const subscription = await topics.subscribe('message-namespace', sessionId, {
onItem: (data) => processMessage(data.value(), data.tokenId())
});
with TopicClient(TopicConfigurations.Default.v1(), _AUTH_PROVIDER) as client:
subscription = client.subscribe("cache", "my_topic")
match subscription:
case TopicSubscribe.Error():
raise Exception("Subscription error: ", subscription.message)
case TopicSubscribe.Subscription():
for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(item.value)
case TopicSubscriptionItem.Binary():
print(item.value!r)
subscription, err := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: topicName,
})
if err != nil {
panic(err)
}
go func() { pollForMessages(ctx, subscription) }()
time.Sleep(time.Second)
func pollForMessages(ctx context.Context, subscription momento.TopicSubscription) {
for {
item, err := subscription.Item(ctx)
if err != nil {
panic(err)
}
switch message := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", message)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", message)
}
}
}
final TopicSubscribeResponse subscribeResponse =
topicClient
.subscribe(
TopicExample.CACHE_NAME,
TOPIC_NAME,
new ISubscriptionCallbacks() {
@Override
public void onItem(TopicMessage message) {
logger.info("Received message on topic {}: {}", TOPIC_NAME, message.toString());
};
@Override
public void onError(Throwable error) {
logger.error("Subscription to topic {} failed with error", TOPIC_NAME, error);
};
};).join();
var subscriptionTask = Task.Run(async () =>
{
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName);
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
try {
var cancellableSubscription = subscription.WithCancellation(cts.Token);
await foreach (var message in cancellableSubscription) {
switch (message)
{
case TopicMessage.Binary:
Logger.LogInformation("Received binary message from topic: {binaryData}", Convert.ToBase64String(binary.Value));
break;
case TopicMessage.Text text:
Logger.LogInformation("Received string message from topic: {message}", text.Value);
break;
case TopicMessage.Error error:
Logger.LogInformation("Received error message from topic: {error}", error.Message);
cts.Cancel();
break;
}
}
} finally {
subscription.Dispose();
}
break;
case TopicSubscribeResponse.Error error:
Logger.LogInformation("Error subscribing to a topic: {error}", error.Message);
cts.Cancel();
break;
}
});
Security
Momento provides a robust authorization mechanism that limits consumers to specific resources and actions. When building an instant messaging feature, it is incredibly important to take scope into consideration so users do not intercept messages they shouldn't have access to.
The pattern described above uses both cache and topic resources. Following the best practice recommended above by making the chat room match the topic name, we can create policies in a couple of different ways.
Client-side only
It's possible to build instant messaging completely on the client side, meaning directly in the browser or mobile application. The policy below would be used in your clients.
- Node.js
- Java
- .NET
const scope = {
permissions: [
{
role: "readwrite",
cache: "instant-messaging",
item: {
key: chatId
}
},
{
role: "publishsubscribe",
cache: "instant-messaging",
topic: chatId
}
]
};
const token = await authClient.generateDisposableToken(scope, ExpiresIn.hours(1));
List<DisposableTokenPermission> permissions = new ArrayList<>();
permissions.add(
new DisposableToken.CacheItemPermission(
CacheRole.ReadWrite, CacheSelector.ByName("instant-messaging"), CacheSelector.ByName(chatId)));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId)));
DisposableTokenScope scope = new DisposableTokenScope(permissions);
GenerateDisposableTokenResponse response = authClient.generateDisposableTokenAsync(scope, ExpiresIn.hours(1)).join();
var scope = new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.CacheItemPermission(
CacheRole.ReadWrite,
CacheSelector.ByName("instant-messaging"),
CacheSelector.ByName(chatid)
),
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatid)
)
});
var tokenResponse = await client.GenerateDisposableTokenAsync( scope, ExpiresIn.Hours(1));
This policy grants read and write permissions to a specific cache item.
It also grants publish and subscribe permissions to a single topic. Consumers will be able to publish messages to and recieve messages from the one topic. Attempting to subscribe to any other topic will result in an authorization error.
Note that both the cache item and topic share the same value. This is the recommended best practice and the data from the cache item will not interfere with the messages being published to the topic.
Once the policy is created, you pass it to the client and create a token that expires after a set amount of time.
The tradeoff for a client-side only approach is trust in your front-end. Since this opens write permissions to the client, you must be careful with what you expose in the code. Allowing end users to do write operations requires trust in their usage, since you cannot moderate messages before accepting them. This also takes a distributed approach to storing data instead of centralizing storage in server-side code.
If that is tolerable for your application, it results in unrivaled time-to-market and minimal complexity in your application.
Server-side and client-side
If you centralize writes in your server-side code, the pattern changes slightly. Raw message input would be submitted through one topic and be picked up by the server code. Any transformations or moderation would be performed before sending it back down to the verified topic. Below is an example of the policy that would be used on the client side following this pattern.
- Node.js
- Java
- .NET
const scope = {
permissions: [
{
role: "read",
cache: "instant-messaging",
item: {
key: chatId
}
},
{
role: "publishonly",
cache: "instant-messaging",
topic: `${chatid}-input`
},
{
role: "subscribeonly",
cache: "instant-messaging",
topic: chatid
},
]
};
const token = await authClient.generateDisposableToken(scope, ExpiresIn.hours(1));
List<DisposableTokenPermission> permissions = new ArrayList<>();
permissions.add(
new DisposableToken.CacheItemPermission(
CacheRole.ReadOnly, CacheSelector.ByName("instant-messaging"), CacheSelector.ByName(chatId)));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.PublishOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId + "-input")));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.SubscribeOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId)));
DisposableTokenScope scope = new DisposableTokenScope(permissions);
GenerateDisposableTokenResponse response = authClient.generateDisposableTokenAsync(scope, ExpiresIn.hours(1)).join();
var scope = new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.CacheItemPermission(
CacheRole.ReadOnly,
CacheSelector.ByName("instant-messaging"),
CacheSelector.ByName(chatid)
),
new DisposableToken.TopicPermission(
TopicRole.PublishOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName($"{chatid}-input")
),
new DisposableToken.TopicPermission(
TopicRole.SubscribeOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatid)
)
});
var tokenResponse = await client.GenerateDisposableTokenAsync( scope, ExpiresIn.Hours(1));
The policy above grants read-only access to the cache item storing the conversation. The write is done server-side after moderation and transformation is complete.
It also grants publish access to the {chatid}-input
topic. The server code subscribes to this topic, does the moderation, and publishes the final message to the {chatid}
topic, which the client has subscribe access to.
Learn more
Momento enables fast, secure development of instant messaging in any application. To learn more about Cache and Topics or to get a quick-start on your project, check out some additional resources below.