[Python] multiprocessing 備忘録
普段あまり使うことのないmultiprocessingについて勉強したので備忘録としてまとめておきます。長いので3ページに分けました。
この記事ではPython3を使います。なお、基本的に動作確認はLinux/Macでしておりその他OS(特にWindows)での動作確認はできてないのであしからず。
さて、Python(POSIX系)でプロセスを作り出す最も原始的な方法は「os.fork()」を使うことです。
os.forkは実行プロセスのクローンを作ります。生成された側のプロセス(子プロセス)ではos.fork()から0が返却されるため、この値によって分岐することで子プロセスに任意の動作をさせることができるわけですね。
こんなC言語みたいなことやってると辛くなるので今回はmultiprocessingという標準ライブラリを使ってプロセスを操作していきましょう。
multiprocessingには大きく分けて「Pool」「Process」という2つのプロセス実行方法があります。
Process
Processを使うとプロセスが一つ作られます。targetに関数を与え、関数に与える引数を「args」「kwargs」で指定すると、別プロセスでその関数が実行されます。
第一仮引数はgroupなんですが、これはthreadingモジュールとの互換性のために存在するだけなので、省略するかNoneを指定する必要があります。
console | stdout | ||
---|---|---|---|
|
|
start()メソッドで処理が開始され、terminate()で中断します。
ここではほとんど関係ありませんが、join()メソッドを実行すると処理がブロックされます。ここで言うブロックとは処理を待たされるという意味で、以降も出てくる表現です。
一度実行したProcessインスタンスは再度実行できないことに注意してください。同じ処理をしたい場合はインスタンスを作りなおします。
今回説明しませんでしたが、daemon属性がTrueのプロセスは親プロセスの実行と同時にお亡くなりになります。start()前に指定されている必要があります。
通常は子プロセスの処理が全て終了しない限り親プロセスは死にません。
Pool
Poolは予め作成したプロセスに処理を割り当てるための仕組みです。プールされたプロセスを使い回しながら処理を進めていきます。
例えば以下のように使います。
from multiprocessing import Pool import os import time start = time.time() def f(x): time.sleep(1) value = x * x print('{}s passed...\t{}\t(pid:{})'.format(int(time.time() - start), value, os.getpid())) return value with Pool(processes=3) as p: print(p.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) # コンテキストマネージャを使わずに以下のように書いても良い # Pool(3).map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) |
1s passed... 1 (pid:7484) 1s passed... 4 (pid:7486) 1s passed... 9 (pid:7485) 2s passed... 16 (pid:7486) 2s passed... 25 (pid:7484) 2s passed... 36 (pid:7485) 3s passed... 49 (pid:7486) 3s passed... 64 (pid:7484) 3s passed... 81 (pid:7485) 4s passed... 100 (pid:7486) [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] |
上記では複数の値に対して同じ処理を行うためのメソッドとしてmapを使っています。使い方はビルトインのmapと同じです。
mapはすべてのシーケンスを処理した結果をリストで返却するため、終了するまでブロックされます。シーケンスの要素にタプルやリストを指定するとそれを展開するstarmapというメソッドもあります。
また、イテレータを返却するimapがあります。イテレーションした時に処理が終了している結果だけ取り出すことができます。終了していない分はブロックされます。
from multiprocessing import Pool import time def f(x): time.sleep(10) return x * x p = Pool(processes=3) it = p.imap(f, range(10)) next(it) next(it) next(it) |
上記の例では10秒ごとに3個ずつ結果を取り出せます。
imapとmap_asyncはいずれも発行された時点でバックグラウンド処理を進めますが、map_asyncはすべての処理が終わっていないと結果が得られないのに対し、imapは終わっている分の結果だけイテレーションによってを取り出すことができます。
プールしたプロセスを一つずつ利用するメソッドにapply()とapply_async()があります。
from multiprocessing import Pool import time def f(x): time.sleep(10) return x * x p = Pool(processes=3) result1 = p.apply(f, args=(2,)) # 10秒後に結果が返却される result2 = p.apply_async(f, args=(3,)) # 結果はすぐに返却されるが、評価可能になるのは10秒後 print(result1, result2) print(result2.get()) # 値を評価する。評価できるまでブロックされる |
apply_asyncは AsyncResultオブジェクトを返却します。
内部的にはapplyメソッドはapply_asyncメソッドを呼んだ結果を返しているだけです。
次のページはプロセス間通信についてです