Lambda と Lambda を SQS で繋ぐ
はじめに
今日は、AWS SAM を使って Lambda と Lambda を SQS で繋ぐ構成を作ってみよう。
「API Gateway をトリガとする最初の Lambda で UUID を 100件発行し、1件ずつ SQS に投入、後続の Lambda でそれを取り出してログに出力する」という超シンプルなマイクロサービスを構築していく。
今回も、第2話で述べたプロジェクト作成直後の状態をベースに話を進めよう。
つまり、初期状態で API Gateway とそれをトリガとする Lambda 関数までは構築が完了しているものとする。
また、このシリーズをまとめて読みたい人は、Lambda カテゴリのページを参照されたい。
tercel-tech.hatenablog.com
本日のお品書きはコチラ:
そもそも SQS とは
SQS とは、基本情報技術者試験にも出題されるキューであり、サービス同士がデータを送受信する際のデータの通り道に過ぎない。
しかしながら、キューは非常に汎用性の高い道具であり、その使い所も多岐に亘る。
たとえば ──
- 複数のデータをまとめて投入し、それらを後続処理で一つずつ取り出す
- 1つずつバラバラに投入したデータを一定期間キューに溜めておき、後でまとめて取り出してバッチ的に捌く
キューの使い所
大量データの処理を考えてみよう。 1件のデータを処理するのに1秒かかるものとする。
1つの Lambda の中で for
文をぐるぐる回して全てのデータを逐次処理するよりも、とにかく「1件だけデータを取り出して処理する Lambda」を複数同時に並列実行させる方が、一般的により短時間で処理を終えることができる。
仮に100件のデータを処理する場合、前者は100秒かかるだろう。 後者の場合、同時実行数の上限を100以上に設定しておけば、理論上は1秒強*1でデータ処理が終わる計算になる*2。
この他にも、Lambda 同士を直接呼び合うのではなく、間に SQS を挟む構成にすることで、よりフォールトトレラントなシステムを作ることができる。
たとえば宛先の DB にフェールオーバが発生しているタイミングでリクエストが飛んできたとしよう。 このとき、もしも SQS が無いと、即座に Internal Server Error が返却されてユーザビリティが低下してしまう。
一方、リクエストをキューに待たせておいて次のタイミングでリトライするように作っておけば、徒にユーザに不便を強いることが減るだろう。
SQS を知っておけば、友達のマイクロサービスアーキテクチャに差をつけることができそうだ。
【練習①】SQS リソースを作る
では早速、SQS(下図赤枠)を作ろう。
template.yaml の修正(2行)
Cloud9 のエディタで template.yaml
を開き、以下の2行を追加するだけである。
たったこれだけで、SQS のリソースが設定できる(UuidQueue
は、僕が勝手に名付けたキューの名前なので、好きに変えてよい)。
template.yaml
(diff)
Resources: + UuidQueue: + Type: AWS::SQS::Queue HelloWorldFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.8 Events: HelloWorld: Type: Api Properties: Path: /hello Method: get
デプロイと結果確認
ここで一度、デプロイをしてみよう。
初回は sam deploy --guided
コマンドで対話的にデプロイを行うが(第2話参照)、2度目以降はsam deploy
のみでデプロイ可能である。
$ sam deploy
ターミナルで、新しいキューが作成される様子が(一瞬だけ)確認できる。
マネジメントコンソールにサインインして、SQS リソースができているか確認してみよう。
疑り屋のどんぐり……。
……。
無事に作成されている。
【練習②】SQS にデータを投入する Lambda を作る
続いて、SQSにデータを投入するよう、既存の Lambda 関数を修正(下図赤枠)する。
権限の追加
まずは、SQS リソースにデータを投入するため、Lambda に権限を追加しよう。
template.yaml
(diff)
Resources: UuidQueue: Type: AWS::SQS::Queue HelloWorldFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.8 + Policies: + - AmazonSQSFullAccess Events: HelloWorld: Type: Api Properties: Path: /hello Method: get
SQS リソースの参照の追加
次に、Lambda から SQS のリソースを環境変数経由でアクセスできるよう、template.yaml
を修正する。
このあたりは、第3話を参照。
template.yaml
(diff)
Resources: UuidQueue: Type: AWS::SQS::Queue HelloWorldFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.8 Policies: - AmazonSQSFullAccess Events: HelloWorld: Type: Api Properties: Path: /hello Method: get + Environment: + Variables: + Queue: !GetAtt UuidQueue.QueueName
SQS にデータを投入する Lambda の作成
続いて、先ほど作成した SQS リソースにデータを投入する Lambda を作る。
hello_world/app.py
import os import json import uuid import boto3 sqs = boto3.resource('sqs') def lambda_handler(event, context): # 環境変数からキュー名を取得し、 # キュー名からSQSリソースを取得する queue_name = os.environ['Queue'] queue = sqs.get_queue_by_name(QueueName=queue_name) # 適当な UUID を 100個キューに投入する for _ in range(100): id = str(uuid.uuid4())) queue.send_message(MessageBody = id) return { "statusCode": 200, "body": json.dumps({ "result": "Hello, World" }), }
【練習③】SQS からデータを取り出す Lambda を作る
最後に、下図赤枠 の Lambda を作る。
これまでとの違いは、起動契機が API Gateway ではなく SQS であること・既存の Lambda を改修するのではなく新規に追加することの2点である。
新規追加とはいえ、既にある Lambda を徹底的にパクることは可能である。
ソースの複製
ここでもう1つ Lambda をでっち上げる必要があるため、かなり強引な手法を取る。
Cloud9 の左側のペインで hello_world
ディレクトリを右クリックし、ポップアップメニューから「Copy」をクリックする。
続いて、1階層上の sam-app
ディレクトリを右クリックし、ポップアップメニューから「Paste」をクリックする。
すると、hello_world
と同階層に、hello_world.1
というフォルダが丸ごとコピーされるので(下図緑枠)、右クリックメニューから「Rename」をクリックする(下図赤枠)。
hello_world.1
のディレクトリ名を、hello_world2
に変更する。
ディレクトリをコピーしてリネームしたんだよね?
これってさぁ、$ cp -r hello_world/ hello_world2/
コマンドで良かったのでは……?
……。
リソースの追加
続いて、Cloud9 のエディタで template.yaml
を開き、もともと存在する HelloWorldFunction
のブロック(下図緑枠)を確認する。
ここから、赤枠箇所を下図のようにコピペする。
さて、ペーストした YAML のブロックを、以下のように修正しよう。
Events.Type
が Api
ではなく SQS
になっている点に注意すること。これで実行契機が HTTP リクエストではなく SQS をイベントソースとする Lambda になる。
template.yaml
(diff)
Resources: # 略 - HelloWorldFunction: + HelloWorld2Function: Type: AWS::Serverless::Function Properties: - CodeUri: hello_world/ + CodeUri: hello_world2/ # コピー先のディレクトリ名に合わせる Handler: app.lambda_handler Runtime: python3.8 + Events: + HelloWorld2: + Type: SQS + Properties: + Queue: !GetAtt UuidQueue.Arn
これをデプロイすると、新しい Lambda 関数 sam-app-HelloWorld2Function-██████
が作成される。
Lambda 関数の実装
では実際にキューからデータを取り出す側の Lambda 関数hello_world2/app.py
を実装しよう。
だいぶん簡略化しているが、キューに投入したデータiiiiii
, jjjjjj
, ……, nnnnnn
は、以下のようなデータ構造で包まれてevent
パラメータに引き渡される。
{ "Records": [ { "body": iiiiii }, { "body": jjjjjj }, : { "body": nnnnnn } ] }
hello_world2/app.py
def lambda_handler(event, context): records = event.get('Records', []) for record in records: id = record.get('body', '') print(id) return
この Lambda 関数は、自分で能動的に何かをしているわけではなく、先行処理からキューを通して引き継いだデータをただアホみたいに何も考えずログに吐いているだけである。
なので、この Lambda 関数のログに何か意味深な内容が出力されていれば、先行処理からキューを経由してデータが引き渡されていると言える寸法である。
デプロイと動作確認
ここからは確認作業と洒落込もうではないか。
sam deploy
でプロジェクトをデプロイし、curl
でエンドポイント URL にリクエストを送ってみよう。
マネジメントコンソールから Lambda を検索し、sam-app-HelloWorld2Function-██████
を選択する。
続いて、ページの中程のタブから「モニタリング」を選び、「CloudWatch のログを表示 」ボタンをクリックする。
すると、何やかんやあって夥しいログが表示される。
下図の緑枠に注目しよう。 これが、SQS を通って引き渡されてきたデータである。
キューのデータ到着とともに Lambda 関数が起動し、引き渡されたデータが表示された。 おめでとう。
まとめとか
SAM における SQS の最も簡単な使い方は以上である。
が、しかし、本文中で語りきれなかったことがいくつかある。
まず、SQS には 標準キュー と FIFOキュー という2種類のキューが存在する。
FIFOキューはその名の通り、先入先出が保証されるキューである。 投入順にデータが宛先へ到達する要件がある場合はこちらを選ぶことになる。
一方、標準キューは FIFO が必ずしも保証されないため*3、順序性が厳密に重視される要件には適さない。
その上、稀にではあるが同じデータが2回以上到達してしまうこともある。
よって、後続の Lambda 関数は、アプリケーションレベルで二重処理を回避する実装が要求される。 ちなみに今回利用した SQS は標準キューである。
あとはまぁ、可視性タイムアウトとか、デッドレターキューとか、いろいろ周辺知識があるのだが、今日はここまで。