Java以外の言語を使用して MapReduce を実行することのできる Hadoop Streaming の使用についてです。ディストリビューションに付属しているユーティリティであり、データを標準入出力を介して受け渡すため、標準入出力が扱える言語であれば MapReduce ジョブを記述して実行することが可能です。
Hadoop Streaming の構文
Hadoop Streaming の基本的な構文はhadoop command [genericOptions] [streamingOptions]になります。通常の Java でのジョブと同様に jar コマンドにhadoop-streaming.jarを指定してください。genericOptionsはstreamingOptionsの前に配置しないと失敗してしまいます。以下がコマンド実行例です。
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files /tmp/sample_mapper.rb, /tmp/sample_reducer.rb \
-mapper sample_mapper.rb \
-reducer sample_reducer.rb \
-input test/input/test.txt \
-output test/output
Generic Command Options
| オプション | 概要 |
|---|---|
| -conf | アプリケーション構成ファイルを指定 |
| -Dproperty=value | 指定されたプロパティの値を使用 |
| -fs | Namenodeを指定 |
| -files | コピーするファイルをカンマ区切りで指定 |
| -libjars | クラスパスに含めるjarファイルをカンマ区切りで指定 |
| -archives | アーカイブファイルをカンマ区切りで指定 |
Streaming Command Options
| オプション | 概要 |
|---|---|
| -input | [必須] 入力のディレクトリかファイルを指定 |
| -output | [必須] 出力のディレクトリを指定 |
| -mapper | [必須] Mapperの実行プログラム |
| -reducer | [必須] Reducerの実行プログラム |
| -file | ファイルをノードにコピー |
| -inputformat | InputFormatを指定(JavaClass)デフォルトTextInputFormat |
| -outputformat | OutputFormatを指定(JavaClass)デフォルトTextOutputFormat |
| -partitioner | Partitionerを指定(JavaClass) |
| -combiner | Combinerを指定 |
| -cmdenv | 環境変数を指定 |
| -inputreader | InputFormatの代わりのRecordReaderを指定 |
| -verbose | 詳細を表示 |
| -lazyOutput | 遅延した出力を作成? |
| -numReduceTasks | Reducerの数を指定 |
| -mapdebug | Mapperが失敗したときに呼び出すスクリプト |
| -reducedebug | Reducerが失敗したときに呼び出すスクリプト |
オプションの補足
-fileオプションで実行すると非推奨ですと警告がでます。-filesオプションの方を使いましょう。
WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
-reducerオプションは必須となっていますが、Mapper のみ実行する場合は必要ありません。genericOptions に以下を指定してください。
-D mapreduce.job.reduces=0
他にもオプションは-infoで確認できます。
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -info
RubyによるMapReduceプログラムのサンプル
例としてワードカウントのサンプルになりますが、Mapper は次のように記述します。標準出力に対してキーと値をタブ区切りで出力してください。また、Javaの Mapper クラスではキーと値が引数として渡されますが、Hadoop Streaming では値のみが渡されます。
#!/usr/bin/env ruby STDIN.each_line do |line| line.split.each do |word| puts "#{word}\t1" end end
Reducer は次のように記述します。データはタブ区切りのキーと値がキーでソートされた順に1行ずつ渡されます。そのため同じキーのデータは必ず同じ Reducer で処理されるため、キーの値が変わる(キーブレイクする)まで一連の処理を行うようなプログラムにする必要があります。
#!/usr/bin/env ruby currentKey = nil total = 0 STDIN.each_line do |line| key, value = line.split("\t") value = value.to_i if currentKey && currentKey != key puts "#{currentKey}\t#{total}" currentKey = key total = value else currentKey = key total += value end end puts "#{currentKey}\t#{total}"
今回は標準入力の読み込みにSTDIN.each_lineを使用しましたが、他にも次のような方法があります。
ARGF.each do |line| ... end while line = STDIN.gets ... end
PATHについて
Hadoop Streaming は-filesや-archivesで指定したファイルが各ノードにコピーされて実行されるわけですが、当然のことながら各ノードには実行するプログラムがインストールされていなければならず、PATHが通っていなければいけません。そうでなければコマンドが見つからず次のようなエラーがでます。
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
例えば Ruby のスクリプトで実行したい場合、1行目には shebang を記述しておきましょう。
#!/usr/bin/env ruby
それか、実は以下のように実行コマンドを指定してやることもできます。
-mapper "ruby sample_mapper.rb" -reducer "ruby sample_reducer.rb"
Rubyのbundlerを使用したい場合
単純なスクリプトの実行はわかったけど、bundler でインストールした Gem を使用しているスクリプトの場合どうすればいいのか?ここが一番嵌りましたが解決策はありました。
まず-archivesオプションを使用します。これはjarやtgzで圧縮したファイルを指定することでアプリを丸ごとコピーできます。そしてシェルをひとつはさんで実行してください。
まず実行コマンドは以下のようになります。-archivesにはbundle installしたアプリをtar zcf ../my-streaming.tar.gz .のようにパッケージングしたファイルを指定してください。#appとしているのは別名のシンボリックリンクを作成するためです。
Mapper の実行ファイルはシェルですが、引数に-archivesのファイル名(シンボリックリンク)と本当の実行プログラムを渡しています。
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-Dmapred.reduce.tasks=0 \
-files "/tmp/bundler_run.sh" \
-archives "/tmp/my-streaming.tar.gz#app" \
-mapper "bundler_run.sh app sample_mapper.rb" \
-input test/input \
-output test/output
実行するシェルは以下のとおりです。まず rbenv のパスを通します。あとは見たままですが、第1引数にcdしてbundle exec ruby 第2引数で実行していますね。これで bundler を使用したスクリプトも実行することが可能です。
#!/bin/sh export RBENV_ROOT="/opt/rbenv" export PATH="$RBENV_ROOT/bin:$PATH" eval "$(rbenv init -)" APP_NAME=$1 EXEC=$2 cd $APP_NAME bundle exec ruby $EXEC
参考リンク
Apache Hadoop MapReduce Streaming –
Use Ruby Gems With Hadoop Streaming
終了ステータス - Man page of BASH