Hadoopはどのように動くのか ─並列・分散システム技術から読み解くHadoop処理系の設計と実装

第18回 Impalaの設計と実装[2]

この記事を読むのに必要な時間:およそ 4.5 分

はじめに

今回は,ImpalaのSQL処理の高速化において重要な役割を占めるクエリ処理について説明します。

Impalaのクエリ処理の特徴

Impalaは,MapReduceやSparkをはじめとする既存の手続き型のデータ処理エンジンを使用せず,アドホックなSQLクエリの処理の高効率化に焦点を置いた設計と実装が特徴です。たとえば,結合方法を見てみると,MapやReduceもしくはMapReduceジョブなどのブロッキングオペレータ第16回を組み合わせていく処理エンジンにおいては,Impalaにおけるパイプライン結合処理などを実現することは必ずしも容易ではありません第8回「Impala/Prestoにおける結合処理」)。

また,MapReduceやSparkでは中間データをDiskに書き込むことにより高い耐障害性を実現しますが,Impalaでは耐障害性を多少犠牲にしてメモリ上で処理を完結することにより高速化を実現します第13回「高い耐障害性を実現する設計方針」※1)。

前回の説明にあったように,Impalaではコーディネータがクラスタの各ノードに処理を割り振ります。そのとき、事前に取得された統計情報を元にメモリの使用量なども推測し、当該処理の並列化方法を決定します。今回は,この並列化方法についてお話します。

まずHash Joinの処理を例にImpalaでの実行計画について説明し,次にノード間処理の並列化およびノード内処理の並列化について説明します。

※1)
このような設計方針のため,Impalaのバージョン1の時点ではソートや結合の処理でメモリサイズを超えるような処理が必要となった場合にはエラー終了する動作となっていました。バージョン2になり,メモリに収まらないハッシュ表のDisk Spillなども行われるようになりましたが,処理の高速性を保つためにはメモリ上で処理を完結させることが重要であることは変わりません。

実行計画とplan fragment

今回は,下記の「3つの表を等結合し,共通する値がいくつあるかを集計する」というクエリを例に話を進めていきます。


select count(colA) from tabA join tabB on colA=colB join tabC on colB=colC;

Impalaでは,クエリを受け取ると,RDBMSなどの一般的なSQL処理系と同様に第4回),クエリの実行計画を作成します。その際,Impalaのクエリコンパイラは,compute statsによって取得した統計情報を元に,コーディネータがメモリの使用量などを推測し,クラスタ内の各ノードに処理を均等に分配するように実行計画を作成します。上記のクエリを実行してImpalaのprofileコマンド※2で出力した実行計画の例を以下に示します。

図1 実行計画の例

Operator          #Hosts   Avg Time   Max Time    #Rows  Est. #Rows   Peak Mem  Est. Peak Mem  Detail                
---------------------------------------------------------------------------------------------------------------------
09:AGGREGATE           1   181.94ms   181.94ms        1           1   20.00 KB        -1.00 B  FINALIZE              
08:EXCHANGE            1   73.592us   73.592us        3           1          0        -1.00 B  UNPARTITIONED         
05:AGGREGATE           3  270.619ms  292.402ms        3           1    8.64 MB       10.00 MB                        
04:HASH JOIN           3   35.811ms   77.643ms   36.63K      33.94M   50.05 MB        3.23 MB  INNER JOIN, BROADCAST 
|--07:EXCHANGE         3   33.631ms   36.165ms  769.23K     769.23K          0              0  BROADCAST             
|  02:SCAN HDFS        1  357.375ms  357.375ms  769.23K     769.23K    8.66 MB       32.00 MB  default.tabc          
03:HASH JOIN           3    2s072ms    3s076ms  476.19K      33.33M  203.02 MB        5.99 MB  INNER JOIN, BROADCAST 
|--06:EXCHANGE         3   69.922ms   74.444ms    1.43M       1.43M          0              0  BROADCAST             
|  01:SCAN HDFS        1  586.556ms  586.556ms    1.43M       1.43M   12.66 MB       48.00 MB  default.tabb          
00:SCAN HDFS           3  531.481ms  761.759ms   33.33M      33.33M   32.66 MB      176.00 MB  default.taba

図1のOperator列の各行をそれぞれ見てみましょう。

AGGREGATION
集約関数(上記のcountなど)の集約処理を表します。今回は列値によるgroup byを実施していないため結果が1行にまとまっていますが,集約を実施する場合Impalaはメモリ上にgroup byのキーでハッシュ表を作成することにより処理を実行します
EXCHANGE
あるノードにおいて,ほかのノードから再分配されたレコードを受け取る処理を表します。その際,データはほかのノードのメモリからネットワークなどを介して,当該ノードのメモリへと転送されます。
HASH JOIN
メモリ上にハッシュ表を作成して等結合を行う処理を表します。Impalaでは,ハッシュ表を内部表(今回のtabBとtabC)として,外部表(今回のtabA)の値をProbeし,結合処理を行います(第5回の「Impala/Presto」第8回の「Impala/Prestoにおける結合処理」)。
SCAN HDFS
データの読み出し処理を表します。データの読み出し元は,HDFSが一般的ですが,HBaseやAmazon S3などのほかのストレージを用いることも可能です。

これらをふまえて実行計画を見てみると,SCANにより読み出されたtabB,tabCのレコードからハッシュ表を作成し(ビルドし),それらに対してtabAのレコードを引き当てる(プローブする)という流れを読み取ることができるかと思います。図2では,tabB,tabCに共通する値(緑の行)等価なtabAの行が,JOINの結果,選択されていく状況を示しています。

図2 Hash Joinのイメージ図

図2 Hash Joinのイメージ図

Impalaの場合,コーディネータが作成した実行計画をさらにplan fragmentと呼ばれる単位に分割し,各impaladに分配して実行します。そのとき,統計情報やcatalogdから共有されたブロックの配置状況,およびstatestoreから得られたクラスタ上のimpaladの状況を参照することにより,HDFSに対する入出力のローカリティを考慮してplan fragmentの実行ノードを選択します※3)。

以下に,Impalaのprofileコマンドで出力したplan fragmentの例を示します。

図3 plan fragmentの例

F00:PLAN FRAGMENT [RANDOM]
  DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=08, UNPARTITIONED]
  05:AGGREGATE
  |  output: count(colA)
  |  hosts=3 per-host-mem=10.00MB
  |  tuple-ids=3 row-size=8B cardinality=1
  |
  04:HASH JOIN [INNER JOIN, BROADCAST]
  |  hash predicates: colB = colC
  |  hosts=3 per-host-mem=3.23MB
  |  tuple-ids=0,1,2 row-size=12B cardinality=33940820
  |
  |--07:EXCHANGE [BROADCAST]
  |     hosts=1 per-host-mem=0B
  |     tuple-ids=2 row-size=4B cardinality=769231
  |
  03:HASH JOIN [INNER JOIN, BROADCAST]
  |  hash predicates: colA = colB
  |  hosts=3 per-host-mem=5.99MB
  |  tuple-ids=0,1 row-size=8B cardinality=33333334
  |
  |--06:EXCHANGE [BROADCAST]
  |     hosts=1 per-host-mem=0B
  |     tuple-ids=1 row-size=4B cardinality=1428572
  |
  00:SCAN HDFS [default.taba, RANDOM]
     partitions=1/1 files=1 size=282.57MB
     table stats: 33333334 rows total
     column stats: all
     hosts=3 per-host-mem=176.00MB
     tuple-ids=0 row-size=4B cardinality=33333334

上記はHash JoinからAggregationまでを行うplan fragmentですが,例ではtabAの担当範囲を変えて,同じplan fragmentを3つのノードで実行しています。

図1の2つ目の列は#Hostsとなっていて,対応する処理がいくつのノードで実施されたかを示しています。

※2)
Impalaのprofileコマンドは,クエリの実行結果をパフォーマンス統計の観点からまとめたレポートです。profileコマンドで見るほかに,ImpaladのWeb UIや,クラスタ自体の管理ツールであるCloudera Managerから見ることもできます。当該ツールにおいては,各処理の実行時間や処理量(行数),メモリ使用量(単一ノードでの最大値)などを確認することができます。このとき,見積り(Est.の列)と実際の値が特にSCANの行で大きく異なる場合,統計情報がない,もしくは統計情報が古いことが考えられます。適切な実行計画を作成できておらず,実行時間やメモリ使用量に影響があることも考えられるので,統計情報を収集することを検討してくださいマニュアルも参照)。
※3)
対象の表が小さい場合などにおいては,コーディネータが単一ノードでクエリの処理を完結させるように指示する場合もあります。

著者プロフィール

矢野智聡(やのともあき)

日本オラクル株式会社を経て,2014年よりCloudera株式会社所属。Impala,HiveなどのSQLに関連するエコシステムを中心としてHadoopクラスタのサポートに従事。


山田浩之(やまだひろゆき)

日本アイ・ビー・エム株式会社を経て,ヤフー株式会社にて分散型全文検索エンジンの研究開発に従事。2008年上期未踏IT人材発掘・育成事業において高性能分散型検索エンジンの開発によりスーパークリエータに認定。現在は東京大学生産技術研究所にて高性能並列データ処理系の研究開発に従事。博士(情報理工学)。

著書に『検索エンジン自作入門』。

コメント

コメントの記入