前提
いま参加しているプロジェクトのネットワーク通信部分を大幅に書き換える必要があり、次のような理由からRxJavaの採用を検討した。
- 複数のAPI呼び出しを並行でやったり待ち合わせて逐次処理したり使い分ける必要がある
- API呼び出しによっては複数の非同期処理をすべて待ち合わせて処理する必要がある
- これらの複雑な一連の呼び出しをリトライ処理を考慮して書くことが簡単にできそう
- 各社の採用実績から思いとどまる理由がない
- 筆者はRxJava(1)を業務で使ったことがある
書き直した結果これらの要求を非常に簡単に満たすことができたが、RxJavaに不慣れなメンバーには「いつ非同期処理が開始されたのか分かりにくい」ことが分かった。
本ドキュメントは元々社内Qiita Teamに書き始めたものであるが、世の中の同じようなチームにも益があるかも知れないと考え公開することにした。
なお、現在RxJavaはバージョン2なのでとくに断りなくRxJava2(2017年6月時点の最新版)を前提として書いている。またJava8のラムダ式を使っている。Kotlinは弊社プロジェクトに取り込めていないので使っていない。
また、これは予防線であるが、「最低限の知識をパパッと詰め込む」という点に注力しているので、割り切って説明を省いている部分が多い。(たとえば後述のFuture(Promise)はObservableの一面にしかすぎないかもしれないが。あえて言い切ることで単純化を試みた)
理解の取っ掛かりになりさえすれば本望。
Observableとは何か
ObservableとはFuture(Promise)である。まだ完了していない計算結果へのプロキシオブジェクトである。
また、Future同士をつなぎ合わせてパイプライン化(要するに逐次処理)できる。未来に起こることを順番に繋げられるということ。
「まだ完了していない」というのがポイントで、値や値を変換するための手続きを(将来確定する)という文脈につつんだまま受け渡しできる。したがって、Observableオブジェクトを引数や返り値で受け渡ししてもその時点では何も起きていないんだということを理解する必要がある。
しつこいけどObservableそれ自体を引き渡したり連結したりしてもその時点ではまだ何も起こっていない。Observableは将来なにかが起こるという設計図にすぎない。そこからデータをくれと言ったときに(後述するが、RxJavaの場合は subscribe
メソッド呼び出し)はじめてデータが流れてくる。
これは受け売りであるが、Observableは水道の配管の図面のようなものだ。水がどうながれるか定めているが、そこに水はまだ流れていない。蛇口を捻った瞬間に実際の中身が流れ、指示されたとおり形を変え、目的の場所にたどり着く。
Retrofitで初めてのObservable
Observableの作り方を覚えるのはもう少し後でよい。まずはとりあえずObservableを使ってみよう。
RetrofitはJavaやAndroidで利用できるRESTクライアントである。
Retrofit準備
インストール。
def retrofit_version = '2.3.0'
compile "com.squareup.retrofit2:retrofit:${retrofit_version}"
次のように使う。
1) BASE URLを定めてRetrofitインスタンスを作る。
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.github.com/")
.build();
2) REST APIに合わせてアノテーションを使って宣言的にAPI呼び出しを設定する。
public interface GitHubService {
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
}
3) 先のRetrofitインスタンスを使って呼び出す。
GitHubService service = retrofit.create(GitHubService.class);
Call<List<Repo>> repos = service.listRepos("srym");
// synchronous call
Response<List<Repo>> response = repos.execute();
// asynchronous call
repos.enqueue( /* callback here */ );
見ての通り execute
は同期呼び出し、 enqueue
は非同期呼び出しである。
上のコードに出てくる retrofit2.Call<T>
はソースを見てもらえば分かるが okhttp3.Call
とほとんど同じで型引数を与えたようなものだ。OkHttpに馴染みのある方はすぐに使い方が想像つくだろう。
で、RetrofitはCallAdapterという仕組みでAPI呼び出しの返り値をObservableとして受け取ることができる。次のようにする。
1) インストール
// CallAdapter for RxJava
compile "com.squareup.retrofit2:adapter-rxjava2:${retrofit_version}"
// GSON converter
compile "com.squareup.retrofit2:converter-gson:${retrofit_version}"
// Explicitly install RxJava
compile "io.reactivex.rxjava2:rxjava:2.1.1"
// for Android
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
2) RetrofitインスタンスにCallAdapterを設定
Retrofit retrofit = new Retrofit.Builder()
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create(gson))
.baseUrl("https://api.github.com/")
.build();
3) 返り値をObservableに変更
public interface GitHubService {
@GET("users/{user}/repos")
Observable<List<Repo>> listRepos(@Path("user") String user);
}
これだけだ。早速使おう。
Observableからデータを取り出す
Observableからデータを取り出すのは簡単だ。まず例示し、順次解説する。
Observable<List<Repo>> repos = service.listRepos("srym")
repos
.subscribe(
list -> doSomethingToList(list),
throwable -> Timber.d(throwable.getMessage(), throwable),
() -> Timber.d("complete")
);
Observable#subscribe()
メソッドを呼び出した瞬間(購読という)に非同期通信が走り、データが流れてくる。
最初の引数のラムダ式 list -> doSomethingToList(list)
は成功パターンだ。ここに流れてくるのは List<Repo>
の具象型である。好きに加工するがよろしい。
2つ目の引数のラムダ式 throwable -> Timber.d(throwable.getMessage(), throwable)
は失敗パターンだ。任意のエラーハンドリングをするがよろしい。
3つ目の引数のラムダ式 () -> Timber.d("complete")
はこのObservableがすべての処理を終えこれ以上なにも返してこないときに呼ばれる。ここでは特に何もしていない。
onNext, onError, onComplete
少しずつObservableの深い所に入っていく。
冒頭でObservableはFuture(Promise)だと言い切ったが、将来に渡ってデータを送り続けてくる可能性がある点が少し異なる。たとえばユーザのタッチイベントをObservableで表現した場合、ユーザがスマホをタッチしつづける限りタッチイベントはずっと飛んでくるだろうし終わりがない(言い換えるとこのObservableはアプリ実装者からはタイミングが読めないPUSH型といえる)。
また、API通信のように要求したタイミングで1回だけデータが飛んでくるというものもあるだろう(こちらはPULL型)。
Observableはそのどちらも表現できるように次の3つのイベントコールバックを用意している。
onNext
Observableを購読して、データが正常に送られてきたときに呼ばれる。 ラムダ式の引数にはこのObservableの持つ(プロキシする)データTがひとつコールバックされる。先程のsubscribeの第1引数に相当。
onNextは、こちらが購読をやめるまでの間、このObservableがデータを放出しつづける限り呼ばれる。データのある限りと言っても良い。
onComplete
Observableを購読してonNextでデータが運ばれてくるが、もうこれ以上送り届けるデータがないときに呼ばれる。先程のsubscribeの第3引数に相当。
たとえばAPI通信の場合は、onNextが1度だけ呼ばれてHTTP通信の結果が届けられるとそれ以上届けるものがないのでonCompleteが1度呼ばれるだろう。
onError
何らかの理由でObservableの中で例外が上がった場合に呼ばれる。ラムダ式の引数にはその例外がひとつ渡される。先程のsubscribeの第2引数に相当。
例外の種類によってエラーハンドリングを変えたい場合は instanceOf
等で分岐する。
なお、onErrorとonCompleteは互い排他的なのでonErrorが呼ばれた場合はonCompleteが呼ばれることはないしその逆も同様である。
詳しくは The Observable Contract に書いてある。
Observableの種類
HTTP GETのような「成功パターンは必ずonNextにデータがひとつだけ運ばれて、その後即座にonCompleteが呼ばれる」ような性質のものは、onNextさえ受け取れれば成功と見做すことができる。
同様に、レスポンスとして返すものがないHTTP POSTのような通信はonNextで受け取るデータがなく、oncHCompleteかonErrorかどちらかの可能性しかない。
RxJavaにはこういった場合にうってつけのデータ型が用意されている。
Single
onSuccessかonErrorのどちらかが1回だけ呼ばれる。
HTTP GETのように成功してレスポンスを1度だけ受け取るか、または失敗するようなものに向いている。
Completable
onCompleteかonErrorのどちらかが1回だけ呼ばれる。
レスポンスが空のHTTP POSTなどに向いている。
Maybe
onSuccessかonCompleteかonErrorのどれか1つが1回だけ呼ばれる。
用途はObservableの作り次第だが、成功時はキャッシュの有無によってonSuccessとonCompleteを返しわけ、失敗時はonErrorというような場合に使えるかもしれない。
参考: RxJava2.0 Observable, Single, Maybe, Completableの使い分けメモ
これまで例に出してきたAPI通信は次のように書き換えた方が意図がわかりやすいだろう。
(後述するオペレータも、よりSingleに最適化されたものが提供されている)
public interface GitHubService {
@GET("users/{user}/repos")
Single<List<Repo>> listRepos(@Path("user") String user);
}
スケジューラ
Retrofitの Call<T>
を紹介した時に同期呼び出しと非同期呼び出しの2つのマナーを紹介したが、Observableの場合はどうだろうか。Observableはスケジューラという仕組みで処理を実行するスレッドを指定する。
指定する対象は2箇所、「Observableがデータを生産する処理を指定するスレッド」と「処理した結果を購読者が受け取る(または加工する)スレッド」である。
まずは例をお目にかけよう。
service.listRepos("srym")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(list -> Stream.of(list).forEach(repo -> Timber.d("repo: " + repo.getFullName())));
subscribeOn
subscribeOn()
で指定するのがこのObservableがデータを生産するスレッドである。この例ではつまりAPI通信をするスレッドを指定するということになる。
Schedulers.io()
は非同期のブロッキングI/Oに向いたスケジューラを指定している。つまりここではAPI通信を非同期に実行するという意味になる。
observeOn
observeOn()
で指定するのがこのObservableが吐き出したデータを受け取って加工する場所を指定するスレッドだ。この例では AndroidSchedulers.mainThread()
としてAndroidのUIスレッドで結果を受け取っている。
典型的なAndroidアプリでは非同期通信結果をもってUIを更新したりする。その際にUIスレッド以外でUIの更新ができないためだ。
subscribeOn, observeOn 補足
subscribeOn()
はひとつのObservableに対し複数指定しても最初に指定したものしか使われない。
observeOn()
は複数指定できる(指定する意味がある)。
次のように指定した場合、
// notice this is a pseudo code
.observeOn // 1
.filter
.observeOn // 2
.map
.observeOn // 3
1〜2の間にある filter
オペレータは1で指定されたスケジューラが、2〜3の間にある map
には2で指定されたスケジューラが、3以降のオペレータは3で指定されたスケジューラが使われる。
まだオペレータについて解説していないので読み飛ばして良い。あとで戻ってきて欲しい。
よく使うスケジューラ一覧
- Schedulers.io()…非同期なブロッキングI/Oに向いたスケジューラ。内部的にはスレッドプールが作られワーカを再利用する。ネットワーク通信等に使われることが多いが、必要に応じてスレッドプールを大きくしていくのでOutOfMemoryErrorに注意する。
- Schedulers.computation()…CPUをガンガン使う計算に向いたスケジューラ。ブロックするI/O処理に向かない。ワーカ数は固定(デフォルトで
Runtime#availableProcessors()
と同数)なので枯渇するとブロックする点に注意。 - Schedulers.newThread()…タスクごとに新しいスレッドを立ち上げるスケジューラ。従ってioスケジューラ同様無制限にタスクが増えすぎてパフォーマンスの低下やOOMに注意する。
- Schedulers.trampoline()…FIFOでキューに積まれた順にタスクを実行するスケジューラ
- AndroidSchedulers.mainThread()…タスクをAndroidのメインスレッドで実行するスケジューラ
参考: Understanding RxJava Scheduler
オペレータ
オペレータとはObservableを作ったり加工したり変換したりするための命令である。
膨大なオペレータが存在するが、ここでは割り切って極々々々少数のみ紹介する。
just
justは T
からObservableを作る。
10個以上のTは fromArray
オペレータを使う。 fromIterable
というのもある。
Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(name -> Timber.d("name: " + name));
map
mapはObservableをTからUへ1対1に変換するオペレータ。どういうことか?
たとえば次の例では人名のStringを文字数のIntegerに変換する。
Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
.subscribeOn(Schedulers.io())
.map(name -> name.length()) // String::length
.observeOn(AndroidSchedulers.mainThread())
.subscribe(length -> Timber.d("length: " + length));
この例ではコンソールに length: 5, length: 3, length: 7
と出力される。
reduce
mapと対になる関数で、Observableの中身を畳み込む。次の例では人名の文字数を足し合わせる。
Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
.subscribeOn(Schedulers.io())
.map(name -> name.length()) // String::length
.reduce((accum, length) -> accum + length)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(length -> Timber.d("total length: " + length));
この例では total length: 15
とコンソールに出力される。
flatMap
独断と偏見で選ぶ、もっとも大切なオペレータ。
ObservableをObservableに変換するが、1対1の変換に限らない。
例えば次の例はListの中に入ったListをぺったんこにして1つのリストに変換している。
Observable<List<String>> listInList = Observable.just(
Arrays.asList("Alice", "Bob", "Charlie"),
Arrays.asList("Dave", "Ellen", "Frank")
);
listInList
.subscribeOn(Schedulers.io())
.flatMap(list -> Observable.fromIterable(list))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(name -> Timber.d("name: " + name));
元はListが2つ入ったObservableだったのが2つのListの要素がすべて入ったObservableに変換されている。
これはコンソールにAlice〜Frankまでの名前を順に出力する。
まさに flatten
して map
するイメージ。伝われ!(祈り)
もうひとつflatMapの重要な役割として、Observable同士の逐次実行がある。
次のように、「まずAPI Aを呼び出し、その結果をもってAPI Bを呼び出す」というような処理をひとまとめにするのにflatMapを使う。
service.apiA
.subscribeOn(Schedulers.io())
.flatMap(result -> service.apiB(result.getFoo()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> Timber.d("result: " + result.getBar()));
zip
これも弊社で使っているというだけで取り上げるオペレータ。
zipは複数のObservableの完了を待ち合わせ、それを処理する関数に渡して変換するためのオペレータ。
Observable.zip(service.apiA, service.apiB,
(resultA, resultB) -> doSomething(resultA, resultB))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> Timber.d("result: " + result));
この例では service.apiA
と service.apiB
の結果はそれぞれ (resultA, resultB) -> {}
のように同数の引数をとるラムダ式に渡されるので好きなように加工する。
複数のAPI呼び出しを平行に走らせて、尚且つすべての成功結果を待ち合わせてUIを変更するといった用途で非常に役に立つ。
なお、上の例は
service.apiA.zipWith(service.apiB, (resultA, resultB) -> resultA + resultB)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> Timber.d("result: " + result));
と等価である。
retry
リトライしてくれる。 .retry(3)
と書いておけば3回までonErrorを捕まえてリトライし、4回目でonErrorにエラー理由を出力して停止する。
その他のオペレータ
他にも数え切れないほど重要なオペレータがあるが何もかも紹介するわけにはいかない。
適宜加筆するかもしれない。
購読解除
Android技術者ならば、非同期に実行した Observable#subscribe
が未完了のままユーザが画面を去るときのことが気になって仕方がないはずだ。
RxJavaにはこのために外部から購読を解除するための機構が備わっている。
Disposable
subscribeメソッドは返り値としてDisposableを返す。
Disposable#dispose()
を呼ぶと以後のイベントは通知されて来なくなる。
Disposable disposable = service.apiA
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> Timber.d("result: " + result));
disposable.dispose();
Activity等のフィールドに持っておき onDestroy()
等で破棄するのが典型的な使い方だろう。
CompositeDisposable
複数のDisposableをまとめて購読を解除するためのCompositeDisposableも提供されている。
次のように使う。
CompositeDisposable compositeDisposable = new CompositeDisposable();
Disposable disposable = service.apiA
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> Timber.d("result: " + result));
compositeDisposable.add(disposable);
// non-reusable
compositeDisposable.dispose();
// reusable
compositeDisposable.clear();
こちらも典型的にはフィールドに持っておき、画面を去るタイミングでまとめて購読を解除する使い方が一般的だろう。
ちなみに、 CompositeDisposable#dispose
してしまうと以後このCompositeDisposableは再利用できない(addしても即座にdisposeが呼ばれ、偽が返る)。したがって resume/pause で再利用するようなことができない。
この用途の場合、CompositeDisposable#clear
が使える。
clearの場合はそのタイミングで積まれたDisposableはすべてdisposeするものの再びaddを受け付けることができる。
ObservableとFlowable
ここまで敢えて触れてこなかったが、Observableにはほとんど同じような機能とオペレータを提供し、対になるFlowableというものがある。
両者の違いはすばり「バックプレッシャー」の有無である。
バックプレッシャー
バックプレッシャーとは、データを送る側(生産者)とデータを受け取る側(消費者)との間で、著しく消費スピードに差がある場合のバッファリングや次のデータの要求のための仕組みである。
なんと、ここでは詳しく触れない!Retrofitが返すのはObservableだけだからだ。
しかしながら、FlowableはReactive Streamsという仕様に則っており、これに準拠している限りにおいてその他の準拠API(Java 9 Flow API, Akka Streams)と相互に連携可能らしい。
このまとめを卒業した人は是非調べてみていただきたい。
参考) What's New in RxJava 2
参考) What's different in 2.0
これらの参考リンクには自分でFlowableまたはObservableを作るときに、どのケースでどちらを使うべきかの指針が書いてある。
まとめ
RxJavaのObservableは概念から包括的に理解しようとすると馴染みのないメンバーはつまづきがちという経験から、敢えて現場で利用するRetrofitから逆算して使う部分のみの説明にとどめた。
このまとめは従ってかなり雑なまとめである感は否めないが、ここを出発点として 公式サイト を読みながら理解を深めて欲しい。
最後に、詳しい内容は原点にあたるにしくはないのであるが、日本語で手っ取り早く体系的な理解を得るには次の書籍が参考になった。
参考書籍) RxJavaリアクティブプログラミング
適宜加筆修正する。また、編集リクエストを歓迎する。
以上。