Momento Topics(pub/sub)API を使用する
Momento Topics は、分散型アプリケーションの各部分間でリアルタイム通信を可能にするメッセージングパターンです。トピックの値をパブリッシュ(プロデュース)し、トピックからサブスクライブ(コンシューム)することを可能にします。このページでは、Momento Topics で操作するための Momento API メソッドについて詳しく説明します。
詳細はMomento Topicsをご覧ください。
Topics のメソッド
Subscribe
このメソッドはステートフル接続でサブスクリプションイベントを受け取るためにトピックをサブスクライブします。
言語によっては、アイテム、ハートビート、不連続などの新しいサブスクリプションイベントを受け取るためにコールバック関数やイテレータを使用することができます。:
- アイテムは、文字列またはバイトメッセージ、トピックシーケンス番号、一意なトークン識別子があればそれを含みます(learn more)。
- ハートビートは接続がまだアクティブであることを示します。
- 不連続はサブスクリプション に中断があったことを示し、いくつかのメッセージはスキップされたかもしれません。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | サブスクライブするトピックの名前 |
- Go
- Node.js
これが サンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success - subscriptionオブジェクトを返します。
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.subscribe(cacheName, 'test-topic', {
onError: () => {
return;
},
onItem: (item: TopicItem) => {
console.log(`Received an item on subscription for 'test-topic': ${item.value().toString()}`);
return;
},
});
switch (result.type) {
case TopicSubscribeResponse.Subscription:
console.log("Successfully subscribed to topic 'test-topic'");
console.log("Publishing a value to the topic 'test-topic'");
// Publish a value
await topicClient.publish(cacheName, 'test-topic', 'test-value');
console.log('Waiting for the published value to be received.');
await new Promise(resolve => setTimeout(resolve, 1000));
// Need to close the stream before the example ends or else the example will hang.
result.unsubscribe();
break;
case TopicSubscribeResponse.Error:
throw new Error(
`An error occurred while attempting to subscribe to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.subscribe("cache", "my_topic")
match response:
case TopicSubscribe.Error() as error:
print(f"Error subscribing to topic: {error.message}")
case TopicSubscribe.SubscriptionAsync() as subscription:
await topic_client.publish("cache", "my_topic", "my_value")
async for item in subscription:
match item:
case TopicSubscriptionItem.Text():
print(f"Received message as string: {item.value}")
return
case TopicSubscriptionItem.Binary():
print(f"Received message as bytes: {item.value!r}")
return
case TopicSubscriptionItem.Error():
print(f"Error with received message: {item.inner_exception.message}")
return
when (val response = topicClient.subscribe("test-cache", "test-topic")) {
is TopicSubscribeResponse.Subscription -> coroutineScope {
launch {
withTimeoutOrNull(2000) {
response.collect { item ->
when (item) {
is TopicMessage.Text -> println("Received text message: ${item.value}")
is TopicMessage.Binary -> println("Received binary message: ${item.value}")
is TopicMessage.Error -> throw RuntimeException(
"An error occurred reading messages from topic 'test-topic': ${item.errorCode}", item
)
}
}
}
}
}
is TopicSubscribeResponse.Error -> throw RuntimeException(
"An error occurred while attempting to subscribe to topic 'test-topic': ${response.errorCode}", response
)
}
// Instantiate subscriber
sub, subErr := topicClient.Subscribe(ctx, &momento.TopicSubscribeRequest{
CacheName: cacheName,
TopicName: "test-topic",
})
if subErr != nil {
panic(subErr)
}
time.Sleep(time.Second)
_, pubErr := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if pubErr != nil {
panic(pubErr)
}
time.Sleep(time.Second)
// Receive only subscription items with messages
item, err := sub.Item(ctx)
if err != nil {
panic(err)
}
switch msg := item.(type) {
case momento.String:
fmt.Printf("received message as string: '%v'\n", msg)
case momento.Bytes:
fmt.Printf("received message as bytes: '%v'\n", msg)
}
// Receive all subscription events (messages, discontinuities, heartbeats)
event, err := sub.Event(ctx)
if err != nil {
panic(err)
}
switch e := event.(type) {
case momento.TopicHeartbeat:
fmt.Printf("received heartbeat\n")
case momento.TopicDiscontinuity:
fmt.Printf("received discontinuity\n")
case momento.TopicItem:
fmt.Printf(
"received message with sequence number %d and publisher id %s: %v \n",
e.GetTopicSequenceNumber(),
e.GetPublisherId(),
e.GetValue(),
)
}
var produceCancellation = new CancellationTokenSource();
produceCancellation.CancelAfter(2000);
var subscribeResponse = await topicClient.SubscribeAsync("test-cache", "test-topic");
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
var cancellableSubscription = subscription.WithCancellation(produceCancellation.Token);
await Task.Delay(1_000);
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
await Task.Delay(1_000);
await foreach (var message in cancellableSubscription)
{
switch (message)
{
case TopicMessage.Binary:
Console.WriteLine("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Console.WriteLine($"Received string message from topic: {text.Value}");
break;
case TopicMessage.Error error:
throw new Exception($"An error occurred while receiving topic message: {error.ErrorCode}: {error}");
default:
throw new Exception("Bad message received");
}
}
subscription.Dispose();
break;
case TopicSubscribeResponse.Error error:
throw new Exception($"An error occurred subscribing to a topic: {error.ErrorCode}: {error}");
}
// Make a subscription
let mut subscription = topic_client
.subscribe(cache_name, topic_name)
.await
.expect("subscribe rpc failed");
// Consume the subscription
while let Some(item) = subscription.next().await {
println!("Received subscription item: {item:?}")
}
let subscribeResponse = await topicClient.subscribe(cacheName: cacheName, topicName: "topic")
#if swift(>=5.9)
let subscription = switch subscribeResponse {
case .error(let err): fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub): sub
}
#else
let subscription: TopicSubscription
switch subscribeResponse {
case .error(let err):
fatalError("Error subscribing to topic: \(err)")
case .subscription(let sub):
subscription = sub
}
#endif
// unsubscribe in 5 seconds
Task {
try await Task.sleep(nanoseconds: 5_000_000_000)
subscription.unsubscribe()
}
// loop over messages as they are received
for try await item in subscription.stream {
var value: String = ""
switch item {
case .itemText(let textItem):
value = textItem.value
print("Subscriber recieved text message: \(value)")
case .itemBinary(let binaryItem):
value = String(decoding: binaryItem.value, as: UTF8.self)
print("Subscriber recieved binary message: \(value)")
case .error(let err):
print("Subscriber received error: \(err)")
}
}
final subscription = await topicClient.subscribe("test-cache", "test-topic");
final messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
"Subscribe error: ${subscription.errorCode} ${subscription.message}"),
};
// cancel subscription 5 seconds from now
Timer(const Duration(seconds: 5), () {
print("Cancelling subscription!");
subscription.unsubscribe();
});
try {
await for (final msg in messageStream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
Publish
メッセージをトピックにパブリッシュします。
名前 | 型 | 説明 |
---|---|---|
cacheName | String | トピックが存在するキャッシュの名前 |
topicName | String | 値をパブリッシュするトピック名 |
value | String / bytes | トピックにパブリッシュする値 |
- Go
- Node.js
こちらがサンプルコードです。
Coming soon.
メソッドのレスポンスオブジェクト
- Success
- Error
具体的な情報についてはレスポンスオブジェクトをご覧ください。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
const result = await topicClient.publish(cacheName, 'test-topic', 'test-topic-value');
switch (result.type) {
case TopicPublishResponse.Success:
console.log("Value published to topic 'test-topic'");
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the topic 'test-topic' in cache '${cacheName}': ${result.errorCode()}: ${result.toString()}`
);
}
response = await topic_client.publish("cache", "my_topic", "my_value")
match response:
case TopicPublish.Success():
print("Successfully published a message")
case TopicPublish.Error() as error:
print(f"Error publishing a message: {error.message}")
when (val response = topicClient.publish("test-cache", "test-topic", "test-message")) {
is TopicPublishResponse.Success -> println("Message published successfully")
is TopicPublishResponse.Error -> throw RuntimeException(
"An error occurred while attempting to publish message to topic 'test-topic': ${response.errorCode}",
response
)
}
_, err := topicClient.Publish(ctx, &momento.TopicPublishRequest{
CacheName: cacheName,
TopicName: "test-topic",
Value: momento.String("test-message"),
})
if err != nil {
panic(err)
}
var publishResponse =
await topicClient.PublishAsync("test-cache", "test-topic", "test-topic-value");
switch (publishResponse)
{
case TopicPublishResponse.Success:
Console.WriteLine("Successfully published message to 'test-topic'");
break;
case TopicPublishResponse.Error error:
throw new Exception($"An error occurred while publishing topic message: {error.ErrorCode}: {error}");
}
topic_client
.publish(cache_name, topic_name, "Hello, Momento!")
.await?;
println!("Published message");
let result = await topicClient.publish(
cacheName: cacheName,
topicName: "topic",
value: "value"
)
switch result {
case .success(_):
print("Successfully published message!")
case .error(let err):
print("Unable to publish message: \(err)")
exit(1)
}
final result = await topicClient.publish("cache", "topic", "hello message!");
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
TopicClient
ほとんどの Momento Cache API コールでは CacheClient を使用していますが、Topics については TopicClient オブジェクトを使用します。
- JavaScript
- Python
- Kotlin
- Go
- C#
- Rust
- Swift
- Dart
new TopicClient({
configuration: TopicConfigurations.Default.latest(),
credentialProvider: CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
});
TopicClientAsync(
TopicConfigurations.Default.latest(), CredentialProvider.from_environment_variable("MOMENTO_API_KEY")
)
TopicClient(
CredentialProvider.fromEnvVar("MOMENTO_API_KEY"), TopicConfigurations.Laptop.latest
).use { topicClient ->
//...
}