下記でSQSにエンキューする部分を作ったので、
"API Gateway → Lambda → SQS"でJSONデータをエンキューする今度はデキューする部分を作ってみます。
今まではEC2を起動して、その中で定期的にSQSをポーリングするプログラムを
実行していましたが、今ならLambdaのScheduled Eventを使うことで、
EC2無しに定期的にSQSをポーリングすることができます。
絵にすると、下記のような感じです。
(上段は前述のリンクの記事で作成したので今回は下段のLambdaを作成します)
Lambdaの設定
コードは以下のとおりです。
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
queueUrl = 'https://sqs.ap-northeast-1.amazonaws.com/000000000000/test'
maxNumberOfMessages = 10
def lambda_handler(event, context):
try:
logger.info(event)
client = boto3.client('sqs')
receiveResponse = client.receive_message(
QueueUrl = queueUrl,
MaxNumberOfMessages = maxNumberOfMessages
)
logger.info(receiveResponse)
response = receiveResponse
if receiveResponse.has_key('Messages'):
entries = []
for message in receiveResponse['Messages']:
entries.append({
"Id": message['MessageId'],
"ReceiptHandle": message['ReceiptHandle']
})
deleteResponse = client.delete_message_batch(
QueueUrl = queueUrl,
Entries = entries
)
logger.info(deleteResponse)
response = deleteResponse
return response
except Exception as e:
logger.error(e)
raise e
"receive_message"で最大10件メッセージを受け取って"delete_message_batch"でまとめて削除しています。
タイムアウトの時間も1分に変更しています。
Lambdaのテスト
Lambdaをテストします。
下記のように二つのメッセージの削除が成功したというレスポンスが返ってきて、
{
"Successful": [
{
"Id": "c820ef92-7d8d-4aa4-94a0-0843e683d158"
},
{
"Id": "52fa1240-bd8b-43bf-ac43-739118e613e1"
}
],
"ResponseMetadata": {
"HTTPStatusCode": 200,
"RequestId": "257f3292-d917-517b-8995-7c00dc7bbaa0"
}
}
実際にキューを確認すると、メッセージ数も0になっています。
Scheduled Eventの設定
上記の処理を5分ごとに実行するためにLambdaのScheduled Eventの設定を行います。
これで5分毎にSQSをポーリングし、メッセージがあれば、
最大10件まで削除するようになったはずです。
CloudWatchでの確認
最後にキューの中のメッセージの推移をCloudWatchで確認します。
60程度のメッセージが5分毎に10程度ずつ減少していることが確認できます。
0 コメント:
コメントを投稿