AWS Python SDKからAthenaにクエリして結果をS3からローカルに保存
Amazon Athena を使うと、Amazon S3内のデータに爆速でSQLで問い合わせられます。
便利ではあるのですが、管理コンソールから操作すると
- SQL の記載
- SQL の実行
- 完了まで待つ
- ダウンロードリンクからSQLの実行結果を保存
と待ち時間が長く、まとめて実行した SQL が多くある場合は、少しばかり手間です。
このような問題を解決するため、API を使って SQL の実行から SQL の実行結果の保存までを API で行うスクリプトを紹介します。
実行イメージ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | $ cat foo.sqlselect count(*)from bar$ export AWS_DEFAULT_PROFILE=test$ python athena.py foo.sql$ ls -1athena.logathena.pyfoo.sql # クエリーする SQLfoo.sql.csv # クエリー結果$ cat foo.sql.csv # クエリー結果を確認"_col0""1234" |
解説
主要なポイントだけかいつまんで説明します。
プログラム全体はブログ末尾を参照下さい
Amazon Athena にクエリーを投げる
Amazon Athena にクエリーを投げるには start_query_execution API を使います。
- QueryString : SQL 文 を指定します。
- QueryExecutionContext : クエリー先のデータベース名を指定します
- ResultConfiguration : クエリーの保存先を指定します。AMCから利用する場合、デフォルトは「aws-athena-query-results-
- 」です。
1 2 3 4 5 6 7 8 9 | result = athena.start_query_execution( QueryString = sql, QueryExecutionContext = { 'Database': DATABASE_NAME }, ResultConfiguration = { }) |
クエリーの実行ステータスを確認
クエリーの実行ステータスを確認するには get_query_execution API を使います。
- QueryExecutionId :
start_query_execution実行時に発行される ID を指定します。
レスポンス例です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | { "QueryExecution": { "Query": "string", "QueryExecutionContext": { "Database": "string" }, "QueryExecutionId": "string", "ResultConfiguration": { "EncryptionConfiguration": { "EncryptionOption": "string", "KmsKey": "string" }, "OutputLocation": "string" }, "Statistics": { "DataScannedInBytes": number, "EngineExecutionTimeInMillis": number }, "Status": { "CompletionDateTime": number, "State": "string", "StateChangeReason": "string", "SubmissionDateTime": number } }} |
QueryExecution -> Status -> State が開始直後の RUNNING から SUCCEEDED に遷移するまでポーリングします。
サンプルプログラムでは retrying モジュールを使い exponential backoff を使いながらポーリングしています。
retrying モジュールの使い方は、次の記事を参照下さい。
クエリー結果を S3 からローカル環境にコピー
Athena のクエリー結果は S3 に保存されます。
この保存先パスは s3://{OutputLocation}//{QueryExecutionId}.csv となります。
S3 の API で S3 からローカルサーバーにコピーします。
プログラム全体
以上の処理をまとめたプログラムが以下です
| #!/usr/bin/env python | |
| # vim: set fileencoding=utf8 : | |
| ``` | |
| $ pip install -U boto3 retrying | |
| $ export AWS_DEFAULT_PROFILE=test | |
| $ cat foo.sql | |
| select count(*) | |
| from bar | |
| $ python athena.py foo.sql | |
| $ ls -1 | |
| athena.log # program log | |
| athena.py # main program | |
| foo.sql # query execution result | |
| foo.sql.csv # sql output | |
| $ cat foo.sql.csv # check query result | |
| "_col0" | |
| "1234" | |
| ''' | |
| import logging | |
| import pprint | |
| import sys | |
| import boto3 | |
| from retrying import retry | |
| logging.basicConfig(filename='athena.log',level=logging.INFO) | |
| athena = boto3.client('athena') | |
| s3 = boto3.resource('s3') | |
| S3BUCKET_NAME = 'XXX' | |
| DATABASE_NAME = 'YYY' | |
| S3BUCKET_NAME = 'aws-athena-query-results-018221336085-ap-northeast-1' | |
| DATABASE_NAME = 'trid_381' | |
| @retry(stop_max_attempt_number = 10, | |
| wait_exponential_multiplier = 30 * 1000, | |
| wait_exponential_max = 10 * 60 * 1000) | |
| def poll_status(_id): | |
| ''' | |
| poll query status | |
| ''' | |
| result = athena.get_query_execution( | |
| QueryExecutionId = _id | |
| ) | |
| logging.info(pprint.pformat(result['QueryExecution'])) | |
| state = result['QueryExecution']['Status']['State'] | |
| if state == 'SUCCEEDED': | |
| return result | |
| elif state == 'FAILED': | |
| return result | |
| else: | |
| raise Exception | |
| def query_to_athena(filename): | |
| sql = open(filename, 'r').read() | |
| result = athena.start_query_execution( | |
| QueryString = sql, | |
| QueryExecutionContext = { | |
| 'Database': DATABASE_NAME | |
| }, | |
| ResultConfiguration = { | |
| 'OutputLocation': 's3://' + S3BUCKET_NAME, | |
| } | |
| ) | |
| logging.info(pprint.pformat(result)) | |
| QueryExecutionId = result['QueryExecutionId'] | |
| result = poll_status(QueryExecutionId) | |
| # save response | |
| with open(filename + '.log', 'w') as f: | |
| f.write(pprint.pformat(result, indent = 4)) | |
| # save query result from S3 | |
| if result['QueryExecution']['Status']['State'] == 'SUCCEEDED': | |
| s3_key = QueryExecutionId + '.csv' | |
| local_filename = filename + '.csv' | |
| s3.Bucket(S3BUCKET_NAME).download_file(s3_key, local_filename) | |
| def main(): | |
| for filename in sys.argv[1:]: | |
| try: | |
| query_to_athena(filename) | |
| except Exception, err: | |
| print err | |
| if __name__ == '__main__': | |
| main() |
改善ポイント
今回したプログラムはクエリーを一つずつシーケンシャルに処理しています。
- SQL の並列上限(デフォルトは5)を超えない範囲で並列実行し、 複数の実行ステータスを BatchGetQueryExecution でまとめて確認
- S3バケット、データベース名 を引数で渡せるようにする
などすれば、もう少し使いやすくなると思います。
CloudWatch Events がAthena のクエリーステータスと連動すると、ポーリングが不要になり、イベントドリブンな処理を書きやすくなります。
まとめ
今回は REST API を使い、以下の処理を行うプログラムを作成しました。
- Athena にクエリーを投げる
- クエリー実行ステータスをポーリングで確認
- クエリー実行結果を S3 から保存
Amazon Athena に一度にまとめて SQL を投げるエンジニアのヒントになれば幸いです。
参照
AWS CLI および AWS SDK for Python からの API の使い方は以下の過去の記事も参照下さい。