これ の通り、Rx には本来の目的のみを遂行してもらいたいので考えみました。
まずはこれを読もう
下記に書いてある以上の説明はないです(圧倒的感謝っ)
- 実例によるkotlinx.coroutinesの手引き(日本語訳) - Qiita
- [Kotlin]コルーチンのChannelのハマり所 | Developers.IO
- Kotlinコルーチンによるリアクティブストリームのガイド (日本語訳) - Qiita
Kotlin の Channel と BroadcastChannel
Channel
は「キューのようなもの」と例えられます、BroadcastChannel
も同じく。
Channel<T>
は、送信者と受信者が 1:1 で、
BroadcastChannel<T>
は、送信者と受信者が 1:n です。 Broadcast と言われるように。
両方使ってみます。
// Channel の例
val c = ArrayChannel<String>(2) // capacity=2 は具体的に何に効くのか不明…
launch(CommonPool) {
// delay(2000) // キューに追加されてから受信してもok
c.consumeEach {
Log.d("KT","${it}")
}
}
launch(UI) {
c.send("a")
c.send("b")
c.send("c")
}
// BroadcastChannel の例
launch(UI) {
val bc = ArrayBroadcastChannel<String>(10)
bc.send("A")
bc.send("B")
// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}
// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}
delay(2000) // 受信側のために少し待ってから
bc.send("C")
bc.send("D")
}
上記の出力:
KT a
b
c
KT-BROADCAST-1 C
KT-BROADCAST-2 C
D
KT-BROADCAST-1 D
Channel
は send
されたものを consumeEach { }
で受信します。1:1 です。
ここで使っている ArrayChannel
は単純なキューなので、send が先、あとから consume でも問題なくすべて受信できます(capacity=2
は??)。
一方 BroadcastChannel
は、. openSubscription()
を呼ぶことで複数の受信者を持てます。受信者1と2が、それぞれ同じ値を受信できることがわかるでしょう。
ArrayBroadcastChannel
では、受信登録前に追加した "A", "B"
が受信されていません。
これは BroadcastChannel というよりは ArrayBroadcastChannel
の特性で、 受信者が誰も居ない状態で send()
された値はそのまま捨てられます。
Channel. send()
と offer()
どちらも「キューに要素を追加する」という役割ですが、次の違いがあります。
send()
- コルーチンの中(
launch(xx) { }
)でしか使えない - Channel のキャパシティを超える要素を追加できる(※半信半疑。
ArrayCannel
では追加できてしまった)
offer()
- コルーチン外でも使える
- Channel のキャパシティを超える要素を追加できない(
false
が返却される)
参考 − Channel - kotlinx-coroutines-core
ArrayBroadcastChannel と ConflatedBroadcastChannel
BroadcastChannel には ArrayBroadcastChannel
の他に ConflatedBroadcastChannel
というものがあります。
これは、 最近 send()
された値をひとつだけキャッシュしておく BroadcastChannel です、どっかで聞いたことありますね。
launch(UI) {
val bc = ConflatedBroadcastChannel<String>()
bc.send("A")
bc.send("B")
// 受信者1
launch(newSingleThreadContext("threadA")) {
val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription1.consumeEach {
Log.d("KT-BROADCAST-1","${it}")
}
// ここに何か書いても実行されないよん
}
// 受信者2
launch(newSingleThreadContext("threadB")) {
val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
subscription2.consumeEach {
Log.d("KT-BROADCAST-2","${it}")
}
// ここに何か書いても実行されないよん
}
delay(2000)
bc.send("C")
bc.send("D")
Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}")
}
上記の出力:
KT-BROADCAST-1 B
KT-BROADCAST-2 B
KT-BROADCAST-1 C
KT-BROADCAST-2 C
D
KT-BROADCAST-LATEST D
KT-BROADCAST-1 D
上記のように、受信登録前に send
された A, B
の内、最後に送信された B
が受信できています。
また、最後に送信された値を .valueOrNull
プロパティで取り出すことができます。
Rx との対比
さてさて、 Channel と BroadcastChannel を次のようにまとめてみました。
- Channel は 送信1 : 受信1 のキュー
- BroadcastChannel は 送信1 : 受信n
- ArrayBroadcastChannel は誰も受信者が居ない時に send された値は捨てられる
- ConflatedBroadcastChannel は(受信者が居なくても) send された最新の値を一つキャッシュして新しい受信者に配信する
次に Rx の Observable/Subject、Hot/Cold について次のようにまとめます(Observable/Flowable/Single/Completable
などの違いについては述べません)。
- Cold Observable は 「あなただけの」 Observable(ニコ動1)
- Hot Observable は 「みんなの」 Observable(ニコ生)
- PublishSubject は
onNext
された値を subscriber に配信する(subscriber が居なければ無視される) - BehaviorSubject は subscriber が居なくても
onNext
された最新の値をキャッシュして新しい subscriber に配信する
そうすると、Channel と Rx は次のように対比できると考えます。
- Channel - Cold Observable
- ArrayBroadcastChannel - PublishSubject(Hot Observable)
- ConflatedBroadcastChannel - BehaviorSubject(Hot Observable) ※これは冒頭のリンクにも説明あり
具体的には、
- メソッドの返値に
Observable<T>
を使っている →Channel<T>
に替える - 公開プロパティとして
Observable<T>
(実体はSubject<T>
) を用意している →BroadcastChannel<T>
に替える
ことができると思います。おまけですが、
- メソッドの返値に
Single<T>
やCompletable
を使っている →Continuation<T>
に替える
も。
適当にリポジトリクラスを書いてみると、こんな感じかなと。
// 「アドレス帳」 のリポジトリクラスのインターフェース
interface AddressRepository {
// アドレスリスト帳を通知する(1件のAddressの変更通知をするものなら完璧)
val addresses: BroadcastChannel<Array<Address>>
// なんかの条件でアドレス帳を非同期で検索して結果を返す
fun filter(criteria: String): Channel<Array<Address>>
// 特定の名前の人がアドレス帳に存在するかを非同期で調べて居るなら true を返す
suspend fun exists(name: String): Boolean
}
MV* の「つなぎ」に RxJava を使うのをやめたい からは少し前進できたかなと。もっと勉強が必要です。