2015-10-28
Azkabanについて書く
ちまたではAirflow(https://github.com/airbnb/airflow)が話題のようですが、Azkaban(http://azkaban.github.io/)を使っている身としてはやはりAzkabanについて書かねばならないと思ったので書きます。別にAzkabanを使ってほしいという意味ではないです。むしろAirflowの運用エントリとか読みたいです。
AzkabanはJavaで実装されたジョブ管理ツールです。開発が若干停滞気味ではありますが、細々と進んでいます。
特徴としては、下記の通りです。
- インストールが簡単(バイナリは古いものしか無いのでソースビルドが必要だがgradlewなので簡単)
- ジョブの依存関係をグラフィカルに見る事ができる。
- APIがある
- ジョブが失敗した時でもボタン一つで失敗したジョブだけ再実行できる
- TTLがあるけどジョブの実行ログをブラウザから見えることができる
機能的に足りないと思われる部分は、下記の通りです。
- cron書式でスケジューリングできない
- ファイルが出来たら実行するというようなトリガーができない
AzkabanはWebの部分とジョブ実行のExecutor部分を別マシンにしてデータはMySQLを見るというのが標準構成のようです。
もっとも僕はsolo server modeというものをつかっており、これだとMySQLではなくH2を使う事やWebとExecutorが同じプロセスで動くのでインストールやアップグレードが簡単です。
どちらのモードを使うにしてもAzkabanはSPOFですが、Azkabanを1年以上運用していて落ちた事は一時もありません。
さてそんなAzkabanですが、うちの環境でどう使っているかを書きます。
前提としてHadoopを使ったログ分析環境のバッチで使っていて社外に公開するサービスではありません。
薄い内製Pythonフレームワークを使ってPythonでバッチを書いています。雰囲気はこんな感じ。
bar.py
class Bar(Job): def validate_before(self): hive.exists("access_log", "yyyymmdd='%s'" % (...)) def process(self): insert_query = """ INSERT OVERWRITE TABLE aggregate PARTITION(yyyymmdd='%s') SELECT ... FROM access_log WHERE ... GROUP BY ... """ % (...) hiveCli.query(insert_query) def validate_after(self): hive.exists("aggregate", "yyyymmdd='%s'" % (...))
validate_before -> process -> validate_afterと処理が進みます。
validate_beforeで入力データのチェック、processでメインの加工処理、validate_afterで集計結果のチェックを行います。
後続ジョブがある場合も似たような感じで作った後にこんな感じのジョブ定義ファイルを用意します。これはAzkaban標準のものではないです。
foo: command: echo "start" type: command bar: command: /path/to/exec.sh -t bar -d ${DATE} ${SKIP} dependencies: foo retries: 1 retry.backoff: 300000 type: command hoge: command: /path/to/exec.sh -t hoge -d ${DATE} ${SKIP} dependencies: foo retries: 1 retry.backoff: 300000 type: command piyo: command: echo "finish" dependencies: bar, hoge type: command
これを
https://github.com/wyukawa/ayd
に食わせるとAzkabanのジョブ定義ファイルが自動生成されるのでそれを後述のpropertiesファイルとセットにしてzip化してアップロードすると、こんな感じに表示されます。
aydを作った理由はAzkabanのジョブ定義ファイルはジョブ毎にファイルを作らないといけなくて管理が辛くなったから。
上の例だとfoo, bar, hoge, piyoの4ファイル必要です。
${DATE}と${SKIP}は別途propertiesファイルを下記のように用意して/path/to/exec.shで受け取って処理します。exec.shでyesterdayを受け取って具体的な日付に変換します。
SKIP= DATE=yesterday
これはブラウザから指定して実行することもできます。
DATEは処理対象日付(たいていは前日)、SKIPはdry run用のオプションとしてうちの環境では作っています。
AzkabanではAPIを提供しているので、そのAPIをたたくコマンドラインツールeboshi(https://github.com/wyukawa/eboshi)も自作しました。
多分本来ならAzkabanCLI(http://azkabancli.readthedocs.org/en/latest/)を使うんでしょうが、ちょっと試してすぐ使えない感じだったので自作しました。
AzkabanのCLIツールeboshiを書きました - wyukawa’s blogも参照
eboshiを使ってジョブのアップロードやスケジュール設定やジョブ実行を行っています。
そのためスケジュール設定はgit管理できています。
下記のようなシェルを用意してそれをJenkinsから実行しています。add_schedule関数は最終的にeboshi addScheduleを実行します。
add_schedule azkaban_project azkaban_flow "10/28/2015" '10,00,AM,JST' 1d ...
過去分を再集計したい場合とかもあるので、その場合はそれ専用のAzkabanジョブを用意しています。
デフォルト設定だと同一プロジェクトの並列実行は出来ないのでRun Concurrentlyを指定します。この辺もAPIがあるのでコマンドからたたけます。
- 69 https://www.google.co.jp/
- 18 https://www.google.co.jp
- 16 http://b.hatena.ne.jp/
- 8 http://www.google.co.jp/url?sa=t&rct=j&q=&esrc=s&source=web&cd=16&ved=0CDoQFjAFOApqFQoTCMOv14Wr5MgCFcWUlAodVhkIig&url=http://d.hatena.ne.jp/wyukawa/20100116/1263649373&usg=AFQjCNHXD3MbHHwYuhWGc0MT9Xy3nzW2Lw
- 6 https://t.co/IX1BzC51Bg
- 4 http://www.google.co.jp/url?sa=t&rct=j&q=&esrc=s&source=web&cd=2&ved=0CCQQFjABahUKEwj0yoPlt-TIAhXIIaYKHX2DB1I&url=http://d.hatena.ne.jp/wyukawa/20090628/1246183795&usg=AFQjCNEvaYwxziqTNV8iK543_wQg84bDPg&bvm=bv.106130839,d.dGY
- 4 http://www.google.co.jp/url?sa=t&rct=j&q=&esrc=s&source=web&cd=3&sqi=2&ved=0CCwQFjACahUKEwjC-MS9wuTIAhXDKpQKHfqoCbM&url=http://d.hatena.ne.jp/wyukawa/20090628/1246183795&usg=AFQjCNEvaYwxziqTNV8iK543_wQg84bDPg&bvm=bv.105841590,bs.1,d.dGo
- 2 http://www.facebook.com/Gertruda
- 2 http://www.google.co.jp/url?url=http://d.hatena.ne.jp/wyukawa/20090628/1246183795&rct=j&frm=1&q=&esrc=s&sa=U&ved=0CBQQFjAAahUKEwjnz7qiruTIAhUmUKYKHV_CAuM&usg=AFQjCNHfRQOLjosjoROx9DqueFZ9ZldqzQ
- 2 https://www.google.com/