Chapter 08

2-2.Observable

lacolaco
lacolaco
2023.09.06に更新

この章では、Angular の重要な要素であるObservableについて、その概念を学びます。理解を深めるために、以下の関連する公式ドキュメントもあわせて読みながら学習することをおすすめします。

Observable の成り立ち

Angular で利用される RxJS の Observable を理解するには、RxJS の元でもある Reactive Extensions を生んだ C#言語にさかのぼる必要があります。ですので、ここから先は C#のサンプルコードを交えながら説明しますが、基本的に TypeScript と似た構文なので C#の経験がなくても問題ないはずです。

IEnumerable<T> と LINQ

まず知っておきたいことは、C#が持つ IEnumerable<T>というインターフェースと、LINQ (統合言語クエリ)という言語機能の関係です。次の C#のコードを見てみましょう。

public class Program
{
  public static void Main()
  {
    int[] numbers = {1, 2, 3, 4, 5, 6};
    IEnumerable<int> results = numbers
      .Where(i => i % 2 == 0)
      .Select(i => i * 10);
    foreach(int i in results)
    {
      Console.WriteLine(i); // 20, 40, 60
    }
  }
}

numbers という int[]型の配列を宣言し、Where() メソッドによって偶数の数値だけに絞り込んだあと、Select()メソッドで 10 倍に変換しています。JavaScript でいえば Arrayオブジェクトと filter() メソッド、map()メソッドの関係に似ていますね。

このWhere()Select()といったメソッドは int[] 型にもともと備わっているものではありません。これは LINQ によって IEnumerable<T>インターフェースへ提供されている拡張メソッド[1]です。numbers に LINQ の拡張メソッドが適用されているということは、配列はIEnumerable<T> インターフェースを実装しているわけです。

次に、同じく IEnumerable<T> を使ったコードをもうひとつ見てみましょう。こちらは配列ではなく、反復子 yield return を使った IEnumerable<T>の例です。

public class Program
{
  public static void Main()
  {
    IEnumerable<int> numbers = GetNumbers(6);
    IEnumerable<int> results = numbers
      .Where(i => i % 2 == 0)
      .Select(i => i * 10);
    foreach(int i in results)
    {
      Console.WriteLine(i); // 20, 40, 60
    }
  }

  static IEnumerable<int> GetNumbers(int to)
  {
    for (int i = 1; i <= to; i++)
    {
      yield return i;
    }
  }
}

GetNumbers() 関数の戻り値は、1 からtoまでの整数を順番に返す IEnumerable<int> です。先ほどは、反復する値は配列の要素として事前に生成されていましたが、この yield を使った例では反復ごとに for 文が 1 回実行され、その都度生成された値をyield returnします。

このように値の生成方法が異なりますが、どちらも IEnumerable<T> として扱うことができます。そのため、こちらの例でも LINQ が提供する Where()Select() といった拡張メソッドが使えます。

この 2 つの例から IEnumerable<T> というインターフェースがとても強力であることがわかるでしょうか。foreach や LINQ といった「連続した値をひとつずつ順番に取り出したい」だけの利用者にとって、その対象が IEnumerable<T> でさえあれば、取り出される値がどのように生成されているかは関心の外になるのです。それが事前に決定されている配列でも、取り出すときに動的に生成されたものでも関係ありません。

このような抽象化はIterator パターンとしてよく知られているものです。C#では基本的な言語機能として Iterator パターンがサポートされており、その具体的な実装が IEnumerable<T> というインターフェースだと言えるでしょう。

そして、LINQ という言語機能は、IEnumerable<T>という汎用的なインターフェースをベースにすることで、データソースの種類にかかわらず絞り込みや変換、並べ替え、グループ化などの操作を統一的なプログラミングモデルで記述できるようにするライブラリであるわけです。

さて、IEnumerable<T>のことが少しわかってきたところで、これがどのように Angular で使う Observable と関わるのでしょうか。それがわかるまでにはもう一段階の回り道が必要です。もう少し C#の世界を見ていきましょう。(以下、IEnumerable<T>IE<T> と略記します)

IObservable<T> と Reactive Extensions

IE<T>とそれを拡張する LINQ は、C#の基本概念であり強く愛される言語機能でもあります。しかし、これらが適用できるのは同期的な値に限られているのが弱点でした。そのため、「非同期的な値を扱える LINQ のようなもの」が望まれるようになります。そうして考案されたのが Reactive Extensions (以下 Rx)です。

IE<T> というインターフェースが表現するのは「値を順番に取り出せる」ことだけですが、LINQ はこれを 連続した値(シーケンス) だとみなして統一的なクエリ操作を提供していました。ですので、LINQ の概念を非同期的な値にも適用するために、まずは非同期的な値を時間軸上のシーケンスであると捉えます。

たとえば、ボタンをマウスクリックすることを考えてみましょう。ボタンが 1 秒おきに 3 回クリックされたということは、時間軸の上に 3 つのクリックイベントが要素として並ぶシーケンスだと考えられます。

同様に、 setInterval() のようなタイマーについても、時間軸上に一定間隔でイベントが並んだシーケンスだとみなすことができます。さらに、HTTP リクエストのような非同期タスクについても、タスクの開始・終了・中断などのイベントが時間軸上に並んだシーケンスとして考えることができますし、もっと言えば、同期的な配列すらも極小の時間差で連続する要素として時間軸上の要素として扱うことができます。

このように抽象化することで、同期的であれ非同期的であれ、あらゆる値を時間軸上のシーケンスとして解釈しなおすのが、Rx のもっとも根底にある考え方です。この時間軸上のシーケンスはしばしばストリームとも呼ばれます。

時系列上に並ぶ値

この考えに基づき実装されたのが、IObservable<T> インターフェースと Rx ライブラリです。IObservable<T>が定義するメソッドは Subscribe() ただひとつです。その名のとおり、いわゆるObserver パターンを実装するのに役立つ非常にシンプルなインターフェースにすぎません。

// IObservable<T>インターフェースの定義
public interface IObservable<out T>
{
  IDisposable Subscribe(IObserver<T> observer);
}

この IObservable<T>インターフェースが表現するのは「値が非同期的に渡される」ということだけですが、Rx はこれを LINQ のように時間軸上のシーケンスだとみなして、LINQ とほとんど同じ拡張メソッドやユーティリティを提供します。具体的にコードで見てみましょう。

public class Program
{
  public static void Main()
  {
    IObservable<int> numbers = GetNumbers(6);
    IObservable<int> results = numbers
      .Where(i => i % 2 == 0)
      .Select(i => i * 10);
    results.Subscribe(i => Console.WriteLine(i));
  }

  static IObservable<int> GetNumbers(int to)
  {
    // 0から始まるカウントを1秒おきに生成する
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(1000), Scheduler.CurrentThread);
    return timer.Select(i => (int)i + 1).TakeWhile(i => i <= to);
  }
}

先ほどのサンプルコードでは GetNumbers() 関数はIE<int>を返していましたが、今度は IObservable<int>を返しています。この IObservable<int> は Rx の Observable.Interval() を使って、1 秒おきに次の値を生成します。値の生成が非同期的になりましたが、numbersから results を得る部分のコードはほとんど変わってないことがわかるでしょうか。非同期的な値を時間軸上のシーケンスとして扱うことによって、イベントソースにかかわらず統一的なプログラミングモデルによって非同期処理を記述できるようになっています。

このように C#の文脈においては、 IE<T>と LINQ の関係を非同期的なイベントに拡張したものが、IObservable<T>と Rx ということになります。そして非同期的な値を統一的なモデルで扱う Rx の考え方は C#に限らず多くの言語で支持され、RxJS のような各言語ごとのライブラリが実装されているというわけです。

ようやく Angular の話に帰ってきました。つまり、Angular アプリケーション開発で利用することになる RxJS のObservableとは、C#における IObservable<T> を模倣したものなのです。そしてそれ自体は Observer パターンを実現するためのシンプルなインターフェースにすぎません。RxJS の肝は、UI イベントやタイマー、非同期タスクなどあらゆる非同期的な値の連なりを Observable というインターフェースで表現することで、イベントソースによらず非同期処理の記述方法を統一できることにあるのです。

RxJS の Observable

ここまで見てきたように、Observableという概念はさまざまなイベントソースを統一的に扱うためのインターフェースとして生まれ、広がってきました。そして、その源流は配列や反復子などのさまざまなデータソースを統一的に扱う IE<T> にあることも学びました。

これらを踏まえると、JavaScriptで Observable を理解するために対比するべきものも、 Promise ではなく イテレーター であることがわかるでしょう。イテレーターは JavaScript においてシーケンスを表現する概念で、C#の IE<T> に相当します。for...of 文や スプレッド構文 [...items]などで反復できるString ArrayMapなどのオブジェクトはどれも [Symbol.iterator] を実装しています(詳しくは反復処理プロトコルを参照してください)。

たとえば、先ほど C#の反復子 yield return を使った IE<T>の例を TypeScript で書き換えると次のようになります。ただし JavaScript には LINQ がないため、このnumbers: Iterable<number>Array<number> と同じようなfiltermapといった操作はできません。とはいえ、IE<T>に対応する概念としてイテレーターや反復可能オブジェクトというものがあることは理解しておきましょう。

function main() {
  const numbers: Iterable<number> = getNumbers(6);
  const results: Iterable<number> = numbers
    .filter((i) => i % 2 === 0) // 存在しない
    .map((i) => i * 10); // 存在しない
  for (const i of results) {
    console.log(i); // 20, 40, 60
  }
}

function* getNumbers(to: number): Iterable<number> {
  for (let i = 1; i <= to; i++) {
    yield i;
  }
}

イテレーターの概念をよく理解すると、Observable はそれを非同期的な値に拡張したものだと理解できるでしょう。先ほどの C# の IObservable<T> のサンプルコードも、TypeScript で書き換えると次のようになります。C# では LINQ が提供していた mapfilter といった操作は、RxJS ではオペレーターと呼ばれるものになっていますが、概念的な違いがないことは見て取れるでしょう。

import { interval } from 'rxjs';
import { filter, map, takeWhile } from 'rxjs/operators';

function main() {
  const numbers: Observable<number> = getNumbers(6);
  const results: Observable<number> = numbers.pipe(
    filter((i) => i % 2 === 0),
    map((i) => i * 10)
  );
  results.subscribe((i) => {
    console.log(i); // 20, 40, 60
  });
}

function getNumbers(to: number): Observable<number> {
  // 0から始まるカウントを1秒おきに生成する
  return interval(1000).pipe(
    map((i) => i + 1),
    takeWhile((i) => i <= to)
  );
}

Observable を学ぶためのヒント

RxJS 全体を深く理解しようとする前に、まずは Observableという基本概念に絞ってその特徴や性質を理解することから始めましょう。オペレーターやその他の API のことを覚えるのはその後でかまいません。オペレーターを使わずにsubscribe() メソッドのコールバックに処理を書き連ねてもいいのです(もちろん学習しながらリファクタリングすることを前提に)。

また、そのようにObservableを本質的に理解すれば、非同期処理の実装においてObservableを使うべき場面なのか、Promiseを使うべき場面なのかの判断もしやすくなるでしょう。どちらにも一長一短ありますが、Angular の API でも Observableを使っている部分と Promiseを使っている部分があり、その違いが自分でも理解できるようになれば完璧です。

Angular を学ぶ上で重要なのは、Observable を理解することと、RxJS というライブラリを使いこなすことを分けて考えることです。実際のところ、RxJS を使いこなさなくても Angular アプリケーションを実装できますが、Observableがわからなくては Angular の標準 API をうまく扱えません。はじめは Observable という概念を自然に理解できることを目指しましょう。つまりそれは多くのプログラミング言語で用いられている、基本的な技法としてのObserver パターンを習得するということです。

[コラム] イテレーターをベースにした Rx: IxJS

RxJS から派生したプロジェクトとして、ReactiveX/IxJSというものがあります。これはObservableではなく JavaScript 標準のイテレーターを使って Rx の思想を表現したものです。

IxJS を使うと直前のサンプルコードは実際に動かせるものになります。from() 関数はデータソースを IxJS で拡張されたイテレーターに変換し、pipe()メソッドで統一的なクエリ操作が可能になります。

import { from } from 'ix/iterable';
import { filter, map } from 'ix/iterable/operators';

function main() {
  const numbers: Iterable<number> = getNumbers(6);
  const results: Iterable<number> = from(numbers).pipe(
    filter((i) => i % 2 === 0),
    map((i) => i * 10)
  );
  for (const i of results) {
    console.log(i); // 20, 40, 60
  }
}

function* getNumbers(to: number): Iterable<number> {
  for (let i = 1; i <= to; i++) {
    yield i;
  }
}

Observable のライフサイクル

ここからは、RxJS が提供する Observable の性質や特徴についてもう少し具体的に学びましょう。まずは、Observable のライフサイクルについて見ていきます。

Observable にはいくつかの分類があります。もっともよく分類される基準は、Hot と Cold[2]でしょう。Hot Observable は、購読前から存在するストリームを購読しますが、Cold Observable は購読を始めることで新しいストリームを開始します。このように、Observable<T> という単純な型だけでは表現できない振る舞いの違いは、Observable を扱うときには意識する必要があります。

このページでは、Hot と Cold ではないもうひとつの視点、有限 / 無限 というライフサイクルによる分類について、その特徴と注意点を解説します。

有限の Observable

Angular アプリケーションでもっともよく目にする Observable といえば、HttpClient のメソッドから返される HTTP レスポンスを表現した Observable でしょう。次の例は、Tour of Heroes に登場する HttpClient#get メソッドのサンプルコードです。

getHeroes(): Observable<Hero[]> {
  return this.http.get<Hero[]>(this.heroesUrl);
}

ここで HttpClient#get メソッドが返している Observable は、 有限の Observableです。有限の Observable は、内包した非同期処理の終了とともに自動的に完了する Observable です。Observable が完了すると、 subscribe メソッドの第 3 引数、あるいは complete フィールドに渡したコールバック関数が呼び出され、その Observable を購読しているすべてのリスナー関数は解放されます。

つまり、有限の Observable は、開始と終了がある非同期処理と 1:1 で対応します。いうなれば、非同期タスクを抽象化するための Observable です。この特徴は Promise と同じものです。また、多くの場合、有限の Observable は Cold な Observable でもあります。購読に合わせてタスクを開始し、必要な値をストリームに流したあと、タスクの終了をもって Observable が完了するのです。

有限の Observable と Promise は本質的に似ていますが、機能的な面で次のような違いがあります。

  • Promise は作成した時点で処理が開始するが、Observable は購読まで実行が遅延される
  • Promise は(現状の仕様では)キャンセルできないが、Observable は unsubscribe によって中断できる
  • Promise は完了時に一回だけ値を流せるが、Observable は何回でも値を流せる

ある処理が非同期タスクであるときに、Promise と Observable のどちらを返すようにするか迷うかもしれません。依存ライブラリとして RxJS を前提とできるならば、Observable で実装しておくことが推奨されます。なぜなら有限の Observable は簡単に Promise へ変換できるからです。さきほどの getHeroes メソッドの例は、次のように書き換えることもできます。lastValueFrom メソッドによって、Observable を Promise に変換しています。

getHeroes(): Promise<Hero[]> {
  return lastValueFrom(this.http.get<Hero[]>(this.heroesUrl));
}

Angular のライブラリが Observable を使って機能を提供しているからといって、アプリケーションも同じように Observable を多用することを強制しているわけでは決してありません。あくまでもライブラリ間の共通インターフェースとして、Promise よりも多機能な Observable を利用しているだけです。あなたがアプリケーションの開発で利点を感じなければ、Promise に変換し async/await 構文などと組み合わせることで、TypeScript としてシンプルなソースコードを実現してよいのです。これは RxJS に振り回されずに Angular アプリケーションを開発する上で大切な考え方です。

無限の Observable

一方で、無限にストリームが続いて完了しない Observable もあります。Angular アプリケーションのなかでは、Router の Router#events の Observable などが典型的なものです。次の例は、ルーティングの完了イベントを購読して、ナビゲーション後の URL を取得するサンプルコードです。

export class AppComponent {
  constructor(private router: Router) {}

  ngOnInit() {
    this.router.events.pipe(filter((event) => event instanceof NavigationEnd)).subscribe((event: NavigationEnd) => {
      console.log(event.url);
    });
  }
}

ここで Router#events プロパティが返している Observable は、 無限の Observableです。無限の Observable は、自動的に完了することのない Observable です。購読者の有無にかかわらず発生するイベントを流し続けます。つまり、 イベントストリームを抽象化するための Observable です。HTMLElement の addEventListenerメソッドは無限の Observable ととてもよく似たものです。また、完了することがないためそのまま Promise に変換することはできません。

無限の Observable は DOM のイベントリスナーと同じように、購読を終えるための購読解除が必要になります。購読者よりもイベントストリームのほうが長く存在するため、購読したままにしておくと不要なリスナー関数がメモリリークの原因となるうえに、予期せぬリスナー関数の呼び出しがバグを引き起こす可能性もあります。

それぞれの注意点

Observable をライフサイクルの観点で分類した 2 つの種類、非同期タスクを抽象化した有限の Observable と、イベントストリームを抽象化した無限の Observable があることを学びました。ここからはそれぞれの注意点を整理しておきましょう。

Promise への変換

どちらの Observable であっても lastValueFrom/firstValueFrom 関数は Promise を返しますが、その Promise の then コールバックは Observable が完了するまで呼び出されません。つまり、無限の Observable から作られた Promise の then コールバックは呼び出されません。

もし、無限の Observable から Promise を作成したい場合は、taketakeUntil といったストリームを完了させるオペレーターを利用して、Observable を有限にしてから Promise に変換する必要があります。

購読解除の必要性

Observable に紐づく Subscription は、Observable が完了すると自動的に購読解除されます。つまり、有限の Observable が完了するまでの期間が十分に短い場合、リスナー関数を参照し続けることによるメモリリークは問題になりません。そのため、明示的に購読解除をする必要性は低くなります。

一方で、無限の Observable に紐づく Subscription は自動的に購読解除されることがありません。そのため先述のとおり明示的に購読を解除する必要があります。

使用できるオペレーターの違い

RxJS が提供するビルトインオペレーターの中には、Observable の完了を必要とするものがあります。つまり、無限の Observable には使えないオペレーターがあるということです。例えば、次のようなオペレーターです。

  • last , takeLast , skipLast などの、ストリームに流れた最後の値を扱うオペレーターは、無限の Observable で値を返しません
  • max , min , count , toArray , reduce などの、ストリーム全体を計算するオペレーターは、無限の Observable で値を返しません

Observable のライフサイクルを見分ける

このように、多くの場合で注意が必要なのは無限の Observable です。そのため、無限の Observable をライブラリから提供する際には、それが無限であることをわかりやすくしておくことが重要です。実は、 Angular の公式 API において、getter から返される Observable は原則的に無限の Observable です。 たとえば、Router のevents プロパティや、FormControl の valueChanges プロパティなどがその一例です。一方で、有限の Observable を返す HttpClient の get()post() はメソッドとして実装されています。getter による Observable の取得はすでに存在するストリームであるという印象を与え、メソッド呼び出しは非同期タスクを開始する印象を与えます。サードパーティのライブラリを作る際や、アプリケーションの実装の中でも、Observable を提供する API はこの慣習に従っておくことをおすすめします。

脚注
  1. ある型に対して外部からメソッドを追加するような言語機能。C#だけでなくいろいろな言語に存在する。 https://learn.microsoft.com/ja-jp/dotnet/csharp/programming-guide/classes-and-structs/extension-methods ↩︎

  2. RxJS の Hot / Cold についての詳細は、RxJS 開発チームの Ben Lesh 氏によるブログを参照してください。 ↩︎