はじめに
Apache Oozie は、Hadoop プラットフォームのためのワークフロー・スケジューリング・エンジンです。このフレームワーク (図 1 を参照) では、互いに依存する反復ジョブを容易に調整できるように、事前にスケジューリングされた時刻またはデータの可用性のいずれかによってトリガーできる Oozie コーディネーターを使用します。Oozie バンドル・システムを使用することで、一連のコーディネーター・アプリケーションを送信したり、保守したりすることができます。この演習の一環として、Oozie で Apache Sqoop ジョブを実行して、MySQL データベース内のデータに対してインポート・アクションを行い、そのデータを HDFS (Hadoop Distributed File System) に転送します。続いて、インポートされたデータ・セットで古いデータ・セットを更新するために、Sqoop マージ・アクションを実行します。MySQL データベースから Sqoop ジョブを実行するために使用するメタデータを取得するには、UNIX シェル・アクションを実行します。同様に、Sqoop ジョブに必要な MySQL データベース内のメタデータを更新するために、Java アクションを実行します。
図 1. Oozie オーケストレーション・アーキテクチャー
インストールされていなければならないもの
この記事のサンプル・アプリケーションを最大限活用するために、以下のソフトウェアを利用できるようにしておくと役に立ちます。
- Hadoop 2.2.0
- MySQL データベース
- Oozie-4.0.0
- Hive-0.11.0
- e-メール・サービス
- この記事のダウンロード可能なソース・コード (簡単に参照できるようにするため)
サンプル・アプリケーションで使用するクラスターは、1 つのマスター・ネーム・ノード、2 つのコア・ノード、8 つのタスク・ノードで実行される分散クラスターです。
Oozie のワークフロー
Oozie のワークフローとは、制御依存関係の無閉路有向グラフ (DAG) 内に並べられた一連の Oozie アクションからなります。制御依存関係によって、先行するアクションが正常に完了するまでは、次のアクションが開始されないことが確実になります。この記事では、最初にワークフロー・コントロール・ノードの概要を簡単に説明し、その後、主に以下のワークフロー・アクション・ノードに目を向けます。
ワークフロー・コントロール・ノード
start (開始) コントロール・ノード (リスト 1 を参照) は、ワークフロー・ジョブのエントリー・ポイントです。ワークフローが開始されると、そのワークフローは start に指定されたノードに自動的に遷移します。
リスト 1. start (開始) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf"> <start to="timeCheck"/> </workflow-app>
end (終了) コントロール・ノード (リスト 2 を参照) は、ワークフロー・ジョブが終了するノードであり、ワークフローのアクションが正常に完了したことを意味します。ワークフロー定義には、end ノードが必要です。
リスト 2. end (終了) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf"> <end name="end"/> </workflow-app>
kill (キル) コントロール・ノード (リスト 3 を参照) では、ワークフロー・ジョブが自身を停止することができます。kill ノードに到達した時点で、ワークフロー・ジョブによって開始された 1 つ以上のアクションが実行されている場合、現在実行中のすべてのアクションが停止されます。ワークフロー定義には、kill ノードが設定されない場合もあれば、1 つ以上の kill ノードが設定される場合もあります。
リスト 3. kill (キル) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf"> <kill name="fail"> <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> </workflow-app>
decision (決定) コントロール・ノード (リスト 4 を参照) により、ワークフローはその後に従う実行パスを決定することができます。decision ノードは switch-case (スイッチ・ケース) ブロックと同じように機能し、このノードには述部と遷移のペア一式とデフォルト遷移があります。述部は順に評価され、いずれかの述部が true に評価されると、その述部に対応する遷移が行われます。true に評価される述部がない場合は、デフォルト遷移が行われます。
リスト 4. decision (決定) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf"> <decision name="master-decision"> <switch> <case to="sqoopMerge1"> ${wf:actionData('hiveSwitch')['paramNum'] eq 1} </case> <default to="sqoopMerge2"/> </switch> </decision> </workflow-app>
fork (分岐) ノードは、実行パスを複数の並行パスに分岐させます。join (結合) ノードは、先行する fork ノードのすべての並行実行パスが join ノードに到達するまで待機します。リスト 5 に示すように、fork ノードと join ノードはペアで使用する必要があります。
リスト 5. fork-join (分岐-結合) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf"> <fork name="forking"> <path start="sqoopMerge1"/> <path start="sqoopMerge2"/> </fork> <join name="joining" to="hiveSwitch"/> </workflow-app>
Oozie シェル・アクション
Oozie シェル・アクションをワークフローの一部として構成して、ファイル内の一連のシェル・スクリプトを実行することができます。Oozie シェル・アクションは、タスクを実行するために必要な引数を設定した job-tracker
要素、name-node
要素、および exec
要素から構成することができます (リスト 6 を参照)。シェル・アクションを構成することで、HDFS 上のファイルやディレクトリーを作成または削除してからシェル・ジョブを開始することができます。シェル・ジョブに構成パラメーターを格納した XML ファイルを渡すには、構成要素と一緒に job-xml
要素をインラインで使用します。他にもファイルやアーカイブを追加で構成して、シェル・ジョブで使用できるようにすることも可能です。シェル・ジョブが終了した後に、そのシェル・ジョブの出力を workflow
ジョブで使用することは可能ですが、それには以下の基準を満たさなければなりません。
- 出力の形式を有効な Java プロパティー・ファイルにすること。
- 出力のサイズを 2KB 未満にすること。
リスト 6. シェル・スクリプト
host="XXX.XX.XX.XXX" port="3306" username="root" password="" database="zzz" tableName="$1" #################################### echo "Host: $host" echo "Database: $database" echo "Table: $tableName" #################################### sqoopLstUpd=`mysql --host=$host --port=$port --user=$username --password=$password -N -e 'SELECT PARM_DATE_VAL from T_CONTROL_PARM where PARM_NM="SQOOP_INCR_LST_UPD" and PARM_GROUP_NM="'$tableName'"' $database` echo "sqoopLstUpd=$sqoopLstUpd" echo "tableName=$tableName"
リスト 7 に、workflow.xml ファイル内のシェル・アクションの構成を示します。
リスト 7. Oozie シェル・アクション
<action name="timeCheck"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <exec>${sqoopUpdTrack}</exec> <argument>${tableName}</argument> <file>${sqoopUpdTrackPath}#${sqoopUpdTrack}</file> <capture-output/> </shell> <ok to="sqoopIncrImport"/> <error to="fail"/> </action>
シェルの出力にアクセスするには、リスト 8 に示す Sqoop インクリメンタル・ジョブを使用することができます。
リスト 8. インクリメンタル・インポートを実行するための Oozie Sqoop アクション
<action name="sqoopIncrImport"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${s3BucketLoc}/${tableName}/incr"/> <mkdir path="${s3BucketLoc}/${tableName}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <arg>import</arg> <arg>--connect</arg> <arg>${dbURL}</arg> <arg>--driver</arg> <arg>${mySqlDriver}</arg> <arg>--username</arg> <arg>${user}</arg> <arg>--table</arg> <arg>${wf:actionData('timeCheck')['tableName']}</arg> <arg>--target-dir</arg> <arg>${s3BucketLoc}/${tableName}/incr</arg> <arg>--check-column</arg> <arg>LAST_UPD</arg> <arg>--incremental</arg> <arg>lastmodified</arg> <arg>--last-value</arg> <arg>${wf:actionData('timeCheck')['sqoopLstUpd']}</arg> <arg>--m</arg> <arg>1</arg> </sqoop> <ok to="sqoopMetaUpdate"/> <error to="fail"/> </action>
Oozie Java アクション
Java アクションは、指定されたメインの Java クラスの public static void main(String [] args
) メソッドを実行します。Java アプリケーションは Hadoop クラスター上で、単一のマッパー・タスクを有する MapReduce ジョブとして実行されます。ワークフロー・ジョブは Java アクションの実行が完了するまで待機してから、次のアクションを続行します。Java アクションを構成するのは、job-tracker
、name-node
、Java メイン・クラス、JVM オプション、および入力引数です (リスト 9 を参照)。パラメーターをインライン・プロパティー値に割り当てるには、EL (Expression Language) 式を使用することができます。出力パラメーターはすべて、Java プロパティー・ファイルの形式で作成する必要があります。
Java アプリケーションを開始する前に、HDFS のファイルとディレクトリーをクリーンアップするため、または Apache HCatalog パーティションを設定するための Java アクションを構成することができます。このような Java アクションを構成すれば、一時的であってもなくても、障害が発生した場合には、Oozie はその Java アクションを再試行することができます。
リスト 9. Oozie Java アクション
<action name="sqoopMetaUpdate"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <main-class>SqoopMetaUtil</main-class> <java-opts></java-opts> <arg>${tableName}</arg> <archive>${mySqlDriverPath}</archive> </java> <ok to="hiveSwitch"/> <error to="fail"/> </action>
Java アクションを構成するには、値を次のアクションに伝播する capture-output
を使用することができます。値にアクセスするには、Hadoop EL 関数を使用することができます。これらの値は、Java クラス内に Java プロパティー・ファイルの形式で作成することができます (リスト 10 を参照)。
リスト 10. 値を伝搬するための Java コード・スニペット
String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties"; String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES); OutputStream os = null; if(oozieProp != null){ File propFile = new File(oozieProp); Properties p = new Properties(); p.setProperty("name", "Autodesk"); p.setProperty("address", "Sun Rafael"); try { os = new FileOutputStream(propFile); p.store(os, ""); } catch (FileNotFoundException e) { System.err.println("<<< FileNotFoundException >>>"+e.getMessage()); } catch (IOException e) { System.err.println("<<< IOException >>>"+e.getMessage()); } finally{ if(os != null) try { os.close(); } catch (IOException e) { System.err.println("<<< IOException >>>"+e.getMessage()); } } } else{ throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined"); }
workflow.xml ファイル内にアクションを構成することで、プロパティー・ファイルに設定された対応する値にアクセスすることができます (リスト 11 を参照)。
リスト 11. 値を伝搬するための Oozie Java アクション
<action name="jProperties"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <main-class>PropertyExplorer</main-class> <java-opts></java-opts> <capture-output/> </java> <ok to="email"/> <error to="fail"/> </action> <action name="email"> <email xmlns="uri:oozie:email-action:0.1"> <to>surajit.paul@autodesk.com</to> <subject>Oozie workflow finished successfully!</subject> <body>${wf:actionData('jProperties')['name']} | ${wf:actionData('jProperties')['address']}</body> </email> <ok to="end"/> <error to="fail"/> </action>
Oozie Sqoop アクション
Oozie ワークフローは、Hadoop クラスター上で Sqoop ジョブを起動する Sqoop スクリプトをトリガーします。Sqoop ジョブはタスクを完了するために、Hadoop クラスター上で MapReduce ジョブを開始します。Sqoop スクリプトによって開始された MapReduce ジョブは、データを RDBMS から HDFSに転送します。リスト 12 に記載するような Sqoop アクションを構成すると、Sqoop ジョブを開始する前に HDFS 上のファイルとディレクトリーを削除することができます。他の Oozie アクションと同様に、Sqoop アクションを構成する場合も、job-xml
要素を使用して追加プロパティーを設定することができます。job-xml
要素に指定されたプロパティーは、configutation 要素に指定されたプロパティー値によって上書きされます。
Sqoop ジョブには、その他のファイルとアーカイブも追加で提供することができます。
リスト 12. マージをするための Oozie Sqoop アクション
<action name="sqoopMerge1"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${s3BucketLoc}/${tableName}/master1"/> <mkdir path="${s3BucketLoc}/${tableName}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <arg>merge</arg> <arg>--new-data</arg> <arg>${s3incr}</arg> <arg>--onto</arg> <arg>${s3BucketLoc}/${tableName}/master2</arg> <arg>--target-dir</arg> <arg>${s3BucketLoc}/${tableName}/master1</arg> <arg>--jar-file</arg> <arg>${tableJarLoc}/${tableName}.jar</arg> <arg>--class-name</arg> <arg>${tableName}</arg> <arg>--merge-key</arg> <arg>ROW_ID</arg> </sqoop> <ok to="hive-master1"/> <error to="fail"/> </action>
Oozie Hive アクション
HDFS 上のファイルおよびディレクトリーに対して Hive スクリプトを実行するための Hive アクションを構成することができます (リスト 13 を参照)。Hive アクションは、タスクを完了するために MapReduce ジョブを開始します。Oozie で Hive アクションを構成するには、Hive 構成ファイル hive-default.xml または hive-site.xml を job-xml 要素として使用する必要があります。そうしなければ、Oozie が Hive 環境にアクセスすることはできません。Hive アクションは、Hive ジョブを開始する前に HDFS のファイルとディレクトリーを作成または削除するように構成することができます。job-xml ファイルに指定されたプロパティー値は、configutation 要素に指定されたプロパティー値によって上書きされます。他のファイルやアーカイブを追加して、Hive ジョブで使用することもできます。Oozie は、スクリプト要素内のパスで指定された Hive スクリプトを実行します。Hive スクリプトには、Oozie ワークフローを介した入力パラメーターとしてのパラメーターを割り当てることができます。
リスト 13. Oozie Hive アクション
<action name="hiveSwitch"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <exec>${hiveSwitchScript}</exec> <argument>${tableName}</argument> <file>${hiveSwitchScriptPath}#${hiveSwitchScript}</file> <capture-output/> </shell> <ok to="master-decision"/> <error to="fail"/> </action>
Oozie e-メール・アクション
Oozie e-メール・アクション (リスト 14 を参照) によって、ワークフロー・アプリケーションから e-メールを送信することができます。e-メール・アクションには、宛先アドレスと CC (オプション) アドレスに加え、メッセージの件名と本文が必要です。e-メール・アドレスをコンマで区切ることによって、複数の受信者に e-メールを送信することができます。e-メール・アクションは同期的に実行され、ワークフロー・ジョブは e-メールが送信されるまで待ってから、次のアクションをトリガーします。パラメーターを e-メール・アクションに割り当てるには、Hadoop EL 式を使用することができます。
リスト 14. Oozie e-メール・アクション
<action name="email"> <email xmlns="uri:oozie:email-action:0.1"> <to>surajit.paul@autodesk.com</to> <subject>Oozie workflow finished successfully!</subject> <body>${wf:actionData('jProperties')['name']} | ${wf:actionData('jProperties')['address']}</body> </email> <ok to="end"/> <error to="fail"/> </action>
まとめ
複数の互いに依存するジョブをデータ・フローで関連付ける場合、Oozie ワークフローはデータ・パイプライン・アプリケーションという形になります。Apache Oozie ワークフローにより、データの論理フロー、エラー処理、フェイルオーバー・メカニズムなどの設計が容易になります。ワークフローを効率的に管理するために、Oozie コーディネーターまたはバンドル・アプリケーションを構成することができますが、これらのトピックについての説明は、この記事ではしません。同等の Hadoop ワークフロー・エンジンには、Amazon Data Pipeline、Simple Workflow Engine, Azkaban、Cascading、Hamake などもあります。Hamake と Oozie は XML ベースで構成されている一方、Azkaban の構成には、キーと値のペアが含まれるテキスト・ファイルが使用されます。また、Cascading は Java API を使用して構成されます。
ダウンロード
内容 | ファイル名 | サイズ |
---|---|---|
Source code for this tutorial | oozie-artifacts.zip | 8KB |
参考文献
学ぶために
- Hadoop 用の Oozie ワークフロー・エンジンについて詳しく学んでください。
- Oozie workflow scheduler for Hadoop について調べてください。
- Hadoop プロジェクトに関してより詳しく調べてください。
- Apache Pig のリソースのために Pig プロジェクトの Web サイトにアクセスしてください。
- Hive プロジェクトに関する情報を集めてください。
- Sqoop プロジェクトの Web サイトにアクセスしてください。
- Azkaban プロジェクトについて調べてください。
- Cascading プロジェクトについて詳しく学んでください。
- 記事「An introduction to InfoSphere Streams: A platform for analyzing big data in motion」(developerWorks, May 2013) で InfoSphere Streams について理解してください。
製品や技術を入手するために
- InfoSphere Streams の非製品版として無料でダウンロードできる InfoSphere Streams Quick Start Edition をダウンロードしてください。InfoSphere Streams は、数千ものリアルタイム・ソースから情報が届くと、ユーザーが開発したアプリケーションによって情報を迅速に取り込んで、分析し、相関させることができる、ハイパフォーマンス・コンピューティング・プラットフォームです。
議論するために
- developerWorks コミュニティーに参加してください。ここでは他の developerWorks ユーザーとのつながりを持てる他、開発者によるブログ、フォーラム、グループ、Wiki を調べることができます。
コメント
IBM PureSystems
IBM がどのように IT に革命をもたらしているのかをご自身でお確かめください
Knowledge path
developerWorks の Knowledge path シリーズでは、テーマ別の学習資料をご提供しています
ソフトウェア評価版: ダウンロード
developerWorksでIBM製品をお試しください!