※ 夢のリアルタイム Web を完成させるため、超大作 (?) の Lambda を少しずつ作り上げていくお話です。 最初から読まないとワケが解らないかもです。
本日の導入
AWS SAM でつくる WebSocket アプリケーションもいよいよ大詰めを迎えました。
前回からだいぶ間が空いてしまい、かなりモチベーションが下がってしまっているので、ここで先に完成品のイメージを見てみましょう。
おぉ……
リアルタイムチャットだ……
今日という今日はコレを完成させます!
前回までのおさらい
これまで前回・前々回と、AWS SAM を使い、WebSocket の接続時と切断時の Lambda を実装しました。
ここまでは非常に汎用性が高いというか、リアルタイムチャットに限らず AWS SAM で WebSocket アプリを作ろうとするとほぼ避けては通れない手順です。
一方、クライアント ⇄ サービス間の双方向通信の具体的な実装についてはこれまで触れておりませんでした。
特にサービス側からクライアントに向けてプッシュ型で情報通知が行える醍醐味を味わうことなく立ち消えになるのは非常に勿体無い。 ということで、本日ついにリアルタイムチャットを完成させようと思います。
今回はバックエンドだけでなくフロントエンド実装も込みという盛り沢山な内容でお届けいたします。
上り方向の通信
初めに、上り(クライアント → サービス)方向の処理を実装します。
クライアント(フロントエンド)
クライアント側は HTML と JavaScript で作りましょう。 通常、HTML と JavaScript は別々のファイルに分けますが、ここでは話を簡単にするため1つの HTML ファイル(index.html)にまとめてしまいます。
ちなみにこの HTML ファイルは、SAM プロジェクトには含めません。 本番リリース時には別途 S3 など、HTML をホスティングできるサービスにアップロードする必要がありますが、動作確認目的であれば、さしあたり PC のローカル環境などに保存しておけばよいでしょう。
<!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> </head> <body> <!-- 入力ボックスと投稿ボタン --> <input id="message" autocomplete="false"> <button type="button" onclick=sendMessage()>投稿</button> <script> const ws_endpoint = `wss://████.execute-api.ap-northeast-1.amazonaws.com/Prod`; const ws = new WebSocket(ws_endpoint); // WebSocket 接続確立時のイベント処理 ws.onopen = event => { console.log('OPEN'); } // メッセージ受信時のイベント処理 ws.onmessage = event => { console.log(event); } // 投稿ボタンが押されたときのイベント処理 sendMessage = () => { document.getElementById('message').value = ''; } </script> </body> </html>
上記のコードで伏せられている URL は、お使いの環境で SAM プロジェクトをsam deploy
したときにコンソール上に表示されます(前々回の記事参照)。
WebSocket は非同期の双方向通信ですので、メッセージを送信するための処理と、受信したメッセージを処理するための処理をそれぞれ別々に用意する必要があります。
そのため、先ほどの JavaScript 部分にもいくつかのメソッドを空実装しています。 具体的な実装は追い追いコーディングしていきましょう。
送信機能の実装
WebSocket
オブジェクトのsend
メソッドで、メッセージを送信できます。
送信ボタンの押下イベント(sendMessage
)に送信処理を追加してみましょう。
sendMessage
抜粋)// 投稿ボタンが押されたときのイベント処理 sendMessage = () => { const messageText = document.getElementById('message').value; if(!messageText) return; // 入力文字列を送信 ws.send(JSON.stringify({ message: 'sendMessage', messageText: messageText })); // 入力ボックスの中身をクリア document.getElementById('message').value = ''; }
──ところで、ここで何気なく書いた送信電文の message: 'sendMessage'
が気になる方もいるかと思います。
これは非常に重要な意味を持つので、のちほど、フロントエンド・バックエンドの実装がひととおり揃ったところで改めて説明します。
Lambda(バックエンド)
ここからは Lambda の実装に移ります。
前回、WebSocket のプロジェクトでは、新規の Lambda を追加するごとに、下図の青色のセットが必要だという話をしました。
今回も新たな Lambda を追加しますので、Route, Integration, Permission, そして Function のリソース定義を template.yaml の Resources
に追加していきましょう。
既存のリソースをコピペする場合は修正漏れにご注意ください。
# メッセージ送信 - ルート SendRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: !Ref SimpleChatWebSocket RouteKey: sendMessage Target: !Join - '/' - - 'integrations' - !Ref SendInteg # メッセージ送信 - 統合リソース SendInteg: Type: AWS::ApiGatewayV2::Integration DependsOn: - SendMessageFunction Properties: ApiId: !Ref SimpleChatWebSocket IntegrationType: AWS_PROXY IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${SendMessageFunction.Arn}/invocations # メッセージ送信 - パーミッション SendMessagePermission: Type: AWS::Lambda::Permission DependsOn: - SimpleChatWebSocket - SendMessageFunction Properties: Action: lambda:InvokeFunction FunctionName: !Ref SendMessageFunction Principal: apigateway.amazonaws.com # メッセージ送信 - Lambda SendMessageFunction: Type: AWS::Serverless::Function Properties: CodeUri: send_message # ★Lambda関数のフォルダ名★ Handler: app.lambda_handler Runtime: python3.9 Policies: - DynamoDBReadPolicy: TableName: !Ref ConnectionsTable Environment: Variables: CONNECTIONS_TABLE: !Ref ConnectionsTable
Lambda 関数の実装
続いて、Lambda 関数を追加します。 下図のようにon_connect
フォルダをコピペして、send_message
にリネームしましょう。
これは、先ほど template.yaml 内のリソースSendMessageFunction
にて、CodeUri: send_message
と記述したので、それに合わせる形でフォルダ名をsend_message
としています。
続いてsend_message
フォルダ内のapp.py
を開き、以下のように書き換えます。
def lambda_handler(event, context): print(event['body']) return {}
Lambda の実装が終わったら、sam build -u
、sam deploy
でリソース群をデプロイします。
デプロイに成功したら、いよいよ動作確認です。
動作確認
本日の初めに作成した index.html をブラウザで開き、入力ボックスに適当な文字を入れて「投稿」ボタンを押しましょう。
CloudWatch Logs に、クライアントから送信したメッセージがそのまま出力されています。 無事に、クライアントから WebSocket 通信を契機に Lambda を起動できたことが確認できました。
上り側の処理は、これで完成です。
ルート選択式の正体
クライアントから新たな Lambda 関数が無事に起動できたところで、今まで触れずにいたことについて話していきたいと思います。
なぜ、クライアント側のコードには、起動したい Lambda の関数名が書いてあるわけではないにも拘らず、特定の Lambda を起動できたのか──。
これまでに書いてきたコードを並べてみましょう。
JavaScript側で指定したmessage: 'sendMessage'
は、template.yaml の
RouteSelectionExpression: "$request.body.message"
RouteKey: sendMessage
── に、それぞれ対応づいていることが分かるかと思います。
次の段落で、もう少しだけ詳しくお話します。
RouteSelectionExpression
まずそもそも、一般的にリクエストの JSON には { "Hoge": "foo", "Fuga": "bar", "Piyo": "baz" ... }
など、さまざまな項目がおびただしく並んでいる可能性があります。
このままだと、リクエストを受け付ける側(サービス側)は、JSON のどこを見て振り分ければよいのか判断できないので、前もって「振り分けるときにはこのキー項目を使うよ」という宣言をしておくのです。
これこそがRouteSelectionExpression
と呼ばれるものです。
ここではRouteSelectionExpression: "$request.body.message"
と指定したので、(リクエスト本体のJSON電文の)message
というキー項目を用いてリクエストを振り分けることになります。
RouteKey
さらに、リクエスト JSON の message
の値が、Router の RouteKey
の設定値 ── ここでは sendMessage
にマッチした場合は、当該 Router に紐づく Lambda が(Integration経由で)呼ばれます。
下図は、クライアントが送信した JSON データに応じて、RouteKey
に基づき振り分けが行われるイメージです。 もしも新たに Lambda を追加したい場合は、RouteKey
の異なる別の Router を設けるのが簡単でしょう。
ここまでで、クライアント → サービス方向の通信は実装できました。
続いて、サービス → クライアント方向の通信を実装していきましょう。
下り方向の通信
まずは一番シンプルな題材として、クライアントから送られてきたデータを、そのまま送信元のクライアントだけに鸚鵡返しする Lambda を実装します。
Lambda(バックエンド)
クライアントに向けてデータを送る処理を Lambda で実装するには、ApiGatewayManagementApi というオブジェクトを使います。
実装のポイントは以下の通りです。
- ApiGatewayManagementApi オブジェクトを初期化する際、エンドポイント URL をパラメータとして渡す必要があるため、Lambda の
event
に含まれるリクエストコンテキストを使って URL を組み立てます。 - データを送信する際には、送信先のクライアントを識別するためにコネクションID が必要になります。
今回は、送信者本人にメッセージを送り返すだけですので、event
から直接コネクションIDを取得して使いましょう。
import boto3 import json import os def lambda_handler(event, context): # コネクションIDを取得 connection_id = event['requestContext']['connectionId'] # ApiGatewayManagementApi オブジェクトの作成 domain = event["requestContext"]["domainName"] stage = event["requestContext"]["stage"] apigw = boto3.client( 'apigatewaymanagementapi', endpoint_url=f'https://{domain}/{stage}' ) # コネクションIDを指定してクライアントにデータをPOST apigw.post_to_connection( ConnectionId=connection_id, Data=event['body'] ) return {}
さらに、Lambda 関数から ApiGatewayManagementApi を使ってクライアントにデータを送るには、Lambda に対して追加の権限設定が必要となります。
template.yaml の SendMessageFunction
リソースの Policies
を、以下のように修正しましょう。
SendMessageFunction: Type: AWS::Serverless::Function Properties: CodeUri: send_message Handler: app.lambda_handler Runtime: python3.9 Policies: - DynamoDBReadPolicy: TableName: !Ref ConnectionsTable # ★追加ここから★ - Statement: - Effect: Allow Action: - 'execute-api:ManageConnections' Resource: - !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${SimpleChatWebSocket}/*' # ★追加ここまで★ Environment: Variables: CONNECTIONS_TABLE: !Ref ConnectionsTable
クライアント(フロントエンド)
せっかくですので、Lambda から受信したデータをリアルタイムに画面に反映されるように HTML を改修してみましょう。
データを一覧表示できるよう<ul id="list"></ul>
タグを追加し、さらにデータ受信時のイベント処理で <li></li>
タグを随時挿入する DOM 操作を行なっています。
<!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8"> </head> <body> <!-- 入力ボックスと投稿ボタン --> <input id="message" autocomplete="false"> <button type="button" onclick=sendMessage()>投稿</button> <hr /> <!-- ★追加ここから★ --> <ul id="list"></ul> <!-- ★追加ここまで★ --> <script> const ws_endpoint = `wss://████.execute-api.ap-northeast-1.amazonaws.com/Prod`; const ws = new WebSocket(ws_endpoint); // WebSocket 接続確立時のイベント処理 ws.onopen = _ => { console.log('OPEN'); } // ★変更ここから★ // メッセージ受信時のイベント処理 ws.onmessage = event => { const data = JSON.parse(event.data); const messageText = data.messageText; // 受信したメッセージを HTML のリストに追加 const list = document.getElementById('list'); const item = document.createElement('li'); item.textContent = messageText; list.insertBefore(item, list.firstChild); console.log(`receive: ${messageText}`); } // ★変更ここまで★ // 投稿ボタンが押されたときのイベント処理 sendMessage = () => { const messageText = document.getElementById('message').value; if(!messageText) return; console.log(`send: ${messageText}`); // 入力文字列を送信 ws.send(JSON.stringify({ message: 'sendMessage', messageText: messageText })); // 入力ボックスの中身をクリア document.getElementById('message').value = ''; } </script> </body> </html>
動作確認
ふたたび HTML をブラウザで開き、動作確認してみましょう。
テキストを入力し「投稿」ボタンを押すと、投稿したメッセージが増えていることが確認できます。
まだまだ動きはしょぼいですが、これは一度、Lambda を通って戻ってきたメッセージなのです。
ただし、これではただの 1:1 通信であり、チャットアプリとは似て非なる代物です。 本日の最後の仕上げに、メッセージをブロードキャストできるようにしましょう。
チャットの完成
ここまでの復習となりますが、ApiGatewayManagementApi では、コネクションID を指定してクライアントにデータを送信できるのでした。
そして、前回と前々回に作った DynamoDB テーブルには、接続中のコネクションIDが管理されているのでした。
つまり、DynamoDB からコネクションID を取り出し、その一つひとつにデータを送ることができれば、Lambda から複数のクライアントに対してマルチキャストできることになります。
DynamoDB からデータを取り出すには、目的に応じてscan
やquery
、get_item
などさまざまな方式がありますが、ここでは全アイテムを取得したいのでscan
を使います。
本番環境など、同時接続数が多い場合は、キャパシティユニットの消費量に注意しましょう。
import boto3 import json import os dynamodb = boto3.resource('dynamodb') def lambda_handler(event, context): # ApiGatewayManagementApi オブジェクトの初期化 domain = event["requestContext"]["domainName"] stage = event["requestContext"]["stage"] apigw = boto3.client( 'apigatewaymanagementapi', endpoint_url=f'https://{domain}/{stage}' ) # テーブルを取得 table_name = os.environ['CONNECTIONS_TABLE'] connections_table = dynamodb.Table(table_name) # テーブルから接続中のコネクションIDを取得 connection_id_list = connections_table.scan() for item in connection_id_list['Items']: connection_id = item['id'] # コネクションIDを指定してクライアントにデータをPOST apigw.post_to_connection( ConnectionId=connection_id, Data=event['body'] ) return {}
これでついに完成です。
sam build -u
、sam deploy
で AWS にデプロイしましょう。
まとめ
なっっっが!!!
これ、Lambda なんて使わず、Node.jsで素直に実装した方がよいのでは……
まぁでも、IoT Core でセンサから吸い上げたデータを、Webのダッシュボードにリアルタイム送信したりするといった利用用途なら使い出があるんじゃないでしょうか。
Lambda と相性が良さそうですし。
なるほど……
あと、完全にサーバレスという仕組みはなんだかんだいって良いと思います。
そして、必要なインフラ一式もコードから生成できる IaC も、また良いものだと思います。
当時、自分でやってみていろいろハマったので、ゼロから作れるように順序立てて構築手順をまとめ直してみたいとかねがね思ったのですが、いかんせん template.yaml の分量が多く、やむなく記事を3分割させることになりました。
最後に、template.yaml の全容を置いておきます。 最後までお読みいただきありがとうございました。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: "" Globals: Function: Timeout: 3 Resources: SimpleChatWebSocket: Type: AWS::ApiGatewayV2::Api Properties: Name: SimpleChatWebSocket ProtocolType: WEBSOCKET RouteSelectionExpression: "$request.body.message" ConnectRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: !Ref SimpleChatWebSocket RouteKey: $connect Target: !Join - '/' - - 'integrations' - !Ref ConnectInteg ConnectInteg: Type: AWS::ApiGatewayV2::Integration DependsOn: - OnConnectFunction Properties: ApiId: !Ref SimpleChatWebSocket IntegrationType: AWS_PROXY IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations OnConnectPermission: Type: AWS::Lambda::Permission DependsOn: - SimpleChatWebSocket - OnConnectFunction Properties: Action: lambda:InvokeFunction FunctionName: !Ref OnConnectFunction Principal: apigateway.amazonaws.com DisconnectRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: !Ref SimpleChatWebSocket RouteKey: $disconnect Target: !Join - '/' - - 'integrations' - !Ref DisconnectInteg DisconnectInteg: Type: AWS::ApiGatewayV2::Integration DependsOn: - OnDisconnectFunction Properties: ApiId: !Ref SimpleChatWebSocket IntegrationType: AWS_PROXY IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnDisconnectFunction.Arn}/invocations OnDisconnectPermission: Type: AWS::Lambda::Permission DependsOn: - SimpleChatWebSocket - OnDisconnectFunction Properties: Action: lambda:InvokeFunction FunctionName: !Ref OnDisconnectFunction Principal: apigateway.amazonaws.com SendRoute: Type: AWS::ApiGatewayV2::Route Properties: ApiId: !Ref SimpleChatWebSocket RouteKey: sendMessage Target: !Join - '/' - - 'integrations' - !Ref SendInteg SendInteg: Type: AWS::ApiGatewayV2::Integration DependsOn: - SendMessageFunction Properties: ApiId: !Ref SimpleChatWebSocket IntegrationType: AWS_PROXY IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${SendMessageFunction.Arn}/invocations SendMessagePermission: Type: AWS::Lambda::Permission DependsOn: - SimpleChatWebSocket - SendMessageFunction Properties: Action: lambda:InvokeFunction FunctionName: !Ref SendMessageFunction Principal: apigateway.amazonaws.com OnConnectFunction: Type: AWS::Serverless::Function Properties: CodeUri: on_connect Handler: app.lambda_handler Runtime: python3.9 Policies: - DynamoDBWritePolicy: TableName: !Ref ConnectionsTable Environment: Variables: CONNECTIONS_TABLE: !Ref ConnectionsTable OnDisconnectFunction: Type: AWS::Serverless::Function Properties: CodeUri: on_disconnect Handler: app.lambda_handler Runtime: python3.9 Policies: - DynamoDBCrudPolicy: TableName: !Ref ConnectionsTable Environment: Variables: CONNECTIONS_TABLE: !Ref ConnectionsTable SendMessageFunction: Type: AWS::Serverless::Function Properties: CodeUri: send_message Handler: app.lambda_handler Runtime: python3.9 Policies: - DynamoDBReadPolicy: TableName: !Ref ConnectionsTable - Statement: - Effect: Allow Action: - 'execute-api:ManageConnections' Resource: - !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${SimpleChatWebSocket}/*' Environment: Variables: CONNECTIONS_TABLE: !Ref ConnectionsTable ConnectionsTable: Type: AWS::Serverless::SimpleTable Stage: Type: AWS::ApiGatewayV2::Stage Properties: StageName: Prod ApiId: !Ref SimpleChatWebSocket AutoDeploy: true Outputs: WebSocketURI: Description: "The WSS Protocol URI to connect to" Value: !Sub "wss://${SimpleChatWebSocket}.execute-api.${AWS::Region}.amazonaws.com/${Stage}"