メインコンテンツまでスキップ

Momento Topics(pub/sub)API を使用する

Momento Topics は、分散型アプリケーションの各部分間でリアルタイム通信を可能にするメッセージングパターンです。トピックの値をパブリッシュ(プロデュース)し、トピックからサブスクライブ(コンシューム)することを可能にします。このページでは、Momento Topics で操作するための Momento API メソッドについて詳しく説明します。

詳細はMomento Topicsをご覧ください。

Topics のメソッド

Subscribe

このメソッドはステートフル接続でサブスクリプションイベントを受け取るためにトピックをサブスクライブします。

言語によっては、アイテム、ハートビート、不連続などの新しいサブスクリプションイベントを受け取るためにコールバック関数やイテレータを使用することができます。:

  • アイテムは、文字列またはバイトメッセージ、トピックシーケンス番号、一意なトークン識別子があればそれを含みます(learn more)。
  • ハートビートは接続がまだアクティブであることを示します。
  • 不連続はサブスクリプションに中断があったことを示し、いくつかのメッセージはスキップされたかもしれません。
名前説明
cacheNameStringトピックが存在するキャッシュの名前
topicNameStringサブスクライブするトピックの名前

これが サンプルコードです。

メソッドのレスポンスオブジェクト
  • Success - subscriptionオブジェクトを返します。
  • Error

具体的な情報についてはレスポンスオブジェクトをご覧ください。

const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");

console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');

console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));

// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
備考
Full example code and imports can be found here

Publish

メッセージをトピックにパブリッシュします。

名前説明
cacheNameStringトピックが存在するキャッシュの名前
topicNameString値をパブリッシュするトピック名
valueString / bytes トピックにパブリッシュする値

こちらがサンプルコードです。

メソッドのレスポンスオブジェクト
  • Success
  • Error

具体的な情報についてはレスポンスオブジェクトをご覧ください。

const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
備考
Full example code and imports can be found here

TopicClient

ほとんどの Momento Cache API コールでは CacheClient を使用していますが、Topics については TopicClient オブジェクトを使用します。

new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});
備考
Full example code and imports can be found here

Example apps using Momento Topics APIs

A growing list of example apps using the Momento Topics.

Momento Topicsを使用したアプリの例が続々と増えています。

  • サーバーレスで作成されたアイテムをパブリッシュするマイクロサービス このマイクロサービスはTypeScriptで書かれ、API Gateway、Lambda関数、Momento Topicsを使ってAWS上で実行される。(API Gateway上で適切なセキュリティが設定されていれば)他のサービスでも利用することができ、様々なトピックにメッセージを発行して他のアプリケーションから購読させることができます。この API に topicNametopicValue を渡すと、このサービスはその値をトピックにパブリッシュします。