Amazon SNSがメッセージフィルタに対応し柔軟なpub-subが可能に!
AWS のメッセージングサービス Amazon Simple Notification Service(Amazon SNS)は pub-sub に対応し、SNSトピックにメッセージを送信(publish)すると、トピックを購読(subscribe)しているsubscriberにメッセージが一斉配信されます。
ここで注意すべきことは subscriber は publisher が送信したすべてのメッセージに興味があるとは限らないことです。
そのため subscriber 側でメッセージを確認し、不要なメッセージは破棄するなどの処理を入れていました。 この処理を行うのは Lambda だったり、SQS のワーカーだったりします。
運用するにつれて振り分けルールが増える傾向にあり、振り分けロジックは各 subscriber に散らばっているため、運用コストは高くなりがちでした。
SNS にメッセージフィルタ機能が追加されたことにより、メッセージ送信時に付与した属性によって、対応する subscriber にだけメッセージが送信されるようになり、 subscriber にあったフィルタ処理は不要になりました。
RabbitMQ のような AMQP 系メッセージキューを使ったことがある方には、トピック・エクスチェンジが実装されたと言えば、イメージしやすいかもしれません。
Amazon SNS メッセージフィルタリング機能の関連ドキュメント
メッセージフィルタリング機能に関して、現時点で3つのドキュメントが存在します。
管理コンソールでの操作をメインとした pub-sub チュートリアル
Python SDK での操作をメインとした pub-sub チュートリアル
Amazon SNSのメッセージフィルタ機能に限定したAPIドキュメント
ウォークスルー
このブログでは2つ目のドキュメント「管理コンソールでの操作をメインとした pub-sub チュートリアル」を AWS CLI から操作し、別角度から Amazon SNS トピックに Amazon SQS キューをフィルタ機能付きで購読し、メッセージが適切な subscriber にのみ配信されることを確認します。
架空の保険見積もり(insurance quote)サイトで、保険の種類に応じて、配信先を振り分けます。
最終型
ウォークスルーの流れ
- Amazon SNS トピック の作成
- Amazon SQS キューの作成
- SQS キューを SNS トピックに購読させる
- SNS 購読にフィルタポリシーを設定
- トピックにメッセージ配信
- キューにメッセージ配信されていることを確認
1. Amazon SNS トピック の作成
保険の見積もり(quote)依頼があったときに、メッセージを publish する SNS トピックを作成します。
1 2 3 4 | $ aws sns create-topic --name Insurance-Quote-Requests { "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests" } |
2. Amazon SQS キューの作成
保険の種類ごとにシステムが別れており、各システムは対応する Amazon SQS のキューをポーリングします。
このキューを作成します。
作成するキューは3つあります
- All-Quotes : すべての見積もり
- Vehicle-Insurance-Quotes : 乗り物の見積もり
- Life-Insurance-Quotes : 生命保険の見積もり
順に作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | $ aws sqs create-queue --queue-name All-Quotes { } $ aws sqs get-queue-attributes --queue-url https://ap-northeast-1.queue.amazonaws.com/123456789012/All-Quotes --attribute-names All { "Attributes": { "ApproximateNumberOfMessagesNotVisible": "0", "MessageRetentionPeriod": "345600", "ApproximateNumberOfMessagesDelayed": "0", "MaximumMessageSize": "262144", "CreatedTimestamp": "1511575422", "ApproximateNumberOfMessages": "0", "ReceiveMessageWaitTimeSeconds": "0", "DelaySeconds": "0", "VisibilityTimeout": "30", "LastModifiedTimestamp": "1511575422", "QueueArn": "arn:aws:sqs:ap-northeast-1:123456789012:All-Quotes" } } $ aws sqs create-queue --queue-name Vehicle-Insurance-Quotes { } $ aws sqs create-queue --queue-name Life-Insurance-Quotes { } |
3. SQS キューを SNS トピックに購読させる
SNS トピックにメッセジが publish されたときに、SQS キューにメッセージがブロードキャストされるように、購読します。
--protocol
には sqs を指定し、 --notification-endpoint
には SQS の ARN を指定します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | $ aws sns subscribe --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --protocol sqs \ --notification-endpoint arn:aws:sqs:ap-northeast-1:123456789012:All-Quotes { "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:22564e62-b642-456e-8c1a-60726b11c574" } $ aws sns subscribe --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --protocol sqs \ --notification-endpoint arn:aws:sqs:ap-northeast-1:123456789012:Vehicle-Insurance-Quotes { "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:20ae8621-e33b-4a6e-952c-62fa92bac171" } $ aws sns subscribe --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --protocol sqs \ --notification-endpoint arn:aws:sqs:ap-northeast-1:123456789012:Life-Insurance-Quotes { "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:a43d756e-74ab-4a39-bbc8-7ef31d70d0d6" } |
subscribe 状況を確認
SNS トピック Insurance-Quote-Requests
に作成した3つのキューが購読状況を確認します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | $ aws sns list-subscriptions-by-topic --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests { "Subscriptions": [ { "Owner": "123456789012", "Endpoint": "arn:aws:sqs:ap-northeast-1:123456789012:Vehicle-Insurance-Quotes", "Protocol": "sqs", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests", "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:20ae8621-e33b-4a6e-952c-62fa92bac171" }, { "Owner": "123456789012", "Endpoint": "arn:aws:sqs:ap-northeast-1:123456789012:Life-Insurance-Quotes", "Protocol": "sqs", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests", "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:a43d756e-74ab-4a39-bbc8-7ef31d70d0d6" }, { "Owner": "123456789012", "Endpoint": "arn:aws:sqs:ap-northeast-1:123456789012:All-Quotes", "Protocol": "sqs", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests", "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:ad2b679a-0495-4a72-8faf-5ee0665e0bab" } ] } |
4. SNS 購読にフィルタポリシーを設定
次にフィルタ設定を行います。
- All-Quotes キュー : 全てのメッセージを配信するため、フィルタ設定は行わない
- Vehicle-Insurance-Quotes キュー :
insurance_type
がcar
またはboat
のときにを配信するフィルタ設定 - Life-Insurance-Quotes キュー :
insurance_type
がlife
のときにを配信するフィルタ設定
利用API
- 購読の属性設定 API : set-subscription-attributes - 購読の属性設定確認 API : set-subscription-attributes
--subscription-arn
には購読 ARN を指定し、--attribute-name
には FilterPolicy
を指定し、 --attribute-name
には フィルタルールを JSON で指定します。
例)
1 2 3 4 5 6 | { "insurance_type" : [ "car" , "boat" ] } |
All-Quotes キュー
なにもしない
Vehicle-Insurance-Quotes キュー
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | $ aws sns set-subscription-attributes \ --subscription-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:20ae8621-e33b-4a6e-952c-62fa92bac171 \ --attribute-name FilterPolicy \ --attribute-value '{"insurance_type":["car", "boat"]}' $ aws sns get-subscription-attributes \ --subscription-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:20ae8621-e33b-4a6e-952c-62fa92bac171 { "Attributes": { "Endpoint": "arn:aws:sqs:ap-northeast-1:123456789012:Vehicle-Insurance-Quotes", "Protocol": "sqs", "RawMessageDelivery": "false", "ConfirmationWasAuthenticated": "true", "FilterPolicy": "{\"insurance_type\":[\"car\", \"boat\"]}", "Owner": "123456789012", "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:20ae8621-e33b-4a6e-952c-62fa92bac171", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests" } } |
Life-Insurance-Quotes キュー
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | $ aws sns set-subscription-attributes \ --subscription-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:a43d756e-74ab-4a39-bbc8-7ef31d70d0d6 \ --attribute-name FilterPolicy \ --attribute-value '{"insurance_type":["life"]}' $ aws sns get-subscription-attributes \ --subscription-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:a43d756e-74ab-4a39-bbc8-7ef31d70d0d6 { "Attributes": { "Endpoint": "arn:aws:sqs:ap-northeast-1:123456789012:Life-Insurance-Quotes", "Protocol": "sqs", "RawMessageDelivery": "false", "ConfirmationWasAuthenticated": "true", "FilterPolicy": "{\"insurance_type\":[\"life\"]}", "Owner": "123456789012", "SubscriptionArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests:a43d756e-74ab-4a39-bbc8-7ef31d70d0d6", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests" } } |
5. トピックにメッセージ配信
保険の見積もりサイトから見積もり依頼があったと見立てて、SNS トピックにメッセージ配信を行います。
メッセージフィルタの動作確認のために、保険の種類を変えて複数のメッセージを送信します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | # insurance_type = car $ aws sns publish --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --subject "Insurance Quote Request #1" \ --message "2017 Volvo S60, Montreal" \ --message-attributes '{"insurance_type": { "DataType" : "String", "StringValue" : "car"}}' { "MessageId": "09d09c76-d890-5d8c-87f7-cee43c26e895" } # insurance_type = life $ aws sns publish --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --subject "Insurance Quote Request #2" \ --message "Male, 33 years old, Vancouver" \ --message-attributes '{"insurance_type": { "DataType" : "String", "StringValue" : "life"}}' # insurance_type = home $ aws sns publish --topic-arn arn:aws:sns:ap-northeast-1:123456789012:Insurance-Quote-Requests \ --subject "Insurance Quote Request #3" \ --message "Townhouse, 1500 sq ft, Toronto " \ --message-attributes '{"insurance_type": { "DataType" : "String", "StringValue" : "home"}}' |
6. キューにメッセージ配信されていることを確認
最後に、保険の種類によってメッセージフィルタが動作して、対応する SQS キューに見積もり依頼が届いている(ルーティングされている)ことを確認します。
以下のようになっていれば OKです。
- All-Quotes キュー : 全てのメッセージ
- Vehicle-Insurance-Quotes キュー :
insurance_type
がcar
またはboat
のメッセージ - Life-Insurance-Quotes キュー :
insurance_type
がlife
のメッセージ
All-Quotes キュー
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | $ aws sqs receive-message --max-number-of-messages 5 \ --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/All-Quotes | jq -r ".Messages[].Body" { "Type" : "Notification", ... "Subject" : "Insurance Quote Request #1", "Message" : "2017 Volvo S60, Montreal", ... "MessageAttributes" : { "insurance_type" : {"Type":"String","Value":"car"} } } { "Type" : "Notification", ... "Subject" : "Insurance Quote Request #3", "Message" : "Townhouse, 1500 sq ft, Toronto ", ... "MessageAttributes" : { "insurance_type" : {"Type":"String","Value":"home"} } } |
Life-Insurance-Quotes キュー
1 2 3 4 5 6 7 8 9 10 11 12 | $ aws sqs receive-message --max-number-of-messages 5 \ --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/Life-Insurance-Quotes | jq -r ".Messages[].Body" { "Type" : "Notification", ... "Subject" : "Insurance Quote Request #2", "Message" : "Male, 33 years old, Vancouver", ... "MessageAttributes" : { "insurance_type" : {"Type":"String","Value":"life"} } } |
Vehicle-Insurance-Quotes キュー
1 2 3 4 5 6 7 8 9 10 11 12 | $ aws sqs receive-message --max-number-of-messages 5 \ { "Type" : "Notification", ... "Subject" : "Insurance Quote Request #1", "Message" : "2017 Volvo S60, Montreal", ... "MessageAttributes" : { "insurance_type" : {"Type":"String","Value":"car"} } } |
注意事項
このメッセージフィルタにはいくつかの制限があります。
- メッセージフィルタポリシーは JSON で指定します。
- String 型 もしくは String.Array 型のみ利用可能です。Number 型や Binary 型の場合は無視されます
- 大文字・小文字を区別します。
- 属性名(ウォークスルーでは insurance_typeに相当)は10個まで可能です
- ポリシー設定の最大サイズは 256 KB です
まとめ
今回は Amazon SNS のメッセージフィルタ機能を AWS CLI から動作確認しました。
subscriber の責務だったメッセージの振り分けロジックを購読設定に寄せられるようになり、各コンポーネントは
- Amazon SNS : メッセージ配信
- 購読 : メッセージのルーティング
- Amazon SQS : メッセージの処理
ときれいに分離され、本業に専念できるようになりました。
AMQP のトピックエクスチェンジを Amazon SNS 上に頑張って実装していた人には朗報です。
Apache Kafka はトピックが実装されているので、Amazon Kinesis ファミリーもトピックに対応してくれると最高ですね。