この記事は Chainer Advent Calendar 2015 17 日目の記事です。
はじめに
サイズが大きいデータを Deep Learning すると学習に時間がかかってつらい。時間がかかってつらいので並列処理して高速化したい。
並列化するのに良さそうなパッケージないかな? と探してみると、Dask
という並列 / Out-Of-Core 計算パッケージを見つけた。これと Chainer
を組み合わせると並列処理が簡単に書けそうな気がする。
最初は MNIST を並列化してみたが、データが小さすぎるせいか むしろ遅くなってしまった。もう少し大きいデータである CIFAR-10 を使い、より深いネットワーク構造でその効果を確かめたい。
最終的には以下二つの処理を並列化することを目指す。
- Data Augmentation
- DNN の学習
1. Data Augmentation
層の深い DNN をうまく学習させるため、学習データに対して クロッピング等の画像処理を行い学習データを増やす。予測も Augmentation した画像群に対して行いその平均をとる。具体的な処理は id:ultraist さんのエントリに詳しい。
これは Dask
を使って CPU 側で並列処理したい。Dask
についてはこちらを。
- Python Dask で 並列 DataFrame 処理 - StatsFragments
- Python Dask.Array で 並列 / Out-Of-Core 処理 - StatsFragments
補足 もっとも、学習データ/検証データはそれぞれ前もって 1 度だけ Augmentation しておけばよいため、この並列化の重要度は低い。Dask
は むしろ 2. DNN の学習の並列化をシンプルに書くために使う (予定)。
2. DNN の学習
Chainer
を使って GPU 側で並列処理したい。Chainer
のドキュメントにも記載されているが、学習を並列化する方法は大きく 2 種類ある。
2-1 | Model Parallel | ある DNN の処理ごとに異なる GPU を割り当てて並列化する。 |
2-2 | Data Parallel | ある DNN を複数のデータ/GPUで同時に学習させ、学習したパラメータをなんらかの方法で統合する。 |
2-1. Model Parallel は モデルごとに実装する必要がある。2-2. Data Parallel はある程度 汎用的に書けそうなので、今回は Data Parallel をやりたい。
2-2. Data Parallel での DNN の学習
Data Parallel にもいくつか方法がある。詳細は "確率的最適化 (機械学習プロフェッショナルシリーズ)" や PFI 岡野原さんの資料に記載されている。
- 作者: 鈴木大慈
- 出版社/メーカー: 講談社
- 発売日: 2015/08/08
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
Data Parallel について自分の理解を簡単に整理すると、
手法 | 概要 |
---|---|
Parameter Mixture (単純平均) | パラメータを各ノードで並列に最適化し、最後にパラメータの平均をとる。期待誤差が強凸 / 損失関数が十分滑らかなら上手くいくらしい。 |
Distributed Gradient (同期型・ミニバッチ法) | 勾配の計算を各ノードで並列に行い、パラメータの更新は 1台のノードで行う。勾配計算のたびにパラメータの同期が発生する。Chainer のドキュメントに記載されているのはこの方式。 |
Asynchronous Update (非同期分散) | 勾配の計算、パラメータ更新を各ノードで並列に行う。パラメータ更新時にロックしないため、古いパラメータを元に勾配計算 / 更新がされることがある。実装の一種である Hogwild! は スパースな問題を対象とし、パラメータ更新時に勾配が 0 でない部分だけを更新する。 |
Iterative Parameter Mixture | パラメータを各ノードで並列に最適化する。特定の期間ごと (epochごと等) にパラメータの平均をとり、更新されたパラメータを元に各ノードで最適化を繰り返す。 |
目指す姿
トラブルや金銭的な課題 (EC2) がなければ、以下 5 ステップに分けてやってみるつもり。
1 | データの準備 & Distributed Gradient での DNN 学習並列化 (←今回) |
2 | Iterative Parameter Mixture での DNN 学習並列化 |
3 | Dask による Data Augmentation の並列化 |
4 | ステップ 2 + ステップ 3 の処理を統合 |
5 | blaze/distributed ( 旧 dask.distributed )によるノード間分散処理 |
利用環境
- EC2 g2.8xlarge (GPU 4, vCPU 32, メモリ 60 GiB)
- Amazon Linux AMI with NVIDIA GRID GPU Driver
- cuDNN v2 (March 17,2015)
データのダウンロード
CIFAR-10 は以下サイトから入手できる。スクリプトでダウンロードする。
import numpy as np np.__version__ # '1.10.2' import os import requests fname = 'cifar-10-python.tar.gz' datadir = 'data' os.mkdir(datadir) reader = requests.get('http://www.cs.toronto.edu/~kriz/{0}'.format(fname), stream=True) with open(os.path.join(datadir, fname), 'wb') as f: for chunk in reader.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush()
ダウンロードした tar
ファイルを解凍する。以下のスクリプトを実行すると、 data/cifar-10-batches-py
ディレクトリの下に 訓練用、テスト用のデータを含む pickle
ファイルができる (拡張子はない)。
import tarfile tf = tarfile.open(os.path.join(datadir, fname), 'r') tf.extractall(datadir) tarpath = tf.getnames()[0] tarpath # 'cifar-10-batches-py' files = os.listdir(os.path.join(datadir, tarpath)) datafiles = [os.path.join(datadir, tarpath, f) for f in files if f.startswith('data')] datafiles # ['data/cifar-10-batches-py/data_batch_5', # 'data/cifar-10-batches-py/data_batch_4', # 'data/cifar-10-batches-py/data_batch_1', # 'data/cifar-10-batches-py/data_batch_3', # 'data/cifar-10-batches-py/data_batch_2']
データの準備
データは Dask
で読み込む。もっとも、今回は学習時には Dask
ではなく np.ndarray
を直接使うので普通に読み込んでもいい ( 次回以降は Dask
が活躍する予定...)。
まずは pickle
ファイルから 画像データ / ラベルデータを取得する関数を定義する。
import dask import dask.array as da dask.__version__ # '0.7.5' from six.moves import cPickle as pickle def load(fn): # https://www.cs.toronto.edu/~kriz/cifar.html with open(os.path.join(datadir, tarpath, fn), 'rb') as f: result = pickle.load(f) return result def load_data(fn): return load(fn)['data'] def load_labels(fn): # list から np.ndarray に変換 return np.array(load(fn)['labels'])
これらを使って、訓練用データを読み込む Computational Graph を定義する。詳細はこちらのドキュメントを。
dsk_data = {('data', i, 0): (load_data, 'data_batch_{0}'.format(i + 1)) for i in range(5)} dsk_data # {('data', 0, 0): (<function __main__.load_data>, 'data_batch_1'), # ('data', 1, 0): (<function __main__.load_data>, 'data_batch_2'), # ('data', 2, 0): (<function __main__.load_data>, 'data_batch_3'), # ('data', 3, 0): (<function __main__.load_data>, 'data_batch_4'), # ('data', 4, 0): (<function __main__.load_data>, 'data_batch_5')} data = da.Array(dsk_data, 'data', chunks=(10000, 3 * 32 * 32), dtype=np.float32, shape=(50000, 3 * 32 * 32)) data # dask.array<data, shape=(50000, 3072), dtype=float32, chunksize=(10000, 3072)> data.compute() # array([[ 59, 43, 50, ..., 140, 84, 72], # [154, 126, 105, ..., 139, 142, 144], # [255, 253, 253, ..., 83, 83, 84], # ..., # [ 35, 40, 42, ..., 77, 66, 50], # [189, 186, 185, ..., 169, 171, 171], # [229, 236, 234, ..., 173, 162, 161]], dtype=uint8) data.compute().shape # (50000, 3072)
ラベルについても同様。
dsk_labels = {('labels', i): (load_labels, 'data_batch_{0}'.format(i + 1)) for i in range(5)} labels = da.Array(dsk_labels, 'labels', chunks=(10000, ), shape=(50000, )) labels # dask.array<labels, shape=(50000,), dtype=None, chunksize=(10000,)> labels.compute() # array([6, 9, 9, ..., 9, 1, 1]) labels.compute().shape # (50000,)
テスト用データは np.ndarray
として読み込んだ。
test_data = load_data('test_batch') test_labels = np.array(load_labels('test_batch'))
データの確認
データの中身を確かめるため各画像を描画してみたい。各レコード中には (チャネル, 縦, 横) に対応する画素が順に 1 次元に並んでいる。これを matplotlib
で描画するため (縦, 横, チャネル) の 3 次元となるように reshape
→ transpose
する。
nrows = 3 ncols = 10 images = data[0:nrows*ncols, :].compute() images = images.reshape(nrows*ncols, 3, 32, 32).transpose(0, 2, 3, 1) images[0].shape # (32, 32, 3) import matplotlib.pyplot as plt fig, axes = plt.subplots(nrows, ncols, figsize=(10, 4)) for im, ax in zip(images, axes.flatten()): ax.imshow(im) ax.axis('off')
DNN の学習
Single GPU での学習
比較のため、まずは並列化せずに一つの GPU だけで学習 / テストがしたい。Chainer
をインポートし、 CUDA, cuDNN が利用できることを確かめる。
import chainer chainer.__version__ # '1.5.1' import chainer.cuda as cuda cuda.available # True cuda.cudnn_enabled # True
学習するモデルとしては、@mitmul さんが GitHub 上で公開されている CIFAR-10 用のモデルを使わせていただく。
今回は Data Augmentation をしておらずデータ数が少ないため、比較的単純なモデルである models/Cifar10.py
を利用した。API は後の処理にあわせて少し変更している。
from chainer import optimizers from chainer import Variable import chainer.functions as F import chainer.links as L class Cifar10(chainer.Chain): def __init__(self): super(Cifar10, self).__init__( conv1=L.Convolution2D(3, 32, 5, stride=1, pad=2), conv2=L.Convolution2D(32, 32, 5, stride=1, pad=2), conv3=L.Convolution2D(32, 64, 5, stride=1, pad=2), fc4=F.Linear(1344, 4096), fc5=F.Linear(4096, 10), ) def _internal_call(self, x, t, train=True): h = F.max_pooling_2d(F.relu(self.conv1(x)), 3, stride=2) h = F.max_pooling_2d(F.relu(self.conv2(h)), 3, stride=2) h = F.relu(self.conv3(h)) h = F.spatial_pyramid_pooling_2d(h, 3, F.MaxPooling2D) h = F.dropout(F.relu(self.fc4(h)), ratio=0.5, train=train) h = self.fc5(h) return h def __call__(self, x, t): h = self._internal_call(x, t, train=True) self.loss = F.softmax_cross_entropy(h, t) self.accuracy = F.accuracy(h, t) return self.loss def predict(self, x, t): """ データに対する予測値 (ラベル) を返す """ h = self._internal_call(x, t, train=False) self.pred = F.softmax(h) return self.pred.data.argmax(axis=1) optimizer = optimizers.Adam(alpha=0.001) model = Cifar10() xp = cuda.cupy model.to_gpu() optimizer.setup(model) optimizer.target # <__main__.Cifar10 at 0x7f7e9c586150>
ここで 訓練データ/テストデータを np.ndarray
に変換する。
def data_transformer(x): # 各レコード 1 次元のデータを 3次元 (3, 32, 32) に変換する return x.astype(np.float32).reshape(len(x), 3, 32, 32) def labels_transformer(y): return y.astype(np.int32) tr_data = data_transformer(data.compute()) tr_labels = labels_transformer(labels.compute()) te_data = data_transformer(test_data) te_labels = labels_transformer(test_labels)
学習 / テストを行う。ログの出力など実行に不要な箇所は省略した。
def run_epoch(optimizer, model, data, labels, train=True, batch_size=100): sum_accuracy = 0 sum_loss = 0 num = 0 total = len(labels) if train: indexer = np.random.permutation(len(labels)) data = data[indexer] labels = labels[indexer] for index in range(0, total, batch_size): x = data[index:index+batch_size] t = labels[index:index+batch_size] volatile = 'off' if train else 'on' x = Variable(xp.asarray(x), volatile=volatile) t = Variable(xp.asarray(t), volatile=volatile) if train: optimizer.update(model, x, t) sum_loss += float(model.loss.data) * len(t) sum_accuracy += float(model.accuracy.data) * len(t) else: predicted = model.predict(x, t) acc = float((predicted == t.data).sum()) sum_loss = np.nan sum_accuracy += acc return sum_accuracy, sum_loss for epoch in range(0, 20): print('******************** Epoch {:02d} (Train) ********************'.format(epoch + 1)) acc, loss = run_epoch(optimizer, model, tr_data, tr_labels, train=True) print('******************** Epoch {:02d} (Test) *********************'.format(epoch + 1)) acc, loss = run_epoch(optimizer, model, te_data, te_labels, train=False)
出力されたログを上に添付した。最左列は学習開始からの時間 (秒) である。20 Epoch 経過時点で 355 秒、テストデータに対して 60 % 程度の精度だった。
# ******************** Epoch 01 (Train) ******************** # 4.091: 10000/50000 loss=10.164 acc=0.123 # 7.329: 20000/50000 loss=6.162 acc=0.150 # 10.570: 30000/50000 loss=4.788 acc=0.177 # 13.812: 40000/50000 loss=4.064 acc=0.205 # 17.056: 50000/50000 loss=3.612 acc=0.231 # ******************** Epoch 01 (Test) ********************* # 18.140: 10000/10000 loss=nan acc=0.380 # # ... ... ... ... # # ******************** Epoch 20 (Train) ******************** # 340.570: 10000/50000 loss=0.993 acc=0.650 # 343.832: 20000/50000 loss=0.993 acc=0.650 # 347.095: 30000/50000 loss=1.001 acc=0.650 # 350.350: 40000/50000 loss=1.005 acc=0.650 # 353.611: 50000/50000 loss=1.014 acc=0.649 # ******************** Epoch 20 (Test) ********************* # 354.691: 10000/10000 loss=nan acc=0.621
Multi GPU での学習 (Distributed Gradient)
上に記載のとおり、今回は Distributed Gradient (同期型・ミニバッチ法) を利用した並列学習を行いたい。勾配計算は各 GPU で行い、パラメータの更新は 1 つの GPU で行う。
実装は Chainer
のドキュメントに記載されている内容をなぞればよい。利用するメソッドの概要を整理する。
メソッド | 概要 |
---|---|
Link.zerograds() |
Link に含まれる各 Variable が持つ勾配 ._grad を 0 で埋める。 |
Link.__call__() |
最適化する Variable を返す。Link を継承したクラスで定義する。 |
Variable.backward() |
Variable から誤差逆伝播を行う |
Link.addgrads(link) |
引数の Link に含まれる Variable の勾配を自身の対応する Variable に加算する。 |
Optimizer.update() |
(引数なしで呼ばれた場合) 計算済みの勾配をもとに、モデルの Variable を更新する。実装は GradientMethod.update() 中にある。 |
Link.copyparams() |
引数の Link に含まれる Variable の値を自身の対応する Variable にコピーする。 |
とはいえ、ドキュメントの書き方だと GPU 数が増えると少し手間だ。簡単な wrapper を作り、 Distributed Gradient を以下のように定型的に書けるようにしたい。
with model as m: # 各 GPU 上のモデルで .zerograds() を実行 m(x, t) # 各 GPU 上のモデルで .__call__() を実行 # 各 GPU 上のモデルで .backward() を実行し、 # 計算された勾配を .addgrads() でマスターに加算 optimizer.update() model.syncparams() # マスターのパラメータを .copyparams() で各 GPU 上のモデルにコピー
そのための wrapper として GPUDistributedGradient
クラスを定義する。
class GPUDistributedGradient(chainer.Chain): def __init__(self, chain, ngpus): if isinstance(ngpus, int): ngpus = list(range(ngpus)) self.ngpus = ngpus self._chains = [chain.copy().to_gpu(i) for i in ngpus] @property def master(self): """ 最初の GPU にあるモデルをマスターとする """ return self._chains[0] def __call__(self, *args): """ 引数毎に 'Variable のリスト' を渡す""" self._losses = [chain(*arg) for chain, arg in zip(self._chains, zip(*args))] return self._losses[0] def predict(self, x, t): """ 予測値はマスターで計算し、(まだ) 並列化しない """ return self.master.predict(x, t) def zerograds(self): for chain in self._chains: chain.zerograds() def backward(self): for loss in self._losses: loss.backward() def syncgrads(self): """ 各 GPU からの勾配をマスターに加算する """ for chain in self._chains[1:]: self.master.addgrads(chain) def __enter__(self): self.zerograds() return self def __exit__(self, exception_type, exception_value, traceback): self.backward() self.syncgrads() return True def syncparams(self): """ マスターのパラメータを他 GPU のモデルにコピーする """ for chain in self._chains[1:]: chain.copyparams(self.master)
上で作成したクラスを利用して、並列化したいモデルを wrap する。optimizer
へは マスターとなるモデル model.master
を渡す。
NGPU = 4 optimizer = optimizers.Adam(alpha=0.001) model = Cifar10() model = GPUDistributedGradient(model, NGPU) optimizer.setup(model.master) optimizer.target # <__main__.Cifar10 at 0x7f7e9c6ede10>
run_epoch
を書き換えて実行してみる。run_epoch
の呼び出しは先ほどと同じため省略する。
def run_epoch(optimizer, model, data, labels, train=True, batch_size=100): sum_accuracy = 0 sum_loss = 0 num = 0 total = len(labels) if train: indexer = np.random.permutation(len(labels)) data = data[indexer] labels = labels[indexer] batch_size = batch_size * NGPU for index in range(0, total, batch_size): x = data[index:index+batch_size] t = labels[index:index+batch_size] if train: # train 時には 利用する GPU と同じ長さの Variable のリストを渡す x = [Variable(cuda.to_gpu(v, i)) for i, v in enumerate(np.array_split(x, NGPU))] t = [Variable(cuda.to_gpu(v, i)) for i, v in enumerate(np.array_split(t, NGPU))] else: x = Variable(xp.asarray(x), volatile='on') t = Variable(xp.asarray(t), volatile='on') if train: with model as m: m(x, t) optimizer.update() model.syncparams() n = sum([len(_t) for _t in t]) # マスターで計算された loss / acc をレコード数倍して近似 sum_loss += float(model.master.loss.data) * n sum_accuracy += float(model.master.accuracy.data) * n else: predicted = model.predict(x, t) acc = float((predicted == t.data).sum()) sum_loss = np.nan sum_accuracy += acc return sum_accuracy, sum_loss
Single GPU の場合と同じくログを添付する。20 Epoch までの処理時間は、並列化なしでの 355 秒に対し 並列化した場合は 277 秒と 20% 程度 速くなっている。また、テストデータに対する精度も並列化なしの場合と同程度で問題はなさそうだ。
# ******************** Epoch 01 (Train) ******************** # 2.870: 10000/50000 loss=1.438 acc=0.479 # 5.439: 20000/50000 loss=1.431 acc=0.480 # 8.008: 30000/50000 loss=1.417 acc=0.488 # 10.595: 40000/50000 loss=1.416 acc=0.491 # 13.155: 50000/50000 loss=1.405 acc=0.492 # ******************** Epoch 01 (Test) ********************* # 14.243: 10000/10000 loss=nan acc=0.517 # # ... ... ... ... # # ******************** Epoch 20 (Train) ******************** # 265.769: 10000/50000 loss=0.498 acc=0.824 # 268.249: 20000/50000 loss=0.509 acc=0.821 # 270.738: 30000/50000 loss=0.518 acc=0.818 # 273.230: 40000/50000 loss=0.524 acc=0.814 # 275.717: 50000/50000 loss=0.528 acc=0.813 # ******************** Epoch 20 (Test) ********************* # 276.785: 10000/10000 loss=nan acc=0.634
処理中に watch nvidia-smi
すると 全 GPU が利用されていることが確かめられる。眺めていると GPU 1 〜 3 の利用率はしばしば 0% になっている (パラメータ更新のため)。
$ watch nvidia-smi
GPU の利用率が少ないのは モデルが単純だった / バッチサイズが小さかったせいだと思う。このあたりを適切に設定すればパフォーマンス向上の余地がありそうだ。
結果のプロット
縦軸をテストデータにおける精度、横軸を Epoch の経過 ( 100 Ticks で 1 Epoch ) としてグラフを描いた。
同じく、横軸に経過時間 (秒) として学習の過程をプロットした。訓練データの情報も追加した。形が異なるのは Multi-GPU で訓練データへの精度を正確に計算していないためかと思う。こうしてみると、テスト部分は並列化しなくてもあまり影響なさそうだ。
まとめ
Chainer
のドキュメントの手順に従い、DNN の学習を 4 GPU / Distributed Gradient で並列化した。次は Dask
の処理も使って Iterative Parameter Mixture をやりたい。
1 | データの準備 & Distributed Gradient での DNN 学習並列化 (←済み) |
2 | Iterative Parameter Mixture での DNN 学習並列化 (←次回) |
3 | Dask による Data Augmentation の並列化 |
4 | ステップ 2 + ステップ 3 の処理を統合 |
5 | blaze/distributed ( 旧 dask.distributed )によるノード間分散処理 |
利用したスクリプトは以下のリポジトリに置いた。Jupyter Notebook なので GitHub 上でレンダリングして確認することができる。
何かおかしいことやっていたらご指摘ください。
- 作者: 岡谷貴之
- 出版社/メーカー: 講談社
- 発売日: 2015/04/08
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (6件) を見る
- 作者: 鈴木大慈
- 出版社/メーカー: 講談社
- 発売日: 2015/08/08
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る