この記事は Python Advent Calendar 2015 13 日目の記事です。
Python で手軽に並列 / Out-Of-Core 処理を行うためのパッケージである Dask について書きたい。Dask を使うと以下のようなメリットが得られる。
- 環境構築 / インストールが
pipで簡単にできる - 手軽に並列処理ができる
- Out-Of-Core (メモリに乗らないデータ) 処理ができる
補足 Dask は手持ちの PC の シングルコア / 物理メモリでは処理が少しきついかな、といった場合に利用するパッケージのため、より大規模 / 高速 / 安定した処理を行いたい場合には Hadoop や Spark を使ったほうがよい。
Dask は以下 3 つのサブパッケージを持つ。
| サブモジュール | ベースパッケージ |
|---|---|
dask.array |
NumPy |
dask.bag |
PyToolz (list, set, dict に対する処理) |
dask.dataframe |
pandas |
うち dask.dataframe については以前にエントリを書いた。
今回は NumPy API のサブセットをもつ dask.array について。dask.array でも基本的な考え方 / 挙動は dask.dataFrame と同じなので まず上のエントリを読んでください。
インストール
pip で。
pip install dask
基本的な操作
import numpy as np np.__version__ # '1.10.1' import dask dask.__version__ # '0.7.5' import dask.array as da
dask.DataFrame はいくつかの行を chunk としてまとめて並列処理を行っていたが、dask.Array は各 axis それぞれを chunk として分割することができる。以下では 4 行 4 列 の ndarray を 2 行 2 列 計 4 つの chunk に分割する。
arr = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]) arr # array([[ 1, 2, 3, 4], # [ 5, 6, 7, 8], # [ 9, 10, 11, 12], # [13, 14, 15, 16]]) darr = da.from_array(arr, chunks=2) darr # dask.array<from-ar..., shape=(4, 4), dtype=int64, chunksize=(2, 2)>
dask.Dataframe と同じく、dask.Array のメソッドを呼び出すことで 内部の Computational Graph を更新していく。評価 (計算の実行) を行うには .compute()。また、.visualize() で Computational Graph を描画することができる。
各行の合計を取る処理をみると、まずそれぞれの chunk ごとに 行の合計を計算 → その後 隣り合う chunk 同士の行の合計を取ることで最終的な出力を得ている。
darr.sum(axis=1) # dask.array<p_reduc..., shape=(4,), dtype=int64, chunksize=(2,)> darr.sum(axis=1).compute() # array([10, 26, 42, 58]) darr.sum(axis=1).visualize()
転置のように chunk の位置の入れ替えを伴う処理もできる。Computational Graph 中の四角形 (xxx, 0, 1) は (0, 1) 番目の chunk であることを表す。
darr.T.compute() # array([[ 1, 5, 9, 13], # [ 2, 6, 10, 14], # [ 3, 7, 11, 15], # [ 4, 8, 12, 16]]) darr.T.visualize()
dask.Array 同士の演算もできる。式中では darr.min(axis=1) が重複しているが、これらは Computational Graph 中でマージされ、評価時には 1 度だけ計算される。
((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).compute() # array([[ 0, -1, -2, -3], # [ 1, 0, -1, -2], # [ 2, 1, 0, -1], # [ 4, 3, 2, 1]]) ((darr - darr.min(axis=1)) / (darr.max(axis=1) - darr.min(axis=1))).visualize()
並列処理のメリット
各 chunk に対する処理は 自動的に並列化して実行されるため、ある程度データが大きい場合 / やりたい処理が並列で実行できる場合には速度メリットがある。以下では 長さ 1 億 の ndarray の合計を取る処理で比較した ( EC2 c4.2xlarge を利用)。
# NumPy arr = np.arange(100000000) %timeit arr.sum() # 10 loops, best of 3: 76.5 ms per loop # Dask darr = da.from_array(arr, chunks=10000000) %timeit darr.sum().compute() # 10 loops, best of 3: 25.3 ms per loop
補足 NumPy は主要な処理で GIL を解放しているため、Dask での並列処理は threading によって行われる。処理方法として multiprocessing を指定することもできる。
Out-Of-Core 処理のメリット
Dask では Computational Graph 中の各ノードを順に計算していくため、処理する全データが同時にメモリに乗っている必要がない。そのため、PC の物理メモリを超える大きさのデータも扱うことができる。以下のドキュメントでは複数の pkl ファイルを chunk として読み込む方法が記載されている。
関連パッケージ
以下のパッケージでは バックグラウンド処理を dask.array を利用して行うことができる。
Xray: 多次元ラベルデータをpandasのように処理できるパッケージ
まとめ
簡単にに並列 / Out-Of-Core 処理を行うためのパッケージである Dask のサブモジュールのうち NumPy 互換の dask.array の使い方を記載した。