現場で運用する視点から見た Amazon Athena

  • 6
    いいね
  • 0
    コメント

個人的なプロジェクトも含め、いくつかの現場で Amazon Athena について触れる機会があったので、個人的に思うところについて記載します。
数時間で書いた雑な文章ですが、ご了承ください。

なお、先日リリースされた Amazon Athena API については以下の Qiita に first impression をまとめているので、 API 以外の話題について書きます。

Amazon Athena の API を使ってみた (2017/05)

データの設計について

Amazon Athena は managed な Presto 環境で、

  • 事前に Presto サーバーの立ち上げなどリソース確保を行わなくても良い
  • データについても、S3 にファイルをアップさえすれば、後付でいかようにでもデータの解析が出来る

という手軽さを売りにされています。
これは利点としていっさい間違いではないですが、事前のデータ設計なしにただデータを S3 にアップロードしている状況だと、いざ解析をしようとしてもうまくいかないケースが多いです。

そのため、S3 にファイルをアップロードする際は、Amazon Athena など解析ツールで使用されることを前提に、あらかじめ置き方の設計をしておくことが望ましいです。

S3 Bucket 内のディレクトリの設計

Amazon Athena は table に partition を設定しないと、基本的に発行するクエリーは該当の S3 Bucket のフルスキャンになってしまいます。

この事は以下の2点で大きなマイナスなため、Amazon Athena から読み込む S3 Bucket は、必ず後付にでも partition 設定が可能なディレクトリ構造になっている必要があります。

  • Amazon Athena <-> S3 間で異なる region にて解析処理を行う場合に S3 の転送量が発生するため、解析に必要不必要にかかわらずクエリー発行ごとに常に指定した Bucket 内の全ファイル分の転送量が発生する
  • クエリーのパフォーマンスが劣化する

ディレクトリの作り方

Partitioning Data

上記の公式ドキュメントでも書かれているとおり、 hive などでよく用いられている /dt=(yyyy-MM-dd)/ のような形でディレクトリを作成しておくと、MSCK REPAIR TABLE クエリーで自動的に partition が読み込まれるため、この方法が推奨されます。

$ aws s3 ls s3://athenatest/tables/test/

    PRE dt=2017-05-29/
    PRE dt=2017-05-30/
    PRE dt=2017-05-31/

S3 上に上記のようなディレクトリ構成でファイルが置かれていて、

CREATE EXTERNAL TABLE test (
  (snip.)
)
PARTITIONED BY (dt string)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION 's3://athenatest/tables/test/' ;

のような形でテーブルを作成した後、以下のように MSCK REPAIR TABLE クエリーを発行すると、自動的に Partition が読み込まれます。

MSCK REPAIR TABLE test;

Partitions not in metastore:    test:dt=2017-05-29  test:dt=2017-05-30  test:dt=2017-05-31
Repair: Added partition to metastore test:dt=2017-05-29
Repair: Added partition to metastore test:dt=2017-05-30
Repair: Added partition to metastore test:dt=2017-05-31

なお、Partition 情報は自動的に Amazon Athena に読み込まれないので、あらたなディレクトリが追加されたタイミングで何かしらの手段で MSCK REPAIR TABLE クエリーを発行する必要があります。

任意のディレクトリ文字列に対して partition を設定することもできますが、上記のように統一されたフォーマットでディレクトリを作り MSCK REPAIR TABLE を発行する運用にしておいた方が、データ規約的にも運用的にも楽なケースが多いように思えます。

ただし、partition の数が多すぎる環境下では MSCK REPAIR TABLE は時間がかかることがありますので、注意が必要です。

例:fluentd / td-agent から S3 に送る際の設定

S3 に送るユースケースとして fluentd / td-agent が用いられるケースも多いと思われるので、送信設定のサンプルを以下記載します。

td-agent.conf
@include conf.d/*.conf

# source
<source>
  @type forward
</source>

<match s3.**>
  @type relabel
  @label @s3
</match>
conf.d/s3.conf
<label @s3>
  <match s3.test>
    @type s3
    aws_key_id .....
    aws_sec_key .....
    s3_bucket athenatest
    s3_region ua-east-1
    path tables/test/dt=%Y-%m-%d/
    s3_object_key_format %{path}%{time_slice}_%{hostname}_%{index}.%{file_extension}
    store_as gzip
    buffer_type file
    buffer_path /var/log/td-agent/buffer/s3/${tag}/
    time_slice_format %H
    time_slice_wait 1m
    include_time_key true
    output_tag false
    output_time false
    utc
  </match>
</label>

この設定のポイントとしては以下になります。

  • path に、出力先の S3 Bucket のディレクトリを、上述のように partition しやすい形式で指定する
  • このケースでは json で出力したいため、fluentd / td-agent 固有の出力情報は以下のようにオフにする
    • output_tag false
    • output_time false
  • time 情報が削られると解析に不便なこともあるので、 include_time_key true で json 中に time の情報を含める
  • 転送量を考慮し、圧縮して保存する( store_as gzip )

データのフォーマット

結論として、Amazon Athena のクエリーパフォーマンスを最優先する場合は Parquet や ORC などを選択、そうでない場合は JSON などのテキスト形式、という形になると思います。

テキスト形式

Amazon Athena で取り扱うデータは、特別な事情がない場合、データ変更に対してロバストで人間も閲覧しやすい JSON のようなテキスト形式で保存するのが一番便利だと思います。
また、Amazon Athena でのデータ処理時は、基本的に S3 からのファイルのロードが処理時間としてもオーバーヘッドになり転送量としてもネックになるため、保存するデータは gzip などで圧縮をしておくことが推奨されます。

2017年5月時点では、以下の圧縮フォーマットがサポートされています。 http://docs.aws.amazon.com/athena/latest/ug/supported-formats.html

  • gzip
  • lzo
  • zlib
  • snappy

Parquest or ORC

JSON データや、CSV 、TSV ファイルなどテキストファイルを Amazon Athena で扱う場合、基本的に SerDe (JsonSerDe, LazySimpleSerDe 等) の Java 実装内で文字列を一行ずつ parsing 処理をしていくため処理のオーバーヘッドがそれなりにかかります。
データの規模や求める処理速度によっては、Parquet や ORC のような、集約関数の高速化の工夫がなされているデータフォーマットで保存するのが良さそうです。

ただ、fluentd などでストリームログとして S3 にファイルをアップロードするユースケースには Parquet や ORC はあまり合致しないため、

  1. S3 に JSON 形式でファイルをアップロード
  2. ある程度まとまったタイミングで EMR などで JSON ファイルを ORC or Parquet に変換

といった ETL フェーズを挟む必要が運用上出てきます。

個人的な感覚では、1 partition あたり数 GB 程度しか扱わないような小さいデータであれば、何も考えずに JSON + gzip 形式で問題ないと思っています。

データの分割単位、もしくは転送単位

Amazon Athena は、所定の S3 Bucket 内にファイルが置かれてさえいれば、データの事前ロードなどを行わなくても即座にクエリーを発行することが出来ます。
逆に、当たり前の話ですが S3 に置かれない以上そのデータの解析は行えないわけで、リアルタイム性の高いデータ解析の際にはアプリケーションサイドにあるデータをいかに迅速に S3 に伝達するかが鍵になります。

fluentd / td-agent のケース

fluentd / td-agent でアクセスログなどを S3 にアップロードする際には、 fluentd / td-agent のレイヤーで一定時間バッファリングし、複数行をまとめてフラッシュします。
フラッシュする時間の調整は、 time_slice_wait により行います。

conf.d/s3.conf
    @type s3
    time_slice_wait 1m

そのフラッシュのタイミングを短くすればするほど、S3 に反映するまでの時間を短くでき、Amazon Athena 上でもすぐにクエリー可能な状態になりますが、ファイルが細分化されます。
ファイルが細分化されすぎると、Amazon Athena のクエリー実行時にデータを読み込むオーバーヘッドが大きくなり、クエリーパフォーマンスが劣化しやすいというデメリットがあります。

Lambda Architecture

より早く解析可能な状態にし、かつパフォーマンスも担保したい、という場合、昔よりビッグデータ解析ではよく使われる Lambda Architectute 的なやり方でデータを扱うことになります。

  • 直近数時間~1日程度のデータは、1分に一度アップロードされる細分化されたファイルを解析対象にする
  • それ以上前のデータは、事前に ETL 処理でファイルを一定の単位(1時間単位等)にまとめ、そのデータを解析対象にする
  • アプリケーションからクエリーを発行する際は、その期間に応じてテーブルの読み先を切り替える

逆にいうと、Google BigQuery や Treasure Data などはそういう Lambda Architecture を意識しないでデータのロードと解析が行えるサービスになり、今はこのようなサービスが主流になっているようにも思えます。
そういう意味では、 Amazon Athena ではリアルタイム性を高めようとすると Lambda Architecture 的な工夫が未だに求められるサービスとも言えます。

データをいかに S3 にアップするか

Amazon Athena を使うには、まず S3 Bucket にデータをアップロードする必要があります。
多くの日本国内のユースケースでは fluentd / td-agent を用いたものになると思いますが、それ以外の例についてもいくつか記載します。

fluentd / td-agent のような agent 型 log collector を用いる

上述の例でも何度か述べてきているものなのでここでは割愛します。
fluentd / td-agent 以外にも、以下のようなツールも存在します。

AWS CloudWatch Logs を用いる

Amazon CloudWatch Logs とは?

AWS が提供している、主にアプリケーションログを収集するための仕組みです。
こちらで取り込んだデータに対して Filter を設定して任意の CloudWatch Metrics として使用したりアラートを通知するための仕組みとして用いられることが多いです。

そして CloudWatch Logs に保存されたデータは特定の S3 Bucket に Export することも可能なので、いったん CloudWatch Logs でデータを受け取り、S3 に定期的にデータをうつす、という運用も可能です。

Amazon S3 へのログデータのエクスポート

ECS から awslogs logging driver 経由で AWS CloudWatch Logs に送る

AWS ECS や、その他 docker container 環境では、awslogs という docker logging driver が標準で提供されているため、docker container の出力するログを手軽に AWS 環境に送ることができます。

Amazon CloudWatch Logs logging driver

ecs_sample.yml
    log_configuration:
      log_driver: awslogs
      options:
        awslogs-group: nginx
        awslogs-region: ap-northeast-1
        awslogs-stream-prefix: nginx

docker によるアプリケーション管理が普及する中、追加のアプリケーションの用意や整備が必要なく、標準の logging driver 経由でログが手軽に送れる手段として、AWS CloudWatch Logs はかなり有力に考えてよい選択肢になってきているように思えます。

fluentd から AWS CloudWatch Logs に送る

docker logging driver 経由や、AWS の SDK など経由で送る方法もありますが、fluentd の plugin と組み合わせて CloudWatch Logs に書き込むことも可能です。
現在は以下のような plugin が公開されています。

https://github.com/ryotarai/fluent-plugin-cloudwatch-logs

Kinesis Firehose を用いる

Amazon Kinesis Firehose

ストリーミングデータを手軽に S3 や Redshift, Elastic Search などにロードすることができる仕組みです。

こちらは Tokyo Region にまだ来ていないのでここでは概説のみですが、ストリームの中で、Lambda Script などを用いてデータの加工なども行うことができるなど利便性が高く、将来的に S3 にデータをアップロードする有力な手段の一つになると思います。

image.png

こちらも、Kinesis の SDK など経由で直接 stream にデータを書き込むこともできますし、以下のような fluentd の plugin と組み合わせるという方法も可能です。

https://github.com/awslabs/aws-fluent-plugin-kinesis

クエリー結果をどのように扱うか

一度発行したクエリーの結果をどこに保存するか

Amazon Athena は標準ではクエリーの実行結果のキャッシュは対応していません。
実行結果は S3 上に保存されるため、実行履歴から再ダウンロードすることは可能ですが、 BI ツールなどと連携した場合、BI ツール上のキャッシュが存在しない場合は都度クエリーが実行されることになります。

元データのサイズが大きくなればなるほど、何度も同じクエリーを発行することによる処理待ち時間やかかる料金のインパクトが大きくなるので、実行した結果を他のデータストレージに保存する必要があります。

別の言い方をすると、S3 上に置かれた大きなデータに対して SQL like な構文で集計・加工処理を行う ETL ツールとして、 Amazon Athena の活用用途があるようにも思えます。

Redshift に保存

Amazon AthenaでETLした結果をRedshiftにロードしてみる

こちらの例のように、S3 上に CSV 形式で保存された Amazon Athena の実行結果を Redshift に copy し、以後集計結果を参照したい際は Redshfit にクエリーを実行する、という事が可能です。

後述する Redshift Spectrum を用いることで、 Redshift から Amazon Athena のテーブルを呼び出し ETL 的なことを実現することも可能です。

MySQL に保存

上記の Redshift と同じような形で、実行結果の CSV ファイルを LOAD DATA LOCAL INFILE にて MySQL や Aurora に import することが可能です。
ただし、いったん S3 環境から CSV ファイルを EC2 環境などにコピーして実行する必要があります。

AWS Glue?

上述のような例を見ても分かる通り、Amazon Athena から他システムへの連携はまだまだ未整備で、Amazon Athena の実行結果を他環境に透過的に伝搬させることは現状では難しく何かしらの仕組みの開発が必要になります。
そもぞも、現時点では AWS のサービスでは ETL に関する仕組みが不足しているのが現状です。

現在一般には公開されていないで詳細が不明ですが、AWS Glue と呼ばれる ETL サービスが近々公開される模様なので、こちらに期待したいと思います。

AWS Glue

BI ツールとの連携

現在 Amazon Athena をサポートしている BI ツールとして代表的なものは以下です。

API 対応が2017年5月まで行われなかったこともあってか、現時点では Amazon Athena に対応する BI ツールは極めて少ないのが現状のようです。

Redash は、早くから Amazon Athena のサポートを行っており、手軽にクエリーの発行とグラフの作成が行えるため、現時点で選択可能な一番スタンダードなツールなように思えます。
また、Redash は API も提供しており、なかなか痒いところに手が届いていて機能が豊富なので、Amazon Athena を扱う際には Re:dash API 経由で操作する方が実は便利なのではないかと、個人的には思っています。

Redash API

Amazon Athena のチューニング

Amazon Athena のパフォーマンスチューニング Tips トップ 10
こちらのブログ記事に詳細にまとめらているので、ここでは引用にとどめます。
Hive 環境などを運用したことがある人には、これらのチューニング項目は日常的によく見聞きしてきたものと思います。

Amazon Athena と AWS 各種製品との関係

雑にまとめてみます。
どちらかというと、それぞれが競合する関係というより、いろいろな製品との組み合わせでシナジーが発揮できるような形でサービスの開発、機能の追加が行われているように思えます。
よって、単純な Pros / Cons での比較が妥当では無いと思い、そのような比較表は記述していません。

Amazon Athena と EMR

Amazon Athena は managed Presto 経由で S3 に対して SELECT 文だけが発行できる、ある意味 EMR が提供することができる機能のごくごく一部だけを切り出された機能です。

上述のように S3 に置かれているデータを Parquet or ORC などに変換するなどの ETL 処理についてはいまだに EMR の力を借りないといけないという点からしても、Amazon Athena と EMR は競合関係にはなりえません。

Amazon Athena と Redshift

SQL が発行できるクエリーエンジン、という文脈では、Amazon Athena と並び称すべきサービスは Redshfit に思えます。
ただし、こちらも Redshift Spectrum の登場により、並列にそれぞれ比較するサービスというより相互を組み合わせて最適なシステムを構築していくもの、というイメージになりつつあります。

機能面

Redshift はリリース後数年経つサービスということもあり、サポートする BI ツールも多いですし、S3 にデータを置きさえすれば copy コマンドでバルクロードできるという意味で非常に利便性高い DWH 製品であると思います。
fluentd の plugin も存在するため、アクセスログを準リアルタイム的に定期的に Redshift 環境にロードすることも容易に行えます。
https://github.com/flydata/fluent-plugin-redshift

Amazon Athena と異なることは、事前にリソースを確保する必要があること、テーブルのスキーマを事前にきっちり決めないといけないこと(一度決めた後変更することが難しい)こと、かなと思います。

性能面

性能面については、以下のブログに3つのケースでパフォーマンス比較がされていました
(Simple Select, Aggregation, Join)
An Amazonian Battle: Comparing Athena and Redshift

こちらの例では、Simple Select と Aggregation では Amazon Athena、Join では Redshift が優位な結果になっています。データ量が少ないことなどからどこまで汎用性のある結果かは微妙かと思いますが、クエリーやデータの質により性能差はさまざまということなのかなと思います。

予断を以て類推すると、Redshift は列志向データベースとして最適化されているので、SORTKEY / DISKEY や各種圧縮化オプションが適切に設定されているのであればデータ量が増えれば増えるほど Redshift の方が性能的に優位に立つのではないかと思います。

Redshift Spectrum

先日、 Redshift Spectrum という機能がリリースされました。
Amazon Athena の文脈で話すと、Redshift から Athena のテーブルを呼び出すことが可能になりました。

Amazon Redshift Spectrum – S3のデータを直接クエリし、エクサバイトまでスケール可能

Amazon Athena をそのまま使う場合にくらべ、Redshift のサーバーリソースを使用して Amazon Athena の機能を使うことになるため、潤沢な Redshift のサーバークラスターリソースがある場合は高速に Athena の機能を使える可能性があるようです。

個人的には、この機能は、Redshift へのデータの取り込みを Amazon Athena 経由で行う、 ETL 用途での使用が有用なのではないかと考えています。
(S3 に保存された JSON ファイルのデータのうち、条件に合致するデータのみ Redshift に取り込む。集約関数を使って集計済みの結果を Redshift に取り込む。等)

個人的なまとめ

Redshift の文脈から見た時、Amazon Athena は Redshit 内のデータと S3 のデータを結びつける便利なツール、という存在になります。特に Redshift Spectrum の機能を用いて S3 上のファイルの ETL 用途に Amazon Athena は有効に使えると思います。

Amazon Athena の文脈から見た時、Redshift は事前のリソースの確保やスキーマの決定など Athana にくらべると事前準備が必要な事項が多いですが、大量なデータの解析に最適化された DWH でありデータサイズやワークロードにより使い分けを考える対象になると思います。
また Redshift は Amazon Athena を高速に実行可能なクエリーエンジンとしての可能性も秘めています。

まとめにかえて

個人的に経験した事例

私が以前短期間だけお世話になっていた組織では対外的には Amazon Athena を使い尽くしているという触れ込みでしたが、実際に中を覗いてみると

  • partition 管理もされてないサンプルテーブルが一つ存在するのみ
  • 社員の誰も Amazon Athena を使用していない
  • "Managed Hive" だと発言する社員がいるなど、Amazon Athena にたいする理解が広まってない

という状況だったため、数日くらい時間をかけて以下のような環境を構築しました。

  • fluentd / td-agent を用いたログの収集基盤を、設定ファイルの規約やログ規約を整備しつつ構築
    • アプリケーションログの収集をまずは行う
  • Amazon Athena の partition 管理を自動化
  • Amazon Athena と Re:dash との連携
  • 各種メトリクスの Re:dash ダッシュボードを用いた可視化

これらの構築は私一人で行いましたが、結果として開発者がシステムのエラー状況を確認できたり、アドホックにデバッグ作業を行ったりできる環境を整備しました。

しかし、その環境が、十分に活用できていたかというと、それまで私が属していた組織に比べるとかなり心もとないものがあり、私以外の人間の手でこれらの環境がブラッシュアップされることも無かったように思えます。

その理由は、

  • ログを用いてさまざまな事象を可視化すること
  • それを元にさまざまな意思決定、判断をすること

に対する意識、文化が備わっていなかった事によると思っています。
意識が薄いが故の無理解から、過剰に Amazon Athena を礼賛する意見も散見されたようにも思えます。

単純な宣伝文句に踊らされずに

Amazon Athena は便利なサービスですが、このツールだけでデータ解析に関するすべての課題を解決できるほどの存在ではありませんし、どちらかというと他の AWS 製品と組み合わせて最適な環境を構築するためのひとつのパーツであると思います。
それらが有効に機能するためには、やはり「何を実現したいのか」という会社や組織の達成すべき課題や目標をはっきりとさせる、という基本的な部分が大事だと思います。

「S3 にファイルをアップさえすれば Amazon Athena で解析できてハッピー」 という単純なストーリー、宣伝文句に踊らされずに、 Amazon Athena がどのような機能を持っていて、自分たちの目的にどれだけ Amazon Athena が寄り添っているか、それを運用の観点も含めて冷静に判断する必要があります。

そういった判断に際し、当記事が多少なりとも役に立つことがあれば、幸いです。