AWS Lambda で遊ぼう(第10話: WebSocket篇③ - リアルタイムチャット完成篇)

夢のリアルタイム Web を完成させるため、超大作 (?) の Lambda を少しずつ作り上げていくお話です。 最初から読まないとワケが解らないかもです。

tercel-tech.hatenablog.com

tercel-tech.hatenablog.com

本日の導入

AWS SAM でつくる WebSocket アプリケーションもいよいよ大詰めを迎えました。

前回からだいぶ間が空いてしまい、かなりモチベーションが下がってしまっているので、ここで先に完成品のイメージを見てみましょう。

youtu.be

おぉ…

リアルタイムチャットだ…

今日という今日はコレを完成させます!

前回までのおさらい

これまで前回前々回と、AWS SAM を使い、WebSocket の接続時と切断時の Lambda を実装しました。

ここまでは非常に汎用性が高いというか、リアルタイムチャットに限らず AWS SAM で WebSocket アプリを作ろうとするとほぼ避けては通れない手順です。

一方、クライアント ⇄ サービス間の双方向通信の具体的な実装についてはこれまで触れておりませんでした。

特にサービス側からクライアントに向けてプッシュ型で情報通知が行えるだいを味わうことなく立ち消えになるのは非常に勿体無い。 ということで、本日ついにリアルタイムチャットを完成させようと思います。

今回はバックエンドだけでなくフロントエンド実装も込みという盛り沢山な内容でお届けいたします。

上り方向の通信

初めに、上り(クライアント → サービス)方向の処理を実装します。

クライアント(フロントエンド)

クライアント側は HTML と JavaScript で作りましょう。 通常、HTML と JavaScript は別々のファイルに分けますが、ここでは話を簡単にするため1つの HTML ファイル(index.html)にまとめてしまいます。

ちなみにこの HTML ファイルは、SAM プロジェクトには含めません。 本番リリース時には別途 S3 など、HTML をホスティングできるサービスにアップロードする必要がありますが、動作確認目的であれば、さしあたり PC のローカル環境などに保存しておけばよいでしょう。

■ index.html

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8">
</head>
<body>
  
  <!-- 入力ボックスと投稿ボタン -->
  <input id="message" autocomplete="false">
  <button type="button" onclick=sendMessage()>投稿</button>

  <script>
    const ws_endpoint = `wss://████.execute-api.ap-northeast-1.amazonaws.com/Prod`;
    const ws = new WebSocket(ws_endpoint);

    // WebSocket 接続確立時のイベント処理
    ws.onopen = event => {
      console.log('OPEN');
    }

    // メッセージ受信時のイベント処理
    ws.onmessage = event => {
      console.log(event);
    }

    // 投稿ボタンが押されたときのイベント処理
    sendMessage = () => {
      document.getElementById('message').value = '';
    }
  </script>
</body>
</html>

上記のコードで伏せられている URL は、お使いの環境で SAM プロジェクトをsam deploy したときにコンソール上に表示されます(前々回の記事参照)。

WebSocket は非同期の双方向通信ですので、メッセージを送信するための処理と、受信したメッセージを処理するための処理をそれぞれ別々に用意する必要があります。

そのため、先ほどの JavaScript 部分にもいくつかのメソッドをから実装しています。 具体的な実装は追い追いコーディングしていきましょう。

送信機能の実装

WebSocketオブジェクトのsendメソッドで、メッセージを送信できます。

送信ボタンの押下イベント(sendMessage)に送信処理を追加してみましょう。

■ index.htmlsendMessage抜粋)

    // 投稿ボタンが押されたときのイベント処理
    sendMessage = () => {
      const messageText = document.getElementById('message').value;
      if(!messageText) return;

      // 入力文字列を送信
      ws.send(JSON.stringify({
        message: 'sendMessage',
        messageText: messageText
      }));

      // 入力ボックスの中身をクリア
      document.getElementById('message').value = '';
    }

──ところで、ここで何気なく書いた送信電文の message: 'sendMessage' が気になる方もいるかと思います。

これは非常に重要な意味を持つので、のちほど、フロントエンド・バックエンドの実装がひととおり揃ったところで改めて説明します。

Lambda(バックエンド)

ここからは Lambda の実装に移ります。

前回、WebSocket のプロジェクトでは、新規の Lambda を追加するごとに、下図の青色のセットが必要だという話をしました。

今回も新たな Lambda を追加しますので、Route, Integration, Permission, そして Function のリソース定義を template.yamlResourcesに追加していきましょう。

既存のリソースをコピペする場合は修正漏れにご注意ください。

■ template.yaml(追加分のみ)

  # メッセージ送信 - ルート    
  SendRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      RouteKey: sendMessage
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref SendInteg
  
  # メッセージ送信 - 統合リソース
  SendInteg:
    Type: AWS::ApiGatewayV2::Integration
    DependsOn:
      - SendMessageFunction
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${SendMessageFunction.Arn}/invocations

  # メッセージ送信 - パーミッション
  SendMessagePermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleChatWebSocket
      - SendMessageFunction
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref SendMessageFunction
      Principal: apigateway.amazonaws.com

  # メッセージ送信 - Lambda
  SendMessageFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: send_message  # ★Lambda関数のフォルダ名★
      Handler: app.lambda_handler
      Runtime: python3.9
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref ConnectionsTable
      Environment:
        Variables:
          CONNECTIONS_TABLE: !Ref ConnectionsTable
Lambda 関数の実装

続いて、Lambda 関数を追加します。 下図のようにon_connectフォルダをコピペして、send_messageにリネームしましょう。

これは、先ほど template.yaml 内のリソースSendMessageFunctionにて、CodeUri: send_message と記述したので、それに合わせる形でフォルダ名をsend_messageとしています。

続いてsend_messageフォルダ内のapp.pyを開き、以下のように書き換えます。

■ send_message/app.py

def lambda_handler(event, context):
    print(event['body'])
    return {}

Lambda の実装が終わったら、sam build -usam deploy でリソース群をデプロイします。

デプロイに成功したら、いよいよ動作確認です。

動作確認

本日の初めに作成した index.html をブラウザで開き、入力ボックスに適当な文字を入れて「投稿」ボタンを押しましょう。

CloudWatch Logs に、クライアントから送信したメッセージがそのまま出力されています。 無事に、クライアントから WebSocket 通信を契機に Lambda を起動できたことが確認できました。

上り側の処理は、これで完成です。

ルート選択式の正体

クライアントから新たな Lambda 関数が無事に起動できたところで、今まで触れずにいたことについて話していきたいと思います。

なぜ、クライアント側のコードには、起動したい Lambda の関数名が書いてあるわけではないにもかかわらず、特定の Lambda を起動できたのか──。

これまでに書いてきたコードを並べてみましょう。

JavaScript側で指定したmessage: 'sendMessage' は、template.yaml

  • RouteSelectionExpression: "$request.body.message"
  • RouteKey: sendMessage

── に、それぞれ対応づいていることが分かるかと思います。

次の段落で、もう少しだけ詳しくお話します。

RouteSelectionExpression

まずそもそも、一般的にリクエストの JSON には { "Hoge": "foo", "Fuga": "bar", "Piyo": "baz" ... }など、さまざまな項目がおびただしく並んでいる可能性があります。

このままだと、リクエストを受け付ける側(サービス側)は、JSON のどこを見て振り分ければよいのか判断できないので、前もって「振り分けるときにはこのキー項目を使うよ」という宣言をしておくのです。

これこそがRouteSelectionExpressionルート選択式と呼ばれるものです。

ここではRouteSelectionExpression: "$request.body.message"と指定したので、(リクエスト本体のJSON電文の)messageというキー項目を用いてリクエストを振り分けることになります。

RouteKey

さらに、リクエスJSONmessage の値が、Router の RouteKey の設定値 ── ここでは sendMessage にマッチした場合は、当該 Router に紐づく Lambda が(Integration経由で)呼ばれます。

下図は、クライアントが送信した JSON データに応じて、RouteKeyに基づき振り分けが行われるイメージです。 もしも新たに Lambda を追加したい場合は、RouteKey の異なる別の Router を設けるのが簡単でしょう。


ここまでで、クライアント → サービス方向の通信は実装できました。

続いて、サービス → クライアント方向の通信を実装していきましょう。

下り方向の通信

まずは一番シンプルな題材として、クライアントから送られてきたデータを、そのまま送信元のクライアントだけにおう返しする Lambda を実装します。

Lambda(バックエンド)

クライアントに向けてデータを送る処理を Lambda で実装するには、ApiGatewayManagementApi というオブジェクトを使います。

実装のポイントは以下の通りです。

  • ApiGatewayManagementApi オブジェクトを初期化する際、エンドポイント URL をパラメータとして渡す必要があるため、Lambda の event に含まれるリクエストコンテキストを使って URL を組み立てます。
  • データを送信する際には、送信先のクライアントを識別するためにコネクションID が必要になります。

今回は、送信者本人にメッセージを送り返すだけですので、eventから直接コネクションIDを取得して使いましょう。

■ send_message/app.py

import boto3
import json
import os

def lambda_handler(event, context):
    # コネクションIDを取得
    connection_id = event['requestContext']['connectionId']
    
    # ApiGatewayManagementApi オブジェクトの作成
    domain = event["requestContext"]["domainName"]
    stage = event["requestContext"]["stage"]
    
    apigw = boto3.client(
        'apigatewaymanagementapi',
        endpoint_url=f'https://{domain}/{stage}'
    )
    
    # コネクションIDを指定してクライアントにデータをPOST
    apigw.post_to_connection(
        ConnectionId=connection_id,
        Data=event['body']
    )

    return {}

さらに、Lambda 関数から ApiGatewayManagementApi を使ってクライアントにデータを送るには、Lambda に対して追加の権限設定が必要となります。

template.yamlSendMessageFunctionリソースの Policies を、以下のように修正しましょう。

■ template.yaml(抜粋)

  SendMessageFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: send_message
      Handler: app.lambda_handler
      Runtime: python3.9
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref ConnectionsTable
        # ★追加ここから★
        - Statement:
          - Effect: Allow
            Action:
            - 'execute-api:ManageConnections'
            Resource:
            - !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${SimpleChatWebSocket}/*'
        # ★追加ここまで★
      Environment:
        Variables:
          CONNECTIONS_TABLE: !Ref ConnectionsTable

クライアント(フロントエンド)

せっかくですので、Lambda から受信したデータをリアルタイムに画面に反映されるように HTML を改修してみましょう。

データを一覧表示できるよう<ul id="list"></ul>タグを追加し、さらにデータ受信時のイベント処理で <li></li>タグを随時挿入する DOM 操作を行なっています。

■ index.html

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8">
</head>
<body>
  
  <!-- 入力ボックスと投稿ボタン -->
  <input id="message" autocomplete="false">
  <button type="button" onclick=sendMessage()>投稿</button>

  <hr />

  <!-- ★追加ここから★ -->
  <ul id="list"></ul>
  <!-- ★追加ここまで★ -->

  <script>
    const ws_endpoint = `wss://████.execute-api.ap-northeast-1.amazonaws.com/Prod`;
    const ws = new WebSocket(ws_endpoint);

    // WebSocket 接続確立時のイベント処理
    ws.onopen = _ => {
      console.log('OPEN');
    }

    // ★変更ここから★
    // メッセージ受信時のイベント処理
    ws.onmessage = event => {
      const data = JSON.parse(event.data);
      const messageText = data.messageText;

      // 受信したメッセージを HTML のリストに追加
      const list = document.getElementById('list');
      const item = document.createElement('li');
      item.textContent = messageText;

      list.insertBefore(item, list.firstChild);

      console.log(`receive: ${messageText}`);
    }
    // ★変更ここまで★

    // 投稿ボタンが押されたときのイベント処理
    sendMessage = () => {
      const messageText = document.getElementById('message').value;
      if(!messageText) return;
      console.log(`send: ${messageText}`);

      // 入力文字列を送信
      ws.send(JSON.stringify({
        message: 'sendMessage',
        messageText: messageText
      }));

      // 入力ボックスの中身をクリア
      document.getElementById('message').value = '';
    }
  </script>
</body>
</html>

動作確認

ふたたび HTML をブラウザで開き、動作確認してみましょう。

テキストを入力し「投稿」ボタンを押すと、投稿したメッセージが増えていることが確認できます。

まだまだ動きはしょぼいですが、これは一度、Lambda を通って戻ってきたメッセージなのです。

ただし、これではただの 1:1 通信であり、チャットアプリとは似て非なる代物です。 本日の最後の仕上げに、メッセージをブロードキャストできるようにしましょう。

チャットの完成

ここまでの復習となりますが、ApiGatewayManagementApi では、コネクションID を指定してクライアントにデータを送信できるのでした。

そして、前回前々回に作った DynamoDB テーブルには、接続中のコネクションIDが管理されているのでした。

つまり、DynamoDB からコネクションID を取り出し、その一つひとつにデータを送ることができれば、Lambda から複数のクライアントに対してマルチキャストできることになります。

DynamoDB からデータを取り出すには、目的に応じてscanqueryget_itemなどさまざまな方式がありますが、ここでは全アイテムを取得したいのでscanを使います。

本番環境など、同時接続数が多い場合は、キャパシティユニットの消費量に注意しましょう。

■ send_message/app.py

import boto3
import json
import os

dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):
    # ApiGatewayManagementApi オブジェクトの初期化 
    domain = event["requestContext"]["domainName"]
    stage = event["requestContext"]["stage"]
    
    apigw = boto3.client(
        'apigatewaymanagementapi', 
        endpoint_url=f'https://{domain}/{stage}'
    )
    
    
    # テーブルを取得
    table_name = os.environ['CONNECTIONS_TABLE']
    connections_table = dynamodb.Table(table_name)
    
    # テーブルから接続中のコネクションIDを取得
    connection_id_list = connections_table.scan()
    for item in connection_id_list['Items']:
        connection_id = item['id']
    
        # コネクションIDを指定してクライアントにデータをPOST
        apigw.post_to_connection(
            ConnectionId=connection_id,
            Data=event['body']
        )

    return {}

これでついに完成です。

sam build -usam deployAWS にデプロイしましょう。

動作確認

最後の動作確認です。 ブラウザを2つ立ち上げて、送信したデータがリアルタイムで同期されることを確認してみましょう。

youtu.be

UI はめちゃくちゃしょぼいですが、バックエンドはなかなかすごいテクノロジーが使われているのです。

まとめ

なっっっが!!!

これ、Lambda なんて使わず、Node.jsで素直に実装した方がよいのでは…

まぁでも、IoT Core でセンサから吸い上げたデータを、Webのダッシュボードにリアルタイム送信したりするといった利用用途なら使い出があるんじゃないでしょうか。

Lambda と相性が良さそうですし。

なるほど…

あと、完全にサーバレスという仕組みはなんだかんだいって良いと思います。

そして、必要なインフラ一式もコードから生成できる IaC も、また良いものだと思います。

当時、自分でやってみていろいろハマったので、ゼロから作れるように順序立てて構築手順をまとめ直してみたいとかねがね思ったのですが、いかんせん template.yaml の分量が多く、やむなく記事を3分割させることになりました。

最後に、template.yaml の全容を置いておきます。 最後までお読みいただきありがとうございました。

■ template.yaml(最終版)

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: ""

Globals:
  Function:
    Timeout: 3

Resources:
  SimpleChatWebSocket:
    Type: AWS::ApiGatewayV2::Api
    Properties:
      Name: SimpleChatWebSocket
      ProtocolType: WEBSOCKET
      RouteSelectionExpression: "$request.body.message"

  ConnectRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      RouteKey: $connect
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref ConnectInteg
  
  ConnectInteg:
    Type: AWS::ApiGatewayV2::Integration
    DependsOn:
      - OnConnectFunction
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnConnectFunction.Arn}/invocations
  
  OnConnectPermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleChatWebSocket
      - OnConnectFunction
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref OnConnectFunction
      Principal: apigateway.amazonaws.com
  
  DisconnectRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      RouteKey: $disconnect
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref DisconnectInteg
  
  DisconnectInteg:
    Type: AWS::ApiGatewayV2::Integration
    DependsOn:
      - OnDisconnectFunction
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${OnDisconnectFunction.Arn}/invocations

  OnDisconnectPermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleChatWebSocket
      - OnDisconnectFunction
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref OnDisconnectFunction
      Principal: apigateway.amazonaws.com

  SendRoute:
    Type: AWS::ApiGatewayV2::Route
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      RouteKey: sendMessage
      Target: !Join
        - '/'
        - - 'integrations'
          - !Ref SendInteg
  
  SendInteg:
    Type: AWS::ApiGatewayV2::Integration
    DependsOn:
      - SendMessageFunction
    Properties:
      ApiId: !Ref SimpleChatWebSocket
      IntegrationType: AWS_PROXY
      IntegrationUri: !Sub arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${SendMessageFunction.Arn}/invocations

  SendMessagePermission:
    Type: AWS::Lambda::Permission
    DependsOn:
      - SimpleChatWebSocket
      - SendMessageFunction
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !Ref SendMessageFunction
      Principal: apigateway.amazonaws.com

  OnConnectFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: on_connect
      Handler: app.lambda_handler
      Runtime: python3.9
      Policies:
        - DynamoDBWritePolicy:
            TableName: !Ref ConnectionsTable
      Environment:
        Variables:
          CONNECTIONS_TABLE: !Ref ConnectionsTable

  OnDisconnectFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: on_disconnect
      Handler: app.lambda_handler
      Runtime: python3.9
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref ConnectionsTable
      Environment:
        Variables:
          CONNECTIONS_TABLE: !Ref ConnectionsTable

  SendMessageFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: send_message
      Handler: app.lambda_handler
      Runtime: python3.9
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref ConnectionsTable
        - Statement:
          - Effect: Allow
            Action:
            - 'execute-api:ManageConnections'
            Resource:
            - !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${SimpleChatWebSocket}/*'
      Environment:
        Variables:
          CONNECTIONS_TABLE: !Ref ConnectionsTable
          
  ConnectionsTable:
    Type: AWS::Serverless::SimpleTable

  Stage:
    Type: AWS::ApiGatewayV2::Stage
    Properties:
      StageName: Prod
      ApiId: !Ref SimpleChatWebSocket
      AutoDeploy: true

Outputs:
  WebSocketURI:
    Description: "The WSS Protocol URI to connect to"
    Value: !Sub "wss://${SimpleChatWebSocket}.execute-api.${AWS::Region}.amazonaws.com/${Stage}"

Copyright (c) 2012 @tercel_s, @iTercel, @pi_cro_s.