Hadoop で CombineFileInputFormat を使用して小さい圧縮ファイルを処理する

この記事では、gzip (デフォルトのコーデック) ファイルの中身を実行時に読み取るために、CombineFileInputFormat を拡張して実装する方法を、詳細な例を用いて説明します。Mapper が使用するデータの量を HDFS 内のファイルのブロック・サイズとは無関係にするために、MapReduce フレームワーク内で CombineFileInputFormat を使用する方法を学んでください。

Sujay Som, Big Data Consultant and Application Architect, IBM India

Sujay Som は、ビッグ・データ・アナリティクスを対象とした Global Business Service (GBS) Software Group (SWG) Center of Competence に所属するビッグ・データ・コンサルタント兼アプリケーション・アーキテクトです。彼はアーキテクチャー、設計、開発での 12 年を超える IT の経験があります。



2014年 6月 12日

はじめに

Apache Hadoop ソフトウェア・ライブラリーは、フラット・テキスト・ファイルからデータベースに至るまで、さまざまなタイプのデータ・フォーマットを処理することができます。MapReduce フレームワークは、入力仕様を検証して、入力ファイルを論理 InputSplit に分割するために、ジョブの InputFormat を利用します。分割された各 InputSplit は個々の Mapper に割り当てられます。

すべてのファイル・ベースの InputFormat の基底クラスである FileInputFormat には、以下に挙げる直接的なサブクラスがあります。

  • TextInputFormat
  • SequenceFileInputFormat
  • NLineInputFormat
  • KeyValueTextInputFormat
  • CombineFileInputFormat

これらの InputFormat はいずれも getSplits(JobContext) の汎用的な実装を提供しており、入力ファイルが分割されずに 1 つのファイルとして Mapper で処理されるように isSplitable (JobContext, Path) メソッドをオーバーライドすることもできます。FileInputFormat は、org.apache.hadoop.mapreduce.InputFormat クラスから継承した createRecordReader メソッドを実装しており、入力レコードを論理 InputSplit から収集して Mapper で処理するために、このメソッドを使用してリーダーが作成されます。

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition は、IBM の Hadoop ベースのオファリングである InfoSphere BigInsights の、無料でダウンロードできるバージョンです。Quick Start Edition を利用すると、オープンソースの Hadoop の価値をさらに高めるために IBM が構築した、Big SQL、テキスト・アナリティクス、BigSheets などの機能を試すことができます。Hadoop を導入する際に役立つステップバイステップの自習式チュートリアルやビデオを含め、この製品を使用する際のエクスペリエンスをできるだけ円滑にするためのガイド付き学習資料も用意されています。時間やデータに制限はなく、皆さんの都合が良いときに大量のデータの処理を試すことができます。ビデオを閲覧し、チュートリアル (PDF) に従って、今すぐ InfoSphere BigInsights Quick Start Edition をダウンロードしてください。

その一方、Hadoop は小さいファイルを大量に処理する場合よりも、少量の大きいファイルを処理する場合のほうがパフォーマンスに優れています (ここで言う「小さい」とは、HDFS (Hadoop Distributed File System) ブロックのサイズより遥かに小さいという意味です)。そこで、小さいファイルを効率的に処理する CombineFileInputFormat を設計し、FileInputFormat がファイルごとに 1 個のスプリットを作るようにすることで、この状況を緩和します。CombineFileInputFormat は多数のファイルを 1 つのスプリットに詰め込むことで、各 Mapper で処理されるデータ量が多くなるようにします。CombineFileInputFormat は、大きいファイルを処理する際にもメリットをもたらします。基本的に Mapper が使用するデータの量は、CombineFileInputFormat によって HDFS 内のファイルのブロック・サイズとは無関係になります。

現在、CombineFileInputFormat は Hadoop クラス・ライブラリー (org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K,V>) の抽象クラスとなっていて、具象実装はありません。CombineFileInputFormat のメリットを活かすには、CombineFileInputFormat の具象サブクラスを作成して createRecordReader() メソッドを実装する必要があります。そしてこのメソッドのカスタム・コンストラクターを使用して、RecordReader を継承する委譲 CustomFileRecordReader クラスをインスタンス化します。この手法では、CustomWritable が Hadoop Mapper クラスのキーとして作成されることも要件となります。各行の CustomWritable は、ファイル名とその行のオフセット長で構成されることになります。

Hadoop のサンプル・コード (クリックすると表示されます) を調べてください。このコードには、CombineFileInputForma を使用して、特定の入力ディレクトリー内のテキスト・ファイルに含まれる単語の出現回数をカウントする方法が示されています。

ここでは、実行時に gzip (デフォルトのコーデック) ファイルの中身を解凍された形で読み取るための機能を追加することで、CombineFileInputFormat を拡張し、実装する方法を説明します。そのために、キーと値の組み合わせを出力として生成します。キーは、ファイル名とその行のオフセットの組み合わせからなり、値はその行のテキスト表現です。この記事の例では、MapReduce フレームワーク内で CustomInputFormat を使用します。作成するのは、以下の主要なクラスです。

CompressedCombineFileInputFormat
このクラスは、ファイル結合ロジックを実行するレコード・リーダーを渡すために、CombineFileInputFormat を継承して createRecordReader を実装します。
CompressedCombineFileRecordReader
RecordReader を継承するこのクラスは、CombineFileRecordReader の委譲クラスです。
CompressedCombineFileWritable
このクラスは、WritableComparable を実装し、ファイル名とオフセットを格納して、compareTo メソッドをオーバーライドすることで、最初にファイル名、次にオフセットを比較します。

CompressedCombineFileInputFormat の例では、MapReduce プログラムを使用して、一般公開されている NOAA (National Oceanic and Atmospheric Administration: 米国海洋大気庁) の気象履歴データを利用します。NOAA は、3 つの気象要素 (気温、降雨量、降雪量) の月次要約データを使用して、米国各地の測候所の極値統計を蓄積しています。ここで使用する例では、圧縮 (gzip) ファイルで入手される NCDC (National Climatic Data Center: 米国国立気候データ・センター) の気象データから、最高気温を計算します。気象データの入手方法についての詳細は、「参考文献」を参照してください。


複数の圧縮ファイル入力フォーマット

このソリューションで使用する圧縮 CombineFileInputFormat は、次の 3 つの具象クラスを使用します。

  • CombineFileInputFormat (org.apache.hadoop.mapreduce.lib.input. CombineFileInputFormat<K,V>) の抽象実装のサブクラス
  • RecordReader (org.apache.hadoop.mapreduce.RecordReader<K,V>) の具象サブクラス
  • WritableComparable (org.apache.hadoop.io.WritableComparable) を実装するカスタム Writable クラス。このカスタム・クラスは、ファイル名とその行のオフセットからなる、ファイルの行のキーを生成します。

CompressedCombineFileInputFormat

CompressedCombineFileInputFormat.java は、CombineFileInputFormat のサブクラスです。このクラスは、CombineFileSplitRecordReader を作成するために、InputFormat.createRecordReader(InputSplit, TaskAttemptContext) を実装します。CombineFileSplit は、入力ファイルのサブコレクションです。FileSplit とは異なり、CombineFileSplit クラスは 1 つのファイルのスプリットを表すのではなく、複数の小さい入力ファイルからなるスプリットを表します。スプリットには異なるファイルからのブロックが含まれることもありますが、1 つのスプリットに含まれるすべてのブロックは、おそらく同じラックのローカルにあるブロックです。CombineFileSplit を使用して RecordReader を実装するには、ファイルごとに 1 つのレコードを読み取ることになります。ファイルを半分に分割する必要はないので、isSplitable() メソッドは false を返すようにオーバーライドされます (オーバーライドされなければ、デフォルトで true を返します)。リスト 1 に一例を示します。

リスト 1. CompressedCombineFileInputFormat.java
package com.ssom.combinefile.lib.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;


public class CompressedCombineFileInputFormat 
extends CombineFileInputFormat<CompressedCombineFileWritable, Text>  {
	
	public CompressedCombineFileInputFormat(){
		super();
	
	}

	public RecordReader<CompressedCombineFileWritable,Text> 
createRecordReader(InputSplit split,
	    TaskAttemptContext context) throws IOException {
	  return new 
CombineFileRecordReader<CompressedCombineFileWritable, 
Text>((CombineFileSplit)split, context, 
CompressedCombineFileRecordReader.class);
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path file){
	  return false;
	}

}

CompressedCombineFileRecordReader

CompressedCombineFileRecordReader.java は、CombineFileRecordReader の委譲クラスです。CombineFileRecordReader は、CombineFileSplit に含まれる各チャンクに対して異なる RecordReader を渡すことが可能な汎用の RecordReader です。CombineFileSplit は、複数のファイルからのデータ・チャンクを結合することができます。このクラスでは、異なるファイルからのデータ・チャンクを処理するために、異なる RecordReader を使用することができます。

Hadoop ジョブが起動されると、CombineFileRecordReader は、処理する必要がある HDFS 入力パスのすべてのファイル・サイズを読み取り、MaxSplitSize に基づいて必要なスプリットの数を決定します。スプリットごとに (isSplitable() がオーバーライドされて false を返すように設定されているため、各スプリットは 1 つのファイルです)、CombineFileRecordReaderCompressedCombineFileRecordReader インスタンスを作成して、CompressedCombineFileRecordReader が処理対象のファイルを見つけられるように CombineFileSplitcontextindex を渡します。

CompressedCombineFileRecordReader はインスタンス化された後、入力ファイルに圧縮コーデック (org.apache.hadoop.io.compress.GzipCodec) が含まれるかどうかを判別します。含まれる場合、その入力ファイルは実行時に解凍されて処理されます。圧縮コーデックが含まれていなければ、入力ファイルはテキスト・ファイルであると見なされます。CompressedCombineFileRecordReader はファイルの処理中に、呼び出された Mapper クラスのキーとして CompressedCombineFileWritable を作成します。CompressedCombineFileWritable は、ファイル名と読み取られた各行のオフセット長で構成されます。これについては、「MapReduce の例」で説明します。

リスト 2 に、CompressedCombineFileRecordReader.java の一例を示します。

リスト 2. CompressedCombineFileRecordReader.java
package com.ssom.combinefile.lib.input;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;

/**
 * RecordReader is responsible from extracting records from a chunk
 * of the CombineFileSplit. 
 */
public class CompressedCombineFileRecordReader 
  	extends RecordReader<CompressedCombineFileWritable, Text> {

	private long startOffset;
	private long end; 
	private long pos; 
	private FileSystem fs;
	private Path path;
	private Path dPath;
	private CompressedCombineFileWritable key = new CompressedCombineFileWritable();
	private Text value;
	private long rlength;
	private FSDataInputStream fileIn;
	private LineReader reader;
	  
 
	public CompressedCombineFileRecordReader(CombineFileSplit split,
	      TaskAttemptContext context, Integer index) throws IOException {
	    
			Configuration currentConf = context.getConfiguration();
		  	this.path = split.getPath(index);
		  	boolean isCompressed =  findCodec(currentConf ,path);
		  	if(isCompressed)
		  		codecWiseDecompress(context.getConfiguration());
	
		  	fs = this.path.getFileSystem(currentConf);
		  	
		  	this.startOffset = split.getOffset(index);
	
		  	if(isCompressed){
		  		this.end = startOffset + rlength;
		  	}else{
		  		this.end = startOffset + split.getLength(index);
		  		dPath =path;
		  	}
		  	
		  	boolean skipFirstLine = false;
	    
	        fileIn = fs.open(dPath);
	        
	        if(isCompressed)  fs.deleteOnExit(dPath);
	        
	        if (startOffset != 0) {
	        	skipFirstLine = true;
	        	--startOffset;
	        	fileIn.seek(startOffset);
	        }
	        reader = new LineReader(fileIn);
	        if (skipFirstLine) {  
	        	startOffset += reader.readLine(new Text(), 0,
	        	(int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
	        }
	        this.pos = startOffset;
	  }

	  public void initialize(InputSplit split, TaskAttemptContext context)
	      throws IOException, InterruptedException {
	  }

	  public void close() throws IOException { }

	  public float getProgress() throws IOException {
		    if (startOffset == end) {
		      return 0.0f;
		    } else {
		      return Math.min(1.0f, (pos - startOffset) / (float)
                  (end - startOffset));
		    }
	  }

	  public boolean nextKeyValue() throws IOException {
		    if (key.fileName== null) {
		      key = new CompressedCombineFileWritable();
		      key.fileName = dPath.getName();
		    }
		    key.offset = pos;
		    if (value == null) {
		      value = new Text();
		    }
		    int newSize = 0;
		    if (pos < end) {
		      newSize = reader.readLine(value);
		      pos += newSize;
		    }
		    if (newSize == 0) {
		      key = null;
		      value = null;
		      return false;
		    } else {
		      return true;
		    }
	  }

	  public CompressedCombineFileWritable getCurrentKey() 
	      throws IOException, InterruptedException {
		  return key;
	  }

	  public Text getCurrentValue() throws IOException, InterruptedException {
		  return value;
	  }
  
 
	private void codecWiseDecompress(Configuration conf) throws IOException{
		  
		 CompressionCodecFactory factory = new CompressionCodecFactory(conf);
		 CompressionCodec codec = factory.getCodec(path);
		    
		    if (codec == null) {
		    	System.err.println("No Codec Found For " + path);
		    	System.exit(1);
		    }
		    
		    String outputUri = 
CompressionCodecFactory.removeSuffix(path.toString(), 
codec.getDefaultExtension());
		    dPath = new Path(outputUri);
		    
		    InputStream in = null;
		    OutputStream out = null;
		    fs = this.path.getFileSystem(conf);
		    
		    try {
		    	in = codec.createInputStream(fs.open(path));
		    	out = fs.create(dPath);
		    	IOUtils.copyBytes(in, out, conf);
		    	} finally {
		    		IOUtils.closeStream(in);
		    		IOUtils.closeStream(out);
					rlength = fs.getFileStatus(dPath).getLen();
		    	}
	  }
	
	private boolean findCodec(Configuration conf, Path p){
		
		CompressionCodecFactory factory = new CompressionCodecFactory(conf);
	    CompressionCodec codec = factory.getCodec(path);
	    
	    if (codec == null) 
	    	return false; 
	    else 
	    	return true;

	}
  
}

CompressedCombineFileWritable

CompressedCombineFileWritable.java クラスは WritableComparable を実装し、org.apache.hadoop.io.Writablejava.lang.Comparable を継承しています (リスト 3 を参照)。

Writable は、DataInputDataOutput に基づく単純で効率的なシリアライゼーション・プロトコルを実装する、シリアライズ可能なオブジェクトです。MapReduce フレームワークに含まれるあらゆるキーまたは値のタイプは、このインターフェースを実装します。実装では一般に、静的 read(DataInput) メソッドが使用されます。このメソッドが、新しいインスタンスを作成して readFields(DataInput) を呼び出し、インスタンスを返します。

Comparable は、JCF (Java Collections Framework) のメンバーになっているインターフェースです。このインターフェースは、このインターフェースを実装する各クラスのオブジェクトに対し、全体順序付けを強制します。この順序付けは、クラスの自然順序付けと呼ばれ、このクラスの compareTo メソッドは自然比較メソッドと呼ばれます。このインターフェースを実装するオブジェクトのリスト (および配列) は、Collections.sort (および Arrays.sort) によって自動的にソートすることができます。このインターフェースを実装するオブジェクトは、コンパレーターを指定しなくても、ソートされたマップのキーまたはソートされたセットの要素として使用することができます。

これらの特性により、CompressedCombineFileWritable を比較するには、コンパレーターを使用することができます。Hadoop では、キーを区分するために hashCode() コマンドが頻繁に使用されますが、重要なのは、hashCode() の実装が JVM のさまざまなインスタンスで一貫して同じ結果を返すことです。Object のデフォルト hashCode() 実装では、この特性を満たすことができません。そのため、一貫性と効率性を確保するために、hashCode()equals()、および toString() メソッドがオーバーライドされます。

リスト 3. CompressedCombineFileWritable.java
package com.ssom.combinefile.lib.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;


/**
 * This record keeps filename,offset pairs.
 */

@SuppressWarnings("rawtypes")

public class CompressedCombineFileWritable implements WritableComparable {

    public long offset;
    public String fileName;
    
    
 
    public CompressedCombineFileWritable() {
		super();
	}

	public CompressedCombineFileWritable(long offset, String fileName) {
		super();
		this.offset = offset;
		this.fileName = fileName;
	}

	public void readFields(DataInput in) throws IOException {
      this.offset = in.readLong();
      this.fileName = Text.readString(in);
    }

    public void write(DataOutput out) throws IOException {
      out.writeLong(offset);
      Text.writeString(out, fileName);
    }

    
    public int compareTo(Object o) {
      CompressedCombineFileWritable that = (CompressedCombineFileWritable)o;

      int f = this.fileName.compareTo(that.fileName);
      if(f == 0) {
        return (int)Math.signum((double)(this.offset - that.offset));
      }
      return f;
    }
    @Override
    public boolean equals(Object obj) {
      if(obj instanceof CompressedCombineFileWritable)
        return this.compareTo(obj) == 0;
      return false;
    }
    @Override
    public int hashCode() {
  
    	final int hashPrime = 47;
        int hash = 13;
        hash =   hashPrime* hash + (this.fileName != null ? this.fileName.hashCode() : 0);
        hash =  hashPrime* hash + (int) (this.offset ^ (this.offset >>> 16));
    	
        return hash; 
    }
    @Override
    public String toString(){
    	return this.fileName+"-"+this.offset;
    }

  }

MapReduce の例

このセクションの例では、サンプル MapReduce プログラムで CompressedCombineFileInputFormat を使用する方法を説明します。この MapReduce プログラムは、米国各地の測候所での気温、降雨量、降雪量の月次要約データを使用して極値統計を蓄積した、NOAA の気象履歴データを利用します。この例で圧縮 (gzip) ファイル形式の気象データから計算するのは、最高気温です。

この例では、CompressedCombineFileInputFormat を使用する上でのさまざまな面を説明します。この例を実行するために使用したのは、InfoSphere BigInsights Quick Start Edition です。

前提条件

この例では、以下のことを前提とします。

  • Hadoop 環境がインストールされていて稼働状態にあること。
  • サンプル・プログラムに必要なデータが NOAA の NCDC からダウンロードされていること。
  • ダウンロードしたデータが HDFS に取り込まれていること。
  • 提供された運用統計は InfoSphere BigInsights Quick Start Edition (非本番環境) で実行されること。

サンプル・プログラムの実行

CompressedCombineFileInputFormat のクラスはすべて JAR ファイル (CompressedCombine-FileInput.jar) に含まれているので、他のプロジェクトでもこのファイルを参照することができます。デフォルトの入力フォーマット (org.apache.hadoop.io.Text) を使用した場合と、カスタム入力フォーマット (com.ssom.combinefile.lib.input.CompressedCombineFileInputFormat) を使用した場合のパフォーマンスの違いを明らかにするために、それぞれ別個の MapReduce プログラムを使用します。

入力データ

NCDC データは一般公開されています。表 1 に、この例で使用する NCDC データのフォーマットを記載します。

表 1. NCDC データ
読み取り値説明
0057# MASTER STATION カタログ ID
332130# USAF 測候所 ID
99999# WBAN 測候所 ID
99999# WBAN 測候所 ID
19470101# 観測日
0300# 観測時刻
4# データ・ソース・フラグ
+51317# 緯度 (度 x 1000)
+028783# 経度 (度 x 1000)
FM-12# レポート・タイプ・コード
+0171# 標高 (メートル)
99999# コール・サイン ID
V020# 品質管理プロセス名
320# 風向 (度)
1# 品質コード
N# タイプ・コード
0072# 速度率
1# 速度品質コード
00450# 雲底高度 (メートル)
1# 品質コード
CN# 雲底サイズ
010000# 明視距離 (メートル)
1# 品質コード
N9# 変動性コード
-0128# 気温 (摂氏温度 x 10)
1# 品質コード
-0139# 露点温度 (摂氏温度 x 10)
1# 品質コード
10268# 大気圧 (ヘクトパスカル x 10)
1# 品質コード

データのファイルは、日付および測候所ごとに編成されています。1901年以降の各年に 1 つのディレクトリーがあり、それぞれのディレクトリーにはその年の測候所での読み取り値が格納された圧縮ファイルが各測候所について 1 つずつ格納されています。図 1 に、1947年の最初のエントリーを示します。

図 1. ファイルのリストの例
ファイルのリストの例を示す図

私は 1901年、1902年、1947年のデータを選択しました。これらの年の圧縮ファイルの合計数は、1,000 を超えています。ファイルの合計サイズは約 56 MB です。

デフォルトの入力フォーマットを使用する場合

Mapper クラスは、4 つの仮型引数として、map 関数の入力キー、入力値、出力キー、出力値の型を指定する汎用タイプです。

public class MaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {

デフォルトの入力フォーマットは、以下のとおりです。

  • 入力キーは、長整数のオフセットです。
  • 入力値は、テキスト行です。
  • 出力キーは、年です。
  • 出力値は、気温 (整数) です。

map() メソッドにキーと値が渡されると、入力行を含むテキスト値が Java のストリングに変換されます。その後、substring() メソッドを使用して値が抽出されます。この例の場合、Mapper は年を Text オブジェクトとして書き込み (年だけをキーとして使用しているため)、気温が IntWritable にラップされます。出力レコードは気温が存在する場合にのみ書き込まれ、温度の読み取り値が有効であることを品質コードが示します。

Reducer クラスも同じく汎用タイプです。このクラスには、入力と出力の型を指定するための 4 つの仮型引数があります。reduce 関数の出力の型は TextIntWritable であり、そこに格納されるのは、年とその年の最高気温です。最高気温を見つけるには、気温を繰り返し処理して、各気温をそれまでに見つかった最高気温と比較します。Reducer クラスの定義は以下のとおりです。

public class MaxTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {

複数の年がある場合、出力は以下のような情報になります。

  • 1901 — 278
  • 1947 — 283

図 2 と図 3 に、InfoSphere BigInsights 管理コンソールでデフォルト (Text) 入力フォーマットを使用してプログラムを実行した結果を示します。

図 2. アプリケーション・ステータス — デフォルトの実行
アプリケーション・ステータス — デフォルトの実行を示す図

クリックして大きなイメージを見る

図 2. アプリケーション・ステータス — デフォルトの実行

アプリケーション・ステータス — デフォルトの実行を示す図
図 3. 完了
完了を示す図

クリックして大きなイメージを見る

図 3. 完了

完了を示す図

InfoSphere BigInsights 管理コンソールには、表 2 に記載する重要な統計情報が表示されます。

表 2. 結果として得られた統計情報
重要な属性結果
名前 Find_Max_Temperature_Default_Input_Format
ジョブ ID job_201401020703_0001
Map 完了率 100%
Reducer 完了率 100%
開始時刻 2014-01-02 07:09
終了時刻 2014-01-02 07:43
ユーザー名 biadmin
優先順位 NORMAL
結果を取得するまでの合計実行時間は 34 分です。

図 4 に、MapReduce プログラムの出力を示します。

図 4. 応答ファイルの内容 — デフォルトの実行
応答ファイルの内容 — デフォルトの実行を示す

カスタム入力フォーマットを使用する場合

この場合も以下に示すように、CompressedCombineFileWritable を入力キーとして、Mapper クラス (汎用タイプ) を使用します。

public class MaxTMapper extends
	Mapper<CompressedCombineFileWritable, Text, Text, IntWritable> {

処理ロジックは、前の例と同じです (「デフォルトの入力フォーマットを使用する場合」を参照)。

Reducer クラス (変更を加えていないため、前と同じです) の定義は以下のとおりです。

public class MaxTemperatureReducer extends
		Reducer<Text, IntWritable, Text, IntWritable>  {

ジョブの入力フォーマットを設定するには、以下のコードを使用します。

job.setInputFormatClass(CompressedCombineFileInputFormat.class);

サンプルのキーと値は以下のとおりです。

Key: 227070-99999-1901-116370

Line:
0029227070999991901101520004+62167+030650FM
-12+010299999V0201401N003119999999N0000001N9+00331+99999102311
ADDGF100991999999999999999999

図 5 と図 6 に、InfoSphere BigInsights 管理コンソールでカスタム (CompressedCombineFileInputFormat) 入力フォーマットを使用してプログラムを実行した結果を示します。

図 5. アプリケーション・ステータス — カスタムの実行
アプリケーション・ステータス — カスタムの実行を示す図

クリックして大きなイメージを見る

図 5. アプリケーション・ステータス — カスタムの実行

アプリケーション・ステータス — カスタムの実行を示す図
図 6. 完了
完了を示す図

InfoSphere BigInsights 管理コンソールには、表 3 に記載する重要な統計情報が表示されます。

表 3. 複数の年の出力
重要な属性 結果
名前 Find_Max_Temperature_Custom_Input_Format
ジョブ ID job_201401020703_0002
Map 完了率 100%
Reducer 完了率 100%
開始時刻 2014-01-02 08:32
終了時刻 2014-01-02 08:37
ユーザー名 biadmin
優先順位 NORMAL
実行して結果を取得するまでの合計時間は 5 分です。

図 7 に、MapReduce プログラムの出力を示します。

図 7. 応答ファイルの内容 — カスタムの実行
応答ファイルの内容 — カスタムの実行を示す図

まとめ

MapReduce は、クラスター内でディスクの転送速度で動作できる場合に最高のパフォーマンスを発揮するため、可能な限り、多数の小さいファイルを使用しないようにするのが賢明です。多数の小さなファイルを処理するとなると、ジョブを実行するために必要なシーク数が増えてきます。その一方、ビジネスや戦略上の理由によって多数の小さいファイルを処理しなければならない場合、HDFS を使用できるとしたら、CombineFileInputFormat が役立ちます。CombineFileInputFormat は小さいファイルに適しているだけでなく、大きいファイルを処理する場合にもパフォーマンスのメリットをもたらします。

参考文献

学ぶために

  • Understanding InfoSphere BigInsights」を読んで InfoSphere BigInsights 製品のアーキテクチャーと基本テクノロジーについて詳しく学んでください。
  • 動画「Big Data: Frequently Asked Questions for IBM InfoSphere BigInsights」を視聴して、Cindy Saracco が IBM のビッグ・データ・プラットフォームおよび InfoSphere BigInsights についてのよくある質問のいくつかを説明しているのを聞いてください。
  • 気象データを圧縮ファイルで入手するには、National Climatic Data Center (NCDC) にアクセスし、Home > Data Access > Land-Based Station > Datasets > Integrated Surface Database (ISD) の順にアクセスしてください。
  • BigInsights Technical Enablement Wiki」で、技術資料、デモ、トレーニング・コース、ニュース項目などへのリンクを調べてください。
  • Big Data University で Hadoop およびビッグ・データに関する無料のコースを調べてください。
  • InfoSphere BigInsights 製品に関するドキュメントは、InfoSphere BigInsights Information Center を参照してください。
  • Twitter API について詳しく学んでください。
  • プラットフォームについての情報やソフトウェア・ダウンロードへのリンクについては Eclipse.org にアクセスしてください。
  • developerWorks Big data コンテンツ・エリアで、ビッグ・データに関する技術資料、ハウツー記事、教育、ダウンロード、製品情報などを調べてください。
  • InfoSphere Streams を使い始めるのに役立つリソースを見つけてください。この IBM のハイパフォーマンス・コンピューティング・プラットフォームでは、何千ものリアルタイム・ソースからの情報を到着と同時に迅速に取り込み、分析し、相関させるアプリケーションを開発することができます。
  • マイペースで学習できるチュートリアル (PDF) に従って、ビッグ・データ環境の管理方法、分析対象のデータをインポートする方法、BigSheets を使ってデータを分析する方法、最初のビッグ・データ・アプリケーションを開発する方法、ビッグ・データを分析する Big SQL クエリーを開発する方法、そして InfoSphere BigInsights を使用してテキスト文書から洞察を引き出すエクストラクターを作成する方法を学んでください。
  • developerWorks テクニカル・イベントで最新情報を入手してください。
  • Twitter で developerWorks をフォローしてください。

製品や技術を入手するために

議論するために

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Open source
ArticleID=973520
ArticleTitle=Hadoop で CombineFileInputFormat を使用して小さい圧縮ファイルを処理する
publish-date=06122014