この記事は CrowdWorks Advent Calendar 2017 20日目の記事です。

はじめに

クラウドワークスでエンジニアをしている @yosu@ToruIwashita です。

普段は主にCrowdWorksのアプリケーション基盤に関わる開発を行っています。

そんな中で最近、短期間で大量なデータを対象に複雑なデータ分析を行う必要がありました。

その際、普段のデータ分析では行わないような試行錯誤を経験したため、この記事ではそのとき起きたことを書いていきたいと思います。

背景

CrowdWorksではデータベースとしてMySQLを利用し、そのテーブル数は400超、総レコード数は数億あります。

このうち一部のデータはRedshiftに同期されていて、普段カジュアルにデータ分析したい場合はRedashを通してクエリを実行し、グラフ化、ダッシュボードの作成を行っています。

クラウドワークスのデータ分析の基盤と見える化 - Qiita

今回はRedshift上にある数十テーブル、数千万レコードが分析の対象となりました。

Redshift + Redash

開始当初は担当者3人が分析箇所を分担していました。それぞれRedash上でクエリを書き、それを持ち寄る形で分析していました。

しかし、分析の対象となるテーブルが数十テーブル、LEFT OUTER JOINやINNER JOIN、CASE文やWITH句を駆使しなければ抽出できないようなデータの構造や状態だったため、ある程度正しそうなクエリができた状態で保存し、それをコピーしたクエリをさらに組み立てるということが多発しました。

また、担当者間でも共通に使いたい中間データがいくつも増え、これをあらかじめテーブルにしておきたくなりました。

中間データがあればそれを使うクエリが楽になるのですが、中間データを作るためのクエリと、それを使う側のクエリで依存関係が生まれます。どちらのクエリでも変更があるため、実行順序やデータ全体の整合性がとれていることを担保する必要がありました。

そこで、CrowdWorksのKPI集計やデータ分析で利用実績のあったBricolageを利用することにしました。

Bricolage

Bricolageを利用することで、最終的な集計結果を出すための中間テーブルを作るクエリを分割することができます。また、クエリの依存関係もジョブネットという仕組みで簡単に管理できるようになりました。

Bricolageの具体的な利用方法については以下の記事を参照して下さい。

rukawa と bricolage とデータ集計 -クラウドワークス エンジニアブログ-

中間テーブルを作ることでクエリの重複を減らすことができました。
しかし、最初は数個で済んでいた中間テーブルが、分析が進むにつれ瞬く間に数が膨れ上がり、依存関係の管理がむずかしくなっていきました。

ジョブネットの定義は以下のように書きますが、これが100行を軽く超えるようになりました。

# monthly_salesジョブのあとにreportジョブが実行されることを保証
monthly_sales
-> report

user_registrations
-> report

数十ある依存関係のうち全ての依存を正しく書けていないと、結果は出てもその値が間違っている可能性があります。おかしなデータができたとしても、その原因に気づくのは容易ではありませんでした。

また、中間テーブルが増えたことによりクエリの実行時間が長くなっていきました(最終的には40分〜1時間程度)。

Bricolage + 階層化

前述の問題もあり、単一のジョブネットファイルですべての依存関係を管理するのがつらくなったため、分析単位を複数のディレクトリに分けて管理することにしました。

▾ project_root/
  ▾ 01_snapshot/
    base.jobnet
    snapshot_sales.ct
    snapshot_sales.job
    snapshot_sales.sql
    ...
  ▾ 02_data_correct/
    base.jobnet
    data_correct_sales.ct
    data_correct_sales.job
    data_correct_sales.sql
    ...
  ▸ 03_calculate/
  ▸ 04_report/
  ▸ 05_check/
  Rakefile

この構成で、以下の流れでデータを作っていきます。

  1. スナップショット(フィルタリング)
  2. データ補正
  3. 中間計算処理
  4. レポート(最終的な出力)
  5. 検算

スナップショット(フィルタリング)

スナップショットでは、ある時点でのデータを再現するための処理のみを行います。

例えば、ordersテーブルの状態を2017年9月末日の時点に戻したい場合は以下のようなクエリで、snapshot_ordersテーブルを作ります。

ordersテーブルではキャンセル日時が更新されることもあるためそれも考慮します。

INSERT INTO snapshot_orders
SELECT
  id,
  -- 戻したい時点でキャンセルされていなかったことを表すため、CASE文を使い値を補正
  CASE WHEN cancelled_at < '2017-10-01'
    THEN cancelled_at
    ELSE NULL
  END AS cancelled_at,
  ...
FROM orders
WHERE
  -- レコードの作成日時で絞込み
  created_at < '2017-10-01'

場合によってはテーブルの情報だけでは戻せない場合もありますが、その場合はこのスナップショットでは扱いません。簡単な絞込みと補正にとどめ、複雑なロジックが入り込まないようにします。

扱うデータがすべて不変で、ほしい時点でのスナップショットがとれていればこういった処理は必要ありません。データ構造から変えたくなりますが、それはかなり骨の折れる作業になるため、ここで示したような処理で工夫しながら対処していきます。

データ補正

例えば実装バグなどで正しい値が入っていないレコードがあって、分析時点でそれを認識している場合はデータ補正で正しい値にします。

例えば、snapshot_ordersテーブルの特定のレコードが不正である場合、以下のようなクエリでデータを補正し、data_corrected_ordersテーブルを作ります。

INSERT INTO data_corrected_orders
SELECT
  id,
  -- 実装バグによりcancelled_atが記録されてしまったものを補正する
  CASE
    WHEN id IN (55199, 55201, 55993)
      THEN NULL
    ELSE
      cancelled_at
  END AS cancelled_at,
  ...
FROM
  snapshot_orders
;

上の例では補正するレコードが3件でカラムも1つでしたが、量が多くなるとクエリ上で補正するのは限界があります。

たくさんの補正データがある場合、その補正データを別テーブルとして作成し、そのテーブルを使って補正します。

キャンセルされていないことが分かっている注文IDが入っているテーブルを、not_cancelled_ordersとしたときは以下のようになります。

INSERT INTO data_corrected_orders
SELECT
  snapshot_orders.id AS id,
  NVL(not_cancelled_orders.cancelled_at, snapshot_orders.cancelled_at) AS cancelled_at,
  ...
FROM
  snapshot_orders,
LEFT JOIN not_cancelled_orders USING (id)

中間計算処理

最終的に出力するレポートに使用する中間の計算結果をテーブルに切り出していきます。

意味のある単位に分割することで、作成時の確認や修正、再利用がしやすくなります。
例えば同じテーブルのデータを意味の異なるデータごとに分割する場合は以下のような形になります。

-- 割引適用された売上
INSERT INTO discounted_sales
SELECT
  sales.*
FROM
  sales
JOIN
  discounts ON sales.id=discounts.sales_id

-- 割引適用されていない売上
INSERT INTO not_discounted_sales
SELECT
  sales.*
FROM
  sales
LEFT JOIN
  discounts ON sales.id=discounts.sales_id
WHERE
  discounts.id IS NULL

ここではデータの補正はせず、集計や複雑な計算などのロジックにフォーカスします。

レポート

中間計算処理で作られたレコードを元に、最終的に欲しいデータを作成します。例えば、ユーザー毎の集計や、月別の集計などです。

まずは中間テーブルをまとめて一つのテーブルを作成します。

INSERT INTO monthly_active_user_sales
SELECT
  monthly_active_user_ids.user_id,
  monthly_active_user_ids.month,

  -- LEFT JOINしているため値がNULLになるケースを考慮する
  NVL(monthly_sales.amount, 0) AS sales_amount,
  ...
FROM monthly_active_user_ids
  LEFT JOIN monthly_sales USING(month, user_id)
  LEFT JOIN monthly_fees USING(month, user_id)
  LEFT JOIN monthly_discounts USING(month, user_id)
  ...
;

このテーブルをもとに、集計の軸や計算方法を変えて最終的にほしい結果を求めます。

ユーザー別の集計と、月別の集計を出す場合は以下のようになります。

-- ユーザー別の集計
INSERT INTO user_sales_report
SELECT
  user_id,
  SUM(sales_amount) AS total_sales_amount,
  ...
FROM monthly_active_user_sales
GROUP BY user_id
;
-- 月別の集計
INSERT INTO monthly_sales_report
SELECT
  month,
  SUM(sales_amount) AS sales_amount,
  ...
FROM monthly_active_user_sales
GROUP BY month
;

検算

ここではレポートの集計結果が正しく集計できているかどうかを確認します。

例えば売上金額は以下の式で計算できるとします。

売上金額 = 決済金額 - 返金額

このとき売上テーブルの金額を合計したものと、決済テーブルの金額合計 - 返金テーブルの金額合計が一致するかどうかを確認します。

このように、異なる方法で計算した値を突合することで、計算ロジックが変わったりした場合に正しく計算できているかを逐次確認できるようにします。

Rakeによるデータ生成タスク管理

Bricolageは便利なのですが、変数などを利用するようになるとコマンドが長くなります。

例えばレポートのみ実行したいとき、Bricolageのコマンドを直接書くと以下のようになります。

$ bundle exec bricolage-jobnet --environment=$SALES_SUMMARY_ENV \
                               --variable=closed_on=$SALES_SUMMARY_CLOSED_ON \
                               --variable=schema=$SALES_SUMMARY_SCHEMA \
                               --queue=queue/05_report/jobnet.queue \
                               05_report/base.jobnet

Rakeタスクを用意しておくことで以下のように直感的に実行できるようになります。

$ bundle exec rake jobnet:report

Rakeタスクのコード

SALES_SUMMARY_ENV       = ENV['SALES_SUMMARY_ENV'] || 'development'
SALES_SUMMARY_CLOSED_ON = ENV['SALES_SUMMARY_CLOSED_ON']
SALES_SUMMARY_SCHEMA    = ENV['SALES_SUMMARY_SCHEMA']

def jobnet_args(dirname, jobnet_name)
  [
    "--environment=#{SALES_SUMMARY_ENV}",
    "--variable=closed_on=#{SALES_SUMMARY_CLOSED_ON}",
    "--variable=schema=#{SALES_SUMMARY_SCHEMA}",
    "--queue=queue/#{dirname}.#{jobnet_name}.queue",
    "#{dirname}/#{jobnet_name}"
  ]
end

namespace 'jobnet' do
  desc 'Execute all jobs'
  task :all => %i(snapshot data_correct calculate report check)

  desc 'Execute snapshot'
  task :snapshot do
    args = jobnet_args('01_snapshot', 'base.jobnet')
    sh "bundle exec bricolage-jobnet #{args.join(' ')} "
  end

  ...

  desc 'Execute report'
  task :report do
    args = jobnet_args('04_report', 'base.jobnet')
    sh "bundle exec bricolage-jobnet #{args.join(' ')} "
  end

  desc 'Execute check'
  task :check do
    args = jobnet_args('05_check', 'base.jobnet')
    sh "bundle exec bricolage-jobnet #{args.join(' ')} "
  end
end

desc %q(Execute a job file specified by ENV['SALES_SUMMARY_JOB'] )
task :job do
  SALES_SUMMARY_JOB = ENV['SALES_SUMMARY_JOB']

  args = [
    "--environment=#{SALES_SUMMARY_ENV}",
    "--variable=closed_on=#{SALES_SUMMARY_CLOSED_ON}",
    "--variable=schema=#{SALES_SUMMARY_SCHEMA}",
    "--job=#{SALES_SUMMARY_JOB}",
  ]

  sh "bundle exec bricolage #{args.join(' ')}"
end

その他のTips

日本語テーブル名、カラム名べんり

Redshiftではテーブル名、カラム名に日本語が使え、Bricolageから実行しても問題無く動きます。中間テーブルを何回も使っていたら名前付けがツラくなっていきましたが、直接日本語を使うようにしたらすごく作業が捗りました。

CREATE TABLE sales_reports (
  month  date  not null,

  "1. 売上額" decimal(19,2) not null,
  "2. 割引額" decimal(19,2) not null,
  ...
)
DISTKEY (month) SORTKEY (month)
;

複数人同時にRedshift利用

Bricolageにはクエリに変数を埋めこむ機能があり、これをRedshiftのスキーマ名(ネームスペース)に適用することで複数人が同一のRedshiftを使って同時に分析できるようにしていました。

-- ユーザー別の集計
INSERT INTO user_sales_report
SELECT
  user_id,
  SUM(sales_amount) AS total_sales_amount,
  ...
い
FROM $schema.monthly_active_user_sales -- $schemaを開発者ごとに変更することで同一テーブルを上書きしない
GROUP BY user_id
;

変数はBricolageのコマンド実行時に--variableオプションで差し込みます。

$ bundle exec bricolage-jobnet --variable=schema=xxxx_development ...

S3にカラム名つきでアップロードする

Redshiftのunloadを利用するとS3にデータを任意の形式でアップロードできて便利ですが、アップロードされるのはデータのみでカラム名の情報はアップロードされません。今回はカラム名の情報を残したかったので以下のようなクエリを用いてカラム名が一行目に出力されるようにしました。

SELECT
  user_id,
  '月別売上金額',
  ...
FROM (
  SELECT
    1 AS header,
    'user_id' AS user_id,
    '月別売上金額' AS '月別売上金額',
    ...
  UNION ALL
  SELECT
    0 AS header,
    user_id::text AS user_id,
    monthly_sale_reports.amount::text AS '月別売上金額,
    ...
  FROM monthly_sales_reports
)
ORDER BY header DESC
;

Redshiftの列圧縮

Redshiftではテーブルの列単位で圧縮をかけることができます(列圧縮タイプの選択 - Amazon Redshift)。

データ容量を減らす目的で、特にレコード数の多いテーブルで外部キー以外の列に対して圧縮をかけたところクエリのパフォーマンスも向上しました。

利用するにはCREATE TABLEの列定義にencode指定を追加するだけです。

例: Zstandard圧縮する場合

CREATE TABLE sales (
  ...
  amount decimal(19,2) not null encode ZSTD,
  ...
)

まとめ

分析対象のデータが多い場合、何も考えずにクエリを作ってしまうとあっという間にCASE文、WITH句といった要素がいつくも入った複雑なクエリが出来てしまいます。そういったクエリを後で見た時理解し直すのは難しく、特に複数人で作業をする場合は理解するためだけにかなりの時間を要するという事も起きてしまい、作業が何度も膠着状態に陥りました。

Bricolageによってクエリを分割できるようになってからも、依存関係が膨れ上がり、理解できなくなるという問題が起きましたが、最後は自分たちで理解しやすい構造を作り、最終的な結果を出すことができました。

結局のところ、自分たちの頭には限界があり、それを超えた問題を解く場合はツールを使ったり工夫することで問題を小さくし、統合していく仕組みを作ることが必要でした。

一言で言えばなんのことはない、分割統治が問題解決の鍵でした。