はじめに
Apache Oozie は、Hadoop プラットフォームのためのワークフロー・スケジューリング・エンジンです。このフレームワーク (図 1 を参照) では、互いに依存する反復ジョブを容易に調整できるように、事前にスケジューリングされた時刻またはデータの可用性のいずれかによってトリガーできる Oozie コーディネーターを使用します。Oozie バンドル・システムを使用することで、一連のコーディネーター・アプリケーションを送信したり、保守したりすることができます。この演習の一環として、Oozie で Apache Sqoop ジョブを実行して、MySQL データベース内のデータに対してインポート・アクションを行い、そのデータを HDFS (Hadoop Distributed File System) に転送します。続いて、インポートされたデータ・セットで古いデータ・セットを更新するために、Sqoop マージ・アクションを実行します。MySQL データベースから Sqoop ジョブを実行するために使用するメタデータを取得するには、UNIX シェル・アクションを実行します。同様に、Sqoop ジョブに必要な MySQL データベース内のメタデータを更新するために、Java アクションを実行します。
図 1. Oozie オーケストレーション・アーキテクチャー
インストールされていなければならないもの
この記事のサンプル・アプリケーションを最大限活用するために、以下のソフトウェアを利用できるようにしておくと役に立ちます。
- Hadoop 2.2.0
- MySQL データベース
- Oozie-4.0.0
- Hive-0.11.0
- e-メール・サービス
- この記事のダウンロード可能なソース・コード (簡単に参照できるようにするため)
サンプル・アプリケーションで使用するクラスターは、1 つのマスター・ネーム・ノード、2 つのコア・ノード、8 つのタスク・ノードで実行される分散クラスターです。
Oozie のワークフロー
Oozie のワークフローとは、制御依存関係の無閉路有向グラフ (DAG) 内に並べられた一連の Oozie アクションからなります。制御依存関係によって、先行するアクションが正常に完了するまでは、次のアクションが開始されないことが確実になります。この記事では、最初にワークフロー・コントロール・ノードの概要を簡単に説明し、その後、主に以下のワークフロー・アクション・ノードに目を向けます。
ワークフロー・コントロール・ノード
start (開始) コントロール・ノード (リスト 1 を参照) は、ワークフロー・ジョブのエントリー・ポイントです。ワークフローが開始されると、そのワークフローは start に指定されたノードに自動的に遷移します。
リスト 1. start (開始) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
<start to="timeCheck"/>
</workflow-app>end (終了) コントロール・ノード (リスト 2 を参照) は、ワークフロー・ジョブが終了するノードであり、ワークフローのアクションが正常に完了したことを意味します。ワークフロー定義には、end ノードが必要です。
リスト 2. end (終了) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
<end name="end"/>
</workflow-app>kill (キル) コントロール・ノード (リスト 3 を参照) では、ワークフロー・ジョブが自身を停止することができます。kill ノードに到達した時点で、ワークフロー・ジョブによって開始された 1 つ以上のアクションが実行されている場合、現在実行中のすべてのアクションが停止されます。ワークフロー定義には、kill ノードが設定されない場合もあれば、1 つ以上の kill ノードが設定される場合もあります。
リスト 3. kill (キル) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</workflow-app>decision (決定) コントロール・ノード (リスト 4 を参照) により、ワークフローはその後に従う実行パスを決定することができます。decision ノードは switch-case (スイッチ・ケース) ブロックと同じように機能し、このノードには述部と遷移のペア一式とデフォルト遷移があります。述部は順に評価され、いずれかの述部が true に評価されると、その述部に対応する遷移が行われます。true に評価される述部がない場合は、デフォルト遷移が行われます。
リスト 4. decision (決定) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
<decision name="master-decision">
<switch>
<case to="sqoopMerge1">
${wf:actionData('hiveSwitch')['paramNum'] eq 1}
</case>
<default to="sqoopMerge2"/>
</switch>
</decision>
</workflow-app>fork (分岐) ノードは、実行パスを複数の並行パスに分岐させます。join (結合) ノードは、先行する fork ノードのすべての並行実行パスが join ノードに到達するまで待機します。リスト 5 に示すように、fork ノードと join ノードはペアで使用する必要があります。
リスト 5. fork-join (分岐-結合) コントロール・ノード
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
<fork name="forking">
<path start="sqoopMerge1"/>
<path start="sqoopMerge2"/>
</fork>
<join name="joining" to="hiveSwitch"/>
</workflow-app>Oozie シェル・アクション
Oozie シェル・アクションをワークフローの一部として構成して、ファイル内の一連のシェル・スクリプトを実行することができます。Oozie シェル・アクションは、タスクを実行するために必要な引数を設定した job-tracker 要素、name-node 要素、および exec 要素から構成することができます (リスト 6 を参照)。シェル・アクションを構成することで、HDFS 上のファイルやディレクトリーを作成または削除してからシェル・ジョブを開始することができます。シェル・ジョブに構成パラメーターを格納した XML ファイルを渡すには、構成要素と一緒に job-xml 要素をインラインで使用します。他にもファイルやアーカイブを追加で構成して、シェル・ジョブで使用できるようにすることも可能です。シェル・ジョブが終了した後に、そのシェル・ジョブの出力を workflow ジョブで使用することは可能ですが、それには以下の基準を満たさなければなりません。
- 出力の形式を有効な Java プロパティー・ファイルにすること。
- 出力のサイズを 2KB 未満にすること。
リスト 6. シェル・スクリプト
host="XXX.XX.XX.XXX" port="3306" username="root" password="" database="zzz" tableName="$1" #################################### echo "Host: $host" echo "Database: $database" echo "Table: $tableName" #################################### sqoopLstUpd=`mysql --host=$host --port=$port --user=$username --password=$password -N -e 'SELECT PARM_DATE_VAL from T_CONTROL_PARM where PARM_NM="SQOOP_INCR_LST_UPD" and PARM_GROUP_NM="'$tableName'"' $database` echo "sqoopLstUpd=$sqoopLstUpd" echo "tableName=$tableName"
リスト 7 に、workflow.xml ファイル内のシェル・アクションの構成を示します。
リスト 7. Oozie シェル・アクション
<action name="timeCheck">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${sqoopUpdTrack}</exec>
<argument>${tableName}</argument>
<file>${sqoopUpdTrackPath}#${sqoopUpdTrack}</file>
<capture-output/>
</shell>
<ok to="sqoopIncrImport"/>
<error to="fail"/>
</action>シェルの出力にアクセスするには、リスト 8 に示す Sqoop インクリメンタル・ジョブを使用することができます。
リスト 8. インクリメンタル・インポートを実行するための Oozie Sqoop アクション
<action name="sqoopIncrImport">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${s3BucketLoc}/${tableName}/incr"/>
<mkdir path="${s3BucketLoc}/${tableName}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<arg>import</arg>
<arg>--connect</arg>
<arg>${dbURL}</arg>
<arg>--driver</arg>
<arg>${mySqlDriver}</arg>
<arg>--username</arg>
<arg>${user}</arg>
<arg>--table</arg>
<arg>${wf:actionData('timeCheck')['tableName']}</arg>
<arg>--target-dir</arg>
<arg>${s3BucketLoc}/${tableName}/incr</arg>
<arg>--check-column</arg>
<arg>LAST_UPD</arg>
<arg>--incremental</arg>
<arg>lastmodified</arg>
<arg>--last-value</arg>
<arg>${wf:actionData('timeCheck')['sqoopLstUpd']}</arg>
<arg>--m</arg>
<arg>1</arg>
</sqoop>
<ok to="sqoopMetaUpdate"/>
<error to="fail"/>
</action>Oozie Java アクション
Java アクションは、指定されたメインの Java クラスの public static void main(String [] args) メソッドを実行します。Java アプリケーションは Hadoop クラスター上で、単一のマッパー・タスクを有する MapReduce ジョブとして実行されます。ワークフロー・ジョブは Java アクションの実行が完了するまで待機してから、次のアクションを続行します。Java アクションを構成するのは、job-tracker、name-node、Java メイン・クラス、JVM オプション、および入力引数です (リスト 9 を参照)。パラメーターをインライン・プロパティー値に割り当てるには、EL (Expression Language) 式を使用することができます。出力パラメーターはすべて、Java プロパティー・ファイルの形式で作成する必要があります。
Java アプリケーションを開始する前に、HDFS のファイルとディレクトリーをクリーンアップするため、または Apache HCatalog パーティションを設定するための Java アクションを構成することができます。このような Java アクションを構成すれば、一時的であってもなくても、障害が発生した場合には、Oozie はその Java アクションを再試行することができます。
リスト 9. Oozie Java アクション
<action name="sqoopMetaUpdate">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>SqoopMetaUtil</main-class>
<java-opts></java-opts>
<arg>${tableName}</arg>
<archive>${mySqlDriverPath}</archive>
</java>
<ok to="hiveSwitch"/>
<error to="fail"/>
</action>Java アクションを構成するには、値を次のアクションに伝播する capture-output を使用することができます。値にアクセスするには、Hadoop EL 関数を使用することができます。これらの値は、Java クラス内に Java プロパティー・ファイルの形式で作成することができます (リスト 10 を参照)。
リスト 10. 値を伝搬するための Java コード・スニペット
String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
OutputStream os = null;
if(oozieProp != null){
File propFile = new File(oozieProp);
Properties p = new Properties();
p.setProperty("name", "Autodesk");
p.setProperty("address", "Sun Rafael");
try {
os = new FileOutputStream(propFile);
p.store(os, "");
} catch (FileNotFoundException e) {
System.err.println("<<< FileNotFoundException >>>"+e.getMessage());
} catch (IOException e) {
System.err.println("<<< IOException >>>"+e.getMessage());
}
finally{
if(os != null)
try {
os.close();
} catch (IOException e) {
System.err.println("<<< IOException >>>"+e.getMessage());
}
}
}
else{
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}workflow.xml ファイル内にアクションを構成することで、プロパティー・ファイルに設定された対応する値にアクセスすることができます (リスト 11 を参照)。
リスト 11. 値を伝搬するための Oozie Java アクション
<action name="jProperties">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>PropertyExplorer</main-class>
<java-opts></java-opts>
<capture-output/>
</java>
<ok to="email"/>
<error to="fail"/>
</action>
<action name="email">
<email xmlns="uri:oozie:email-action:0.1">
<to>surajit.paul@autodesk.com</to>
<subject>Oozie workflow finished successfully!</subject>
<body>${wf:actionData('jProperties')['name']} |
${wf:actionData('jProperties')['address']}</body>
</email>
<ok to="end"/>
<error to="fail"/>
</action>Oozie Sqoop アクション
Oozie ワークフローは、Hadoop クラスター上で Sqoop ジョブを起動する Sqoop スクリプトをトリガーします。Sqoop ジョブはタスクを完了するために、Hadoop クラスター上で MapReduce ジョブを開始します。Sqoop スクリプトによって開始された MapReduce ジョブは、データを RDBMS から HDFSに転送します。リスト 12 に記載するような Sqoop アクションを構成すると、Sqoop ジョブを開始する前に HDFS 上のファイルとディレクトリーを削除することができます。他の Oozie アクションと同様に、Sqoop アクションを構成する場合も、job-xml 要素を使用して追加プロパティーを設定することができます。job-xml 要素に指定されたプロパティーは、configutation 要素に指定されたプロパティー値によって上書きされます。
Sqoop ジョブには、その他のファイルとアーカイブも追加で提供することができます。
リスト 12. マージをするための Oozie Sqoop アクション
<action name="sqoopMerge1">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${s3BucketLoc}/${tableName}/master1"/>
<mkdir path="${s3BucketLoc}/${tableName}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<arg>merge</arg>
<arg>--new-data</arg>
<arg>${s3incr}</arg>
<arg>--onto</arg>
<arg>${s3BucketLoc}/${tableName}/master2</arg>
<arg>--target-dir</arg>
<arg>${s3BucketLoc}/${tableName}/master1</arg>
<arg>--jar-file</arg>
<arg>${tableJarLoc}/${tableName}.jar</arg>
<arg>--class-name</arg>
<arg>${tableName}</arg>
<arg>--merge-key</arg>
<arg>ROW_ID</arg>
</sqoop>
<ok to="hive-master1"/>
<error to="fail"/>
</action>Oozie Hive アクション
HDFS 上のファイルおよびディレクトリーに対して Hive スクリプトを実行するための Hive アクションを構成することができます (リスト 13 を参照)。Hive アクションは、タスクを完了するために MapReduce ジョブを開始します。Oozie で Hive アクションを構成するには、Hive 構成ファイル hive-default.xml または hive-site.xml を job-xml 要素として使用する必要があります。そうしなければ、Oozie が Hive 環境にアクセスすることはできません。Hive アクションは、Hive ジョブを開始する前に HDFS のファイルとディレクトリーを作成または削除するように構成することができます。job-xml ファイルに指定されたプロパティー値は、configutation 要素に指定されたプロパティー値によって上書きされます。他のファイルやアーカイブを追加して、Hive ジョブで使用することもできます。Oozie は、スクリプト要素内のパスで指定された Hive スクリプトを実行します。Hive スクリプトには、Oozie ワークフローを介した入力パラメーターとしてのパラメーターを割り当てることができます。
リスト 13. Oozie Hive アクション
<action name="hiveSwitch">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${hiveSwitchScript}</exec>
<argument>${tableName}</argument>
<file>${hiveSwitchScriptPath}#${hiveSwitchScript}</file>
<capture-output/>
</shell>
<ok to="master-decision"/>
<error to="fail"/>
</action>Oozie e-メール・アクション
Oozie e-メール・アクション (リスト 14 を参照) によって、ワークフロー・アプリケーションから e-メールを送信することができます。e-メール・アクションには、宛先アドレスと CC (オプション) アドレスに加え、メッセージの件名と本文が必要です。e-メール・アドレスをコンマで区切ることによって、複数の受信者に e-メールを送信することができます。e-メール・アクションは同期的に実行され、ワークフロー・ジョブは e-メールが送信されるまで待ってから、次のアクションをトリガーします。パラメーターを e-メール・アクションに割り当てるには、Hadoop EL 式を使用することができます。
リスト 14. Oozie e-メール・アクション
<action name="email">
<email xmlns="uri:oozie:email-action:0.1">
<to>surajit.paul@autodesk.com</to>
<subject>Oozie workflow finished successfully!</subject>
<body>${wf:actionData('jProperties')['name']} |
${wf:actionData('jProperties')['address']}</body>
</email>
<ok to="end"/>
<error to="fail"/>
</action>まとめ
複数の互いに依存するジョブをデータ・フローで関連付ける場合、Oozie ワークフローはデータ・パイプライン・アプリケーションという形になります。Apache Oozie ワークフローにより、データの論理フロー、エラー処理、フェイルオーバー・メカニズムなどの設計が容易になります。ワークフローを効率的に管理するために、Oozie コーディネーターまたはバンドル・アプリケーションを構成することができますが、これらのトピックについての説明は、この記事ではしません。同等の Hadoop ワークフロー・エンジンには、Amazon Data Pipeline、Simple Workflow Engine, Azkaban、Cascading、Hamake などもあります。Hamake と Oozie は XML ベースで構成されている一方、Azkaban の構成には、キーと値のペアが含まれるテキスト・ファイルが使用されます。また、Cascading は Java API を使用して構成されます。
ダウンロード
| 内容 | ファイル名 | サイズ |
|---|---|---|
| Source code for this tutorial | oozie-artifacts.zip | 8KB |
参考文献
学ぶために
- Hadoop 用の Oozie ワークフロー・エンジンについて詳しく学んでください。
- Oozie workflow scheduler for Hadoop について調べてください。
- Hadoop プロジェクトに関してより詳しく調べてください。
- Apache Pig のリソースのために Pig プロジェクトの Web サイトにアクセスしてください。
- Hive プロジェクトに関する情報を集めてください。
- Sqoop プロジェクトの Web サイトにアクセスしてください。
- Azkaban プロジェクトについて調べてください。
- Cascading プロジェクトについて詳しく学んでください。
- 記事「An introduction to InfoSphere Streams: A platform for analyzing big data in motion」(developerWorks, May 2013) で InfoSphere Streams について理解してください。
製品や技術を入手するために
- InfoSphere Streams の非製品版として無料でダウンロードできる InfoSphere Streams Quick Start Edition をダウンロードしてください。InfoSphere Streams は、数千ものリアルタイム・ソースから情報が届くと、ユーザーが開発したアプリケーションによって情報を迅速に取り込んで、分析し、相関させることができる、ハイパフォーマンス・コンピューティング・プラットフォームです。
議論するために
- developerWorks コミュニティーに参加してください。ここでは他の developerWorks ユーザーとのつながりを持てる他、開発者によるブログ、フォーラム、グループ、Wiki を調べることができます。
コメント
IBM PureSystems
IBM がどのように IT に革命をもたらしているのかをご自身でお確かめください
Knowledge path
developerWorks の Knowledge path シリーズでは、テーマ別の学習資料をご提供しています
ソフトウェア評価版: ダウンロード
developerWorksでIBM製品をお試しください!