ここ数年、特にモバイルアプリ開発で流行ってるUIデザインパターンならなんでもですが、MVVM を例にすると、Usecase における Repository からの結果の受信、ViewModel における Usecase からの通知、あるいは View の変更の通知に RxJava の Observable<T>
を使用する例は多いと思います(かくいう自分もそう作ってきました)。
DroidKaigi 2018 のアプリもそうですね。
via DroidKaigi 2018 official Android app
しかし最近、この「つなぎ」の役割に RxJava を使うのはやり過ぎでは?と思うようになっています。その理由を次に書きます。
RxJava を使うのをやめたい理由
1. Rx は、できることが多すぎる
RxJava の学習コストが高いことは知られています。
つなぎの型が Observable<T>
であるだけで、多くの機能が使えてしまい、利用者(=開発者)を混乱させるでしょう。
Rx の真髄は豊富な Operator を組合せて、ストリーミングデータをエレガントに扱うことなので、たた単に「pub して sub するだけ」ならオレの出番じゃねぇよ、なんですよね。
2. 依存モジュールは最小にすべきだ
「つなぎ」に Observable<T>
を使うことで、つながっている両者は RxJava に依存してしまいます。
Google I/O 2018 で発表された Android App Bundle では、機能をモジュール化して、モジュール毎の(オンデマンドな)動的配信が可能になります。
モジュール化するにあたり、各モジュールが依存するモジュールはできるだけ最小にすべきで、「つなぎ」の役割のためだけに決して小さくはない RxJava に依存することには疑問があります。
3. Java に対する不安、Pure Kotlin への期待
の通り、今後の "疑似Java" の使用には幾ばくかの不安があります。
一方で、もはや Android アプリ開発の標準言語となった Kotlin は、言語自体は JVM が必須ではありません。例えば Kotlinマルチプラットフォームプロジェクト では、複数プラットフォームで使いまわせる「共有モジュール」は Kotlin の標準APIsのみを使って開発します。
RxJava はその名の通り、Java向けのライブラリであり JVM に依存します。
不安のあるJava成分を削除し、来るべきマルチプラットフォーム時代のためにモジュールを共通化しようと考えたら、JVM に依存している RxJava は足枷になるはずです(Pure Kotlin で書かれた Rx-Kotlin には期待したいですがそれは別な話で)。
とはいえ一番マッチしたのが RxJava だったんです
Java や Android の API にいわゆる Promise/Future や、ストリームを扱う共通インターフェースがなかったところに、「Rx(RxJava)なんか便利だぞ」って流行りだして、そのまま便利に使われちゃってる、というのが現状だと思います(自分も Android で Rx を使い始めたきっかけは Promise/Future の代わりでした)。
それはそれでベストな選択だったわけで何も間違ってはいないです。
ただ今はもっとベストな選択があるんじゃないか?と。
RxJava を Kotlin Coroutines に置き換えたら良いんじゃないかな?
前述の通り Kotlin はもはや Android アプリ開発では必須ですし、もしかしたら私の知っている以上にサーバーサイドでも使われているのかも知れません。
Kotlin には Coroutines という、非同期に特化したAPIセットがあります。まだ experimental(実験段階)だけど、プロダクトにぶっこんでる人も多いのではないかと。
そしてこの記事
では、RxJava と Coroutines の対比が解説されているではありませんか。
代表的なものにしぼって Rx → Coroutine の対応を挙げると、
- 1件だけ値を受信する
Single<T>
→Continuation<T>
- 完了したかのみを受信する
Completable
→Continuation<Unit>
- 複数の値を通知する
Subject<T>
→SendChannel<T>
- 複数の値を受信する
Observable<T>/Flowable<T>
→ReceiveChannel<T>
- 処理を実行する疑似スレッド
Scheduler
→CoroutineContext
となります。
このように置き換えれば RxJava への依存は切ることができそうです(ただし現在の Coroutines は JVM に依存してるみたいなので共通モジュールでは使えなさそう)。
RxJava は局所的に、本当に必要な場所だけで使おう
RxJava が完全に不要になるかといえばそうでもなく、そのオペレータはやっぱり便利です。
例えば、
- 大量に流れてくるデータを「間引き」する
debounce
-
複数のストリームの「どれか」が変わったら通知する
combineLatest
とか。
これら「RxJava でしかできない機能」が必要なら使うべきで、ただし「局所的に」するのがよいと思います。
DDD よく知らないけどカッコつけて言うなら「Cohesive Mechanisms(凝集されたメカニズム)パターン」でしょうか、Rx は What じゃなくて How の領域なのでそこだけ分離する、と。
幸い、
を使うと、 RxJava2 と Kotlin Coroutines の相互変更ができるようです。つなぎは Continuation<T>
や ReceiveChannel<T>
を使い、必要な箇所で Single<T>
や Flowable<T>
に変換して使えばよさそうです。
実際にやってみた
冒頭の DroidKaigi 2018 のアプリから RxJava を追い出して、代わりに Kotlin Coroutine を使ってみました。はじめは「DroidKaigiApp から Rx 全部抜く!」の意気込みで取り掛かりましたが、意外と RxJava にガッツリ依存していたのであきらめ 、一つの画面だけやってみました。
イメージ的にはこんな感じです。
やってみたのはスタッフ一覧、NavDrawer → Staff で出てくる画面です。
この画面は StaffViewModel
が StaffDataRepository
を使ってスタッフ一覧データを読み、それを画面に表示させています。
StaffDataRepository
から RxJava を追い出す
こちらの修正前のソースが以下です。見やすさ向上のため関係のない一部のコードは省いています。
class StaffDataRepository @Inject constructor(
private val context: Context,
private val schedulerProvider: SchedulerProvider
) : StaffRepository {
override fun loadStaff(): Completable = getStaff()
.subscribeOn(schedulerProvider.io())
.toCompletable()
override val staff: Flowable<List<Staff>>
get() = getStaff().toFlowable().subscribeOn(schedulerProvider.io())
private fun getStaff(): Single<List<Staff>> {
return Single.create { emitter ->
try {
val asset = LocalJsonParser.loadJsonFromAsset(context, "staff.json")
emitter.onSuccess(StaffJsonMapper.mapToStaffList(asset))
} catch (e: Exception) {
Timber.e(e)
emitter.onError(e)
}
}
}
}
staff: Flowable<List<Staff>>
が、読み出したスタッフリストを外部へ通知する Observable ですね。
そして loadStaff()
が、読み出しを非同期で実行するメソッドです(これ自体も戻り値が Completable
になっていますが、あまり関係ないので省略します)。
さてここから RxJava をやめて代わりに Coroutine を使ってみたのが次です。
class StaffDataRepository @Inject constructor(
private val context: Context,
private val schedulerProvider: SchedulerProvider
) : StaffRepository {
private val sender = ConflatedBroadcastChannel<List<Staff>>();
override val staff: ReceiveChannel<List<Staff>> = sender.openSubscription()
override fun loadStaff() {
launch(CommonPool) {
try {
val asset = LocalJsonParser.loadJsonFromAsset(
this@StaffDataRepository.context, "staff.json")
sender.offer(StaffJsonMapper.mapToStaffList(asset))
yield()
} catch (e: Exception) {
Timber.e(e)
sender.close(e)
}
}
}
}
まず staff
プロパティが ReceiveChannel<List<Staff>>
になりました。
そしてそれは ConflatedBroadcastChannel
である sender
を openSubscription()
して得ています。
sender : ConflatedBroadcastChannel
は値を送信する側、staff: ReceiveChannel
は値を受信するためだけのインターフェースです。これは Rx の Subject<T>
と Observable<T>
に似ていますね。
loadStaff()
は少し簡略化しました。
launch(CommonPool) { }
で非同期処理を開始し、Json を読み出した後、sender.offer()
でそれを送信します。
注目なのは、その次に yield()
を呼び出している点で、これにより非同期処理を開始したコルーチンに処理を戻します。yield()
を忘れると通知が受信できないので要注意です。
StaffViewModel
から RxJava を追い出す
次は ViewModel ですね。StaffDataRepository
からスタッフリストを受信してそれを LiveData<T>
に変換します。LiveData はリストとデータバインドされているので一覧に表示される仕組みです。
class StaffViewModel @Inject constructor(
private val repository: StaffRepository,
private val schedulerProvider: SchedulerProvider
) : ViewModel(), LifecycleObserver {
private val compositeDisposable: CompositeDisposable = CompositeDisposable()
val staff: LiveData<Result<List<Staff>>> by lazy {
repository.staff
.toResult(schedulerProvider)
.toLiveData()
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun onCreate() {
repository.loadStaff()
.subscribeBy(onError = defaultErrorHandler())
.addTo(compositeDisposable)
}
override fun onCleared() {
super.onCleared()
compositeDisposable.clear()
}
}
staff: LiveData<>
の宣言で StaffRepository.staff : Flowable<>
を LiveData に変換しています、宣言だけで完結する、いいコードですね。
あとは画面の表示時である onCreate
で StaffRepository.loadStaff()
を呼び出します。
で、こちらも RxJava をやめて代わりに Coroutine を使ってみたのが次です。
class StaffViewModel @Inject constructor(
private val repository: StaffRepository,
private val schedulerProvider: SchedulerProvider
) : ViewModel(), LifecycleObserver {
private val compositeDisposable: CompositeDisposable = CompositeDisposable()
val staff: LiveData<Result<List<Staff>>> by lazy {
val liveData = MutableLiveData<Result<List<Staff>>>()
launch(Unconfined) {
liveData.postValue(Result.inProgress())
repository.staff.consumeEach {
liveData.postValue(Result.success(it))
}
}
liveData
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun onCreate() {
repository.loadStaff()
}
override fun onCleared() {
super.onCleared()
compositeDisposable.clear()
}
}
staff: LiveData<>
の定義が少し長くなってしまいましたが、これは .toResult()
、 .toLiveData()
の拡張関数に頼れなかったためです。
launch(Unconfined) { }
で、呼び出し元と同じスレッドで処理を開始し、repository.staff.consumeEach { }
で値を受信しつづけます。値を受信したら liveData.postValue(it)
で値をViewに通知します。
この処理は ReceiveChannel<T>.toLiveData()
な拡張関数が欲しいですね。
さて、これらの修正で StaffFragment
- StaffViewModel
- StaffDataRepository
のラインでは RxJava を使わず Kotlin の Coroutine で完結させることができました。
修正前後の完全な差分は、
を見てください。
- Channel を使うために kotlinx-coroutines-core を追加
-
ConflatedBroadcastChannel
でデータを送信するとき、エラーが発生したら.close(throwable)
を呼ぶ -
StaffDataRepository
クラスはStaffRepository
インターフェースの実装なので、StaffRepository
も修正
などをしています。
疑問
これでよいんだろか?と思う点、いくつもあります。
sender.offer()
の後、 yield()
を呼ぶしかないのか
通知を受信するために yield()
が必須!とは言ったものの、絶対忘れそう…。
あと「スレッドを呼び出し元に戻す」ことで受信が可能になるという仕組みもなんだかハマりそうな予感。
SendChannel.openSubscription()
したら、誰が・いつ Close するの?
SendChannel.openSubscription()
で得られる ReceiverChannel
には close
メソッドがあります。
StaffDataRepository
で Open したんだから、StaffDataRepository
で Close すべき? なら StaffDataRepository
は Disposable であるべき?
CoroutineContext の扱い
launch(xxx) { }
の xxx に与える CoroutineContext、Rx では Scheduler にあたるわけですが、これはアプリ全体で統一感を持たせて管理すべきでしょう。修正前のコードでは SchedulerProvider
に ui/computation/io
などが用意されていました。同じようにアプリ UI/計算実行用/IO処理 など個別に CoroutineContext を用意して、SchedulerProvider
に持たせるとよいのかな、と思います。
すべて CommonPool
に頼るとどこかで衝突・デッドロックが発生しそうです。
おわり
とりあえずこんな感じで、次に Android アプリをスクラッチで開発するときには、つなぎに RxJava を使わない方向でやってみようかなーと思っています。
識者のコメント、お待ちしております。