イベント駆動型アーキテクチャでMomentoトピック、Webhook、トークンを活用する
Momento Topicsを使用すると、トピック上のメッセージを購読し、別のトピックにメッセージを公開することができます。Webhooksは、これらのトピックに発行されたメッセージに応答してトリガーされるHTTPコールバックとして機能し、ステートレスコンシューマーとして動作します。トークンは、システムとやりとりするユーザーやサービスに、短時間のセッション・トークンを提供するように設計されています。各トークンに一意の識別子 (token_id
) を埋め込むことで、リソースへの安全で追跡可能なアクセスを保証します。
ここで重要なのは、Topics を使って短命のトークンを付与することで、ユーザー間のリアルタイムなコミュニケーションを促進できることです。これらのトークンにはユーザー情報 (token_id
) を埋め込むことができ、メッセージが公開されると、Webhook コールバックを介して Momento キャッシュなどの共有リソースにアクセスするために活用することができます。例えば、token_id
を使ってユーザーを特定することで、Momento キャッシュに保存されたユーザー情報にアクセスし、ユーザー体験をパーソナライズすることができます。情報を埋め込むことで、2つの大きな利点が得られます:
- セキュリティを強化し、ユーザーのなりすましを防ぐ。
- トークン自体にユーザー情報が埋め込まれているため、 データ転送コストが削減される。
はじめに
- Momentoコンソールでキャッシュを作成する
- ウェブフックを作成する をキャッシュに追加します。Webhook の宛先を公開エンドポイントに割り当てます。このエンドポイントは POST リクエストを受け入れ、Momento からの着信コールを受け取ることができる必要があります。このイベントの構造の詳細については、こちら を参照してください。
- インフラを作成します。例えば、以下はウェブフックエンドポイントのラムダ関数URLを作成するサンプルコードです。また また、AWS secrets manager に Momento 認証トークンを保存します。
import * as path from 'path';
import * as cdk from 'aws-cdk-lib';
import {Construct} from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaNodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as secrets from 'aws-cdk-lib/aws-secretsmanager';
import {CfnOutput} from 'aws-cdk-lib';
export class MomentoWebhookStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const momentoApiKeyParam = new cdk.CfnParameter(this, 'MomentoApiKey', {
type: 'String',
description: 'The Momento API key that will be used to read from the cache.',
noEcho: true,
});
const momentoSecretStringParam = new cdk.CfnParameter(this, 'MomentoSecretString', {
type: 'String',
description: 'The Momento Webhook Secret String that will be used to validate the caller',
noEcho: true,
});
const apiKeySecret = new secrets.Secret(this, 'MomentoWebhookHandlerApiKey', {
secretName: 'MomentoWebhookHandlerApiKey',
secretStringValue: new cdk.SecretValue(momentoApiKeyParam.valueAsString),
});
const secretStringSecret = new secrets.Secret(this, 'MomentoWebhookHandlerSecretString', {
secretName: 'MomentoWebhookHandlerSecretString',
secretStringValue: new cdk.SecretValue(momentoSecretStringParam.valueAsString),
});
const webhookHandlerLambda = new lambdaNodejs.NodejsFunction(this, 'MomentoWebhookHandler', {
functionName: 'MomentoWebhookHandler',
runtime: lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, '../../lambda/webhook-handler/handler.ts'),
projectRoot: path.join(__dirname, '../../lambda/webhook-handler'),
depsLockFilePath: path.join(__dirname, '../../lambda/webhook-handler/package-lock.json'),
handler: 'handler',
timeout: cdk.Duration.seconds(30),
memorySize: 128,
environment: {
MOMENTO_API_KEY_SECRET_NAME: apiKeySecret.secretName,
THE_SIGNING_SECRET: secretStringSecret.secretName,
},
});
const serviceLambda = new lambdaNodejs.NodejsFunction(this, 'ServiceLambda', {
functionName: 'ServiceLambda',
runtime: lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, '../../lambda/service-topics/handler.ts'),
projectRoot: path.join(__dirname, '../../lambda/service-topics'),
depsLockFilePath: path.join(__dirname, '../../lambda/service-topics/package-lock.json'),
handler: 'handler',
timeout: cdk.Duration.seconds(30),
memorySize: 128,
environment: {
MOMENTO_API_KEY_SECRET_NAME: apiKeySecret.secretName,
},
});
// 👇 Setup lambda url
const lambdaUrl = webhookHandlerLambda.addFunctionUrl({
authType: lambda.FunctionUrlAuthType.NONE,
});
apiKeySecret.grantRead(webhookHandlerLambda);
apiKeySecret.grantRead(serviceLambda);
secretStringSecret.grantRead(webhookHandlerLambda);
new CfnOutput(this, 'FunctionUrl ', {value: lambdaUrl.url});
}
}
- Webhook にコードを追加して、受信メッセージを処理します。以下は、Webhook ペイロードからユーザの
token_id
を抽出し、Momento キャッシュに保存されたリソースにアクセスする Webhook ラムダハンドラのサンプルコードです。また、Webhook の呼び出し元が本当に Momento であることを、署名の秘密を通して確認します。
import {GetSecretValueCommand, SecretsManagerClient} from '@aws-sdk/client-secrets-manager';
import {
CacheClient,
CacheListPushFrontResponse,
Configurations,
CredentialProvider,
WebhookUtils,
} from '@gomomento/sdk';
const _secretsClient = new SecretsManagerClient({});
const _cachedSecrets = new Map<string, string>();
let _cacheClient: CacheClient | undefined = undefined;
const cacheName = 'course-comments';
interface LambdaEvent {
headers: Record<string, string>;
body: string;
}
interface Payload {
token_id: string;
text: string;
}
interface Message {
courseId: string;
comment: string;
}
export const handler = async (event: LambdaEvent) => {
try {
const secretStringSecretName = process.env.THE_SIGNING_SECRET;
if (secretStringSecretName === undefined) {
throw new Error("Missing required env var 'THE_SIGNING_SECRET");
}
const secretString = await getSecret(secretStringSecretName);
const authorized = WebhookUtils.validateWebhookRequest({
signature: event.headers['momento-signature'],
signingSecret: secretString,
body: event.body,
});
if (authorized !== WebhookUtils.RequestValidation.VALID) {
return {
statusCode: 403,
headers: {
'Content-Type': 'application/json',
},
body: '{"message": "Access Denied!"}',
};
}
const payload: Payload = JSON.parse(event.body) as Payload;
const userID = payload.token_id;
const message = JSON.parse(payload.text) as Message;
console.log('Storing user comment for userID ' + userID + ' and courseId ' + message.courseId);
console.log('Comment: ' + message.comment);
const cacheClient = await getCacheClient();
if (_cacheClient === undefined) {
throw new Error('Cache client is undefined');
}
const listResp = await cacheClient.listPushFront(
cacheName,
String(message.courseId),
JSON.stringify({userID: userID, comment: message.comment})
);
switch (listResp.type) {
case CacheListPushFrontResponse.Success:
console.log('Successfully persisted comment for course');
break;
case CacheListPushFrontResponse.Error:
console.log('Error while publishing comment for course ' + listResp.message());
break;
}
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
},
body: '{}',
};
} catch (err) {
console.log(err);
return {
statusCode: 500,
body: JSON.stringify({
message: 'An error occurred!' + String(err),
}),
};
}
};
async function getSecret(secretName: string): Promise<string> {
if (!_cachedSecrets.has(secretName)) {
const secretResponse = await _secretsClient.send(new GetSecretValueCommand({SecretId: secretName}));
if (secretResponse) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
_cachedSecrets.set(secretName, secretResponse.SecretString!);
} else {
throw new Error(`Unable to retrieve secret: ${secretName}`);
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return _cachedSecrets.get(secretName)!;
}
async function getCacheClient(): Promise<CacheClient> {
const apiKeySecretName = process.env.MOMENTO_API_KEY_SECRET_NAME;
if (apiKeySecretName === undefined) {
throw new Error("Missing required env var 'MOMENTO_API_KEY_SECRET_NAME");
}
if (_cacheClient === undefined) {
const momentoApiKey = await getSecret(apiKeySecretName);
console.log('Retrieved secret!');
// eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-assignment
_cacheClient = await CacheClient.create({
configuration: Configurations.Lambda.latest(),
credentialProvider: CredentialProvider.fromString({apiKey: momentoApiKey}),
defaultTtlSeconds: 60,
});
}
return _cacheClient;
}
- 最後に、トークンを生成してトピックにユーザー・メッセージを公開するサ ンプル・コードを以下に示す。現実の世界では これらは理想的には別々のマイクロサービスに住みわけることになります。
import {GetSecretValueCommand, SecretsManagerClient} from '@aws-sdk/client-secrets-manager';
import {
AuthClient,
Configurations,
CredentialProvider,
DisposableTokenScopes,
ExpiresIn,
GenerateDisposableTokenResponse,
TopicClient,
TopicPublishResponse,
} from '@gomomento/sdk';
const _secretsClient = new SecretsManagerClient({});
const _cachedSecrets = new Map<string, string>();
let _authClient: AuthClient | undefined = undefined;
let _topicClient: TopicClient | undefined = undefined;
const cacheName = 'course-comments';
const topicName = 'comment';
export const handler = async () => {
try {
const authClient = await getAuthClient();
const eventsPublishToken = await authClient.generateDisposableToken(
DisposableTokenScopes.topicPublishOnly(cacheName, topicName),
ExpiresIn.minutes(30),
{tokenId: 'taylor'}
);
switch (eventsPublishToken.type) {
case GenerateDisposableTokenResponse.Success: {
console.log('Generated a disposable API key with access to the "events" topic in the "cache" cache!');
// logging only a substring of the tokens, because logging security credentials is not advisable :)
//console.log(`API key starts with: ${eventsPublishToken.authToken.substring(0, 10)}`);
//console.log(`Expires At: ${eventsPublishToken.expiresAt.epoch()}`);
console.log('Publishing to the "events" topic in the "cache" cache! using the generated disposable token');
const topicClient = getTopicClient(eventsPublishToken.authToken);
const message = JSON.stringify({
comment: 'This course and video is awesome!',
courseId: 123,
});
console.log('Message: ' + message);
const publishResponse = await topicClient.publish(cacheName, topicName, message);
switch (publishResponse.type) {
case TopicPublishResponse.Success:
console.log('Published to the "events" topic in the "cache" cache!');
break;
case TopicPublishResponse.Error:
throw new Error(
`An error occurred while attempting to publish to the "events" topic in the "cache" cache: ${publishResponse.errorCode()}: ${publishResponse.toString()}`
);
}
break;
}
case GenerateDisposableTokenResponse.Error:
throw new Error(
`An error occurred while attempting to call generateApiKey with disposable token scope: ${eventsPublishToken.errorCode()}: ${eventsPublishToken.toString()}`
);
}
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
},
body: '{}',
};
} catch (err) {
console.log(err);
return {
statusCode: 500,
body: JSON.stringify({
message: 'An error occurred!',
}),
};
}
};
async function getAuthClient(): Promise<AuthClient> {
const apiKeySecretName = process.env.MOMENTO_API_KEY_SECRET_NAME;
if (apiKeySecretName === undefined) {
throw new Error("Missing required env var 'MOMENTO_API_KEY_SECRET_NAME");
}
if (_authClient === undefined) {
const momentoApiKey = await getSecret(apiKeySecretName);
console.log('Retrieved secret!');
// eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-assignment
_authClient = new AuthClient({
credentialProvider: CredentialProvider.fromString({apiKey: momentoApiKey}),
});
}
return _authClient;
}
function getTopicClient(disposableTokenKey: string): TopicClient {
if (_topicClient === undefined) {
console.log('Retrieved secret!');
_topicClient = new TopicClient({
configuration: Configurations.Lambda.latest(),
credentialProvider: CredentialProvider.fromString({apiKey: disposableTokenKey}),
});
}
return _topicClient;
}
async function getSecret(secretName: string): Promise<string> {
if (!_cachedSecrets.has(secretName)) {
const secretResponse = await _secretsClient.send(new GetSecretValueCommand({SecretId: secretName}));
if (secretResponse) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
_cachedSecrets.set(secretName, secretResponse.SecretString!);
} else {
throw new Error(`Unable to retrieve secret: ${secretName}`);
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return _cachedSecrets.get(secretName)!;
}
See More
- Momentoトピック、ウェブフック、トークンを統合することで、セキュアでステートレスな非同期システムを作成できます。この規約は、多言語チャットアプリ、オンラインポーリング、イベント駆動型システムなど、さまざまなユースケースに適用できます。