[Python] multiprocessing 備忘録

普段あまり使うことのないmultiprocessingについて勉強したので備忘録としてまとめておきます。長いので3ページに分けました。
この記事ではPython3を使います。なお、基本的に動作確認はLinux/Macでしておりその他OS(特にWindows)での動作確認はできてないのであしからず。

さて、Python(POSIX系)でプロセスを作り出す最も原始的な方法は「os.fork()」を使うことです。
os.forkは実行プロセスのクローンを作ります。生成された側のプロセス(子プロセス)ではos.fork()から0が返却されるため、この値によって分岐することで子プロセスに任意の動作をさせることができるわけですね。

こんなC言語みたいなことやってると辛くなるので今回はmultiprocessingという標準ライブラリを使ってプロセスを操作していきましょう。

multiprocessingには大きく分けて「Pool」「Process」という2つのプロセス実行方法があります。

Process

Processを使うとプロセスが一つ作られます。targetに関数を与え、関数に与える引数を「args」「kwargs」で指定すると、別プロセスでその関数が実行されます。
第一仮引数はgroupなんですが、これはthreadingモジュールとの互換性のために存在するだけなので、省略するかNoneを指定する必要があります。

console stdout
import time
from multiprocessing import Process
 
p1 = Process(target=print, args=(1, 2, 3), kwargs={'sep': '-'})
p1.start()  # 実行開始
p1.join()  # p.start()が終わるまで待つ
p1.start()  # 再実行不可
 
p2 = Process(target=lambda: time.sleep(10))
p2.start()
p2.terminate()
 
print(p1, p2)
#
#
#
#
1-2-3
#
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/multiprocessing/process.py", line 99, in start
    assert self._popen is None, 'cannot start a process twice'
AssertionError: cannot start a process twice
#
<Process(Process-1, stopped)> <Process(Process-2, stopped[SIGTERM])>

start()メソッドで処理が開始され、terminate()で中断します。
ここではほとんど関係ありませんが、join()メソッドを実行すると処理がブロックされます。ここで言うブロックとは処理を待たされるという意味で、以降も出てくる表現です。

一度実行したProcessインスタンスは再度実行できないことに注意してください。同じ処理をしたい場合はインスタンスを作りなおします。

今回説明しませんでしたが、daemon属性がTrueのプロセスは親プロセスの実行と同時にお亡くなりになります。start()前に指定されている必要があります。
通常は子プロセスの処理が全て終了しない限り親プロセスは死にません。

Pool

Poolは予め作成したプロセスに処理を割り当てるための仕組みです。プールされたプロセスを使い回しながら処理を進めていきます。

例えば以下のように使います。

from multiprocessing import Pool
import os
import time
 
start = time.time()
 
def f(x):
    time.sleep(1)
    value = x * x
    print('{}s passed...\t{}\t(pid:{})'.format(int(time.time() - start), value, os.getpid()))
    return value
 
with Pool(processes=3) as p:
    print(p.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
 
 
# コンテキストマネージャを使わずに以下のように書いても良い
# Pool(3).map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
1s passed...    1       (pid:7484)
1s passed...    4       (pid:7486)
1s passed...    9       (pid:7485)
2s passed...    16      (pid:7486)
2s passed...    25      (pid:7484)
2s passed...    36      (pid:7485)
3s passed...    49      (pid:7486)
3s passed...    64      (pid:7484)
3s passed...    81      (pid:7485)
4s passed...    100     (pid:7486)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

上記では複数の値に対して同じ処理を行うためのメソッドとしてmapを使っています。使い方はビルトインのmapと同じです。
mapはすべてのシーケンスを処理した結果をリストで返却するため、終了するまでブロックされます。シーケンスの要素にタプルやリストを指定するとそれを展開するstarmapというメソッドもあります。

また、イテレータを返却するimapがあります。イテレーションした時に処理が終了している結果だけ取り出すことができます。終了していない分はブロックされます。

from multiprocessing import Pool
import time
 
def f(x):
    time.sleep(10)
    return x * x
 
p = Pool(processes=3)
it = p.imap(f, range(10))
next(it)
next(it)
next(it)

上記の例では10秒ごとに3個ずつ結果を取り出せます。
imapとmap_asyncはいずれも発行された時点でバックグラウンド処理を進めますが、map_asyncはすべての処理が終わっていないと結果が得られないのに対し、imapは終わっている分の結果だけイテレーションによってを取り出すことができます。

プールしたプロセスを一つずつ利用するメソッドにapply()とapply_async()があります。

from multiprocessing import Pool
import time
 
def f(x):
    time.sleep(10)
    return x * x
 
p = Pool(processes=3)
result1 = p.apply(f, args=(2,))  # 10秒後に結果が返却される
result2 = p.apply_async(f, args=(3,))  # 結果はすぐに返却されるが、評価可能になるのは10秒後
print(result1, result2)
print(result2.get())  # 値を評価する。評価できるまでブロックされる

apply_asyncは AsyncResultオブジェクトを返却します。
内部的にはapplyメソッドはapply_asyncメソッドを呼んだ結果を返しているだけです。

次のページはプロセス間通信についてです

1 2 3