ストリーム表現とその変換

データをストリームとして表現する方法と、ストリームを変換する方法を紹介する。

ストリームはメッセージが流れる川である

Pub/Subメッセージングモデルでメッセージを流すためのオブジェクトのことをストリームと呼ぶことにする。ストリームにはメッセージをPublishでき、またメッセージを受け取ったときの処理をSubscribeできる。例えばキーボードからの入力をPublishして、内容をコンソールに出力するような処理をSubscribeできる。

kamo.jsでストリームを表現する

ストリームについて説明するために、kamo.jsというストリームを表現するためのライブラリをつくった。kamo.jsは、ストリームを作成するためのkamo.Streamというコンストラクタ関数を提供する。このコンストラクタ関数から作成されたオブジェクトは、publishsubscribeというメソッド(※プロパティとして参照できる関数のことをメソッドと呼ぶことにする)を持っている。

// ストリームを作成する
var stream = new kamo.Stream();

// ストリームに処理をSubscribeする
stream.subscribe(function (message) {
  console.log(message);
});

// ストリームにメッセージをPublishする
stream.publish('test message');

毎秒メッセージがPublishされるストリーム

ストリームの使い方を学ぶために、例を1つ見ておこう。以下のコードで、毎秒メッセージがPublishされるストリームが得られる。

// ストリームを作成する
var stream = new kamo.Stream();

// 1秒ごとにストリームにメッセージをPublishする
window.setInterval(
  function () {
    stream.publish('test message');
  },
  1000
);

こういう「第1引数で関数を受け取る高階関数(上の例ではsetIntervalのこと)からストリームを作成する」というパターンの処理は頻繁に利用することになるため、kamo.jsは高階関数からストリームを作成するための便利なクラスメソッド(※コンストラクタ関数のプロパティとして呼び出し可能な関数のことをクラスメソッドと呼ぶことにする)を提供している。上の例は、このクラスメソッドを利用して以下のように書き換えられる。

var stream = kamo.Stream.fromEventHandlerFunction(window, 'setInterval', 1000);

画面のクリックを検知するストリーム

もう1つだけストリームの例を見ておこう。以下のコードで、画面がクリックされたときにメッセージがPublishされるストリームが得られる(※見出しの「検知する」という言葉は「そういう状態が発生したときにイベントがPublishされる」という意味だ)。

// ストリームを作成する
var stream = new kamo.Stream();

// window.onclickプロパティにPublish用の関数を格納する
window.onclick = function (event) {
  stream.publish(event);
};

こういう「プロパティに関数を格納することでストリームを作成する」というパターンの処理も頻繁に利用することになるため、kamo.jsはこの処理のためのクラスメソッドを用意している。上の例は、このクラスメソッドを利用して以下のように書き換えられる。

var stream = kamo.Stream.fromEventHandlerSetter(window, 'onclick');

ストリームを変換する

単純なストリームを作成する方法を説明したので、次はこのストリームを「変換」する方法を学ぶ。「変換」というのは、あるストリームをもとに別のストリームを作成するということ。例えば、毎秒メッセージがPublishされるストリームから偶数秒のときだけメッセージがPublishされるストリームをつくったり、画面のクリックを検知するストリームから画面のダブルクリックを検知するストリームをつくったり、という具合になる。

Stream#map

1, 2, 3がPublishされるストリームを、2, 4, 6という2倍の値がPublishされるストリームに変換してみよう。kamo.jsのStreamクラスには、この単純な変換を行うためにmapというメソッドを持っている。mapメソッドは「メッセージをどのように変換するか」を定義するための関数を受け取り、新たなStreamクラスのインスタンスを返す。図で表すと以下のようになる。

stream        : --1--2--3-->
                  |  |  |
stream.map(f) : --2--4--6-->

mapメソッドを利用したコードは以下のようになる。

// 元となるストリームを作成する
var stream = new kamo.Stream();

// 受け取った値を2倍に変換するストリームを作成する
var mappedStream = stream.map(function (message) {
  return message * 2;
});

// 変換後のストリームをSubscribeする
mappedStream.subscribe(function (message) {
  console.log(message);
});

// 変換後のストリームにメッセージがPublishされる
stream.publish(1); // => 2
stream.publish(2); // => 4
stream.publish(3); // => 6

Stream#filter

1, 2, 3がPublishされるストリームを、1, 3という奇数の値だけがPublishされるストリームに変換してみよう。この処理を実現するには、Streamクラスのfilterメソッドが利用できる。filterメソッドは「どのようなメッセージであれば許可するか」を定義するための関数を受け取り、新たなStreamクラスのインスタンスを返す。図で表すと以下のようになる。

stream           : --1--2--3-->
                     |     |
stream.filter(f) : --1-----3-->

filterメソッドを利用したコードは以下のようになる。

// 元となるストリームを作成する
var stream = new kamo.Stream();

// 受け取ったメッセージをフィルターするストリームを作成する
var filteredStream = stream.filter(function (message) {
  return message % 2 === 1;
});

// 変換後のストリームをSubscribeする
filteredStream.subscribe(function (message) {
  console.log(message);
});

// 変換後のストリームにメッセージがPublishされる
stream.publish(1); // => 1
stream.publish(2);
stream.publish(3); // => 3

Stream#scan

1, 2, 3がPublishされるストリームを、1, 3, 6というこれまでの値の総和がPublishされるストリームに変換してみよう。この処理を実現するには、Streamクラスのscanメソッドが利用できる。streamメソッドは「前回返したメッセージと現在のメッセージを受け取り次のメッセージを返す」という関数、それから最初のシードとなる値(最初だけは「前回のメッセージ」が存在しないので必要になる)を受け取り、新たなStreamクラスのインスタンスを返す。つまりストリームが「前回のメッセージ」という状態を持つということになる(外部からアクセスすることは出来ないが)。図で表すと以下のようになる。

a            : --1--2--3-->
                 |  |  |
a.scan(0, f) : --1--3--6-->

scanメソッドを利用したコードは以下のようになる。

// 元となるストリームを作成する
var stream = new kamo.Stream();

// メッセージを足し合わせていくストリームを作成する
var scannedStream = stream.scan(0, function (previousMessage, message) {
  return previousMessage + message;
});

// 変換後のストリームをSubscribeする
scannedStream.subscribe(function (message) {
  console.log(message);
});

// 変換後のストリームにメッセージがPublishされる
stream.publish(1); // => 1
stream.publish(2); // => 3
stream.publish(3); // => 6

コナミコマンドを検知するストリーム

ここまでに紹介したクラスとメソッドを利用して、「画面のクリックを検知する」という単純なストリームを変換して「コナミコマンドを検知する」という複雑なストリームを実現してみよう。コナミコマンドというのは「十字キーとABボタンで"上上下下左右左右BA"の順に入力すると自機が強化される」というものだが、ここではそのキー入力を検知するストリームを作成することにする。この例を通してストリームの変換の概念を学ぶことで、まずストリームとして表現する→別のストリームに変換する→変換されたストリームを利用して任意の処理を実現する、という一連の流れの理解が深まることを期待している。概念を理解することを目標としているので、実行効率のことはとりあえず忘れておくことにする。

// コナミコマンドに対応するキーコードの配列
var codes = [
  38, // up
  38, // up
  40, // down
  40, // down
  37, // left
  39, // right
  37, // left
  39, // right
  66, // b
  65  // a
];


// まず画面に対する全てのキー入力イベントを流すストリームを作る
kamo.Stream.fromEventHandlerSetter(window, 'onkeyup')

// どのキーが押されたかという情報が欲しいので、キーコードが流れるストリームに変換する
.map(function (event) { return event.keyCode; })

// これまでに入力されたキーコードの履歴がストリームに流れるようにする
// 最大長が10のキュー形式で表現するために Array#concat と Array#sliceを使う
.scan([], function (queue, code) { return queue.concat([code]).slice(-codes.length); })

// キューの長さが10に満たない場合はメッセージを流さないようにする
.filter(function (queue) { return queue.length === codes.length })

// キューの内容が↑↑↓↓←→←→BAのときだけメッセージが流れるようにする
// 配列同士の比較が面倒なのでJSON.stringifyでゆっくり比較する
.filter(function (queue) { return JSON.stringify(queue) === JSON.stringify(codes) })

// めでたい
.subscribe(function () { alert('Conguratulation!') });

もっと学びたい人へ

Streamクラスには他にもStreamを変換するための様々なメソッドが用意されている。実はStreamクラスの全てのメソッドがStreamを返すようになっている。kamo.jsでは全てが川なのだ。興味があればr7kamura/kamo.jsを見てみよう。ソースコードを読んでみるとより理解が深まるかもしれないし、雑に書いたコードなので何も深まらないかもしれない。特に言わなかったけど、この記事で学んだことはFunctional Reactive Programmingという概念に繋がっている。最近ninjinkunがあなたが求めていたリアクティブプログラミング入門という翻訳記事を書いていたから、それを見てみるのもいいと思う。