Momento CacheとTopicsを使ってインスタント・メッセージを構築する
インスタントメッセージングは、多くのアプリケーションで必須の機能です。1対1、1対多、多対多の会話をシームレスに行うことは、多くの場合、アプリケーションに必要なコンポーネントと考えられています。Momento CacheとTopicsを使用すると、バックエンドのインフラストラクチャなしで、アプリケーションにインスタント・メッセージングを組み込むことができます。Momento web SDKを使用すると、ブラウザで直接キャッシュデータにアクセスし、メッセージを公開および購読できます。
このパターンでは、Momento Cache を使用して過去のメッセージを保存し、Momento Topics を使用してインスタント・メッセージ参加者間のリアルタイム・コミュニケーションを促進します。
アーキテクチャ
Momentoでのインスタントメッセージの構築は、完全にクライアントサイド、つまりユーザーインターフェイスで行うこ とができます。ブラウザ、iPhoneアプリ、Androidアプリはすべて、Momentoのリソースに直接アクセスする機能を持っています。
上の図では、新しいユーザーがチャットに参加すると、list からメッセージが読み込まれます。ユーザーはチャットルーム固有のトピックを購読し、チャットに参加している間、リアルタイムでメッセージを受け取ります。メッセージが送信されると、送信者によって直接リストに追加されます。
コンポーネント
インスタントメッセージは2つの部分に分かれています:
- Message storage
- Delivery
これらのコアコンポーネントにより、ユーザーはいつでもチャットに参加し、メッセージ履歴を閲覧したり、新しいメッセージをリアルタイムで受信することができます。
Message storage
ほとんどのインスタントメッセージプラットフォームは、ユーザーが到着する前にメッセージが送信された場合、会話の履歴を表示できるようにメッセージを保存します。チャットルームに入るときのスタートアップタスクの一部は、こ の履歴をロードして画面に表示することです。これはMomentoのlistキャッシュアイテムで簡単にできます。
Lists
リスト は、要素の順序付き配列を格納するコレクション・データ型です。要素をリストの前面または背面にプッシュすることができ、一度に複数のエントリを追加できます。リストは、メッセージが送信された順に格納されるため、インスタントメッセージングに最適なデータ型です。ユーザがインスタントメッセージングセッションに参加すると、1回のlistFetch APIコールでチャット履歴全体をすばやく取得できます。
リスト内の要素は一意である必要はありません。同じメッセージを何度もリストにプッシュすれば、その都度追加されます。リストは、次のような用途のインスタントメッセージに適しています:
- 一度に N 個のメッセージを取得
- 一度に複数のメッセージを追加
- 一定の長さのチャット履歴、つまり直近の100メッセージの保持
リストを使用する場合、多くの配列操作関数があり、メッセージ履歴が長くなりすぎた場合に自動的に切り詰めたり、コンテンツモデレーションなどのために特定の要素を削除したりすることができます。
Delivery
技術的には、メッセージの保存はインスタントメッセージングの必須コンポーネントではありません。理論的には、ユーザーが接続している間に送信されたメッセージだけを表示するメッセージングシステムを持つことができます。メッセージはクライアントのメモリーに保存しておけばよい。対照的に、メッセージ配信はインスタントメッセージングの必須部分です。メッセージを送信者からすべての受信者に最小限の待ち時間で届けることが、このパターンの核心です。これを行うために、Momento Topicsを使います。
Momento Topicsは、クライアント同士、クライアントとサーバー、サーバーとクライアント、サーバーとサーバーを接続する低レイテンシーのPub/Subサービスです。WebSocketのようなものです。トピックにメッセージをパブリッシュすると、Momentoはトピックのすべてのサブスクライバにブロードキャストします。
インスタントメッセージングの場合、送信者はトピックにメッセージを 発行 します。メッセージの受信者は、ユースケースによって1人または多数になりますが、サブスクライバになります。Momento Topicsは、公開されたメッセージをリアルタイムで購読者に配信し、私たちが求めているインスタント・メッセージング体験を提供します!
Momento トピックは、インフラストラクチャのリソースとして定義されていない、柔軟で動的なトピックを提供します。インスタントメッセージングを構築する推奨の方法は、チャットルームまたはセッション識別子をトピック名として使用することです。これにより、チャットのユースケースに柔軟で専用のトピックが提供されます。
メッセージの発行
メッセージを公開するために必要なのは、たったひとつのコマンドだけです:
- Node.js
- Python
- Go
- Java
- .NET
await topics.publish('message-namespace', sessionId, 'Hello world!');
topics.publish("message-namespace", sessionId, "Hello world!")
_, err := topics.Publish(ctx, &momento.TopicPublishRequest{
CacheName: momento.String("message-namespace"),
TopicName: sessionId,
Value: momento.String("Hello world!")
})
final TopicPublishResponse response = topics.publish("message-namespace", sessionId, "Hello world!").join();
var response = await topics.PublishAsync("message-namespace", sessionId, "Hello world!");
トピックの購読
メッセージの購読は、特定のトピックに何かが公開されるたびに、Momentoにそれを配信し、特定のイベントハンドラ関数を実行するように指示します。個々のトピックに対して、1 から数千 (あるいはそれ以上!) のクライアントを購読することができます。
- Node.js
- Python
- Go
- Java
- .NET
const subscription = await topics.subscribe('message-namespace', sessionId, {
onItem: (data) => processMessage(data.value(), data.tokenId())
});
with TopicClient(TopicConfigurations.Default.v1(), _AUTH_PROVIDER) as client:
subscription = client.subscribe("cache", "my_topic")
match subscription:
case TopicSubscribe.Error():
raise Exception("Subscription error: ", subscription.message)
case TopicSubscribe.Subscription():
for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(item.value)
case TopicSubscriptionItem.Binary():
print(item.value!r)
subscription, err := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: topicName,
})
if err != nil {
panic(err)
}
go func() { pollForMessages(ctx, subscription) }()
time.Sleep(time.Second)
func pollForMessages(ctx context.Context, subscription momento.TopicSubscription) {
for {
item, err := subscription.Item(ctx)
if err != nil {
panic(err)
}
switch message := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", message)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", message)
}
}
}
final TopicSubscribeResponse subscribeResponse =
topicClient
.subscribe(
TopicExample.CACHE_NAME,
TOPIC_NAME,
new ISubscriptionCallbacks() {
@Override
public void onItem(TopicMessage message) {
logger.info("Received message on topic {}: {}", TOPIC_NAME, message.toString());
};
@Override
public void onError(Throwable error) {
logger.error("Subscription to topic {} failed with error", TOPIC_NAME, error);
};
};).join();
var subscriptionTask = Task.Run(async () =>
{
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName);
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
try {
var cancellableSubscription = subscription.WithCancellation(cts.Token);
await foreach (var message in cancellableSubscription) {
switch (message)
{
case TopicMessage.Binary:
Logger.LogInformation("Received binary message from topic: {binaryData}", Convert.ToBase64String(binary.Value));
break;
case TopicMessage.Text text:
Logger.LogInformation("Received string message from topic: {message}", text.Value);
break;
case TopicMessage.Error error:
Logger.LogInformation("Received error message from topic: {error}", error.Message);
cts.Cancel();
break;
}
}
} finally {
subscription.Dispose();
}
break;
case TopicSubscribeResponse.Error error:
Logger.LogInformation("Error subscribing to a topic: {error}", error.Message);
cts.Cancel();
break;
}
});
セキュリティ
Momentoは堅牢な認証メカニズムを提供し、コンシューマが特定のリソースやアクションにアクセスできるように制限します。インスタントメッセージ機能を構築するとき、ユーザーがアクセスすべきではないメッセージを傍受しないように、スコープを考慮することは非常に重要です。
上記のパターンでは、キャッシュとトピックの両方のリソースを使用します。チャットルームをトピック名と一致させることで、上で推奨したベストプラクティスに従って、いくつかの異なる方法でポリシーを作成することができます。
クライアントサイドのみでの実装
インスタントメッセージングを完全にクライアントサイドで、つまりブラウザやモバイルアプリケーションで直接構築することが可能です。以下のポリシーがクライアントで使用されます。
- Node.js
- Java
- .NET
const scope = {
permissions: [
{
role: "readwrite",
cache: "instant-messaging",
item: {
key: chatId
}
},
{
role: "publishsubscribe",
cache: "instant-messaging",
topic: chatId
}
]
};
const token = await authClient.generateDisposableToken(scope, ExpiresIn.hours(1));
List<DisposableTokenPermission> permissions = new ArrayList<>();
permissions.add(
new DisposableToken.CacheItemPermission(
CacheRole.ReadWrite, CacheSelector.ByName("instant-messaging"), CacheSelector.ByName(chatId)));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId)));
DisposableTokenScope scope = new DisposableTokenScope(permissions);
GenerateDisposableTokenResponse response = authClient.generateDisposableTokenAsync(scope, ExpiresIn.hours(1)).join();
var scope = new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.CacheItemPermission(
CacheRole.ReadWrite,
CacheSelector.ByName("instant-messaging"),
CacheSelector.ByName(chatid)
),
new DisposableToken.TopicPermission(
TopicRole.PublishSubscribe,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatid)
)
});
var tokenResponse = await client.GenerateDisposableTokenAsync( scope, ExpiresIn.Hours(1));
このポリシーは、特定のキャッシュ・アイテムに読み取り権限と書き込み権限を付与します。
また、1つのトピックに対してパブリッシュとサブスクライブのパーミッションが付与されます。コンシューマは、1つのトピックに対してメッセージを発行し、1つのトピックからメッセージを受信することができます。他のトピックを購読しようとすると、認証エラーが発生します。
キャッシュ・アイテムとトピックは同じ値を共有することに注意してください。これは推奨されるベスト・プラクティスであり、キャッシュ・アイテムのデータがトピックにパブリッシュされるメッセージに干渉することはありません。
ポリシーが作成されたら、それをクライアントに渡し、設定された時間後に失効するトークンを作成します。
クライアントサイドのみのアプローチのトレードオフは、フロントエンドの信頼性です。これはクライアントに書き込み権限を与えることになるので、コードで公開する内容には注意しなければなりません。エンド・ユーザーに書き込み操作を許可することは、その使用方法に対する信頼を必要とします。これはまた、サーバーサイドのコードにストレージを集中させるのではなく、データを分散させて保存するアプローチを取ります。
もしそれがあなたのアプリケーションにとって許容範囲であれば、比類のない市場投入までの時間と、アプリケーションの複雑さを最小限に抑えることができます。
サーバーサイド と クライアントサイド の実装
書き込みをサーバー側のコードに集中させると、パターンは少し変わります。生のメッセージ入力は1つのトピックを通して送信され、サーバーコードによってピックアップされます。検証されたトピックに送り返す前に、変換やモデレーションが行われます。以下は、このパターンに従ってクライアント側で使用されるポリシーの例です。
- Node.js
- Java
- .NET
const scope = {
permissions: [
{
role: "read",
cache: "instant-messaging",
item: {
key: chatId
}
},
{
role: "publishonly",
cache: "instant-messaging",
topic: `${chatid}-input`
},
{
role: "subscribeonly",
cache: "instant-messaging",
topic: chatid
},
]
};
const token = await authClient.generateDisposableToken(scope, ExpiresIn.hours(1));
List<DisposableTokenPermission> permissions = new ArrayList<>();
permissions.add(
new DisposableToken.CacheItemPermission(
CacheRole.ReadOnly, CacheSelector.ByName("instant-messaging"), CacheSelector.ByName(chatId)));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.PublishOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId + "-input")));
permissions.add(
new DisposableToken.TopicPermission(
TopicRole.SubscribeOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatId)));
DisposableTokenScope scope = new DisposableTokenScope(permissions);
GenerateDisposableTokenResponse response = authClient.generateDisposableTokenAsync(scope, ExpiresIn.hours(1)).join();
var scope = new DisposableTokenScope(Permissions: new List<DisposableTokenPermission>
{
new DisposableToken.CacheItemPermission(
CacheRole.ReadOnly,
CacheSelector.ByName("instant-messaging"),
CacheSelector.ByName(chatid)
),
new DisposableToken.TopicPermission(
TopicRole.PublishOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName($"{chatid}-input")
),
new DisposableToken.TopicPermission(
TopicRole.SubscribeOnly,
CacheSelector.ByName("instant-messaging"),
TopicSelector.ByName(chatid)
)
});
var tokenResponse = await client.GenerateDisposableTokenAsync( scope, ExpiresIn.Hours(1));
上記のポリシーは、会話を格納するキャッシュアイテムへの読み取り専用アクセスを許可 します。書き込みは、モデレーションと変換が完了した後にサーバーサイドで行われます。
また、{chatid}-input
トピックへのパブリッシュアクセスも許可する。サーバコードはこのトピックを購読し、モデレーションを行い、最終的なメッセージを {chatid}
トピックに公開します。
Learn more
Momentoを使用すると、あらゆるアプリケーションでインスタント・メッセージを迅速かつ安全に開発できます。CacheとTopicsの詳細、またはプロジェクトのクイックスタートについては、以下のリソースをご覧ください。