どーも、シローです。
前回(https://shiro-secret-base.com/?p=1004)に続いて、チャットに接続したユーザ情報や、投稿したメッセージをDynamoDBに登録してみます。
前回の確認
ローカルでWebSocketを立ち上げて、動作確認ができるようになっている前提で進めます。
DynamoDBのテーブルをコンテナ起動時に作成するように修正
テーブル定義は後ほどsls deploy
コマンドでも利用できるようにyml形式で記述します。公式 => (https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html)
以下のようにファイルを作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
. ├── dynamodb-tables │ ├── ChatMessage.yml <= 作成 │ └── UserConnection.yml <= 作成 ├── functions │ └── socket │ ├── connect │ │ └── index.js │ ├── disconnect │ │ └── index.js │ └── sendMessage │ └── index.js └── serverless.yml |
./serverless/src/dynamodb-tables/UserConnection.yml
1 2 3 4 5 6 7 8 9 10 |
TableName: UserConnection AttributeDefinitions: - AttributeName: connectionId AttributeType: S KeySchema: - AttributeName: connectionId KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 |
./serverless/src/dynamodb-tables/ChatMessage.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
TableName: ChatMessage AttributeDefinitions: - AttributeName: roomSlug AttributeType: S - AttributeName: requestTime AttributeType: N KeySchema: - AttributeName: roomSlug KeyType: HASH - AttributeName: requestTime KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 |
次に、これらのymlファイルをDockerのコンテナ起動時に読み込んでローカルのDynamoDBのテーブルを作成するためにstartup.sh
を修正します。
./docker/serverless/startup.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/bin/bash cd /app/dynamodb-tables for table_file in $(ls) do table_name=$(echo ${table_file} | awk -F '.' '{print $1}') aws dynamodb describe-table --table-name ${table_name} --endpoint-url http://dynamodb:8000 if [ $? -gt 0 ]; then # テーブルがない場合は新規作成 aws dynamodb create-table --cli-input-json "$(yq . ${table_file})" --endpoint-url http://dynamodb:8000 fi done cd /app npm install -S aws-sdk sls offline start --host 0.0.0.0 --port 3000 |
再度ビルドし直して、コンテナを再起動します。
1 2 |
$ docker-compose build serverless $ docker-compose up |
NoSQLWorkbenchでローカルのDynamoDBに作成したテーブルを確認
コンテナを再起動すれば、ChatMessage
、UserConnection
テーブルが作成されているはずです。
実際に確認するにはNoSQLWorkbenchを使います。
インストールはこちら => (https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/workbench.settingup.html)
ローカルのDynamoDBに接続する際にはdocker-compose.ymlで指定したローカルのポート番号(8889)を指定するのに注意します。
DynamoDB、APIGatewayの利用スクリプトを作成
以下のファイルを./serverless/src/functions
配下に作成します。
- modules/dynamodbClient.js ... DynamoDBを利用するためのクライアントを作成
- modules/apiGw.js ... APIGatewayを利用するためのクライアントを作成
- models/UserConnection.js ... チャットに接続しているユーザのテーブルにアクセスする
- models/ChatMessage.js ... チャットメッセージのテーブルにアクセスする
とその前にnpmで以下のパッケージをインストールします。
- aws-sdk
- serverless-offline
- serverless-websockets-plugin
コンテナに入ってインストールしても良いですが、起動時にインストールできるようにdocker/serverless/startup.sh
を編集します。
models/dynamodbClient.js
1 2 3 4 5 6 7 8 9 |
const AWS = require('aws-sdk'); const getDynamodbClient = () => { return new AWS.DynamoDB.DocumentClient({ region: 'localhost', endpoint: 'http://dynamodb:8000', }); }; exports.dynamodbClient = getDynamodbClient(); |
modules/apiGw.js
1 2 3 4 5 6 7 8 9 10 11 |
const AWS = require('aws-sdk'); require('aws-sdk/clients/apigatewaymanagementapi'); const getApigw = (domainName = null, stage = null) => { return new AWS.ApiGatewayManagementApi({ apiVersion: "2018-11-29", endpoint: 'http://localhost:3000' }); } exports.apigwManagementApi = getApigw; |
models/UserConnection.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
const { dynamodbClient } = require('../modules/dynamodbClient'); const TableName = 'UserConnection'; exports.scanUserConnection = async ({ FilterExpression, ExpressionAttributeValues }) => { const { Items } = await dynamodbClient.scan({ TableName, FilterExpression, ExpressionAttributeValues, }).promise(); return Items; } exports.createUserConnection = async (items) => { const params = { TableName, Item: { ...items } }; dynamodbClient.put(params, (err, data) => { if (err) { console.error(err); } else { console.log(data); } }); } exports.deleteUserConnection = async ({ connectionId }) => { const params = { TableName, Key: { connectionId, } }; dynamodbClient.delete(params, (err, data) => { if (err) { console.error(err); } else { console.log(data); } }); } |
models/ChatMessage.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
const { dynamodbClient } = require('../module/dynamodbClient'); exports.createChatMessage = async (items) => { const params = { TableName: 'ChatMessage', Item: { ...items } }; dynamodbClient.put(params, (err, data) => { if (err) { console.error(err); } else { console.log(data); } }); } |
チャットに接続・非接続、メッセージを送信するときの処理を修正
前回作成した
- functions/connect/index.js
- functions/disconnect/index.js
- functions/sendMessage/index.js
にUserConnection、ChatMessageテーブルにアクセスする処理を加えていきます。
functions/connect/index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
const { createUserConnection } = require('../../models/UserConnection'); exports.handler = async (event, context, callback) => { const { userId, roomSlug } = event.queryStringParameters; const { connectionId, requestTimeEpoch } = event.requestContext; console.log(userId, roomSlug, connectionId, requestTimeEpoch); const messageRecord = { connectionId, requestTime: requestTimeEpoch, userId, roomSlug } createUserConnection(messageRecord); callback(null, { statusCode: 200, body: 'connected' }) }; |
functions/disconnect/index.js
1 2 3 4 5 6 7 8 9 10 11 |
const { deleteUserConnection } = require('../../models/UserConnection'); exports.handler = async (event, context, callback) => { const { connectionId } = event.requestContext; deleteUserConnection({ connectionId }); callback(null, { statusCode: 200, body: "disconnected" }); }; |
functions/sendMessage/index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
const { apigwManagementApi } = require('../../modules/apiGw'); const { getUserConnection, deleteUserConnection, scanUserConnection, } = require('../../models/UserConnection'); const { createChatMessage } = require('../../models/ChatMessage'); exports.handler = async (event, context, callback) => { const body = JSON.parse(event.body); const messageBody = JSON.stringify({ message: body.message, }); const { connectionId, requestTimeEpoch } = event.requestContext; const { userId, roomSlug } = await getUserConnection({ connectionId }); const messageRecord = { requestTime: requestTimeEpoch, userId, roomSlug, messageBody } createChatMessage(messageRecord); const result = await scanUserConnection({ FilterExpression: 'roomSlug = :roomSlug', ExpressionAttributeValues: { ':roomSlug' : roomSlug } }); messageRecord.messageType = 'postMessage' const postParams = { Data: JSON.stringify(messageRecord) }; const apigw = apigwManagementApi(); result.forEach((element) => { console.log(element.connectionId); const postData = { ...postParams, ConnectionId: element.connectionId } apigw.postToConnection(postData, (err) => { console.log(err); console.log(postData); if (err && err.statusCode === 410) { deleteUserConnection({ connectionId: element.connectionId }); } }); }); }; |
動作確認
wscat
コマンドでwscat -c 'ws://localhost:3334?userId=1&roomSlug=2'
でソケットに接続、
もう片方はuserIdを2に変えて接続
{"action": "sendMessage", "message": "hoge----"}
て打つと、もう片方にもメッセージが出力されると思います。
最後に
ここまで見てくれた人に、一応僕の環境で作った成果物をgitにあげましたので、
上手くいかなかったら、プルして見てもいいかもです。
(最初から、それでええやんとか言ってはいけない)
https://github.com/smithshiro/docker-natto-env
クラウドコンピューティングの技術の中でも、昨今注目を集めているのがサーバーレスアーキテクチャです。
FaaS(Function-As-A-Service)とも呼ばれるサーバーレスアーキテクチャは、ファンクション(関数)と呼ばれるマイクロサービスを実装し、組み合わせながら、サービスを構築していくアーキテクチャです。