KotlinConf 2017 の情報を追ったり、Ktor を見たりしているとコルーチンがよく出てくる。 コルーチンについては概要は知っているが詳細を追いかけていなかったので、コルーチンについて学んでメモ程度に記録しておく。
Kotlin 1.1 においてコルーチンは実験段階で、将来にはこのページの情報は古くなっているかもしれないので注意されたし。
本記事に書かれている内容
- コルーチンとは何か
- コルーチンの実装がどこにあるのか (言語サポートと標準ライブラリと外部ライブラリ)
- コルーチンの基本的な使い方
- コルーチンのキャンセル処理について
- コルーチンコンテキストについて
- チャンネル
- 並行性の問題
- Select 文
Coroutine (コルーチン) って何?
まずは公式リファレンスの情報を追う。
- 公式リファレンス : Coroutines - Kotlin Programming Language
コルーチンは軽量なスレッドのようなもの。 スレッドの場合は、非同期処理を行う際に呼び出し側はスレッドをブロック (blocking) して待機するが、コルーチンの場合は非同期処理の呼び出しでコルーチンを中断 (suspension) することができる。 コルーチンの中断は、スレッドのブロックと比べて安く、より制御しやすい。
Kotlin 言語やライブラリとコルーチンの関係
言語機能 : suspending 関数 (suspending functions)
コルーチンに関する言語機能として、suspending 関数がある。 suspend
修飾子が付けられた関数である。
suspend fun foo(): Bar { ... }
このような関数の呼び出し時に、コルーチンの中断が発生する可能性がある。 (中断されない可能性もある。) Suspending 関数を呼ぶことができるのは、コルーチンの中や他の suspending 関数の中からだけである。
無名ラムダも suspending 関数になりえる。 kotlinx.coroutines
に含まれる async
関数の宣言は以下のようになっており、引数のラムダは suspending 関数である。
fun <T> async(block: suspend () -> T)
上記関数に渡したラムダは suspending ラムダとなる。
コルーチンの低レベルなコア API について
コルーチンの低レベルなコア API は、主にコルーチンを扱うライブラリのためのもので、基本的にはアプリケーションコードでは使わない。 (buildSequence
と buildIterator
だけはアプリケーションコードからの使用が想定されているらしい。)
低レベルなコア API についての詳細は以下にある。
コルーチンの使い方 (高レベル API)
kotlinx.coroutines
のリポジトリの中に詳しいドキュメントがあるので、それを読んでいく。 個人的にはコルーチンについて学ぶのに最初に読むドキュメントとしてはこれが一番わかりやすいと思う。
最初の例
最初の例として、launch
関数が使われた例が書かれている。
fun main(args: Array<String>) { launch { // 新しいコルーチンの起動。 // スレッドを使う場合のスレッドの作成・開始に相当する。 delay(1000L) // コルーチンの中断 (1 秒間)。 // スレッドを使う場合の `Thread.sleep(1000L) 相当だが、 // スレッドを使う場合と違って非ブロッキング。 println("World!") // ここに処理が来るのは 1 秒間の中断の後。 } println("Hello,") // 上記コルーチンが中断していてもここに処理は来る。 Thread.sleep(2000L) // アプリケーション全体が終了してしまうのを防ぐ。 }
スレッドとの対比を考えるとわかりやすいだろう。
最後に Thread.sleep
しているのは、コメントにあるようにアプリケーション全体が終了してしまうことを防ぐためである。 コルーチンはデーモンスレッドのような感じあり、アクティブなコルーチンが存在していてもそれによってプロセスが生き続けるわけではない。
上記の例では main
関数全体はコルーチン上で動かされていない (ので Thread.sleep
が使われている) が、全体をコルーチンで動かすために runBlocking
関数が紹介されている。 新しいコルーチンを起動し、コルーチンの処理が完了するまで現在のスレッドをブロックする、というもの。 コルーチンを使った処理とそうでない処理の橋渡しのために設計された関数である。
fun main(args: Array<String>) = runBlocking<Unit> { launch { /* ... この中は上の例と同じ ... */ } println("Hello,") // 上記コルーチンが中断していてもここに処理は来る。 delay(2000L) // アプリケーション全体が終了してしまうのを防ぐ。 }
上記の例では launch
で起動したコルーチンを待つために delay(200)
しているが、本来はコルーチン上の処理完了を明示的に待ちたい。 スレッドで Thread#join
するのと同じように、launch
の返り値である Job
の join
メソッドを呼ぶことで、コルーチンの処理が完了するのを (非ブロッキングに) 待つことができる。
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch { /* ... この中は上の例と同じ ... */ } println("Hello,") // 上記コルーチンが中断していてもここに処理は来る。 job.join() // 上記コルーチンの処理完了を待つ。 }
Suspending 関数の導入
ここまでの例では全て suspending ラムダを使ってきたが、実際のコードでは関数やメソッドとして処理を記述したいことが多い。 この記事の最初の方で紹介した suspending 関数として記述できる。
fun main(args: Array<String>) = runBlocking { val job = launch { sayWorld() } println("Hello,") // 上記コルーチンが中断していてもここに処理は来る。 job.join() } suspend fun sayWorld() { delay(1000L) // コルーチンの中断 (1 秒間)。 println("World!") // ここに処理が来るのは 1 秒間の中断の後。 }
キャンセルとタイムアウト
Job#cancel
メソッドを使ってコルーチンのキャンセル処理が可能。
キャンセル処理は協調的な処理である。 すなわち、コルーチンの処理がキャンセル処理に対応していなければならない。
具体的に言うと、下記のようなコードを書くとコルーチンの処理の中でキャンセルされるタイミングがないために、キャンセルがリクエストされてもコルーチンの処理が最後まで続いてしまう。
fun main(args: Array<String>) = runBlocking { val job = launch { sayWorld() println("Complete!") // `sayWorld` 処理中にキャンセルリクエストされても、 // `sayWorld` がキャンセルに対応してないのでここも処理される。 } println("Hello,") job.cancelAndJoin() } suspend fun sayWorld() { val startTimestamp = System.currentTimeMillis() while (System.currentTimeMillis() < startTimestamp + 5000L) { // Computing... // キャンセル処理に対応していないので、 // キャンセルがリクエストされても 5 秒間動き続ける } println("World!") // キャンセルされるタイミングがないので、 // キャンセルがリクエストされてもここも処理される。 }
キャンセルに対応する一つの方法としては、定期的にキャンセルに対応している suspending 関数を呼ぶことである。 例としては yield
関数が挙げられている。 他の方法として、自分でキャンセルされているかどうかを明示的に確認する、というものもある。
ちなみにキャンセルに対応した suspending 関数は、キャンセルされた際に CancellationException
例外を送出する。 そのため、try-finally
による終了処理を書いておけば、キャンセル時にも終了処理が行われる。
ちなみにキャンセルされたコルーチンから suspending 関数を呼ぶと、(既にキャンセルされているので) CancellationException
例外が送出されてしまう。 なので、めったにないことではあるが、終了処理で suspending 関数を呼ぶことは (普通にやろうとしても) 不可能である。 この問題に対応するには、run
関数 に NonCancellable
コンテキストを渡し、処理を実行してやる必要がある。
コルーチンをキャンセルしたい理由としてタイムアウト処理があるので、withTimeout
関数というものも用意されている。
async/await
TypeScript や ECMAScript を使っている人には async
/await
キーワードはなじみ深いものだと思う。 Kotlin でもライブラリで async
関数や await
メソッドといったものが提供されているが、コルーチンとの関係がいまいちよくわかっていなくて自分にとっては混乱のもとだった。
async
関数は、launch
関数と同じで新しいコルーチンを起動するものである。 launch
関数とは違って、返り値として Deferred
が返される。 JS 界隈の人にとっては Deferred
や Promise
というと馴染み深いであろう。 コルーチンから値を受け取ることができるのである。
そして、Deferred
からの値の取り出しに使われるのが Deferred#await
メソッドである。 この値の取り出しの待ち合わせも非ブロッキングである。
// 下記のようにコルーチン上で新しい非同期処理を開始して待合せたり val deferredValue1 = async { /* 何らかの非同期処理 */ } val deferredValue2 = async { /* 何らかの非同期処理 */ } println("${deferredValue1.await()}, ${deferredValue2.await()}") // 下記のように関数定義を行って、 fun asyncFoo() = async { /* 何らかの非同期処理 */ } // コルーチンから使ったり、 val fooValue = asyncFoo().await() // コルーチンの外で使ったりできる val deferredFooValue = asyncFoo() // ただし await メソッドは suspending 関数なので // コルーチンの外では使えない
上のような使い方を見ると JS の async
/await
をより柔軟に使えるようにしたもの、というような印象を受けるが、実際はもう少しややこしい気がしている。 JS ではメインのイベントループが 1 つ回っているだけなので、async 関数の本体の処理と呼び出し側の処理は同じスレッド的なものの上で動く。 一方で、Kotlin の場合に JS のイベントループに相当するものがコルーチンだと考える *1 と、単純にコルーチン内で suspending 関数を呼び出すのが JS の async
/await
との対比になるような気がする。
つまり、TypeScript で以下のように書くのが、
// 下記のような関数を async function asyncFoo(): string { /* ... */ } // 下記のように使用する let foo = await asyncFoo();
Kotlin における下記のコードに相当する、という考え方もできる。
// 下記のような suspending 関数を suspend fun foo(): String = /* ... */ // コルーチン内で下記のように使用する val foo = foo()
ともかく、JS 界隈の async
/await
との対比で理解しようとするよりは、コルーチンの仕組みをおさえた方が理解しやすいと感じた。
コルーチンコンテキストとディスパッチャ
- コルーチンコンテキストとディスパッチャの詳細 : kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub
- コルーチンは、
CoroutineContext
で表現されるコンテキストで実行される。- コルーチンコンテキストは、マップと集合によって値を持つ。
- (
CoroutineContext.Element
がCoroutineContext
を継承してるのは設計がイケてないという気がする……。)
- コルーチンコンテキストはジョブの情報を持っているし、コルーチンディスパッチャ (
CoroutineDispatcher
) の情報も持っている。coroutinContext[Job]
って感じでコンテキストからジョブ情報を取れる。- コルーチンディスパッチャは、どのスレッド (またはスレッド群) でコルーチンが実行されるかを決めるもの。
launch
やasync
のようなコルーチンビルダは、オプションでコルーチンコンテキストを受け取る。- デフォルトで使用されるディスパッチャは
DefaultDispatcher
で、現在の実装ではCommonPool
ディスパッチャと同じ。 - 親のコルーチンと同じコンテキストを使いたい場合は
coroutineContext
で参照すればよい。 Unconfined
ディスパッチャというものもあり、これはコルーチンを開始したり再開したりしたディスパッチャ上で動くもの。 つまり、コルーチンが中断して再開した場合、中断前と再開後で別のディスパッチャによって動かされる可能性がある。
- デフォルトで使用されるディスパッチャは
- コルーチン内でコンテキストを変化させることもできる :
run
関数 - コルーチンに親子関係を持たせたい場合は、親コルーチンのコンテキストを渡せばよい。 (コンテキストとしてジョブを指定してやればそれだけで良さそう。 詳細は下記)
- コルーチンに親子関係があるとき、親コルーチンの終了は子のコルーチンの終了を待つし、親コルーチンがキャンセルされたときは子のコルーチンもキャンセルされる。
- コンテキストは
+
演算子で合成できる。 右側にあるコンテキストが左側のコンテキストの関係する箇所を置き換える。 (置き換えられなかったものは引き継がれる。) - デバッグの話
- JVM オプションに
-Dkotlinx.coroutines.debug
を追加すると、スレッド名にコルーチン名が追加される。 CoroutineName
コンテキスト要素を使うことで、コルーチンの名前を指定できる。
- JVM オプションに
コルーチンの親子関係
上で説明したように、launch
関数などに指定するコンテキストにジョブが含まれていると、そのジョブが親コルーチンとなる。 そして、親コルーチンの終了は子のコルーチンが終了するのを待つ。
val job = launch { println("Parent coroutine") launch(coroutineContext) { // coroutineContext に親となるコルーチンのジョブが含まれている delay(1000L) println("Child coroutine") } } job.join() // ここに処理が来る前に 「Child coroutine」 は出力される。
また、親のコルーチンがキャンセルされると子のコルーチンもキャンセルされることを応用して、Android の Activity などのライフサイクルに紐づけてコルーチンをキャンセルさせたい場合などに、ライフサイクルに紐づくジョブを作っておいて、それを親にするという手法を取ることができる。
// Activity のライフサイクルに紐づくジョブ val activityRelatedJob = Job() val networkJob = launch(activityRelatedJob) { // 非同期通信など } // Activity 終了時に親ジョブをキャンセルすることで子も全てキャンセルできる。 activityRelatedJob.cancel()
チャンネル (Channels)
- チャンネルの詳細 : kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub
- Deferred は単一の値をコルーチン間でやりとりする便利な方法。 値のストリームをコルーチン間でやり取りするのに使えるのがチャンネル。
Channel
はBlockingQueue
のようなもの。 主要な違いは値の追加と取り出しが suspending 関数になっていること。- それ以上値がないことを示すためにチャンネルを閉じることができる。 受け取り側は
for
ループで受け取ることができる。 - Producer-consumer パターンとして一般的なパターン。
- チャンネルを生成する便利関数として
produce
関数がある。 - 受け取り側の便利拡張関数としては
consumeEach
メソッド がある。
- チャンネルを生成する便利関数として
- あるコルーチンが (おそらく無限に) 値のストリームを流し続けて、他のコルーチンが値を消費したり値に変換をかけたりするパイプラインというパターンもよく使われる。
- バッファのないチャンネルの場合、送信が先に呼ばれると受信が呼ばれるまで送信処理が中断されるし、受信が先に呼ばれると送信が呼ばれるまで受信が中断される。
- チャンネル作成時にバッファの大きさを指定できる。
- 複数のコルーチンが受信や送信を呼んで中断している場合、先に呼んだものから順に値を受け取ったり送信したりできる。 (Channels are fair)
変更可能な状態の共有と並行性
- 詳細 : kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub
- コルーチンも複数スレッド上で動きうるので、並行性の問題がある。
- Java でのマルチスレッド用の対策も 1 つの方法。
- 特定の値を参照するのを特定のスレッドからだけにするのも 1 つの方法。 (Thread confinement)
- 例えば UI に関するオブジェクトは UI スレッドからしか参照しないようにする、とか。
Mutex
による排他制御という方法もある。- これはスレッドの世界における
synchronized
やReentrantLock
相当のもの。 Mutex
は非ブロッキング。
- これはスレッドの世界における
- コルーチンと状態、そして他のコルーチンとやり取りするためのチャンネルをまとめたアクター (actor) という概念を用いても良い。
- アクターを生成するための
actor
コルーチンビルダが用意されている。 - 状態を触るのをアクターに限定することで並行性の問題を解消する。
- アクターを生成するための
おすすめ書籍
JVM 上での並行性・マルチスレッド対応については、下記の書籍がおすすめである。
Java並行処理プログラミング ―その「基盤」と「最新API」を究める―
- 作者: Brian Goetz,Joshua Bloch,Doug Lea
- 出版社/メーカー: ソフトバンククリエイティブ
- 発売日: 2006/11/22
- メディア: 単行本
- 購入: 30人 クリック: 442回
- この商品を含むブログ (174件) を見る
書評も書いたので参考にどうぞ。
Select expression
- 詳細 : kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub
- 呼び出すと中断する複数の suspending 関数を同時に待ち、利用可能になった最初の一つを選択する、ということもできる。
select
関数を利用する。Job#join
に対応する select 文を表すJob#onJoin
プロパティや、ReceiveChannel#receive
に対応する select 文を表すReceiveChannel#onReceive
プロパティがある。select
に渡すラムダの中でそれらの select 文を使って利用可能になったときに実行される処理を定義していく。
例
ドキュメントからの引用。
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> means that this select expression does not produce any result fizz.onReceive { value -> // this is the first select clause println("fizz -> '$value'") } buzz.onReceive { value -> // this is the second select clause println("buzz -> '$value'") } } }
どういう仕組みなのか初見ではわからなかったので簡単に解説しておく。
fizz.onReceive
は、上で説明したように ReceiveChannel#receive
に対応する select 文 (SelectClause1
オブジェクト) を返す。 そして、fizz.onReceive
と後続のブロック部分は、省略せずに書くと fizz.onReceive.invoke({ ... })
という形式になっている。
SelectBuilder
に SelectClause1#invoke
拡張関数が定義されている。 select
関数が受け取る引数の定義が SelectBuilder.() -> Unit
なので、fizz.onReceive.invoke({ ... })
というコードを記述できるのである。
詳細は別記事に書いた。
終わり
というわけで、Kotlin のコルーチンについて、下記のドキュメントを見ながら学んだことをまとめてみた。
- 公式リファレンス : Coroutines - Kotlin Programming Language
- ライブラリのガイド : kotlinx.coroutines/coroutines-guide.md at master · Kotlin/kotlinx.coroutines · GitHub
この 2 つのドキュメントを読むことで、コルーチンについて基本的な部分はおおよそ理解できるだろう。 本記事が、皆さんがコルーチンを理解するための一助となれば。
その他参考になるページ
- 実例によるkotlinx.coroutinesの手引き(日本語訳) - Qiita : 本ページでも参考にしたガイドの日本語訳。
- Androidの非同期処理をKotlinコルーチンで行う // Speaker Deck : Android にコルーチンを導入するのに参考になる。
*1:それはそれで正しくなくて、より正確にはイベントループはコルーチンコンテキストの一種だと考えるのが対比としては一番良い気はする