02 December 2017

IoTやAIのユースケースがますます増えているのなか、ビッグデータをリアルタイムにSQLでクエリする需要が高まっている。こちらを実現するソリューションとして、Apache Hive と Druid の組合せが魅力です。

  • Apache Hive と Druid とも実績の高いオープンソースソフトウェア(OSS)。
  • 両方とも非常に大量のデータでも対応できるスケールアウト・アーキテクチャ。
  • Apache Hive と Druid 自体や依存コンポネント全ては、Apache Ambariを使ってGUIウィーザーでインストール可能。
  • 簡単な設定だけで連携可能。

今回はApache HiveとDruidを使ったリアルタイムSQL処理を、サンプルみながら説明する。

Apache Hive 2とは

Apache HiveはHadoop上のSQLデータ・ウェアハウスです。最新のHive 2.2は新しいLLAPエンジン追加や、SQLサポートの強化など、大きく進化したビッグデータのSQLソリューションの事実上の標準となっている。

  • 全て99個のTPC-DSクエリを含む包括的なANSI SQLサポート。
  • 簡単な更新を可能にするACID MERGEを対応対応した唯一のHadoop SQLソリューション。
  • メモリ内キャッシュにより、MPPデータ・ウェアハウスと同レベルのパフォーマンスをHadoopスケールで実現。
  • ユーザーごとの動的な行と列レベルのセキュリティ。
  • すべての主要なBIツールと連携。
  • 300PB以上のスケール実績がある。

Hive 2 with LLAP

Hive 2 TPC-DS Benchmark

Druidとは

Druidは、大量のデータをすばやく取り込み、索引付けし、リアルタイムなクエリを可能にするための分散型、リアルタイム、列指向のデータストアです。以下の特徴がある:

  • ストリーミング的にデータ投入
  • 秒以下のクエリ速度
  • 蓄積データとリアルタイムデータの融合
  • 高速な近似計算

Druidアーキテクチャ

仮想通貨データのリアルタイム分析

この例では、Satoriのcryptocurrency-market-dataチャネルからデータをストリーミングして索引付けします。 そこから、SQLを使用してデータを分析したり、Apache Superset可視化レイヤーを上に置くことができます。

この例はこちらのコードを参照している。

前提条件

Satoriアカウントが必要です。Satoriからデータを取得するための app key が要る。アカウントは https://www.satori.com/ よる取得する。

Python、Druid、KafkaやHiveも必要です。Hortonworks HDP 2.6+ がこれらすべて入っているためお勧めです。

実行手順

こちらのGithub repoよりサンプルコードをダウンロードし、順番に下記スクリプトを実行する。

  • 00setup.sh を実行し、Pythonの依存ライブラリをインストール。
  • 01satori_cryptocurrency_kafka.py を変更し、Satoriより取得した自分の app key を入れる。
  • 01satori_cryptocurrency_kafka.py を実行しSatoriのデータをKafkaに投入。必要に応じて、01satori_cryptocurrency_kafka.py -k kafka:port を実行し、Kakfaサーバーとポートを指定する。
  • 別ターミナルより、03start_supervisor.sh を実行し、Kafkaよりデータを取得しDruidに索引を付ける。
  • 04query_rest.sh でデータをクエリする。最初のクエリは少し時間がかかる場合がある。

ここまでで、Satoriより仮想通貨データをリアルタイムに取得し、Kafkaに入れたあと、Druidにデータを投入しつつ索引付けを行った。問題なく行えば、最後のクエリにこのような結果が返ってくる。

$ ./04query_rest.sh
[ {
  "timestamp" : "2017-11-29T09:45:00.000Z",
  "result" : [ {
    "count" : 1402,
    "cryptocurrency" : "LTC"
  }, {
    "count" : 1224,
    "cryptocurrency" : "BTC"
  }, {
    "count" : 1212,
    "cryptocurrency" : "ETH"
  }, {
    "count" : 887,
    "cryptocurrency" : "DASH"
  }, {
    "count" : 745,
    "cryptocurrency" : "BCH"
  } ]
},
...
]

可視化とダッシュボード

Apache Superset (incubating) を用いてDruidに投入された仮想通貨データのリアルタイム・ダッシュボードを簡単に作成する。

仮想通貨ダッシュボード

HiveからSQLでデータ分析

DruidデータをHiveからSQLでクエリ可能です。

DruidをストレージとするHive外部テーブルを作成。

#!/bin/sh

cat<<EOF>/tmp/create_table.sql
SET hive.druid.broker.address.default=<DRUID_BROKER_HOST>:<DRUID_BROKER_PORT>;
CREATE EXTERNAL TABLE if not exists cryptocurrency_market_data
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES ("druid.datasource" = "cryptocurrency-market-data");
EOF

HS2=${1:-<HIVE_LLAP_HOST>:<HIVE_LLAP_PORT>}
BEELINE="beeline -u jdbc:hive2://$HS2/default"
$BEELINE -f /tmp/create_table.sql

SQLで問合せしてみる。

select cryptocurrency, count(*) c from cryptocurrency_market_data group by cryptocurrency order by c;

| XRP             | 3023  |
| BCH             | 3862  |
| ZEC             | 4144  |
| DASH            | 4646  |
| ETC             | 4662  |
| LTC             | 6572  |
| ETH             | 6660  |
| BTC             | 7212  |
+-----------------+-------+--+
69 rows selected (0.692 seconds)

SQLを何回か実行すると、結果がリアルタイムに変わっていることが分かる。Druidにリアルタイムに投入されたデータが、HiveからSQLでリアルタイムに問合せできている。

Tableauから、Hiveに接続しデータを可視化してみる。

Tableauによる可視化