AWS Lambdaを使ったAmazon SNSへのメッセージ送受信

AWS Lambdaの一般的な利用方法の中から1つを取り上げ、環境構築からLambda関数の作成および動作確認までの一連の作業手順を詳細に解説しています。今回はAmazon Simple Notification Service(SNS)へメッセージを送信するLambdaとAmazon SNSから配信されるメッセージを受信するLambdaの作成手順を解説します。

Amazon SNSはサーバーレスアーキテクチャのためのメッセージ通知サービスで、幅広い AWS イベントソース (Amazon EC2、Amazon S3、Amazon RDS など) と AWS イベント送信先 (Amazon SQS、Lambda など)を統合します。メッセージの受け渡しはPublish-Subscribe(Pub-Sub)方式で行われます。Pub-Subは通常、配信登録した全ての宛先への配信が可能ですがAmazon SNSでは更に細かく、メッセージフィルタ機能により特定の条件を満たした相手のみにメッセージを配信することができます。

今回は、このメッセージフィルタ機能を使ったメッセージ配信モデルの構築手順を解説します。

1.動作概要

動作の全体像を以下に示します。

Webで受け付けた商品の注文を分類し、注文データを商品別に保存するシステムを作成します。注文データの書式を以下に示します。

{
    "id":XXXX,	//注文ID(XXXXは数値)
    "date":"YYYY-MM-DD hh:mm:ss",	//注文日時
    "payment": {	//支払い金額
        "amount": XXXX,	//合計金額(XXXXは数値)
        "currency":"yen"	//通貨:円
    },
    "product_type":"string",	//商品種別(文字列)
    "items":[	//商品リスト
        {
            "product_name":" string ",	//商品名(文字列)
            "price":XXXX,	//単価(XXXXは数値)
            "quantity": X	//個数(Xは数値)
        }
    ]
}

Amazon API GatewayがユーザからHTTPリクエストによる商品の注文を受信すると、登録しているLambdaを起動します。
起動されたLambda関数は、属性情報を引数から取得した注文データより作成します。作成した属性情報は注文データに付加してAmazon SNSの注文受付用トピックに送信します。
トピックに保存された注文データは属性情報を元に商品別に注文処理保存用Lambdaに配信されます。
それぞれの注文処理保存用Lambdaは注文データをDynamoDBの商品別テーブルに保存します。

2.DynamoDBテーブル作成

最初にサンプル用のDBを作成します。今回は音楽CD、ビデオ、書籍の各注文データを管理するテーブルを以下のように作成します。

商品種別 テーブル名
音楽CD MusicOrderTables
ビデオ VideoOrderTables
書籍 BooksOrderTables

各テーブルに設定する属性とテーブル内の項目(レコード)を一意に識別するキーを以下に示します。

属性名 キー設定
id パーティションキー
date ソートキー
payment
items

(※)DynamoDBは、テーブル作成時に項目を一意に識別できる要素をプライマリーキーとして設定します。
DynamoDBでは以下の2種類のプライマリーキーをサポートしています。

パーティションキー:
  • DynamoDBの内部の物理ストレージの保存場所(パーティション)を示します。
パーティションキーとソートキー:
  • パーティション内に複数の項目を保存する場合、ソートキーを使って一意に項目を示します。

今回はid(注文ID)をパーティションキーにdate(注文日時)をソートキーとしたプライマリーキーを作成します。
プライマリーキー以外の要素はテーブルへのレコード追加・更新時に追加されるためテーブル作成時には指定不要です。

2.1.テーブル作成

注文データを保存するテーブルを商品別に作成します。
DynamoDBコンソール画面から「テーブル作成」ボタンを押下します。

テーブル作成画面で以下を設定し、画面右下の「作成」ボタンを押下します。

  • テーブル名に「MusicOrderTables」を設定
    (※ビデオの場合「VideoOrderTables」、書籍の場合「BooksOrderTables」を設定)
  • パーティションキーに「id」を設定
  • 「ソートキーの追加」にチェックを入れ、ソートキーに「date」を設定

3.注文処理保存用Lambda作成

Amazon SNSから商品別に注文データを配信される注文処理保存用Lambdaを作成します。
Lambdaの作成方法には以下の2通りがあります。
①AWS Lambdaコンソールで直接コードを作成
②外部ライブラリとLambda関数をパッケージ化してアップロード
今回はDynamoDBの操作のみで外部ライブラリを必要としないため、①での作成になります。

3.1.Lambda関数登録

注文処理保存用Lambda関数の登録手順は、各商品でほとんど同じです。
そこで音楽CD注文処理保存用Lambda関数登録の手順を代表例として解説します。

Lambdaコンソールから「関数の作成」ボタンを押下します。

「関数の作成」画面のオプションの中から「一から作成」を選択します。
「基本的な情報」画面が表示されるので以下を設定します

  • 「関数名」に「SaveMusicOrder」と入力
    (※ビデオの場合「SaveVideoOrder」、書籍の場合「SaveBooksOrder」を入力)
  • 「ランタイム」に「python3.8」を選択
  • 「実行ロールの選択または作成」をクリックし、折り畳まれている画面を表示

  • 「実行ロール」の選択肢の中から「AWSポリシーテンプレートから新しいロールを作成」を選択
  • 「ロール名」に「putDynamoDB」と設定

  • 「ポリシーテンプレート – オプション」のキーワード入力欄に「dynamodb」と入力。表示される選択肢より「テストハーネスのアクセス権限 DynamoDB Lambda」を選択

最後に画面右下の「関数の作成」ボタンを押下します。

3.2.Lambda関数の修正

デフォルトで提供されているLambda関数を編集してAmazon SNSからのメッセージを取り出しDynamoDBへ保存するコードを作成します。
Lambda関数「SaveMusicOrder」のコンソール画面を「関数コード」の項目までスクロールします。関数コードを編集します。コード内容を以下に示します。

import json
import boto3

dynamodb = boto3.resource('dynamodb')	//Dynamodbアクセスのためのオブジェクト取得
table = dynamodb.Table('MusicOrderTables')	//指定テーブルのアクセスオブジェクト取得(注1)

def lambda_handler(event, context):	//Lambdaから最初に呼びされるハンドラ関数
    print("Message received from SNS: " + json.dumps(event))	//Amazon SNSからのメッセージ内容を表示
    for record in event['Records']:	//引数;event内の'Records'を配列要素分Loop
        message = json.loads(record['Sns']['Message'])	//Amazon SNSのメッセージ本体を抽出(注2)
        try:
            date = message['date']	//注文日時を抽出
            payment = str(message['payment'])	//支払い情報を抽出
            items = str(message['items'])	//購入商品を抽出
            print('id: ' + str(message['id']) + ', date: ' + date)
            print('payment: ' + payment)
            print('items: ' + items)
            putResponse = table.put_item(	//put_item()メソッドでレコードを追加設定
                Item={	//追加対象レコードのカラムリストを設定
                    'id': message['id'],
                    ''date': date,
                    'payment': payment,
                    'items': items
                }
            )
            print('Response: ' + json.dumps(putResponse))
        except Exception as e:	//例外処理
            print("Error Exception.")
            print(e)

(注1)注文種別毎に指定テーブルは以下のように設定します。
ビデオ注文処理保存用の場合、table = dynamodb.Table('VodeoOrderTables')
書籍注文処理保存用の場合、table = dynamodb.Table('BooksOrderTables')

(注2)Amazon SNSからのメッセージ形式は開発者ガイドの「AWS Lambda を Amazon SNS に使用する」を参照してください。

関数コード編集後、画面右上の「保存」ボタンを押下します。

4.Amazon SNS作成

受け付けた注文を商品毎の注文処理保存用Lambdaへ振り分けて配信を行うAmazon SNSの設定を行います。

4.1.トピック作成

Amazon SNS マネージメントコンソールを開きます。
画面左側のメニュー欄から「トピック」を選択します。

「トピックの作成」ボタンを押下します。

「トピックの作成」画面が表示されるので以下を設定します

  • 「名前」に「inputOrder」と入力
  • 「表示名 – オプション」に「inputOrder-to-DynamoDB」と入力

画面右下の「トピックの作成」ボタンを押下してトピックを作成します。

4.2.サブスクリプション作成

トピック「inputOrder」にエンドポイントとして注文処理保存用Lambdaの紐づけを行うサブスクリプションを作成します。属性情報に設定された商品種別を参照して商品別に配信されるようにサブスクリプションにメッセージフィルタ処理を設定します。

代表例として音楽CDの場合の設定手順を以下に示します。
作成したトピック「inputOrder」の詳細画面で「サブスクリプションの作成」ボタンを押下します。

「プロトコル」のドロップダウンリストから「AWS Lambda」を選択します。

音楽CD注文処理保存用Lambda関数「SaveMusicOrder」のコンソール画面を開き、画面右上のARN(※)のコピーボタンを押下します。
※ARN(AWS Resource Name): AWSサービスを識別するID。

ARNがコピーされると以下のように表示されます。

「サブスクリプションの作成」画面に戻り、「エンドポイント Amazon SNS から通知を受信できる AWS Lambda 関数」の入力欄にコピーしたLambda関数「SaveMusicOrder」のARNをペーストします。ARNをペーストすると青字でARNの選択候補が表示されるので、これを選択します。

「サブスクリプションフィルターポリシー – オプション」を選択し、折り畳まれている画面を表示します。

「JSONエディタ」が開くので、以下のように入力します。

{
  "product_type": ["Music CD"]
}

この設定により、商品種別が音楽CDの場合、Lambda関数「SaveMusicOrder」のみに配信されるようになります。

最後に画面右下の「サブスクリプションの作成」ボタンを押下します。

(※)ビデオ注文の場合、「JSONエディタ」に以下のように入力します。

{
  "product_type": ["Video"]
}

書籍注文の場合、「JSONエディタ」に以下のように入力します。

{
  "product_type": ["Book"]
}

サブスクリプションが作成されると以下の画面が表示されます。

サブスクリプションが作成されると、自動的にLambda関数「SaveMusicOrder」にSNSのトリガーが設定されます。

5.Amazon SNS 送信用Lambda作成

AWS API Gatewayから受け取った注文メッセージをAmazon SNSのトピック「inputOrder」にpublish(送信)するLambdaを作成します。
この時、LambdaではAmazon SNSで商品種別毎にサブスクリプションが振り分けられるよう注文メッセージに商品種別を記したメッセージ属性を付加します。

5.1.Lambda関数登録

Lambdaコンソールから「関数の作成」ボタンを押下します。

「関数の作成」画面のオプションの中から「一から作成」を選択します。
「基本的な情報」画面が表示されるので以下を設定します

  • 「関数名」に「apiGW2sns」と入力
  • 「ランタイム」に「python3.8」を選択
  • 「実行ロールの選択または作成」をクリックし、折り畳まれている画面を表示

  • 「実行ロール」の選択肢の中から「AWSポリシーテンプレートから新しいロールを作成」を選択
  • 「ロール名」に「publishSNS」と設定

  • 「ポリシーテンプレート – オプション」のキーワード入力欄に「sns」と入力。表示される選択肢より「Amazon SNS 発行ポリシー」を選択

最後に画面右下の「関数の作成」ボタンを押下します。

5.2.Lambda関数の修正

デフォルトで提供されているLambda関数を編集してAWS API Gatewayからのメッセージを取り出しAmazon SNSのトピック「inputOrder」へpublishするコードを作成します。
Lambda関数「apiGW2sns」のコンソール画面を「関数コード」の項目までスクロールします。
関数コードを編集します。コード内容を以下に示します。

import json
import boto3

TOPIC_ARN = 'arn:aws:sns:ap-northeast-1:291356481870:dynamodb'	//トピック:inputOrderのARNを定義(注1)
subject = 'TestUpLoadToDynamoDB'	//メッセージの件名を定義
client = boto3.client('sns')	//SNSアクセスのためのオブジェクト取得

def lambda_handler(event, context):	//Lambdaから最初に呼びされるハンドラ関数
    message = json.dumps(event)	//引数:eventからメッセージ本体を取得
    print("Received event: " + message)
    product_type = event['product_type']	//引数:eventから商品タイプを取得
    message_attributes = {	//メッセージ属性を定義
        'product_type': {	//'product_type'の定義
            'DataType': 'String',	//書式:文字列
            'StringValue': product_type	//値:product_type(商品タイプ)
        }
    }
    response = client.publish(	//client.publish ()メソッドでトピックにメッセージをpublishする(注2)
        TopicArn = TOPIC_ARN,	//トピックのARNを設定
        Message = message,	//メッセージ本文を設定
        Subject = subject,	//件名を設定
        MessageAttributes = message_attributes	//メッセージ属性を設定(注3)
    )
    print(json.dumps(response))
    return response

(※)LambdaからAmazon SNSへのアクセスにはAWS SDK for Python(boto3)を使用します。
boto3全体のドキュメントは「Boto 3 Documentation」を参照してください
boto3でのAmazon SNSについての記載は「SNS」を参照してください

(注1) トピック「inputOrder」のARNは以下の手順で取得します。
Amazon SNSのマネージメントコンソールからトピック「inputOrder」を選択します。
詳細情報のARNをコピーします。

(注2)Amazon SNSのトピックにメッセージを送信(publish)する場合は、client.publish()を使用します。

client.publish(
    TopicArn='string' or TargetArn='string' or PhoneNumber='string',
    Message='string',
    Subject='string',
    MessageStructure='string',
    MessageAttributes={
        'string': {
            'DataType': 'string',
            'StringValue': 'string' or 'BinaryValue': b'bytes'
        }
    }	
)
引数:
・TopicArn - string型
  • Publish先トピックのARN。指定しない場合はTargetArnまたはPhoneNumberを指定する必要があります。
・TargetArn - string型
  • エンドポイントのARN
・PhoneNumber - string型
  • メッセージ配信先の電話番号
・Message - string型
  • メッセージ本文
・Subject - string型
  • メッセージタイトル。省略可。
・MessageStructure - string型(json形式)
  • 送信メッセージの形式。省略可。
・MessageAttributes - dict型
  • メッセージ属性。省略可。サブスクリプションのメッセージフィルタに使用されます。以下の書式で定義します。
    属性名={
    DataType : 'string',
    StringValue : 'string' or BinaryValue : 'byte'
    }
・DataType - string型
  • 必須。'String'/'String.Array'/'Number'/'Binary'の何れかを指定します。
・StringValue - string型(json形式)
  • DataTypeが、String、String.Array、Numberの時に設定します。
    フィルタの選択肢の候補をjson形式で定義します。設定方法については開発者ガイドの「メッセージ例と属性」を参照してください。
・BinaryValue
  • DataTypeがBinaryの時に設定します。
    圧縮データ、暗号化データ、画像などのバイナリデータを格納できます。

(注3)ここで設定した「メッセージ属性」が「4.2. サブスクリプション作成」の「サブスクリプションフィルターポリシー」の配信可否判定に使用されます。
詳細は開発者ガイドの「Amazon SNS メッセージ属性」を参照してください。

6.API Gateway作成

Amazon API Gatewayのコンソール画面から「APIを作成」ボタンを押下します。

「APIタイプの選択」画面で「REST API」を選択し「構築」ボタンを押下します。

以下を設定し、画面右下の「APIの作成」ボタンを押下します。

  • 「プロトコルを選択する」で「REST」を選択
  • 「新しいAPIの作成」で「新しいAPI」を選択
  • 「名前と説明」でAPI名に「TestAcceptOrders」を設定

APIが作成されると「API: TestAcceptOrders」のトップページが開きます。
「アクション」ボタンを押下し、ドロップダウンリストから「リソースの作成」を選択します。

※リソースとはWeb APIのパスになります。
デフォルトは「/」で、これにパスを設定することにより、パス毎に別々のLambda関数を割り振ることが出来ます。接続先を割り振る必要がない場合、リソースの作成は不要です。
今回は複数の接続先は用意していませんが、設定手順の紹介のため作成します。

「リソース名」に「order」と設定し、画面右下の「リソースの作成」ボタンを押下します。

「アクション」ボタンを押下し、ドロップダウンリストから「メソッドの作成」を選択します。

「/order」の下にメソッド設定用の空欄が作成されますのでクリックします。
ドロップダウンリストから「POST」を選択します。

「POST」の右横にある「✔」を押下します。

「POSTのセットアップ」画面が開くので以下を設定し画面右下の「保存」ボタンを押下します。

  • 「結合タイプ」に「Lambda関数」を選択
  • 「Lambda関数」の入力欄に「apiGW2sns」を設定

「保存」ボタンを押下すると「Lambda関数に権限を追加する」というポップアップメッセージが表示されるので「OK」ボタンを押下してください。

これによりPOSTメソッドリクエストがLambda関数「apiGW2sns」に渡されるルートが確立されます。

この設定によりLambda関数「apiGW2sns」に自動的にAPI Gatewayのトリガーが設定されます。

7.APIテスト

API公開前にダミーの注文データを用いて、想定した動作が行えていることを確認します。
注文データが正しく振り分けられていることを確認するため、音楽CD、ビデオ、書籍毎に注文データを送信し、DynamoDBの音楽注文用テーブル、ビデオ注文用テーブル、書籍注文用テーブルそれぞれの保存内容を確認します。
また、ログをチェックし、合わせて正しく処理されていることを確認します。

代表例として音楽CDを注文した場合のテストの実施手順を以下に示します。

「POST – メソッドの実行」画面で「テスト」ボタンを押下します。

「POST –メソッドテスト」の入力画面が開きます。

「リクエスト本文」にLambda関数に渡すメッセージをJSON形式で入力し、「テスト」ボタンを押下します。

「リクエスト本文」に設定するメッセージは1.「動作概要」で示した書式で作成します。
今回は以下のように入力します。

{
    "id":2509,	//注文ID:2509
    "date":"2020-04-04 18:23:00",	//注文日時:"2020-04-04 18:23:00"
    "payment": {	//支払い金額
        "amount": 7180,	//合計金額:7180
        "currency":"yen"	//通貨:円
    },
    "product_type":"Music CD",	//商品種別:音楽CD
    "items":[	//商品リスト(2点)
        {
            "product_name":"Classic Complete vol.1",	//商品名:"Classic Complete vol.1"
            "price":3000,	//単価:3000
            "quantity": 1	//個数:1
        },
        {
            "product_name":"World Jazz vol.8",	//商品名::"World Jazz vol.8"
            "price":4180,	//単価:4180
            "quantity": 1	//個数:1
        }
    ]
}

テストを実行すると、メソッドテスト画面の右側に応答情報が表示されます。
ステータスが200となっていることを確認してください。

DyanamoDBのコンソールでテーブル「MusicOrderTables」のレコード一覧にid: 2509, date:"2020-04-04 18:23:00"のレコードが追加されているかを確認します。

レコードを選択すると、以下のような詳細情報が確認できます。

テーブル「VideoOrderTables」のレコード一覧にid: 2509, date:"2020-04-04 18:23:00"のレコードが追加されていないことを確認します。

テーブル「BooksOrderTables」のレコード一覧にid: 2509, date:"2020-04-04 18:23:00"のレコードが追加されていないことを確認します。

音楽注文処理保存用Lambdaのログから動作内容を確認します。
Lambda関数「SaveMusicOrder」のメニュー画面から「モニタリング」タブを選択します。
「モニタリングタブ」画面から「CloudWatchのログを表示」を選択します。

Lambda関数「SaveMusicOrder」のロググループにジャンプします。

ログストリームのリストに表示されているファイルをクリックすると、Lambda関数のログ詳細を見ることが出来ます。

同様の手順でビデオの注文の場合と書籍の注文の場合のテストを実施します。
「POST –メソッドテスト」でリクエスト本文に設定する注文データの設定例を以下に示します。

ビデオ注文の場合
{
    "id":1309,	//注文ID:1309
    "date":"2020-04-06 15:07:00",	//注文日時:"2020-04-06 15:07:00"
    "payment": {	//支払い金額
        "amount": 8000,	//合計金額:8000
        "currency":"yen"	//通貨:円
    },
    "product_type":"Video",	//商品種別:ビデオ
    "items":[	//商品リスト(1点)
        {
            "product_name":"Railway Complete Works vol.2",	//商品名:"Railway Complete Works vol.2"
            "price":8000,	//単価:8000
            "quantity": 1	//個数:1
        }
    ]
}
書籍注文の場合
{
    "id":8723,	//注文ID:8723
    "date":"2020-04-10 13:09:00",	//注文日時:"2020-04-10 13:09:00"
    "payment": {	//支払い金額
        "amount": 4080,	//合計金額:4080
        "currency":"yen"	//通貨:円
    },
    "product_type":"Book",	//商品種別:書籍
    "items":[	//商品リスト(2点)
        {
            "product_name":"Encyclopedia",	//商品名::"Encyclopedia"
            "price":3500,	//単価:3500
            "quantity": 1	//個数:1
        },
        {
            "product_name":"Ramen Guide Book",	//商品名:"Ramen Guide Book"
            "price"580,	//単価:580
            "quantity": 1	//個数:1
        }
    ]
}

8.APIの公開

作成したAPIを外部に公開して外部ネットワークからアクセスできるようにします。
APIをデプロイすることによりAPIが外部に公開されます。

8.1.APIデプロイ

「POST – メソッドの実行」画面で「アクション」ボタンを押下します。
ドロップダウンリストから「APIのデプロイ」を選択します。

「APIのデプロイ」画面が開くので以下を設定し「デプロイ」ボタンを押下します。

  • 「デプロイされるステージ」に「新しいステージ」を選択
  • 「ステージ名」に「orderEntry」と設定(※)

APIのデプロイに成功すると、「ステージエディタ」が表示され、APIにアクセスするためのURLが表示されます。

8.2.接続確認

公開されたAPIに対し、外部からアクセス可能かcurlコマンドで確認します。
Amazon API Gatewayへは、以下の書式でcurlコマンドを実行してください。

curl –X POST https://URL -d PARAM | jq

(※)POSTメソッドでAPIを作成したので、POSTでリクエストします。
(※)URLは上記「ステージエディタ」で表示されているURLを使用します。
末尾にパスとして「リソースの作成」で作成した「/order」を付加します。
(※)PARAMには「7. APIテスト」で入力したメッセージを設定します。
(※)jqコマンドを使用して、応答メッセージを整形して見易くします。

入力例(音楽CD注文の場合)

$ curl -X POST 'https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/orderEntery/order ' -d '{"id":1327, "date":"2020-04-11 10:27:00","payment": {"amount": 4180, "currency":"yen"},"product_type":" Music CD ","items":[ {"product_name":" World Jazz vol.8 vol.2","price": 4180, "quantity": 1 } ]' | jq

9.あとがき

今回は受け付けた注文をAmazon SNSへ送信するLambdaとAmazon SNSから商品別に注文メッセージを受信し処理するLambdaの作成方法とシステムの構築手順について解説しました。
LambdaでのAmazon SNSへのメッセージの送信方法とLambdaで付加したメッセージ属性によるAmazon SNSのメッセージフィルタリング機能を使ったメッセージの振り分け配信方法が理解できたと思います。
今回はAmazon SNSへのLambdaの送受信方法をテーマに解説していたためAmazon SNSから直接Lambdaメッセージを受信する構成にしていましたが、実際にはAmazon SQSが中継してLambdaに渡す構成が一般的です。Amazon SQSが中継することでLambdaがより非同期に実行できるようになり、システム全体の負荷分散につながります。

やはり難しい??事例からAWS Lambda活用のヒントを得ましょう!

Amazon Web Services(AWS)、Microsoft Azureの
導入支援サービスのご相談、お問い合わせをお待ちしております。

ページ上部へ戻る