概要
RxAndroid 0.22.0ではAndroidObservableというActivityやFragmentの状態を考慮してコールバックしてくれるObservableを作れるUtilityがある(最新のRxAndroidではAppObservableという名前になってるぽい)。
subscribeOn(newThread())なObservable、つまりバックグラウンドで動作するObservableをAndroidObservable.bindActivity()等を使ってくるむと、「Activityが既に終了していたらonNext()を呼ばない」といった事を自動でやってくれるようになる。
問題
以下のように、バックグラウンドで動作するObservableをAndroidObservable.bindActivity()で安全にした上で、map()してsubscribe()すると、map()がUIスレッドで実行される。map()で通信処理等をやっていると爆死する。以下だと1が別スレッド、2,3はUIスレッドで実行される。
AndroidObservable.bindActivity(this, http.get(url) // 1. subscribeOn(newThread())なObservableを返すとする ).map({ body -> // 2. なんかパースして再度通信処理をする parseAndLoad(body) }) .subscribe({ result -> //.3 })
原因
以下はAndroidObservable.bindActivity()の実装(ver 0.22.0)。最初にobserveOn(mainThread())を実行している。
これによりここ以降のOnSubscribe.call()はUIスレッドで実行される。
public static <T> Observable<T> bindActivity(Activity activity, Observable<T> source) {
Assertions.assertUiThread();
return source.observeOn(mainThread()).lift(new OperatorConditionalBinding<T, Activity>(activity, ACTIVITY_VALIDATOR));
}
対応
subscribeOn(newThread())で処理したい処理はAndroidObservable.bindActivity()の内側に置く。基本的にAndroidObservable.bindActivity()は最後にあてる感じすればいいんだと思う。
AndroidObservable.bindActivity(this, http.get(url) .map({ body -> parseAndLoad(body) })) .subscribe({ result -> //.3 })
まとめ
observeOn(),subscribeOn()はセットするタイミングが重要っぽい。