AWS Python SDKからAthenaにクエリして結果をS3からローカルに保存
Amazon Athena を使うと、Amazon S3内のデータに爆速でSQLで問い合わせられます。
便利ではあるのですが、管理コンソールから操作すると
- SQL の記載
- SQL の実行
- 完了まで待つ
- ダウンロードリンクからSQLの実行結果を保存
と待ち時間が長く、まとめて実行した SQL が多くある場合は、少しばかり手間です。
このような問題を解決するため、API を使って SQL の実行から SQL の実行結果の保存までを API で行うスクリプトを紹介します。
実行イメージ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | $ cat foo.sql select count(*) from bar $ export AWS_DEFAULT_PROFILE=test $ python athena.py foo.sql $ ls -1 athena.log athena.py foo.sql # クエリーする SQL foo.sql.csv # クエリー結果 $ cat foo.sql.csv # クエリー結果を確認 "_col0" "1234" |
解説
主要なポイントだけかいつまんで説明します。
プログラム全体はブログ末尾を参照下さい
Amazon Athena にクエリーを投げる
Amazon Athena にクエリーを投げるには start_query_execution API を使います。
- QueryString : SQL 文 を指定します。
- QueryExecutionContext : クエリー先のデータベース名を指定します
- ResultConfiguration : クエリーの保存先を指定します。AMCから利用する場合、デフォルトは「aws-athena-query-results-
- 」です。
1 2 3 4 5 6 7 8 9 | result = athena.start_query_execution( QueryString = sql, QueryExecutionContext = { 'Database' : DATABASE_NAME }, ResultConfiguration = { } ) |
クエリーの実行ステータスを確認
クエリーの実行ステータスを確認するには get_query_execution API を使います。
- QueryExecutionId :
start_query_execution
実行時に発行される ID を指定します。
レスポンス例です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | { "QueryExecution" : { "Query" : "string" , "QueryExecutionContext" : { "Database" : "string" }, "QueryExecutionId" : "string" , "ResultConfiguration" : { "EncryptionConfiguration" : { "EncryptionOption" : "string" , "KmsKey" : "string" }, "OutputLocation" : "string" }, "Statistics" : { "DataScannedInBytes" : number, "EngineExecutionTimeInMillis" : number }, "Status" : { "CompletionDateTime" : number, "State" : "string" , "StateChangeReason" : "string" , "SubmissionDateTime" : number } } } |
QueryExecution -> Status -> State が開始直後の RUNNING
から SUCCEEDED
に遷移するまでポーリングします。
サンプルプログラムでは retrying モジュールを使い exponential backoff を使いながらポーリングしています。
retrying モジュールの使い方は、次の記事を参照下さい。
クエリー結果を S3 からローカル環境にコピー
Athena のクエリー結果は S3 に保存されます。
この保存先パスは s3://{OutputLocation}//{QueryExecutionId}.csv
となります。
S3 の API で S3 からローカルサーバーにコピーします。
プログラム全体
以上の処理をまとめたプログラムが以下です
#!/usr/bin/env python | |
# vim: set fileencoding=utf8 : | |
``` | |
$ pip install -U boto3 retrying | |
$ export AWS_DEFAULT_PROFILE=test | |
$ cat foo.sql | |
select count(*) | |
from bar | |
$ python athena.py foo.sql | |
$ ls -1 | |
athena.log # program log | |
athena.py # main program | |
foo.sql # query execution result | |
foo.sql.csv # sql output | |
$ cat foo.sql.csv # check query result | |
"_col0" | |
"1234" | |
''' | |
import logging | |
import pprint | |
import sys | |
import boto3 | |
from retrying import retry | |
logging.basicConfig(filename='athena.log',level=logging.INFO) | |
athena = boto3.client('athena') | |
s3 = boto3.resource('s3') | |
S3BUCKET_NAME = 'XXX' | |
DATABASE_NAME = 'YYY' | |
S3BUCKET_NAME = 'aws-athena-query-results-018221336085-ap-northeast-1' | |
DATABASE_NAME = 'trid_381' | |
@retry(stop_max_attempt_number = 10, | |
wait_exponential_multiplier = 30 * 1000, | |
wait_exponential_max = 10 * 60 * 1000) | |
def poll_status(_id): | |
''' | |
poll query status | |
''' | |
result = athena.get_query_execution( | |
QueryExecutionId = _id | |
) | |
logging.info(pprint.pformat(result['QueryExecution'])) | |
state = result['QueryExecution']['Status']['State'] | |
if state == 'SUCCEEDED': | |
return result | |
elif state == 'FAILED': | |
return result | |
else: | |
raise Exception | |
def query_to_athena(filename): | |
sql = open(filename, 'r').read() | |
result = athena.start_query_execution( | |
QueryString = sql, | |
QueryExecutionContext = { | |
'Database': DATABASE_NAME | |
}, | |
ResultConfiguration = { | |
'OutputLocation': 's3://' + S3BUCKET_NAME, | |
} | |
) | |
logging.info(pprint.pformat(result)) | |
QueryExecutionId = result['QueryExecutionId'] | |
result = poll_status(QueryExecutionId) | |
# save response | |
with open(filename + '.log', 'w') as f: | |
f.write(pprint.pformat(result, indent = 4)) | |
# save query result from S3 | |
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED': | |
s3_key = QueryExecutionId + '.csv' | |
local_filename = filename + '.csv' | |
s3.Bucket(S3BUCKET_NAME).download_file(s3_key, local_filename) | |
def main(): | |
for filename in sys.argv[1:]: | |
try: | |
query_to_athena(filename) | |
except Exception, err: | |
print err | |
if __name__ == '__main__': | |
main() |
改善ポイント
今回したプログラムはクエリーを一つずつシーケンシャルに処理しています。
- SQL の並列上限(デフォルトは5)を超えない範囲で並列実行し、 複数の実行ステータスを BatchGetQueryExecution でまとめて確認
- S3バケット、データベース名 を引数で渡せるようにする
などすれば、もう少し使いやすくなると思います。
CloudWatch Events がAthena のクエリーステータスと連動すると、ポーリングが不要になり、イベントドリブンな処理を書きやすくなります。
まとめ
今回は REST API を使い、以下の処理を行うプログラムを作成しました。
- Athena にクエリーを投げる
- クエリー実行ステータスをポーリングで確認
- クエリー実行結果を S3 から保存
Amazon Athena に一度にまとめて SQL を投げるエンジニアのヒントになれば幸いです。
参照
AWS CLI および AWS SDK for Python からの API の使い方は以下の過去の記事も参照下さい。