AWS Step Functionsでジョブ・ステータス・ポーリングを実装する
AWS Step Functionsを使ってAWS Batchのジョブをキューイング→ジョブステータスをポーリング→ジョブ終了までを管理するジョブ・ステータス・ポーラーを10分でプロビジョンするチュートリアルが追加されていたため、実際にやってみました。
ステートマシン
Lambda で AWS Batch ジョブをキューイングし、ジョブのステータスが完了系になるまでポーリングします。 ポーリング間隔は Wait ステートを利用します。
やってみた
以下の手順でサンプルプロジェクトを実行します。
- タスクタイマーをプロビジョン
- 後片付け
1. タスクタイマーをプロビジョン
Step Function のダッシュボードから ”Create a state machine”を選択します
Sample Projects -> Job Status Poller を選択し、"Create Sample Project" ボタンをクリックします。
CloudFormation でリソースが作成される旨のメッセージが表示されます。 "Create Resources" ボタンをクリックします。
リソースの作成が始まります。
リソースの作成が完了すると、ステートマシーンを実行する "New execution" のダイアログが表示されます。
Input の JSON において
jobName
は AWS Batch のジョブ名jobDefinition
は AWS Batch のコンテナ定義jobQueue
は AWS Batch のジョブキューwait_time
は待機する秒数
を表します。
1 2 3 4 5 6 | { "jobName" : "my-job" , "jobDefinition" : "arn:aws:batch:eu-central-1:123456789012:job-definition/SampleJobDefinition-XXX:1" , "jobQueue" : "arn:aws:batch:eu-central-1:123456789012:job-queue/SampleJobQueue-YYY" , "wait_time" : 60 } |
"Start execution" ボタンをクリックして、ステートマシーンを実行します。
ステートマシーンが実行されます。
"Submit Job" Task では Lambda 関数 SubmitJobFunction
を呼び出し、ジョブをキューイングします。
ジョブはコマンド「echo 'Hello world'」を実行します。
"Wait X Seconds" では Wait ステートを使って、wait_time
で指定した秒数だけ待機します。
"Get Job Status" Task では Lambda 関数 CheckJobFunction
を呼び出し、ジョブのステータスを取得します。
"Job Complete?" Choice では、ジョブステータスが "SUCCEEDED" または "FAILED" の場合次にステートに移動し、これら以外の場合は "Wait X Seconds" ステートに戻ってポーリングを繰り返します。
"Job Failed" では Fail ステートを使ってステートマシンの実行を停止し、失敗としてマークします。
"Get Final Job Status" Task では Lambda 関数 CheckJobFunction を呼び出し、ジョブの最後のステータスを取得し、End フィールドを使ってステートマシンの実行を停止します。 "SUCCEEDED" ステートで実行を停止していない理由ははっきりしません。 このサンプルプロジェクトでは "Get Job Status" ステップと同じ Lambda 関数を利用していますが、 実プロジェクトでは 要件に合わせた Lambda 関数(SNS 通知とか)などにかえることを想定しているものと思われます。
2. 後片付け
このステートマシーンに関するリソースは全て CloudFormation で作成されています。
CloudFormation のスタック一覧に"StepFunctionsSample-JobStatusPoller-XXX" のスタック名が存在するため、削除しましょう。
プロビジョン内容を確認
プロビジョニングには CloudFormation を利用しています。 作成されたリソースを確認してみましょう。
作成されたリソース
CloudFormation により以下のリソースが作成されます。
非 AWS Batch(ECS) 関連
- AWS::StepFunctions::StateMachine : JobStatusPollerStateMachine
- AWS::Lambda::Function : SubmitJobFunction
- AWS::Lambda::Function : CheckJobFunction
- AWS::IAM::Role : StatesExecutionRole
- AWS::IAM::Role : LambdaExecutionRole
AWS Batch(ECS) 関連
- VPC(SampleVPC) などネットワークリソース
- AWS::Batch::JobQueue : SampleJobQueue
- AWS::Batch::JobDefinition : SampleJobDefinition
- AWS::Batch::ComputeEnvironment : SampleComputeEnvironment
- AWS::IAM::Role : SampleEcsInstanceRole
- AWS::IAM::InstanceProfile : SampleIamInstanceProfile
※ 論理ID
Step Functions : JobStatusPollerStateMachine はメインとなるステートマシーンです。
作成されるステートマシーンの Amazon States Language は以下です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | { "Comment" : "A state machine that submits a Job to AWS Batch and monitors the Job until it completes." , "StartAt" : "Submit Job" , "States" : { "Submit Job" : { "Type" : "Task" , "Resource" : "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPol-SubmitJobFunction-1CBAEF68A1X8A" , "ResultPath" : "$.guid" , "Next" : "Wait X Seconds" }, "Wait X Seconds" : { "Type" : "Wait" , "SecondsPath" : "$.wait_time" , "Next" : "Get Job Status" }, "Get Job Status" : { "Type" : "Task" , "Resource" : "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPoll-CheckJobFunction-PXJ28RR8GQ4M" , "Next" : "Job Complete?" , "InputPath" : "$.guid" , "ResultPath" : "$.status" }, "Job Complete?" : { "Type" : "Choice" , "Choices" : [ { "Variable" : "$.status" , "StringEquals" : "FAILED" , "Next" : "Job Failed" }, { "Variable" : "$.status" , "StringEquals" : "SUCCEEDED" , "Next" : "Get Final Job Status" } ], "Default" : "Wait X Seconds" }, "Job Failed" : { "Type" : "Fail" , "Cause" : "AWS Batch Job Failed" , "Error" : "DescribeJob returned FAILED" }, "Get Final Job Status" : { "Type" : "Task" , "Resource" : "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPoll-CheckJobFunction-PXJ28RR8GQ4M" , "InputPath" : "$.guid" , "End" : true } } } |
Task ステートで Lambda を呼び出せるように、 StatesExecutionRole ロールをアタッチしています。
1 2 3 4 5 6 7 8 9 10 11 12 | { "Version" : "2012-10-17" , "Statement" : [ { "Action" : [ "lambda:InvokeFunction" ], "Resource" : "*" , "Effect" : "Allow" } ] } |
Lambda 関数 SubmitJobFunction/CheckJobFunction は AWS Batch へのジョブ Submit とステータスチェックだけをするシンプルなものです。
SubmitJobFunction では submit_job API でジョブを Sumit しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import json import boto3 print ( 'Loading function' ) batch = boto3.client( 'batch' ) def lambda_handler(event, context): # Log the received event print ( "Received event: " + json.dumps(event, indent = 2 )) # Get parameters for the SubmitJob call jobName = event[ 'jobName' ] jobQueue = event[ 'jobQueue' ] jobDefinition = event[ 'jobDefinition' ] # containerOverrides and parameters are optional if event.get( 'containerOverrides' ): containerOverrides = event[ 'containerOverrides' ] else : containerOverrides = {} if event.get( 'parameters' ): parameters = event[ 'parameters' ] else : parameters = {} try : # Submit a Batch Job response = batch.submit_job(jobQueue = jobQueue, jobName = jobName, jobDefinition = jobDefinition, containerOverrides = containerOverrides, parameters = parameters) # Log response from AWS Batch print ( "Response: " + json.dumps(response, indent = 2 )) # Return the jobId jobId = response[ 'jobId' ] return { 'jobId' : jobId } except Exception as e: print (e) message = 'Error submitting Batch Job' print (message) raise Exception(message) |
CheckJobFunction では describe_jobs API でジョブキューのステータスを取得しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | from __future__ import print_function import json import boto3 print ( 'Loading function' ) batch = boto3.client( 'batch' ) def lambda_handler(event, context): # Log the received event print ( "Received event: " + json.dumps(event, indent = 2 )) # Get jobId from the event jobId = event[ 'jobId' ] try : # Call DescribeJobs response = batch.describe_jobs(jobs = [jobId]) # Log response from AWS Batch print ( "Response: " + json.dumps(response, indent = 2 )) # Return the jobtatus jobStatus = response[ 'jobs' ][ 0 ][ 'status' ] return jobStatus except Exception as e: print (e) message = 'Error getting Batch Job status' print (message) raise Exception(message) |
これら Lambda 関数では AWS Batch を操作できるように、 LambdaExecutionRole ロールをアタッチしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 | { "Version" : "2012-10-17" , "Statement" : [ { "Action" : [ "batch:SubmitJob" , "batch:DescribeJobs" ], "Resource" : "*" , "Effect" : "Allow" } ] } |
最後に
Document History によると、このサンプルプロジェクトは既存のチュートリアルをより試しやすいように CloudFormation 化して 2018/01/18 に追加されたようです。
Step Functions はステートを定義するだけではほぼ何もやれず、タスクなどと連携して初めてまともに動作します。
Step Functions ではポーリングしながら状態遷移するケースが非常に多いかと思います。 今回のプロジェクトを参考に、AWS Batch 部分を AWS Lambda など類似のサービスに置き換えるとだけで、ミニマムなおれおれジョブ・ステータス・ポーラーの完成です。
一度お試しください。