Java以外の言語を使用して MapReduce を実行することのできる Hadoop Streaming の使用についてです。ディストリビューションに付属しているユーティリティであり、データを標準入出力を介して受け渡すため、標準入出力が扱える言語であれば MapReduce ジョブを記述して実行することが可能です。
Hadoop Streaming の構文
Hadoop Streaming の基本的な構文はhadoop command [genericOptions] [streamingOptions]
になります。通常の Java でのジョブと同様に jar コマンドにhadoop-streaming.jar
を指定してください。genericOptions
はstreamingOptions
の前に配置しないと失敗してしまいます。以下がコマンド実行例です。
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -files /tmp/sample_mapper.rb, /tmp/sample_reducer.rb \ -mapper sample_mapper.rb \ -reducer sample_reducer.rb \ -input test/input/test.txt \ -output test/output
Generic Command Options
オプション | 概要 |
---|---|
-conf | アプリケーション構成ファイルを指定 |
-Dproperty=value | 指定されたプロパティの値を使用 |
-fs | Namenodeを指定 |
-files | コピーするファイルをカンマ区切りで指定 |
-libjars | クラスパスに含めるjarファイルをカンマ区切りで指定 |
-archives | アーカイブファイルをカンマ区切りで指定 |
Streaming Command Options
オプション | 概要 |
---|---|
-input | [必須] 入力のディレクトリかファイルを指定 |
-output | [必須] 出力のディレクトリを指定 |
-mapper | [必須] Mapperの実行プログラム |
-reducer | [必須] Reducerの実行プログラム |
-file | ファイルをノードにコピー |
-inputformat | InputFormatを指定(JavaClass)デフォルトTextInputFormat |
-outputformat | OutputFormatを指定(JavaClass)デフォルトTextOutputFormat |
-partitioner | Partitionerを指定(JavaClass) |
-combiner | Combinerを指定 |
-cmdenv | 環境変数を指定 |
-inputreader | InputFormatの代わりのRecordReaderを指定 |
-verbose | 詳細を表示 |
-lazyOutput | 遅延した出力を作成? |
-numReduceTasks | Reducerの数を指定 |
-mapdebug | Mapperが失敗したときに呼び出すスクリプト |
-reducedebug | Reducerが失敗したときに呼び出すスクリプト |
オプションの補足
-file
オプションで実行すると非推奨ですと警告がでます。-files
オプションの方を使いましょう。
WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
-reducer
オプションは必須となっていますが、Mapper のみ実行する場合は必要ありません。genericOptions に以下を指定してください。
-D mapreduce.job.reduces=0
他にもオプションは-info
で確認できます。
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -info
RubyによるMapReduceプログラムのサンプル
例としてワードカウントのサンプルになりますが、Mapper は次のように記述します。標準出力に対してキーと値をタブ区切りで出力してください。また、Javaの Mapper クラスではキーと値が引数として渡されますが、Hadoop Streaming では値のみが渡されます。
#!/usr/bin/env ruby STDIN.each_line do |line| line.split.each do |word| puts "#{word}\t1" end end
Reducer は次のように記述します。データはタブ区切りのキーと値がキーでソートされた順に1行ずつ渡されます。そのため同じキーのデータは必ず同じ Reducer で処理されるため、キーの値が変わる(キーブレイクする)まで一連の処理を行うようなプログラムにする必要があります。
#!/usr/bin/env ruby currentKey = nil total = 0 STDIN.each_line do |line| key, value = line.split("\t") value = value.to_i if currentKey && currentKey != key puts "#{currentKey}\t#{total}" currentKey = key total = value else currentKey = key total += value end end puts "#{currentKey}\t#{total}"
今回は標準入力の読み込みにSTDIN.each_line
を使用しましたが、他にも次のような方法があります。
ARGF.each do |line| ... end while line = STDIN.gets ... end
PATHについて
Hadoop Streaming は-files
や-archives
で指定したファイルが各ノードにコピーされて実行されるわけですが、当然のことながら各ノードには実行するプログラムがインストールされていなければならず、PATHが通っていなければいけません。そうでなければコマンドが見つからず次のようなエラーがでます。
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
例えば Ruby のスクリプトで実行したい場合、1行目には shebang を記述しておきましょう。
#!/usr/bin/env ruby
それか、実は以下のように実行コマンドを指定してやることもできます。
-mapper "ruby sample_mapper.rb" -reducer "ruby sample_reducer.rb"
Rubyのbundlerを使用したい場合
単純なスクリプトの実行はわかったけど、bundler でインストールした Gem を使用しているスクリプトの場合どうすればいいのか?ここが一番嵌りましたが解決策はありました。
まず-archives
オプションを使用します。これはjar
やtgz
で圧縮したファイルを指定することでアプリを丸ごとコピーできます。そしてシェルをひとつはさんで実行してください。
まず実行コマンドは以下のようになります。-archives
にはbundle install
したアプリをtar zcf ../my-streaming.tar.gz .
のようにパッケージングしたファイルを指定してください。#app
としているのは別名のシンボリックリンクを作成するためです。
Mapper の実行ファイルはシェルですが、引数に-archives
のファイル名(シンボリックリンク)と本当の実行プログラムを渡しています。
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -Dmapred.reduce.tasks=0 \ -files "/tmp/bundler_run.sh" \ -archives "/tmp/my-streaming.tar.gz#app" \ -mapper "bundler_run.sh app sample_mapper.rb" \ -input test/input \ -output test/output
実行するシェルは以下のとおりです。まず rbenv のパスを通します。あとは見たままですが、第1引数にcd
してbundle exec ruby 第2引数
で実行していますね。これで bundler を使用したスクリプトも実行することが可能です。
#!/bin/sh export RBENV_ROOT="/opt/rbenv" export PATH="$RBENV_ROOT/bin:$PATH" eval "$(rbenv init -)" APP_NAME=$1 EXEC=$2 cd $APP_NAME bundle exec ruby $EXEC
参考リンク
Apache Hadoop MapReduce Streaming –
Use Ruby Gems With Hadoop Streaming
終了ステータス - Man page of BASH