Momento Topics(pub/sub)API を使用する
Momento Topics は、分散型アプリケーションの各部分間でリアルタイム通信を可能にするメッセージングパターンです。トピックの値をパブリッシュ(プロデュース)し、トピックからサブスクライブ(コンシューム)することを可能にします。このページでは、Momento Topics で操作するための Momento API メソッドについて詳しく説明します。
詳細はMomento Topicsをご覧ください。
Topics のメソッド
Subscribe
このメソッドでは、ステートフルな接続を用いて新しい値を受け取るためにトピックをサブスクライブします。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | サブスクライブするトピックの名前 |
これが サンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success - サブスクリプションオブジェクトを返します。
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
返されたサブスクリプションオブジェクトをforループに置くと、新しい値がトピックに公開される時にコードにイベントが送信されます。
- JavaScript
- Python
- Go
- C#
const result = await topicClient.subscribe('test-cache', 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Publishing values to the topic 'test-topic': ${item.value().toString()}`);
return;
},
});
if (result instanceof TopicSubscribe.Subscription) {
console.log("Successfully subscribed to topic 'test-topic'");
// Publish a value
await topicClient.publish('test-cache', 'test-topic', 'test-value');
// Wait for published values to be received.
setTimeout(() => {
console.log('Waiting for the published values');
}, 2000);
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
} else if (result instanceof TopicSubscribe.Error) {
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache 'test-cache': ${result.errorCode()}: ${result.toString()}`
);
}
subscription = await topic_client.subscribe("cache", "my_topic")
match subscription:
case TopicSubscribe.Error():
print("Error subscribing to topic: ", subscription.message)
case TopicSubscribe.SubscriptionAsync():
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value}")
return
case TopicSubscriptionItem.Error():
print("Error with received message:", item.inner_exception.message)
return
// Instantiate subscriber
sub, subErr := client.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: "test-cache",
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(2000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
var cancellableSubscription = subscription.WithCancellation(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var message in cancellableSubscription)
{
switch (message)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
Publish
メッセージをトピックにパブリッシュします。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | 値をパブリッシュするトピック名 |
value | String / bytes | トピックにパブリッシュする値 |
こちらがサンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
- JavaScript
- Python
- Go
- C#
const result = await topicClient.publish('test-cache', 'test-topic', 'test-topic-value');
if (result instanceof TopicPublish.Success) {
console.log("Value published to topic 'test-topic'");
} else if (result instanceof TopicPublish.Error) {
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache 'test-cache': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error():
print("Error publishing a message: ", response.message)
_, err := client.Publish(ctx, &momento.TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
ar publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
TopicClient
ほとんどの Momento Cache API コールでは CacheClient を使用していますが、Topics については TopicClient オブジェクトを使用します。
- JavaScript
- Python
- Go
- C#
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable({
environmentVariableName: 'MOMENTO_API_KEY',
}),
});
topic_client = TopicClientAsync(
TopicConfigurations.Default.latest(),
CredentialProvider.from_environment_variable("MOMENTO_AUTH_TOKEN")
)
credProvider, err := auth.NewEnvMomentoTokenProvider("MOMENTO_API_KEY")
if err != nil {
panic(err)
}
topicClient, err := momento.NewTopicClient(
config.TopicsDefault(),
credProvider,
)
if err != nil {
panic(err)
}
new TopicClient(
TopicConfigurations.Laptop.latest(),
new EnvMomentoTokenProvider("MOMENTO_AUTH_TOKEN")
);
Example apps using Momento Topics APIs
A growing list of example apps using the Momento Topics.
Momento Topicsを使用したアプリの例が続々と増えています。
- サーバーレスで作成されたアイテムをパブリッシュするマイクロサービス このマイクロサービスはTypeScriptで書かれ、API Gateway、Lambda関数、Momento Topicsを使ってAWS上で実行される。(API Gateway上で適切なセキュリティが設定されていれば)他のサービスでも利用することができ、様々なトピックにメッセージを発行して他のアプリケーションから購読させることができます。この API に
topicName
とtopicValue
を渡すと、このサービスはその値をトピックにパブリッシュします。