実際にいくつかのオペレータを実装してみたらRxの気持ちがわかるかと思い実践してみました。
簡素化するために以下の方針とします。
unsubscribeしないerrorハンドリングしない
実装してみたのは以下です。
ofmapsubjectfilterdelayfromPromisecombineLatestswitchMap
Observable
何はともあれ、まずはObservableを実装します。
class Observable { constructor(producer) { this.subscribe = producer } }
コードはこれだけで、producerを受け取って、自身のsubscribeに接続します。
producerはobserverを引数にとって、次に、どんなタイミングで、どんな値を流すか決定する関数です。
現時点ではイメージもわかないと思うので次ofを眺めたほうがわかりやすいかと思います。
of
次にofを実装します。ofは引数で受け取った値を順に流していくだけの最もシンプルなOperatorの一つです。
Observable.prototype.of = function (...values) { const producer = observer => { values.forEach(v => observer.next(v)); observer.complete(); } return new Observable(producer) }
observerを引数にとるproducerを作成し、引数の値を順にobserver.nextで流し。完了すればobserber.completeします。
使用例は以下のようになります。1,2,3と値が流れ、completeします。冒頭で述べましたが、observerは本来errorをハンドリングする関数を含みますが簡素化のため削除しています。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).subscribe(observer);
実際の動作を以下で確認することができます。
observerを渡し、subscribeすることで初めて、producerが実行され、値が流れます。
map
もうひとつシンプルかつ、多用するoperatorであるmapを実装してみます。
Observable.prototype.map = function (f) { const producer = observer => { return this.subscribe({ next(value) { observer.next(f(value)) }, complete() { observer.complete() } }) }; return new Observable(producer); }
関数を引数にとり、producerの中で受け取った関数を適用した値をobserver.nextで流します。
先程のofと合わせて以下のように使用します。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).map(x => x * x).subscribe(observer); // 1, 4, 9, complete
1, 4, 9と値が流れcompleteします。
動作は以下で確認できます。
Subject
ここまで来るとあとは応用で、粛々とOperatorを追加していくだけなんですが、先にHotであるSubjectを実装しておきます。
class Subject extends Observable { constructor() { super(function (observer) { this.observers.push(observer); }); this.observers = []; } next(x) { this.observers.forEach((observer) => observer.next(x)); } complete() { this.observers.forEach((observer) => observer.complete()); } }
SubjectはObservableを継承しsubscribeされると配信先を自身のリストに登録されるようにします。
また、登録された配信先に値を流せるようnextを生やします。nextでは登録済の配信先全てに値を流します。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } const subject$ = new Subject(); subject$.subscribe(observer); subject$.next('hoge'); // hoge subject$.next('fuga'); // fuga subject$.complete(); // complete
良さそう。
filter
後は上記の応用でほとんどのものは実装できます。 名前の通りなんですが、マーブルダイアグラムがあるとわかりやすいですね。
Observable.prototype.filter = function (f) { const producer = observer => { return this.subscribe({ next(value) { if (f(value)) observer.next(value) }, complete() { observer.complete() } }) }; return new Observable(producer); }
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).map(x => x * x).filter(x => x % 2 === 0).subscribe(observer); // 4, complete
delay
こちらも名前の通り指定時間出力を遅延させるoperatorです。
Observable.prototype.delay = function (time) { const producer = observer => { return this.subscribe({ next(value) { setTimeout(() => observer.next(value), time) }, complete() { setTimeout(() => observer.complete(), time) } }) }; return new Observable(producer); }
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1).delay(1000).subscribe(observer); // 1 after 1sec
fromPromise
promiseからobservableに変換するoperator
Observable.prototype.fromPromise = function (promised) { const producer = observer => { return this.subscribe({ next(value) { promised.then((a) => { observer.next(a) }) }, complete() { promised.then(() => { observer.complete() }) } }) }; return new Observable(producer); }
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable() .of(null) .fromPromise(new Promise(resolve => { setTimeout(() => { resolve(1)}, 5000) })) .subscribe(observer); // 1 after 5sec
combineLatest
頻出ですが、このあたりから結構複雑ですね。
みんなだいすきcombineLatest。
本来combineLatestは最後の引数が値を変換するtransform関数なんですが、省略も可能になっていて、今回はObservableの結合のみ行うOperatorとして実装しています。
* http://rxmarbles.com/#combineLatestより
Observable.prototype.combineLatest = function (...observables) { const length = observables.length + 1; const producer = outObserver => { const values = [...Array(length)].map(_ => undefined); const hasValue = [...Array(length)].map(_ => false); const hasComplete = [...Array(length)].map(_ => false); const next = (x, index) => { values[index] = x; hasValue[index] = true; if (hasValue.every(x => x === true)) outObserver.next(values); }; const complete = (index) => { hasComplete[index] = true; if (hasComplete.every(x => x === true)) outObserver.complete(); }; observables.forEach((observable, index) => { observable.subscribe({ next: (x) => next(x, index + 1), complete: () => complete(index + 1), }); }); this.subscribe({ next: (x) => next(x, 0), complete: () => complete(0), }); }; return new Observable(producer); }
以下使用例。
new Observable() .of(0) .combineLatest( new Observable().of(1, 4), new Observable().of(2), new Observable().of(3).delay(1000), ) .subscribe(observer); // [0, 4, 2, 3] after 1sec
動作を文書で説明するのがなかなか難しいOperatorだと思います。
1の値が流れていってしまうことに注意してください。初回のみ各streamの値が出揃うまで値は流れません。
次回以降値が流れてくる度に、他のstreamの最新の値と合わせて配列にパッキングされます。
以下のマーブルダイアグラムを動かしてみるのが一番しっくりくるかもしれません。
動作確認は以下。
こっちはRx版
switchMap
こちらも頻出、みんなだいすきswitchMapです。
実装も泥臭くて若干怪しいですが、おもちゃレベルでは動いてそうです。
switchMapは分かりやすいマーブルダイアグラムがないですね。。
Observable.prototype.switchMap = function (f) { const producer = outObserver => { let i = 0; let hasSourceCompleted = false; const completed = []; this.subscribe({ next: (x) => { i++; completed[i] = false; f(x).subscribe({ next: ((index, y) => { if (index === i) { outObserver.next(y) } }).bind(this, i), complete: ((index) => { completed[index] = true; if (hasSourceCompleted && completed.every(x => x)) outObserver.complete(); }).bind(this, i), }); }, complete: (() => { hasSourceCompleted = true; if (hasSourceCompleted && completed.every(x => x)) outObserver.complete(); }), }); }; return new Observable(producer); };
RxのswitchMapはPromiseも展開してつないでくれますが、今回はObservableのみ対応しています。
使用例は以下。
const observer = { next(value) { console.log(value) }, complete() { console.log('Done') } } new Observable().of(1, 2).switchMap((v) => { if (v === 1) return new Observable().of(v).delay(400); if (v === 2) return new Observable().of(v).delay(200); }).subscribe(observer); // 2 after 200ms
1と2の値が順次流れてきて、1のときは400ms後に1が返るObservableが、2のときは200ms後に2が返るObservableがreturnされます。
200ms後に2が400ms後に1が流れてきそうですが、2がswitchMapに流れてきた時点でstreamは200ms後に2が返るObservableにswitchされるのでobserverまで1が流れてくることはありません。
動作確認用
こっちはRx版
さいごに
業務ではAngularを使用しているため、Observableの扱いにはいつも悩んでいて、もう少し仲良くなるために今回は実装してみました。
若干複雑なOperatorもありますが、mapやfilterはかなりシンプルで、仕組みを知るにはちょうどいい題材ではないかと思います。
また趣味ではredux-observableを使用していて、わりと気に入っているのでもう少し使いこなせるようになりたいですね。