asyncioでPythonの非同期処理を書いてみる
こんにちは、CX事業本部の夏目です。
LambdaでAWSリソースへアクセスする際に直列的に処理するのではなく、並列的に処理したいことは度々あります。
NodeだとPromiseを使えば簡単にできるのですが、Pythonではどうやるのか気になったので調べてみました。
注意
ここではPython3.5以降を使うことを想定して書いてます。
asyncio モジュール
Pythonではasyncioモジュールを使って並列的に処理を書くことができます。
超ざっくり言うと、イベントループを使ってコルーチンを実行しているようです。
それ以上の説明は下記記事を見てください。
簡単に使ってみる
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import asyncioimport timeasync def sleeping(sec): loop = asyncio.get_event_loop() print(f'start: {sec}秒待つよ') await loop.run_in_executor(None, time.sleep, sec) print(f'finish: {sec}秒待つよ')def main(): array = [5, 1, 8, 3, 4] loop = asyncio.get_event_loop() print('=== 一つだけ実行してみよう ===') loop.run_until_complete(sleeping(2)) print('\n=== 5つ並列的に動かしてみよう') gather = asyncio.gather( sleeping(5), sleeping(1), sleeping(8), sleeping(3), sleeping(4) ) loop.run_until_complete(gather)if __name__ == '__main__': main() |
実行結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | === 一つだけ実行してみよう ===start: 2秒待つよfinish: 2秒待つよ=== 5つ並列的に動かしてみようstart: 4秒待つよstart: 1秒待つよstart: 8秒待つよstart: 5秒待つよstart: 3秒待つよfinish: 1秒待つよfinish: 3秒待つよfinish: 4秒待つよfinish: 5秒待つよfinish: 8秒待つよ |
説明
基本的には、
asyncio.get_event_loop()でイベントループを取得- 並列的に動かしたい関数は
asyncをつけて定義 - 時間がかかる処理は、
awaitと宣言してからイベントループのrun_in_executorで呼び出す - イベントループの
run_until_completeで並列的に実行しつつ終わるまで待つ - 複数同時に処理したい場合は
gatherでくくってから、run_until_completeにわたす
といった感じです。
並列数を制限しながら実行する
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | import asyncioimport functoolsimport timeasync def sleeping(sec): loop = asyncio.get_event_loop() func = functools.partial(time.sleep, sec) print(f'start: {sec}秒待つよ') await loop.run_in_executor(None, func) print(f'finish: {sec}秒待ったよ')async def limited_parallel_call(sec_list, limit): sem = asyncio.Semaphore(limit) async def call(sec): with await sem: return await sleeping(sec) return await asyncio.gather(*[call(x) for x in sec_list])def main(): loop = asyncio.get_event_loop() options = [5, 1, 8, 3, 4] print('=== 並列実行数制限なし ===') loop.run_until_complete(asyncio.gather(*[sleeping(x) for x in options])) print('=== 2並列に制限 ===') loop.run_until_complete(limited_parallel_call(options, 2)) print('=== finish ===')if __name__ == '__main__': main() |
実行結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | === 並列実行数制限なし ===start: 3秒待つよstart: 4秒待つよstart: 1秒待つよstart: 8秒待つよstart: 5秒待つよfinish: 1秒待ったよfinish: 3秒待ったよfinish: 4秒待ったよfinish: 5秒待ったよfinish: 8秒待ったよ=== 2並列に制限 ===start: 4秒待つよstart: 8秒待つよfinish: 4秒待ったよstart: 3秒待つよfinish: 3秒待ったよstart: 5秒待つよfinish: 8秒待ったよstart: 1秒待つよfinish: 1秒待ったよfinish: 5秒待ったよ=== finish === |
説明
並列的に処理できるからって全部一度に実行するばかりだと困ることがあります。
なので、今回は並列数を制限してやってみます。
並列数の制限にはSemaphoreを使います。
これはオブジェクトを作成する際に最大並列数を指定して、同時に実行できる数を制限できます。
並列に実行した処理の結果を受け取る
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import asyncioimport timeimport functoolsimport requestsdef sleep(sec): time.sleep(sec) return secasync def get_global_ip(): loop = asyncio.get_event_loop() print('get_ip') return resp.textasync def parallel_sleep(seconds): loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, sleep, seconds) print(f'sleep {resp}sec') return respdef main(): loop = asyncio.get_event_loop() gather = asyncio.gather( parallel_sleep(10), get_global_ip(), parallel_sleep(1) ) results = loop.run_until_complete(gather) print(results)if __name__ == '__main__': main() |
実行結果
1 2 3 4 | get_ipsleep 1secsleep 10sec[10, '121.101.70.247', 1] |
説明
並列的に処理した結果を使いたいことは多々あると思います。
取得する方法は簡単で
await loop.run_in_executorの返り値は渡した関数の返り値run_until_completeで一つだけ実行する場合は、実行したものの返り値をそのまま貰えるrun_until_completeに複数の処理を渡した場合は、渡した順番で結果が格納された配列を貰える
になります。
関数の引数の指定の方法
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import asyncioimport timeimport functoolsdef delayed_print(mes, sec): time.sleep(sec) print(mes)async def call_1(mes, sec): loop = asyncio.get_event_loop() await loop.run_in_executor(None, delayed_print, mes, sec)async def call_2(mes, sec): loop = asyncio.get_event_loop() func = functools.partial(delayed_print, mes, sec) await loop.run_in_executor(None, func)async def call_3(message, seconds): loop = asyncio.get_event_loop() func = functools.partial(delayed_print, mes=message, sec=seconds) await loop.run_in_executor(None, func)def main(): loop = asyncio.get_event_loop() gather = asyncio.gather( call_1('333', 3), call_2('222', 2), call_1('111', 1) ) loop.run_until_complete(gather)if __name__ == '__main__': main() |
実行結果
1 2 3 | 111222333 |
説明
引数を指定する際、いくつか方法があります。
- 順番に引数を渡す場合、
run_in_executorに関数を渡したあと順番に引数書く - 順番に引数を渡す場合、
functoolモジュールのpartialに関数を渡したあと順番に引数を書き、partialの返り値をrun_in_executorに渡す - 名前付き引数を使う場合、
functoolモジュールのpartialに関数を渡したあと名前付き引数を書いて、partialの返り値をrun_in_executorに渡す
順番に引数を渡すだけならrun_in_executorだけで事足りるのですが、名前付き引数を使おうとするとfunctoolモジュールのpartialを使用する必要があります。
まとめ
Pythonで非同期処理を行う方法について簡単にまとめてみました。
ぜひ使ってみてください。