Momento Topics(pub/sub)API を使用する
Momento Topics は、分散型アプリケーションの各部分間でリアルタイム通信を可能にするメッセージングパターンです。トピックの値をパブリッシュ(プロデュース)し、トピックからサブスクライブ(コンシューム)することを可能にします。このページでは、Momento Topics で操作するための Momento API メソッドについて詳しく説明します。
詳細はMomento Topicsをご覧ください。
Topics のメソッド
Subscribe
このメソッドはステートフル接続でサブスクリプションイベントを受け取るためにトピックをサブスクライブします。
言語によっては、アイテム、ハートビート、不連続などの新しいサブスクリプションイベントを受け取るためにコールバック関数やイテレータを使用することができます。:
- アイテムは、文字列またはバイトメッセージ、トピックシーケンス番号、一意なトークン識別子があればそれを含みます(learn more)。
- ハートビートは接続がまだアクティブであることを示します。
- 不連続はサブスクリプション に中断があったことを示し、いくつかのメッセージはスキップされたかもしれません。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | サブスクライブするトピックの名前 |
- Go
- Node.js
これが サンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success - subscriptionオブジェクトを返します。
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");
console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');
console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
備考
Full example code and imports can be found here
response = await topic_client.subscribe("cache", "my_topic")
match response:
case TopicSubscribe.Error() as error:
print(f"Error subscribing to topic: {error.message}")
case TopicSubscribe.SubscriptionAsync() as subscription:
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value!r}")
return
case TopicSubscriptionItem.Error():
print(f"Error with received message: {item.inner_exception.message}")
return
備考
Full example code and imports can be found here
when (val response = topicClient.subscribe("test-cache", "test-topic")) {
is TopicSubscribeResponse.Subscription -> coroutineScope {
launch {
withTimeoutOrNull(2000) {
response.collect { item ->
when (item) {
is TopicMessage.Text -> println("Received text message: ${item.value}")
is TopicMessage.Binary -> println("Received binary message: ${item.value}")
is TopicMessage.Error -> throw RuntimeException(
"An error occurred reading messages from topic 'test-topic': ${item.errorCode}", item
)
}
}
}
}
}
is TopicSubscribeResponse.Error -> throw RuntimeException(
"An error occurred while attempting to subscribe to topic 'test-topic': ${response.errorCode}", response
)
}
備考
Full example code and imports can be found here
// Instantiate subscriber
sub, subErr := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
// Receive only subscription items with messages
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
// Receive all subscription events (messages, discontinuities, heartbeats)
event, err := sub.Event(ctx)
if err != nil {
panic(err)
}
switch e := event.(type) {
case momento.TopicHeartbeat:
fmt.Printf("received heartbeat\n")
case momento.TopicDiscontinuity:
fmt.Printf("received discontinuity\n")
case momento.TopicItem:
fmt.Printf(
"received message with sequence number %d and publisher id %s: %v \n",
e.GetTopicSequenceNumber(),
e.GetPublisherId(),
e.GetValue(),
)
}
備考
Full example code and imports can be found here
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(5_000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
// Note: use `WithCancellation` to filter only the `TopicMessage` types
var cancellableSubscription = subscription.WithCancellationForAllEvents(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var topicEvent in cancellableSubscription)
{
switch (topicEvent)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicSystemEvent.Heartbeat:
Console.WriteLine("Received heartbeat from topic.");
break;
case TopicSystemEvent.Discontinuity discontinuity:
Console.WriteLine($"Received discontinuity from topic: {discontinuity}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
備考
Full example code and imports can be found here
// Make a subscription
let mut subscription = topic_client
.subscribe(cache_name, topic_name)
.await
.expect("subscribe rpc failed");
// Consume the subscription
while let Some(item) = subscription.next().await {
println!("Received subscription item: {item:?}")
}
備考
Full example code and imports can be found here
let subscribeResponse = await topicClient.subscribe(cacheName: cacheName, topicName: "topic")
#if swift(>=5.9)
let subscription = switch subscribeResponse {
case .error(let err): fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub): sub
}
#else
let subscription: TopicSubscription
switch subscribeResponse {
case .error(let err):
fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub):
subscription = sub
}
#endif
// unsubscribe in 5 seconds
Task {
try await Task.sleep(nanoseconds: 5_000_000_000)
subscription.unsubscribe()
}
// loop over messages as they are received
for try await item in subscription.stream {
var value: String = ""
switch item {
case .itemText(let textItem):
value = textItem.value
print("Subscriber recieved text message: \(value)")
case .itemBinary(let binaryItem):
value = String(decoding: binaryItem.value, as: UTF8.self)
print("Subscriber recieved binary message: \(value)")
case .error(let err):
print("Subscriber received error: \(err)")
}
}
備考
Full example code and imports can be found here
final subscription = await topicClient.subscribe("test-cache", "test-topic");
final messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
"Subscribe error: ${subscription.errorCode} ${subscription.message}"),
};
// cancel subscription 5 seconds from now
Timer(const Duration(seconds: 5), () {
print("Cancelling subscription!");
subscription.unsubscribe();
});
try {
await for (final msg in messageStream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
備考
Full example code and imports can be found here
Publish
メッセージをトピックにパブリッシュします。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | 値をパブリッシュするトピック名 |
value | String / bytes | トピックにパブリッシュする値 |
- Go
- Node.js
こちらがサンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
備考
Full example code and imports can be found here
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error() as error:
print(f"Error publishing a message: {error.message}")
備考
Full example code and imports can be found here
when (val response = topicClient.publish("test-cache", "test-topic", "test-message")) {
is TopicPublishResponse.Success -> println("Message published successfully")
is TopicPublishResponse.Error -> throw RuntimeException(
"An error occurred while attempting to publish message to topic 'test-topic': ${response.errorCode}",
response
)
}
備考
Full example code and imports can be found here
_, err := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
備考
Full example code and imports can be found here
var publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
備考
Full example code and imports can be found here
topic_client
.publish(cache_name, topic_name, "Hello, Momento!")
.await?;
println!("Published message");
備考
Full example code and imports can be found here
let result = await topicClient.publish(
cacheName: cacheName,
topicName: "topic",
value: "value"
)
switch result {
case .success(_):
print("Successfully published message!")
case .error(let err):
print("Unable to publish message: \(err)")
exit(1)
}
備考
Full example code and imports can be found here
final result = await topicClient.publish("cache", "topic", "hello message!");
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
備考
Full example code and imports can be found here
TopicClient
ほとんどの Momento Cache API コールでは CacheClient を使用していますが、Topics については TopicClient オブジェクトを使用します。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});
備考
Full example code and imports can be found here
TopicClientAsync(
TopicConfigurations.Default.latest(), CredentialProvider.from_environment_variable("MOMENTO_API_KEY")
)
備考
Full example code and imports can be found here
TopicClient(
CredentialProvider.fromEnvVar("MOMENTO_API_KEY"), TopicConfigurations.Laptop.latest
).use { topicClient ->
//...
}
備考
Full example code and imports can be found here
credProvider, err := auth.NewEnvMomentoTokenProvider("MOMENTO_API_KEY")
if err != nil {
panic(err)
}
topicClient, err = momento.NewTopicClient(
config.TopicsDefault(),
credProvider,
)
if err != nil {
panic(err)
}
備考
Full example code and imports can be found here
new TopicClient(
TopicConfigurations.Laptop.latest(),
new EnvMomentoTokenProvider("MOMENTO_API_KEY")
);
備考
Full example code and imports can be found here
let _topic_client = TopicClient::builder()
.configuration(momento::topics::configurations::Laptop::latest())
.credential_provider(CredentialProvider::from_env_var("MOMENTO_API_KEY")?)
.build()?;
備考
Full example code and imports can be found here
do {
let credentialProvider = try CredentialProvider.fromEnvironmentVariable(envVariableName: "MOMENTO_API_KEY")
let topicClient = TopicClient(
configuration: TopicClientConfigurations.iOS.latest(),
credentialProvider: credentialProvider
)
} catch {
print("Unable to create topic client: \(error)")
exit(1)
}
備考
Full example code and imports can be found here
try {
final topicClient = TopicClient(
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
TopicClientConfigurations.latest());
} catch (e) {
print("Unable to create topic client: $e");
exit(1);
}
備考
Full example code and imports can be found here
Example apps using Momento Topics APIs
A growing list of example apps using the Momento Topics.
Momento Topicsを使用したアプリの例が続々と増えています。
- サーバーレスで作成されたアイテムをパブリッシュするマイクロサービス このマイクロサービスはTypeScriptで書かれ、API Gateway、Lambda関数、Momento Topicsを使ってAWS上で実行される。(API Gateway上で適切なセキュリティが設定されていれば)他のサービスでも利用することができ、様々なトピックにメッセージを発行して他のアプリケーションから購読させることができます。この API に
topicName
とtopicValue
を渡すと、このサービスはその値をトピックにパブリッシュします。