Hatena::ブログ(Diary)

技術の切れはし

2010-09-04

Pythonでクラス内の関数を並列演算させる

オブジェクト指向プログラミング+multiprocessing.Poolでの並列処理でハマりました。

クラスの中でPoolライクな文法で並列演算を行なうことのできるMyPoolを自作したのでその紹介です。

Pythonでの並列処理について

Pythonにはver2.6以降からmultiprocessingという便利なものがあります。

これのPoolという機能を使うと、

test.py

# -*- coding: utf-8 -*-
from multiprocessing import Pool

def fuga(x): # 並列実行したい関数
    return x*x

def hoge():
    p = Pool(8) # 8スレッドで実行
    print p.map(fuga, range(10)) # fugaに0,1,..のそれぞれを与えて並列演算

if __name__ == "__main__":
    hoge()

出力結果

$ python test.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

このようにたったこれだけで並列処理ができてしまいます。

とっても便利。

fugaという関数に、range(10)つまり0~9までのそれぞれの値を入れたときの結果を8スレッドの並列演算をして求めているわけです。

関数にいろいろなデータを処理させたいときにそのデータをリストにして、処理結果をまたリストでもらってくるという方法はとても直感的です。

ただしこのPool、実は以下のコードでは動いてくれません。

test.py

from multiprocessing import Pool

def hoge():
    def fuga(x):
        return x*x
    p = Pool(8)
    print p.map(fuga, range(10))

if __name__ == "__main__":
    hoge()

出力結果

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

どうやら並列演算を行なう関数がローカル関数であると動かないようです。

(公式なルートで確認したわけではありません)

ついでに以下も同様のエラーを出して動きません。

test.py

from multiprocessing import Pool

class Test:
    def fuga(self, x):
        return x*x

    def hoge(self):
        p = Pool(8)
        print p.map(self.fuga, range(10))

if __name__ == "__main__":
    test = Test()
    test.hoge()

同じように並列演算を行なう関数がクラスのメンバ関数だと動かないようです。

ローカル関数が動かないのはあまり困りませんが、オブジェクト指向プログラミングしているとクラスのメンバ関数が動いてくれないのは非常に困ります。

そこでクラスのメンバ関数でも並列演算できるようなPoolクラスを自作してみました。

mypool.pyのコード

mypool.py

# -*- coding: utf-8 -*-

from multiprocessing import Process, Pipe

"""
マルチスレッドで関数を実行するためのクラスです。
クラスの中から使えます。
"""
class MyPool:
    proc_num = 8

    def __init__(self, proc_num):
        self.proc_num = proc_num

    """
    指定した関数funcにargsの引数を一つ一つ与え実行します。
    これらはあらかじめ指定された数のプロセスで並列実行されます。
    """
    def map(self, func, args):
        def pipefunc(conn,arg):
            conn.send(func(arg))
            conn.close()
        ret = []
        k = 0
        while(k < len(args)):
            plist = []
            clist = []
            end = min(k + self.proc_num, len(args))
            for arg in args[k:end]:
                pconn, cconn = Pipe()
                plist.append(Process(target = pipefunc, args=(cconn,arg,)))
                clist.append(pconn)
            for p in plist:
                p.start()
            for conn in clist:
                ret.append(conn.recv())
            for p in plist:
                p.join()
            k += self.proc_num
        return ret

私が使いたい機能しか実装していないので、メソッドはmapしかありません。でも単に並列処理をするならこれで充分。

MyPoolの使い方

使い方はPoolのときと同様です。

test.py

from mypool import MyPool

class Test:
    def fuga(self, x):
        return x*x

    def hoge(self):
        p = MyPool(8)
        print p.map(self.fuga, range(10))

if __name__ == "__main__":
    test = Test()
    test.hoge()

出力結果

$ python test.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

import先を変更して、PoolをMyPoolにかえるだけです。

MyPoolの仕組み

どうやらいろいろ試しているとクラスのメンバ関数で並列演算するのにPoolは使えなくとも同じmultiprocessingのProcessは使えることが分かったので、それを使って並列処理。

返り値の処理はmultiprocessingのPipeを利用。

割り当てられた数のスレッド分だけ指示された関数を走らせ、走っている分がすべて終われば残りの処理をまた並列で行ないます。

実際使ってみて

MyPoolで並列演算しながらtopで各プロセスCPU占有率などをみていましたが8スレッド指定をしてもどうやらいつも8スレッドフルパワーで動いてくれているわけではないようです。なんでかなぁ。

処理が速くなったことは確かですが。

MyPoolの使用に関して

上のmypool.pyはご自由に使用・改変していただいて構いません。ただしその動作や安全性に関して私は一切の保証をしません。

参考リンク

6.6. multiprocessing — Process-based “threading” interface

http://docs.python.org/library/multiprocessing.html

Multiprocessing の使いどころ。 - Twisted Mind

http://d.hatena.ne.jp/Voluntas/20081115/1226779786

Python 3.0 Hacks 第5回 multiprocessingモジュールによるプロセス間通信 - gihyo.jp

http://gihyo.jp/dev/serial/01/pythonhacks/0005

nHandnHand 2010/10/27 00:05 初めまして、python勉強の参考にさせて頂いております。貴重な情報ありがとうございます。

python素人の間違った意見かもしれませんが「for conn in clist: ret.append(conn.recv())」が、ネックになっており、0番~5番の子プロセスが動いていた場合、親プロセスは0番が終わるま「ret.append(conn.recv())」ステップで実行停止されているように思います。
 ※先に3番や4番が終わっていても、前の番号が終わるまでは待機。
0番が終わると次は1番が終わるのを待つ、以後1つづカウントアップ。

結果として五重並列処理なら、5個の子プロセスが全て終了するまで、次のセットの処理が実行できなくなっています。
「for p in plist: p.join()」に処理が来る時には、子プロセスの並列処理は1セット分終了しているので、join()は無駄な処理となっているようです。
その後、一番上のwhileに戻って並列処理の次のセットを実行しているようです。

ご参考までに。

zyxzyx 2011/05/31 01:09 MyPoolを試させていただきました。下記が動いてくれました。
p = MyPool(8)
a = 1
v = p.map(lambda i: myfunc(i, a), range(1024))

はてなダイアリーの記事の更新機能、はてなダイアリープラスを停止しました