Kotlin
RxJava

Kotlin の Channel と Rx の Observable/Subject の対比

これ の通り、Rx には本来の目的のみを遂行してもらいたいので考えみました。

まずはこれを読もう

下記に書いてある以上の説明はないです(圧倒的感謝っ)

Kotlin の Channel と BroadcastChannel

Channel は「キューのようなもの」と例えられます、BroadcastChannel も同じく。

Channel<T> は、送信者と受信者が 1:1 で、
BroadcastChannel<T> は、送信者と受信者が 1:n です。 Broadcast と言われるように。

両方使ってみます。

// Channel の例
val c = ArrayChannel<String>(2) // capacity=2 は具体的に何に効くのか不明…
launch(CommonPool) {
    // delay(2000) // キューに追加されてから受信してもok
    c.consumeEach {
        Log.d("KT","${it}")
    }
}

launch(UI) {
    c.send("a")
    c.send("b")
    c.send("c")
}

// BroadcastChannel の例
launch(UI) {
    val bc = ArrayBroadcastChannel<String>(10)
    bc.send("A")
    bc.send("B")

    // 受信者1
    launch(newSingleThreadContext("threadA")) {
        val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
        subscription1.consumeEach {
            Log.d("KT-BROADCAST-1","${it}")
        }
        // ここに何か書いても実行されないよん
    }

    // 受信者2
    launch(newSingleThreadContext("threadB")) {
        val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
        subscription2.consumeEach {
            Log.d("KT-BROADCAST-2","${it}")
        }
        // ここに何か書いても実行されないよん
    }

    delay(2000) // 受信側のために少し待ってから
    bc.send("C")
    bc.send("D")
}

上記の出力:

KT a
   b
   c

KT-BROADCAST-1  C
KT-BROADCAST-2  C
                D
KT-BROADCAST-1  D

Channelsend されたものを consumeEach { } で受信します。1:1 です。
ここで使っている ArrayChannel は単純なキューなので、send が先、あとから consume でも問題なくすべて受信できます(capacity=2 は??)。

一方 BroadcastChannel は、. openSubscription() を呼ぶことで複数の受信者を持てます。受信者1と2が、それぞれ同じ値を受信できることがわかるでしょう。

ArrayBroadcastChannel では、受信登録前に追加した "A", "B" が受信されていません。
これは BroadcastChannel というよりは ArrayBroadcastChannel の特性で、 受信者が誰も居ない状態で send() された値はそのまま捨てられます。

Channel. send()offer()

どちらも「キューに要素を追加する」という役割ですが、次の違いがあります。

send()

  • コルーチンの中(launch(xx) { })でしか使えない
  • Channel のキャパシティを超える要素を追加できる(※半信半疑。ArrayCannel では追加できてしまった)

offer()

  • コルーチン外でも使える
  • Channel のキャパシティを超える要素を追加できない(false が返却される)

参考 − Channel - kotlinx-coroutines-core

ArrayBroadcastChannel と ConflatedBroadcastChannel

BroadcastChannel には ArrayBroadcastChannel の他に ConflatedBroadcastChannel というものがあります。

これは、 最近 send() された値をひとつだけキャッシュしておく BroadcastChannel です、どっかで聞いたことありますね。

launch(UI) {
    val bc = ConflatedBroadcastChannel<String>()
    bc.send("A")
    bc.send("B")

    // 受信者1
    launch(newSingleThreadContext("threadA")) {
        val subscription1: SubscriptionReceiveChannel<String> = bc.openSubscription()
        subscription1.consumeEach {
            Log.d("KT-BROADCAST-1","${it}")
        }
        // ここに何か書いても実行されないよん
    }

    // 受信者2
    launch(newSingleThreadContext("threadB")) {
        val subscription2: SubscriptionReceiveChannel<String> = bc.openSubscription()
        subscription2.consumeEach {
            Log.d("KT-BROADCAST-2","${it}")
        }
        // ここに何か書いても実行されないよん
    }

    delay(2000)
    bc.send("C")
    bc.send("D")

    Log.d("KT-BROADCAST-LATEST","${bc.valueOrNull}")
}

上記の出力:

KT-BROADCAST-1  B
KT-BROADCAST-2  B
KT-BROADCAST-1  C
KT-BROADCAST-2  C
                D
KT-BROADCAST-LATEST  D
KT-BROADCAST-1  D

上記のように、受信登録前に send された A, B の内、最後に送信された B が受信できています。
また、最後に送信された値を .valueOrNull プロパティで取り出すことができます。

Rx との対比

さてさて、 Channel と BroadcastChannel を次のようにまとめてみました。

  • Channel は 送信1 : 受信1 のキュー
  • BroadcastChannel は 送信1 : 受信n
  • ArrayBroadcastChannel は誰も受信者が居ない時に send された値は捨てられる
  • ConflatedBroadcastChannel は(受信者が居なくても) send された最新の値を一つキャッシュして新しい受信者に配信する

次に Rx の Observable/Subject、Hot/Cold について次のようにまとめます(Observable/Flowable/Single/Completable などの違いについては述べません)。

  • Cold Observable は 「あなただけの」 Observable(ニコ動1)
  • Hot Observable は 「みんなの」 Observable(ニコ生)
  • PublishSubject は onNext された値を subscriber に配信する(subscriber が居なければ無視される)
  • BehaviorSubject は subscriber が居なくても onNext された最新の値をキャッシュして新しい subscriber に配信する

そうすると、Channel と Rx は次のように対比できると考えます。

  • Channel - Cold Observable
  • ArrayBroadcastChannel - PublishSubject(Hot Observable)
  • ConflatedBroadcastChannel - BehaviorSubject(Hot Observable) ※これは冒頭のリンクにも説明あり

具体的には、

  • メソッドの返値に Observable<T> を使っている → Channel<T> に替える
  • 公開プロパティとして Observable<T>(実体は Subject<T>) を用意している → BroadcastChannel<T> に替える

ことができると思います。おまけですが、

  • メソッドの返値に Single<T>Completable を使っている → Continuation<T> に替える

も。

適当にリポジトリクラスを書いてみると、こんな感じかなと。

// 「アドレス帳」 のリポジトリクラスのインターフェース
interface AddressRepository {
   // アドレスリスト帳を通知する(1件のAddressの変更通知をするものなら完璧)
   val addresses: BroadcastChannel<Array<Address>>

   // なんかの条件でアドレス帳を非同期で検索して結果を返す
   fun filter(criteria: String): Channel<Array<Address>>

   // 特定の名前の人がアドレス帳に存在するかを非同期で調べて居るなら true を返す
   suspend fun exists(name: String): Boolean
}

MV* の「つなぎ」に RxJava を使うのをやめたい からは少し前進できたかなと。もっと勉強が必要です。