AWS LambdaでDynamoDB操作情報取得

AWS Lambdaの一般的な利用方法の中から1つを取り上げ、環境構築からLambda関数の作成および動作確認までの一連の作業手順を詳細に解説しています。
今回はDynamoDBから呼び出してAmazon CloudWatchなどへ出力するログに記録するLambdaの作成手順を解説します。
ユースケースとしては以下のようなものがあります。

  • 或るリージョンでのDynamoDBのテーブル操作を別のリージョンのDynamoDB上でも実施するDynamoDBの同期化処理。(操作データの転送にLambdaを使用)
  • モバイルアプリやゲーム等の使用状況の集計、分析、解析する非同期集計処理(集計処理サービスへの転送にLambdaを使用)
  • 画像アップロードを更新するとグループ内のモバイルデバイスに自動的に通知するモバイルアプリケーションの作成(モバイルデバイスへの転送にLambdaを使用)

(注1) Lambda関数の作成にはpythonを使用しています。そのため、文中のLambdaの作成手順はPython向けの解説となります。

1.動作概要

動作の全体像を以下に示します。

動作概要

Amazon DynamoDBの所定のテーブルで追加/更新/削除の操作が実行されると、Lambdaが起動します。
起動されたLambda関数は、引数から操作内容(追加/更新/削除)と操作したレコード内容を取得し、ログに記録します。

2.DynamoDBテーブル作成

サンプル用のDBを作成します。今回はWebカメラの利用状況を管理するDBを以下のような内容で作成します。

キー設定 カラム名 日本語名 記事
パーティションキー cameraID カメラ識別子 cameraXXX ※XXX:001からの整数
ソートキー type カメラ種別 メーカー名+型名
status 動作状態 以下の何れかを選択
・stand-by(準備中)
・stop(停止中)
・running(動作中)

2.1.テーブル作成

DynamoDBコンソール画面から「テーブル作成」ボタンを押下します。

テーブル作成画面で以下を設定し、画面右下の「作成」ボタンを押下します。

  • テーブル名に「MonitoringCamera」を設定
  • パーティションキーに「cameraID」を設定
  • 「ソートキーの追加」にチェックを入れ、ソートキーに「type」を設定

2.2.ストリーム管理

Lambdaに渡すDynamoDBのデータ内容を以下の4種類から選択します。

表示タイプ Lambdaに渡すデータ内容
キーのみ - 変更された項目のキー属性のみ 操作種別(追加/変更/削除)とプライマリーキー
新しいイメージ – 変更後に表示される項目全体 操作種別(追加/変更/削除)と操作後のレコード内容
※削除時はプライマリーキーのみ
古いイメージ – 変更前に表示されていた項目全体 操作種別(追加/変更/削除)と操作前のレコード内容
※追加時はプライマリーキーのみ
新旧イメージ - 項目の新しいイメージと古いイメージの両方 操作種別(追加/変更/削除)と操作前と操作後のレコード内容

今回は、「新旧イメージ – 項目の新しいイメージと古いイメージの両方」を使用します。

作成したテーブル「MonitoringCamera」の設定画面より、「概要」タブを開き、「ストリームの詳細」項目内の「ストリームの管理」ボタンを押下します。

「ストリームの管理」設定画面が表示されるので、「新旧イメージ – 項目の新しいイメージと古いイメージの両方」を選択し、「有効化」ボタンを押下します。

3.Lambda作成

3.1.Lambda関数登録

DynamoDBのテーブル操作情報を受け取るLambda関数を作成します。

Lambda作成方法には以下の2通りがあります。
①AWS Lambdaコンソールで直接コードを作成
②外部ライブラリとLambda関数をパッケージ化してアップロード
今回は引数の情報をログに記録するのみで外部ライブラリを必要としないため、①での作成になります。
今回のLambda関数はLambda側で提供されているテンプレートを使用してDynamoDBの操作毎に条件分岐して処理するようにカスタマイズします。

Lambdaコンソールから「関数の作成」ボタンを押下します。「関数の作成」画面のオプションの中から「設計図の使用」を選択します。
「設計図」のキーワード入力欄に「dynamodb」と入力します。
表示された候補から「dynamodb-process-stream-python」を選択します。

「基本的な情報」の設定項目に以下を設定します

  • 「関数名」に「MonitoringCameraStatus」と入力
  • 「実行ロール」の選択肢から「AWSポリシーテンプレートから新しいロールを作成」を選択
  • 「ポリシーテンプレート・オプション」のキーワード入力欄に「dynamodb」と入力。表示された候補から「シンプルなマイクロサービスのアクセス権限」を選択

・「ロール名」に「dynamoDBReadAccess」と入力

「DynamoDBトリガー」の設定はスキップします。(注2)
「Lambda関数のコード」まで画面をスクロールします。
この時点ではコードの編集はできないので画面右下の「関数の作成」ボタンを押下してLambda関数を作成します。

(注2)この段階で「DynamoDBトリガー」の設定を行い「関数の作成」ボタンを押下すると、Lambda関数は作成されますが、トリガー作成時にエラーが発生し、以下のようなエラーメッセージが表示されてトリガーの作成に失敗します。

エラーメッセージ:

“ロールがストリームIAMでGetRecords、GetShardIterator、DescribeStream、およびListStreamsアクションを実行できることを確認してください”

3.2.実行ロール追加

(注2)のエラーメッセージに記載されているように、DynamoDBの実行ロールテンプレート「シンプルなマイクロサービスのアクセス権限」では以下のアクセス権が不足しています。

  • GetRecords
  • GetShardIterator
  • DescribeStream
  • ListStreams

以下の手順で不足分のアクセス権を追加します。

Lambda関数「MonitoringCameraStatus」のメニュー画面を「実行ロール」の表示位置までスクロールします。
「既存のロール」の「IAMコンソールでdynamoDBReadAccessロールを表示します」をクリックします。

IAMの「ロール > dynamoDBReadAccess」のメニュー画面が開きます。
「アクセス権限」タブの「Permissions Policies」のリストから「AWSLambdaMicroserviceExecutionRole-…」を選択します。

「AWSLambdaMicroserviceExecutionRole-…」の設定画面が表示されたら、「ポリシーの編集」ボタンを押下します。

「DynamoDB(1つのアクション)」をクリックし、折り畳まれている情報を表示します。

「アクション」をクリックし、折り畳まれている情報を表示します。

「アクセスレベル」の中の「読み込み」を選択し、折り畳まれている情報を表示します。

「GetRecords」、「GetShardIterator」、「DescribeStream」、「ListStreams」にチェックを入れます。
画面右下の「ポリシーの確認」ボタンを押下します。

「ポリシーの確認」画面が表示されたら、画面右下の「変更の保存」ボタンを押下します。

3.3.トリガーの設定

実行ロールの追加設定が完了したら、Lambdaのトリガーを設定します。
Lambda関数「MonitoringCameraStatus」のメニュー画面から「+トリガーを追加」ボタンを押下します。

「トリガーの選択」のドロップダウンリストから「DynamoDB」を選択します。

「トリガーの設定」の設定項目で以下を設定し、画面右下の「追加」ボタンを押下します。

  • 「DynamoDBテーブル」にドロップダウンリストから「MonitoringCamera」を選択
  • 「バッチサイズ」はデフォルトの「100」とする
  • 「バッチウィンドウ-オプション」はデフォルトの空欄とする(注3)
  • 「開始位置」はデフォルトの「最新」とする(注4)

(注3) 「バッチウィンドウ-オプション」はLambda関数呼び出しの待機時間を設定します。待機時間は秒単位の設定で最大5分まで設定可能です。
待機時間内に発生したDynamoDBのレコード操作情報を蓄積し、待機時間後にまとめて送信することでLambdaの処理回数を抑制することができます。
空欄の場合はLambda関数を即時に呼び出します。
今回は、Lambda関数を即時起動する空欄設定を選択します。

(注4)テーブル操作データの読み取りを開始する位置を設定します。
「最新」、「水平トリム」から選択します。それぞれの詳細は以下になります。
最新:最新のものから順に読み取る
水平トリム:過去24時間以内の未読の操作データを古い順から読み取る
今回は「最新」を選択します。

これでLambda関数にDynamoDBのトリガーが追加されます。

3.4.Lambda関数の修正

テンプレートで提供されているLambda関数「dynamodb-process-stream-python」はeventIDの値、eventName(DynamoDBの操作種別)と操作されたDynamoDBのレコード内容を全て表示する処理内容となっています。
DynamoDBの操作毎に条件分岐して処理するようコードを以下のように修正します。

import json	
	
print('Loading function')	
	
def lambda_handler(event, context):	//Lambdaから最初に呼びされるハンドラ関数
    #print("Received event: " + json.dumps(event, indent=2))	//受信メッセージを表示(コメントアウト中)
    for record in event['Records']:	//Record要素分Loop
        print(record['eventID'])	//eventIDを表示
        print(record['eventName'])	//DynamoDBの操作種別を表示
        if record['eventName'] == 'INSERT':	//操作種別が”追加”か、判定
            newimage = record['dynamodb']['NewImage']	//追加レコード情報を抽出
            print("newimage: " +  json.dumps(newimage))	//追加レコード情報を表示
        elif record['eventName'] == 'MODIFY':	//操作種別が”変更”か、判定
            olditem = record['dynamodb']['OldImage']	//変更前レコード情報を抽出
            newitem = record['dynamodb']['NewImage']	//変更後レコード情報を抽出
            print("olditem: " +  json.dumps(olditem))	//変更前レコード情報を表示
            print("newitem: " +  json.dumps(newitem))	//変更後レコード情報を表示
        elif record['eventName'] == 'REMOVE':	//操作種別が”削除”か、判定
            deletedItem = record['dynamodb']['OldImage']	//削除対象レコード情報を抽出
            print("deleteimage: " +  json.dumps(deletedItem))	//削除対象レコード情報を表示

Lambda関数「MonitoringCameraStatus」のメニュー画面を「関数コード」の設定欄まで画面をスクロールします。
関数コードを上記の内容に書き替えます。
画面右上の「保存」ボタンを押下して関数コード内容を保存します。

4.Lambdaテスト

4.1.テストイベント設定

Lambda関数「MonitoringCameraStatus」のメニュー画面から「テストイベントの設定」を選択します。

「テストイベントの設定」の画面で以下を設定し、最後に画面右下の「作成」ボタンを押下します。

  • 「新しいテストイベントの作成」を選択
  • 「イベントテンプレート」のドロップダウンリストから「DynamoDB Update」を選択
  • 「イベント名」に「testDynamoDBEvent」と設定
イベントテンプレートは編集せずに、そのまま使用します。

(※)イベントテンプレート解説
INSERT(追加)、MODIFY(変更)、REMOVE(削除)の3種類の操作情報が蓄積されたイベントで、イベントの表示イメージは「新旧イメージ - 項目の新しいイメージと古いイメージの両方」で設定されています。

4.2.テスト実行

「テスト」ボタンを押下してLambdaのテストを実施します。

「実行結果:成功」と表示されればLambdaの実行が正常終了したことになります。
この時「詳細」ボタンを押下すると、テスト実行時のログが表示されます。

また、「実行結果:成功(ログ)」のログボタンを押下すると、CloudWatchに保存されたLambda関数「MonitoringCameraStatus」のロググループにジャンプします。

「実行結果:失敗」と表示された場合は、「詳細」ボタンを押下して、表示されたログから原因を解析してください。

4.3.ログ確認

ログストリームのリストに表示されているファイルをクリックすると、Lambda関数のログ詳細を見ることが出来ます。

ログにはINSERT(追加)、MODIFY(変更)、REMOVE(削除)情報とそれぞれのレコード内容が記録されます。

5.動作試験

実際にDynamoDBのテーブル操作を実施した際のLambda関数の処理をCloudWatch に記録されたログ情報より確認します。

5.1.トリガーの有効化

Lambda関数「MonitoringCameraStatus」のメニュー画面から「DynamoDB」設定欄の更新ボタンを押下します。

表示されたトグルスイッチを「有効」に切り替えます。
画面右上の「保存」ボタンを押下してトリガー設定を保存します。

5.2.レコード追加

DynamoDBのコンソール画面から「テーブル」のメニュー画面を開きます。
「テーブル」一覧から「MonitoringCamera」を選択します。
「MonitoringCamera」の設定画面から「項目の追加」ボタンを押下します。

「項目の編集」画面が開くので、以下を設定します。
・caremaIDに「camera001」を設定
・Typeに「BUFFALO BSW500MBK」を設定

・Typeの「+」ボタンをクリック。ポップアップメニューから「Append」を選択
・「Append」のドロップダウンリストから「String」を選択

・左側の入力欄に「status」と設定
・右側の入力欄に「stand-by」と設定
最後に画面右下の「保存」ボタンを押下します。

DynamoDBのコンソール画面「MonitoringCamera」テーブルの「項目」タブを開くと、レコード一覧にcamera001を主キーとした、レコードが追加されます。

Lambda関数「MonitoringCameraStatus」のメニュー画面から「モニタリング」タブを選択、モニタリング画面右にある「CloudWatchのログを表示」ボタンを押下します。

CloudWatch のLambda関数「MonitoringCameraStatus」のロググループにジャンプします。ログストリーム一覧から最新のログファイル名を選択します。

レコード追加時のログ内容が表示されます。
3行目から5行目がLambda関数内で収集した情報になります。

ログ内容解説:

3行目:eventID
4行目:DynamoDBテーブル操作内容-INSERT
5行目:追加レコード内容(JSON形式で設定。“S”はStringの省略形)
type:BUFFALO BSW500MBK
cameraID:camera001
status:stand-by

レコードの表示内容はJSON形式のため設定順とは異なる場合があります

5.3.レコード変更

DynamoDBのコンソール画面「MonitoringCamera」テーブルのレコード一覧から「camrera001」を選択します。

「項目の編集」画面が開きます。
statusの設定値を「stand-by」⇒「running」に変更します。
画面右下の「保存」ボタンを押下します。

CloudWatch のLambda関数「MonitoringCameraStatus」のロググループ画面に移動します。
ログストリーム一覧から最新のログファイルを選択します。

レコード変更時のログ内容が表示されます。
3行目から6行目がLambda関数内で収集した情報になります。

ログ内容解説:

3行目:eventID
4行目:DynamoDBテーブル操作内容-MODIFY
5行目:変更前レコード内容(JSON形式で設定。“S”はStringの省略形)
type:BUFFALO BSW500MBK
cameraID:camera001
status:stand-by
6行目:変更後レコード内容(JSON形式で設定。“S”はStringの省略形)
type:BUFFALO BSW500MBK
cameraID:camera001
status:running

レコードの表示内容はJSON形式のため設定順とは異なる場合があります

5.4.レコード削除

DynamoDBのコンソール画面「MonitoringCamera」テーブルのレコード一覧から「camrera001」左端のチェックボックスにチェックを入れます。
「アクション」をクリックし、ドロップダウンメニューから「削除」を選択します。

「削除」動作に対する確認画面が開くので「削除」ボタンを押下し、レコードの削除を実行します。

CloudWatch のLambda関数「MonitoringCameraStatus」のロググループ画面に移動します。
ログストリーム一覧から最新のログファイルを選択します。

レコード削除時のログ内容が表示されます。
3行目から5行目がLambda関数内で収集した情報になります。

ログ内容解説:

3行目:eventID
4行目:DynamoDBテーブル操作内容-REMOVE
5行目:削除レコード内容(JSON形式で設定。“S”はStringの省略形)
type:BUFFALO BSW500MBK
cameraID:camera001
status:running

レコードの表示内容はJSON形式のため設定順とは異なる場合があります

あとがき

DynamoDBからの情報を受信して処理するLambdaの作成手順を環境構築から実行確認まで解説しました。DynamoDBから渡されるメッセージ内容とLambdaでのメッセージの取り出し方、Lambda作成時の実行ロール設定の注意点などが理解できたと思います。
今回はDynamoDBのデータを受信するLambdaについての解説でしたが、次回はDynamoDBを操作するLambdaの作成方法について解説致します。

ネットワークからクラウドまでトータルサポート!!
NTT東日本のクラウド導入・運用サービスを確認してください!!

Amazon Web Services(AWS)、Microsoft Azureの
導入支援サービスのご相談、お問い合わせをお待ちしております。

ページ上部へ戻る