RxJava Advent Calendar 2015 の23日分です。
はい、さっそくですがRxInTheBoxにSchedulerを実装しました。 observeOn() と subscribeOn() が本家RxJava同様の振る舞いをします。
add subscribeOn() and observeOn() to RxInTheBox by gfx · Pull Request #2 · gfx/RxInTheBox · GitHub
これは何
半年ほど前に社内勉強会のネタとしてRxJavaのミニマムな実装をしてみました。それがRxInTheBoxで、主な実装が約20行です。
RxJava的なものを最小限に実装してコンポーネントの関係を理解する - Qiita
そして時は流れ、subscribeOn()とobserveOn()の挙動にハマったりしながらなんとかRxJavaをそれなりに使えるかなと思えてきたので、RxInTheBoxにもsubscribeOn()とobserveOn()を実装してみることにしました。
軽い気持ちで始めたら差分が200行を突破してちょっと驚きましたが、これはsubscribeOn()の実装のために lift()と OperatorとSchedulerなどなど多くの実装が必要だったからですね。
実装を読む前に
まずこれらのエントリを読むといいと思います。後者は長いので、とりあえず observeOn() に言及しているところだけでもいいかも。
RxInTheBoxのおさらい
まずおさらいですが、 Observble<T> は Observable<T>.OnSubscribe を1つ保持しているだけのオブジェクトでした。
そして、 Observable<T>#subscribe(Subscriber<T> subscriber) の中身はただ1行、 onSubscribe.call(subscriber) でしたね。
class Observable<T> { // ... final OnSubscribe<T> onSubscribe; Observable(OnSubscribe<T> onSubscribe) { this.onSubscribe = onSubscribe; } interface OnSubscribe<T> { void call(Subscriber<T> subscriber); } void subscribe(Subscriber<T> subscriber) { onSubscribe.call(subscriber); } // ... }
subscribeOn() と observeOn() の実装
これから subscriber が度々出てきますが、これは様々なラッパーをかましつつ、最終的にはユーザーが #subscribe() に与えたオブジェクトで、つまりは Observable<T> に与えるイベントリスナです。
さて、今回実装したコア部分は Observable#lift(), #subscribeOn(), #observeOn() です。これらの実装は非常に簡単です。
class Observable<T> { // ... public interface Operator<R, T> { Subscriber<T> call(Subscriber<R> value); } public <R> Observable<R> lift(final Operator<R, T> operator) { return new Observable<>(new OnSubscribe<R>() { @Override public void call(Subscriber<R> subscriber) { onSubscribe.call(operator.call(subscriber)); } }); } public Observable<T> subscribeOn(Scheduler scheduler) { return Observable.just(this).lift(new OperatorSubscribeOn<T>(scheduler)); } public Observable<T> observeOn(Scheduler scheduler) { return this.lift(new OperatorObserveOn<T>(scheduler)); } } class OperatorSubscribeOn<T> implements Observable.Operator<T, Observable<T>> { /* ... */ } class OperatorObserveOn<T> implements Observable.Operator<T, T> { /* ... */ }
lift() については前述のエントリなどで語られているから省略するとして、どうやら subscribeOn() と observeOn() の実体は Operator なのですね。
OperatorSubscribeOn<T>
ここで Operator<引数の型, 戻り値の型> なので、まず OperatorSubscribeOn<T> は Subscriber
実装は次のとおり。 worker.schedule(Runnable) は特定のスレッドのもとでRunnableを実行するとしましょう。 observable.subscribe(subscriber) の実体は observable.onSubscribe.call(subscriber) でしたね。
つまり、OperatorSubscribeOn の仕事は、 Observable#subscribe() を特定のスレッドのもとで実行するというものです。
class OperatorSubscribeOn<T> implements Observable.Operator<T, Observable<T>> { final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber<Observable<T>> call(final Subscriber<T> subscriber) { final Scheduler.Worker worker = scheduler.createWorker(); return new Subscriber<Observable<T>>() { @Override public void onNext(final Observable<T> observable) { worker.schedule(new Runnable() { @Override public void run() { observable.subscribe(subscriber); } }); } @Override public void onComplete() { } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } }
OperatorObserveOn
これに対して OperatorObserveOn<T> は次のとおりです。
OperatorSubscribeOn<T> とは対照的に、subscriber の各種リスナである onNext(), onComplete(), onError() のために Worker#schedule() でスレッドを指定しています。つまり、ユーザーが指定したイベントリスナがどのスレッドで実行するかを制御しているというわけです。
class OperatorObserveOn<T> implements Observable.Operator<T, T> { final Scheduler scheduler; public OperatorObserveOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber<T> call(final Subscriber<T> subscriber) { final Scheduler.Worker worker = scheduler.createWorker(); return new Subscriber<T>() { @Override public void onNext(final T value) { worker.schedule(new Runnable() { @Override public void run() { subscriber.onNext(value); } }); } @Override public void onComplete() { worker.schedule(new Runnable() { @Override public void run() { subscriber.onComplete(); } }); } @Override public void onError(final Throwable e) { worker.schedule(new Runnable() { @Override public void run() { subscriber.onError(e); } }); } }; } }
Scheduler
Scheduler は特定のスレッドでタスクを実行するためのインターフェイスです。RxInTheBoxでは、JavaのExecutorService ベースのものと、Androidの Handler ベースのものを実装しておきました。長くなるのでコードは紹介しませんが、これらはミニマム実装だと非常に簡単なのでソースコードを参照してみてください。
おわりに
RxInTheBoxはRxJavaの実装を眺めて理解の妨げになる部分を削ったコードなので、RxJava自体の実装にかなり近いものになっています。
しかし実際のRxJavaのコードはこれよりもずっと複雑で、特に OperatorObserveOn はback pressureに対応するために一見して何をしているかわからないコードになってしまっています。
lift(), subscribeOn(), observeOn() はRxJavaの中でも特に難しい部分なので、まずは単純化されたコードをじっくり眺めると理解の一助となるかもしれません。
簡単ですが以上です。