AWS Lambda で遊ぼう(第5話: SQSといっしょ)

Lambda と Lambda を SQS で繋ぐ

はじめに

今日は、AWS SAM を使って Lambda と Lambda を SQS で繋ぐ構成を作ってみよう。

f:id:tercel_s:20210509110526p:plain

API Gateway をトリガとする最初の Lambda で UUID を 100件発行し、1件ずつ SQS に投入、後続の Lambda でそれを取り出してログに出力する」という超シンプルなマイクロサービスを構築していく。

今回も、第2話で述べたプロジェクト作成直後の状態をベースに話を進めよう。

つまり、初期状態で API Gateway とそれをトリガとする Lambda 関数までは構築が完了しているものとする。

f:id:tercel_s:20210509165056p:plain

また、このシリーズをまとめて読みたい人は、Lambda カテゴリのページを参照されたい。

tercel-tech.hatenablog.com
本日のお品書きはコチラ:

そもそも SQS とは

SQS とは、基本情報技術者試験にも出題されるキューqueueであり、サービス同士がデータを送受信する際のデータの通り道に過ぎない。

しかしながら、キューは非常に汎用性の高い道具であり、その使い所も多岐に亘る。

たとえば ──

  • 複数のデータをまとめて投入し、それらを後続処理で一つずつ取り出す
  • 1つずつバラバラに投入したデータを一定期間キューに溜めておき、後でまとめて取り出してバッチ的に捌く

── といった技巧を駆使することで、システムのボトルネックをある程度コントロールできる。

キューの使い所

大量データの処理を考えてみよう。 1件のデータを処理するのに1秒かかるものとする。

1つの Lambda の中で for文をぐるぐる回して全てのデータを逐次処理するよりも、とにかく「1件だけデータを取り出して処理する Lambda」を複数同時に並列実行させる方が、一般的により短時間で処理を終えることができる。

仮に100件のデータを処理する場合、前者は100秒かかるだろう。 後者の場合、同時実行数の上限を100以上に設定しておけば、理論上は1秒強*1でデータ処理が終わる計算になる*2

この他にも、Lambda 同士を直接呼び合うのではなく、間に SQS を挟む構成にすることで、よりフォールトトレラントなシステムを作ることができる。

たとえば宛先の DB にフェールオーバが発生しているタイミングでリクエストが飛んできたとしよう。 このとき、もしも SQS が無いと、即座に Internal Server Error が返却されてユーザビリティが低下してしまう。

一方、リクエストをキューに待たせておいて次のタイミングでリトライするように作っておけば、いたずらにユーザに不便を強いることが減るだろう。

SQS を知っておけば、友達ライバルのマイクロサービスアーキテクチャに差をつけることができそうだ。

【練習①】SQS リソースを作る

では早速、SQS(下図赤枠)を作ろう。

f:id:tercel_s:20210509165604p:plain

template.yaml の修正(2行)

Cloud9 のエディタで template.yaml を開き、以下の2行を追加するだけである。

たったこれだけで、SQS のリソースが設定できる(UuidQueueは、僕が勝手に名付けたキューの名前なので、好きに変えてよい)。

template.yaml (diff)

  Resources:
+   UuidQueue:
+     Type: AWS::SQS::Queue

    HelloWorldFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: hello_world/
        Handler: app.lambda_handler
        Runtime: python3.8
        Events:
          HelloWorld:
            Type: Api
            Properties:
              Path: /hello
              Method: get
デプロイと結果確認

ここで一度、デプロイをしてみよう。

初回は sam deploy --guided コマンドで対話的にデプロイを行うが(第2話参照)、2度目以降はsam deployのみでデプロイ可能である。

$ sam deploy

ターミナルで、新しいキューが作成される様子が(一瞬だけ)確認できる。

f:id:tercel_s:20210509131811p:plain

マネジメントコンソールにサインインして、SQS リソースができているか確認してみよう。

うたぐり屋のどんぐり……。

…。

f:id:tercel_s:20210509133106p:plain

無事に作成されている。

f:id:tercel_s:20210509133136p:plain

【練習②】SQS にデータを投入する Lambda を作る

続いて、SQSにデータを投入するよう、既存の Lambda 関数を修正(下図赤枠)する。

f:id:tercel_s:20210509165954p:plain

権限の追加

まずは、SQS リソースにデータを投入するため、Lambda に権限を追加しよう。

template.yaml (diff)

  Resources:
    UuidQueue:
      Type: AWS::SQS::Queue

    HelloWorldFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: hello_world/
        Handler: app.lambda_handler
        Runtime: python3.8
+       Policies:
+         - AmazonSQSFullAccess
        Events:
          HelloWorld:
            Type: Api
            Properties:
              Path: /hello
              Method: get
SQS リソースの参照の追加

次に、Lambda から SQS のリソースを環境変数経由でアクセスできるよう、template.yaml を修正する。

このあたりは、第3話を参照。

template.yaml (diff)

  Resources:
    UuidQueue:
      Type: AWS::SQS::Queue

    HelloWorldFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: hello_world/
        Handler: app.lambda_handler
        Runtime: python3.8
        Policies:
          - AmazonSQSFullAccess
        Events:
          HelloWorld:
            Type: Api
            Properties:
              Path: /hello
              Method: get
+       Environment:
+         Variables:
+           Queue: !GetAtt UuidQueue.QueueName
SQS にデータを投入する Lambda の作成

続いて、先ほど作成した SQS リソースにデータを投入する Lambda を作る。

hello_world/app.py

import os
import json
import uuid
import boto3

sqs = boto3.resource('sqs')

def lambda_handler(event, context):
    
    # 環境変数からキュー名を取得し、
    # キュー名からSQSリソースを取得する
    queue_name = os.environ['Queue']
    queue = sqs.get_queue_by_name(QueueName=queue_name)
    
    # 適当な UUID を 100個キューに投入する
    for _ in range(100):
        id = str(uuid.uuid4()))
        queue.send_message(MessageBody = id)
    
    return {
        "statusCode": 200,
        "body": json.dumps({
            "result": "Hello, World"
        }),
    }

【練習③】SQS からデータを取り出す Lambda を作る

最後に、下図赤枠 の Lambda を作る。

これまでとの違いは、起動契機が API Gateway ではなく SQS であること・既存の Lambda を改修するのではなく新規に追加することの2点である。

f:id:tercel_s:20210509170337p:plain

新規追加とはいえ、既にある Lambda を徹底T的にTパクるPことは可能である。

ソースの複製

ここでもう1つ Lambda をでっち上げる必要があるため、かなり強引な手法を取る。

Cloud9 の左側のペインで hello_world ディレクトリを右クリックし、ポップアップメニューから「Copy」をクリックする。

f:id:tercel_s:20210509141453p:plain

続いて、1階層上の sam-app ディレクトリを右クリックし、ポップアップメニューから「Paste」をクリックする。

f:id:tercel_s:20210509141512p:plain

すると、hello_world と同階層に、hello_world.1 というフォルダが丸ごとコピーされるので(下図緑枠)、右クリックメニューから「Rename」をクリックする(下図赤枠)。

f:id:tercel_s:20210509141529p:plain

hello_world.1ディレクトリ名を、hello_world2 に変更する。

f:id:tercel_s:20210509141547p:plain

ディレクトリをコピーしてリネームしたんだよね?

これってさぁ、
$ cp -r hello_world/ hello_world2/
コマンドで良かったのでは……?

……。

リソースの追加

続いて、Cloud9 のエディタで template.yaml を開き、もともと存在する HelloWorldFunction のブロック(下図緑枠)を確認する。

f:id:tercel_s:20210509144328p:plain

ここから、赤枠箇所を下図のようにコピペする。

f:id:tercel_s:20210509144345p:plain

さて、ペーストした YAML のブロックを、以下のように修正しよう。

Events.TypeApi ではなく SQS になっている点に注意すること。これで実行契機が HTTP リクエストではなく SQS をイベントソースとする Lambda になる。

template.yaml (diff)

  Resources:

    # 略

-   HelloWorldFunction:
+   HelloWorld2Function:
      Type: AWS::Serverless::Function
      Properties:
-       CodeUri: hello_world/
+       CodeUri: hello_world2/  # コピー先のディレクトリ名に合わせる
        Handler: app.lambda_handler
        Runtime: python3.8
+       Events:
+         HelloWorld2:
+           Type: SQS
+           Properties:
+             Queue: !GetAtt UuidQueue.Arn

これをデプロイすると、新しい Lambda 関数 sam-app-HelloWorld2Function-██████ が作成される。

f:id:tercel_s:20210509150157p:plain

Lambda 関数の実装

では実際にキューからデータを取り出す側の Lambda 関数hello_world2/app.pyを実装しよう。

だいぶん簡略化しているが、キューに投入したデータiiiiii, jjjjjj, ……, nnnnnn は、以下のようなデータ構造で包まれてevent パラメータに引き渡される。

{
  "Records":
    [
      { "body": iiiiii },
      { "body": jjjjjj },
        :
      { "body": nnnnnn }
    ]
}

hello_world2/app.py

def lambda_handler(event, context):
    records = event.get('Records', [])

    for record in records:
        id = record.get('body', '')
        print(id)
    
    return

この Lambda 関数は、自分で能動的に何かをしているわけではなく、先行処理からキューを通して引き継いだデータをただアホみたいに何も考えずログに吐いているだけである。

なので、この Lambda 関数のログに何か意味深な内容が出力されていれば、先行処理からキューを経由してデータが引き渡されていると言える寸法である。

デプロイと動作確認

ここからは確認作業と洒落込もうではないか。

sam deploy でプロジェクトをデプロイし、curl でエンドポイント URL にリクエストを送ってみよう。

f:id:tercel_s:20210509155044p:plain

マネジメントコンソールから Lambda を検索し、sam-app-HelloWorld2Function-██████ を選択する。

f:id:tercel_s:20210509155632p:plain

続いて、ページの中程のタブから「モニタリング」を選び、「CloudWatch のログを表示 」ボタンをクリックする。

f:id:tercel_s:20210509155751p:plain

f:id:tercel_s:20210509155754p:plain

すると、何やかんやあって夥しいログが表示される。

下図の緑枠に注目しよう。 これが、SQS を通って引き渡されてきたデータである。

キューのデータ到着とともに Lambda 関数が起動し、引き渡されたデータが表示された。 おめでとう。

f:id:tercel_s:20210509155815p:plain

まとめとか

SAM における SQS の最も簡単な使い方は以上である。

が、しかし、本文中で語りきれなかったことがいくつかある。

まず、SQS には 標準キューFIFOキュー という2種類のキューが存在する。

FIFOキューはその名の通り、さきいれさきだしが保証されるキューである。 投入順にデータが宛先へ到達する要件がある場合はこちらを選ぶことになる。

一方、標準キューは FIFO が必ずしも保証されないため*3、順序性が厳密に重視される要件には適さない。

その上、稀にではあるが同じデータが2回以上到達してしまうこともある。

よって、後続の Lambda 関数は、アプリケーションレベルで二重処理を回避する実装が要求される。 ちなみに今回利用した SQS は標準キューである。

あとはまぁ、可視性タイムアウトとか、デッドレターキューとか、いろいろ周辺知識があるのだが、今日はここまで。

*1:キューからデータを取り出すオーバヘッドがあるため。

*2:ただし、並列処理の特性として、必ずしもデータの到着順に処理が完了するとは限らない点に注意。

*3:あくまでベストエフォート。

Copyright (c) 2012 @tercel_s, @iTercel, @pi_cro_s.