AWS Python SDKからAthenaにクエリして結果をS3からローカルに保存

Amazon Athena を使うと、Amazon S3内のデータに爆速でSQLで問い合わせられます。

便利ではあるのですが、管理コンソールから操作すると

  1. SQL の記載
  2. SQL の実行
  3. 完了まで待つ
  4. ダウンロードリンクからSQLの実行結果を保存

と待ち時間が長く、まとめて実行した SQL が多くある場合は、少しばかり手間です。

このような問題を解決するため、API を使って SQL の実行から SQL の実行結果の保存までを API で行うスクリプトを紹介します。

実行イメージ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ cat foo.sql
select count(*)
from bar
 
$ export AWS_DEFAULT_PROFILE=test
 
$ python athena.py foo.sql
 
$ ls -1
athena.log
athena.py
foo.sql     # クエリーする SQL
foo.sql.csv # クエリー結果
 
$ cat foo.sql.csv # クエリー結果を確認
"_col0"
"1234"

解説

主要なポイントだけかいつまんで説明します。

プログラム全体はブログ末尾を参照下さい

Amazon Athena にクエリーを投げる

Amazon Athena にクエリーを投げるには start_query_execution API を使います。

  • QueryString : SQL 文 を指定します。
  • QueryExecutionContext : クエリー先のデータベース名を指定します
  • ResultConfiguration : クエリーの保存先を指定します。AMCから利用する場合、デフォルトは「aws-athena-query-results--」です。
1
2
3
4
5
6
7
8
9
result = athena.start_query_execution(
    QueryString = sql,
    QueryExecutionContext = {
        'Database': DATABASE_NAME
    },
    ResultConfiguration = {
        'OutputLocation': 's3://' + S3BUCKET_NAME,
    }
)

クエリーの実行ステータスを確認

クエリーの実行ステータスを確認するには get_query_execution API を使います。

  • QueryExecutionId : start_query_execution 実行時に発行される ID を指定します。

レスポンス例です。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
   "QueryExecution": {
      "Query": "string",
      "QueryExecutionContext": {
         "Database": "string"
      },
      "QueryExecutionId": "string",
      "ResultConfiguration": {
         "EncryptionConfiguration": {
            "EncryptionOption": "string",
            "KmsKey": "string"
         },
         "OutputLocation": "string"
      },
      "Statistics": {
         "DataScannedInBytes": number,
         "EngineExecutionTimeInMillis": number
      },
      "Status": {
         "CompletionDateTime": number,
         "State": "string",
         "StateChangeReason": "string",
         "SubmissionDateTime": number
      }
   }
}

QueryExecution -> Status -> State が開始直後の RUNNING から SUCCEEDED に遷移するまでポーリングします。

サンプルプログラムでは retrying モジュールを使い exponential backoff を使いながらポーリングしています。

retrying モジュールの使い方は、次の記事を参照下さい。

PythonでExponential Backoffをしたかったのでretryingモジュールを調べてみた

クエリー結果を S3 からローカル環境にコピー

Athena のクエリー結果は S3 に保存されます。

この保存先パスは s3://{OutputLocation}//{QueryExecutionId}.csv となります。

S3 の API で S3 からローカルサーバーにコピーします。

プログラム全体

以上の処理をまとめたプログラムが以下です

#!/usr/bin/env python
# vim: set fileencoding=utf8 :
```
$ pip install -U boto3 retrying
$ export AWS_DEFAULT_PROFILE=test
$ cat foo.sql
select count(*)
from bar
$ python athena.py foo.sql
$ ls -1
athena.log # program log
athena.py # main program
foo.sql # query execution result
foo.sql.csv # sql output
$ cat foo.sql.csv # check query result
"_col0"
"1234"
'''
import logging
import pprint
import sys
import boto3
from retrying import retry
logging.basicConfig(filename='athena.log',level=logging.INFO)
athena = boto3.client('athena')
s3 = boto3.resource('s3')
S3BUCKET_NAME = 'XXX'
DATABASE_NAME = 'YYY'
S3BUCKET_NAME = 'aws-athena-query-results-018221336085-ap-northeast-1'
DATABASE_NAME = 'trid_381'
@retry(stop_max_attempt_number = 10,
wait_exponential_multiplier = 30 * 1000,
wait_exponential_max = 10 * 60 * 1000)
def poll_status(_id):
'''
poll query status
'''
result = athena.get_query_execution(
QueryExecutionId = _id
)
logging.info(pprint.pformat(result['QueryExecution']))
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
return result
elif state == 'FAILED':
return result
else:
raise Exception
def query_to_athena(filename):
sql = open(filename, 'r').read()
result = athena.start_query_execution(
QueryString = sql,
QueryExecutionContext = {
'Database': DATABASE_NAME
},
ResultConfiguration = {
'OutputLocation': 's3://' + S3BUCKET_NAME,
}
)
logging.info(pprint.pformat(result))
QueryExecutionId = result['QueryExecutionId']
result = poll_status(QueryExecutionId)
# save response
with open(filename + '.log', 'w') as f:
f.write(pprint.pformat(result, indent = 4))
# save query result from S3
if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
s3_key = QueryExecutionId + '.csv'
local_filename = filename + '.csv'
s3.Bucket(S3BUCKET_NAME).download_file(s3_key, local_filename)
def main():
for filename in sys.argv[1:]:
try:
query_to_athena(filename)
except Exception, err:
print err
if __name__ == '__main__':
main()
view raw athena.py hosted with ❤ by GitHub

改善ポイント

今回したプログラムはクエリーを一つずつシーケンシャルに処理しています。

  • SQL の並列上限(デフォルトは5)を超えない範囲で並列実行し、 複数の実行ステータスを BatchGetQueryExecution でまとめて確認
  • S3バケット、データベース名 を引数で渡せるようにする

などすれば、もう少し使いやすくなると思います。

CloudWatch Events がAthena のクエリーステータスと連動すると、ポーリングが不要になり、イベントドリブンな処理を書きやすくなります。

まとめ

今回は REST API を使い、以下の処理を行うプログラムを作成しました。

  1. Athena にクエリーを投げる
  2. クエリー実行ステータスをポーリングで確認
  3. クエリー実行結果を S3 から保存

Amazon Athena に一度にまとめて SQL を投げるエンジニアのヒントになれば幸いです。

参照

AWS CLI および AWS SDK for Python からの API の使い方は以下の過去の記事も参照下さい。