• 日本語

LINE Engineering
Blog

RxJava 2とArmeriaでマイクロサービスを非同期化してみた

Hirotaka Kawata 2017.12.02

こんにちは、LINEメッセンジャーのサーバーサイド開発チームに所属してスタンプや着せかえに関連する開発を担当している川田(@hktechno)です。この記事はLINE Advent Calendar 2017の2日目の記事です。

私が所属しているチームは、数年前からマイクロサービス化されたサービスのRPC(Remote Procedure Call)やDBアクセスを非同期化し、レイテンシの削減やサーバーリソースの省力化に勤しんできました。最近は、LINE内部で開発しているRPCサーバーArmeriaRxJava 2を使って、“Javaにしては”なかなかイケている内部構成になってきました。この記事では、そんな私達のチームで開発しているスタンプ・着せかえ関連サーバーの裏側についてご紹介したいと思います。

LINE Shopにおけるマイクロサービス

一言でLINEといっても、LINEと名前のつくサービスはたくさんありますが、私達のチームでは、LINEメッセンジャー内で使えるデジタルコンテンツのうち、主にスタンプと着せかえを提供するシステムを開発していて、それらのシステムやチーム全体のことをLINE Shopと呼んでいます。LINE Shopのシステムでは、ショップ向けのAPI提供はもちろん、LINE STOREというサイトや、メッセージを実際にやり取りするサーバーと連携するサービスなど、複数のサービスを管理しています。

マイクロサービスという言葉が流行り始めてからもう数年が過ぎますが、LINEでは早くからマイクロサービスアーキテクチャを取り入れていて、スタンプショップや着せかえショップの内部も細かいサービスに切り出されています。例えば、スタンプメッセージをやり取りする際に必要なバリデーションなどは、LINEの根幹となるメッセージ送信サーバーでは行なわず、マイクロサービスとして切り出して私達のチームで管理していますし、昨年のAdvent Calendarで紹介したように、ショップ向けの検索やランキングリスト向けのCache+Queryを行うサーバーは別なサーバーとして動作しています。

LINE Shop Architecture

マイクロサービス化した場合に問題になりそうなこと

マイクロサービスを増やしていくと、それぞれのサービスがAPIによって接続され、たくさんのRPC(Remote Procedure Call)を行うようになります。しかし、RPCを1つのリクエスト内部でたくさん行う場合は、考えなければいけないことがいくつかあります。そのうちの1つが、RPCを行うためにかかるレイテンシが、逐次的に実行した場合に意外と大きくなってしまうことです。

例えば、あるRPCで1回あたりのレイテンシが10msかかる状況では、100回RPCを行うときに同期的なAPIで100回呼び出すと、1000msかかります。さらに、以下のような状況を考えてみてください。

  • Aを100回取得する(10ms)。
  • それぞれのA.idを使ってBとCを取得する(B 10ms、C 10ms)。
  • すべてのデータ(A、B、C)を利用してDを作成してレスポンスを返す。

このように逐次的にDを100個取得するような状況では、レスポンスまで3000msかかってしまう可能性があります。マイクロサービス化したりDBクエリを実行したりする際に、このような設計にするべきではありませんが、それぞれをキャッシュできる場合などはこうしたほうが良い場合も多いでしょう。

今回はマイクロサービスのRPCを例としましたが、DBアクセスでも同様の状況になることが考えられます。マイクロサービス化されたサービス間のRPCの場合には、それぞれが担っている領域が違うために、DBのクエリレベルで最適化されたものでないことが普通でしょう。例えば、認証、プロダクト情報、ユーザーの所有権などが複雑に絡み合い、全く違うDB・サービスとして提供されているような状況を考えてみてください。

このような場合、よくRPCやDBアクセスを何らかの方法で並列化すると思います。RPCを行うためにスレッドを作って並列化することで、それぞれの段階を並列化することができます。上記の例では、理論上最短でレイテンシを20msに短縮することができます。Javaであれば、通常ThreadPoolExecutorを利用するのではないでしょうか。その他の言語であっても、最近では簡単にマルチスレッドでプログラムを実行することができるようになっています。

スレッドプールを利用した例 Sync API

これで、めでたしめでたし、となるとよいのですが、実際のところはそう簡単に物事は進みません。

同期的(Synchronous)APIとスレッドプール

RPCを行う際の同期的なAPIの問題点は何でしょうか。それは、RPCの結果を取得するまで、他の処理を実行できないことです。前述のように、スレッドプールを利用して、RPCやDBに対して同期的なAPIを使って並列にリクエストを行った場合、利用するスレッドはそのレスポンスが返ってくるまで、その1つのリクエストに専有されてしまいます。C10K問題という単語を聞いたことがある方もいるかと思いますが、だいたいあれと同じことです。

例えば、以下の状況を考えてみましょう。

  • API AはAPI Bを100回呼び出す。
  • API Bは1回あたり100msでレスポンスを返す。
  • API Aは平均で毎秒1000回呼び出される。

この場合、1秒間に生成されるジョブの数は1000 × 100 = 100,000です。それぞれの処理に100msかかるのであれば、スレッドプールの最大生成スレッド数が1,000であった場合に、計算上は遅延せずに処理できることになります。そうでない場合には、処理が遅延していくことになりますし、そうでなかったとしても、API Aのレスポンスにかかる時間は通常100msより長くなります。

このような状況は、普段は問題にならないかもしれません。しかし、リクエストが集中して高負荷になったときに、DBやアクセス先のサーバーはまだ処理できる状態であっても、RPCをたくさん受け付けているサーバーがスレッドプールの限界を迎えたり、逆にそのサーバーのCPUは余裕があるのにDBのレイテンシが悪化して、スレッドプールが限界を迎えたりすることがあります。つまり、スレッドプールを使うと、リソースが限界に近いときに、RPCやDBのレイテンシに敏感なサーバーになってしまうことになります。

以下は、Zipkinを利用して、DBやそれぞれのマイクロサービスをまたいで、LINE Shop内部のサーバーのリクエストの流れを可視化した図です。この例は、購入済み商品の一覧を取得する、実際に動作しているAPIの内部フローです。1つのAPIリクエストで、3つのマイクロサービスを経由して、MongoDBとRedis Cacheに複数回アクセスしていることがわかります。この例では画像の縦サイズの都合上、DBへのアクセスはそれほど多くありませんが、それでも、同期的APIを利用した場合は1リクエストあたり多くのスレッドが必要であることがわかると思います。

Armeria Zipkin Integration

もっとたくさんスレッドを作ればいいじゃないかと思うかもしれませんが、スレッドをたくさん作る事自体がコストになりますし、スレッドプールを使ったとしてもその数を大きくしすぎると、コンテキストスイッチのコストが大きく効率的ではありません。通常は私達のチームでは、1つのスレッドプールの上限は多くても1000程度に設定しています。

RPCやDBへのアクセスは、ただリクエストを投げてそれが返ってくるまで待つだけなので、基本的に他に何も処理するべきことはありません。つまり、何らかのデータをほぼ右から左に流すだけの処理です。しかし、CPUに余裕があっても、外部のリクエスト数が多くレイテンシがそれなりにかかる場合は、スレッドプールの限界を迎えてしまい、結果的にサーバーを増やすしかなくなってしまいます。

非同期(Asynchronous)API

上記のような、同期的なAPIとスレッドプールの組み合わせにより発生する問題を解決するために、非同期(Asynchronous、Async)なAPIを利用します。長々と前置きをしてきましたが、非同期APIを利用するのは、右から左に流すだけの処理をなるべく並列化して効率的に行いたいからです。

マイクロサービスのアーキテクチャでは、ユーザーからAPIリクエストを受けるような上位に位置するサービスは、ほとんどの場合、ただ右から左にリクエストを流したり、複数のRPCのレスポンスを結合したりするだけです。また、その他のサービス内部でも、多くの場合、DBはCPUやメモリをたくさん使って処理を行いますが、それをリクエストするアプリケーションサーバーは、ただレスポンスを受けてデータを変換したりするだけだったりするでしょう。

特に、LINE Shopで管理しているサービスでは、LINE STOREを提供しているサーバー以外で、サーバーサイドでテンプレートエンジンを利用したHTMLレンダリングを行っていません。多くの場合、CPUリソースを消費するような処理は、ほとんど行われません。一番CPUを食うのは、JSONや、その他RPCやキャッシュのために利用するプロトコルのための、シリアライズやデシリアライズだったりするのではないでしょうか。

このようなサービスのために、たくさんのサーバーを使いたくないですし、なるべくたくさんのリクエストを同時に、かつ効率的に、できれば、そのマシンが耐えうるネットワーク帯域幅の限界まで処理を受け付けられるようにしたいのです。マイクロサービス化すると、必然的にそれぞれのサービスでやることは分散され小さくなるので、いかに多くのリクエストをそれぞれのサーバーで処理できるかが、無駄なリソースを減らすために重要になってきます。

非同期API+イベントループの例 Async API

非同期APIといわれても、非同期という概念についていまいちぱっと思い浮かぶイメージがない方も多いと思います。簡単に言ってしまえば、JavaのFutureであったり、Reactive StreamsのSingleであったり、Pythonであればasyncioだったりというものです。これらを使ったことがある方であれば、なんとなくイメージが湧くかもしれません。何か処理を実行して、計算が終了したときに通知してくれたり、次に行う処理(コールバック)を指定できたりするような仕組みが非同期APIです。何か非同期処理を行う関数を実行しても、その関数自体は処理が終了していなくてもブロックせずに終了します。

非同期APIの例

// 非同期APIを利用してProductからAuthorを取得する
// CompletableFutureの例
someAsyncDriver.getProdut(id) // ブロックしない
// -> CompletableFuture<Product>
               .thenCompose(product -> product.getAuthor());
// -> CompletableFuture<Author>

非同期APIを利用するメリット

サーバー内で効率的にリクエストを処理するために、まずサービス内部でRPCやDBへのリクエストを行う際に、それらのクライアントに備わっている非同期APIを利用します。きちんと実装された非同期APIでは、RPCやDBへクエリを行う際に、スレッドを専有することなく処理を行うことができます(注:Futureを返却するAPIでも、実装によっては内部でリクエスト毎にスレッドを生成している場合があり注意が必要です)。最近では、多くのDBクライアントやRPCライブラリが非同期APIを提供しており、各言語で非同期APIを利用することができます。例えば、LINE ShopではJava 8を使っていますが、以下のようなライブラリが提供する非同期APIを利用しています。これらのライブラリの内部的な仕組みはさまざまで、詳細に説明しているとネットワークライブラリ(Java NIOやNetty)や、OSのシステムコール(例えばselectやepoll)の話をしなくてはいけなくなるので、この記事では省きます。

多くのライブラリではリクエスト毎にスレッドを生成することはしません。基本的にはリクエストの送受信時に通知を受けて処理するために限られた数のスレッドを使い、そのスレッド数よりはるかに多くのリクエストを並列に処理しています。イベントループのような仕組みを使って、CPUのコア数程度のスレッドを作り、それぞれのスレッドでリクエスト・レスポンスの処理を行い、キャッシュ効率向上や無駄なコンテキストスイッチコストの削減を実現することができます(例えば、JavaのForkJoinPoolを利用すると、このような処理が簡単に行なえます)。

リクエストからレスポンスまで完全に非同期に

もちろん、RPCやDBへのアクセス部分だけでも非同期化することでかなりの効果が得られるのですが、最大限効果を得るためには、サーバーのネットワークから来るリクエスト処理からRPCやDBへのリクエストを経て、レスポンスを返すまですべてを非同期化するべきです。いったんどこかで同期的な処理にまとめてしまうと、またそこでスレッド数の問題が発生することが少なくないからです。また、後ほど説明するとおり、非同期APIと同期APIの混在は、イベントループの方式を利用した場合、コーディングミスによりリクエストをブロックしてしまうようなことが簡単にできてしまうこともあり、これを防ぐためでもあります。

さらに、上から下まで完全に非同期で書かれたサービスは、それぞれのサービスのレイテンシの低下にそれほど敏感になる必要がなくなります。なぜなら、レスポンスがいくら遅くてもスレッドを専有しないので、通常はただレスポンスが返ってこないだけです。よって、スレッドプールを利用した場合のように、スレッドの上限を気にする必要がなくなります。

サーバーの受け側を非同期化するためには、そのリクエストを処理するフレームワークも非同期APIに対応している必要があります。Javaであれば、よく利用されるSpring FrameworkだとDeferredResultが非同期処理を行うインターフェースとして利用できます。最近では、Spring WebFluxなどサーブレットに依存しないフレームワークが出現して、非同期化の機運が高まっています。そんな世の中ですが、LINE Shopでは、これから説明するArmeriaという自社開発RPCフレームワークを使い、非同期化を行っています(Spring も同時に利用しています)。

ArmeriaとThrift

LINEでは、LINE Shopに限らずメッセンジャーに関連する多くのRPCがApache Thriftと呼ばれるRPCフレームワークを利用して行われています。Thriftは、APIの定義、リクエスト・レスポンスのスキーマの定義を専用のIDLを用いて行うことで、それぞれの言語で利用できるRPCライブラリを自動生成してくれます。最近では、同様のフレームワークにProtocol Buffersを利用したgRPCがあります。LINE Shopでは、JSONを利用したRESTfulなAPIはほとんど使われていません。個人的には、マイクロサービスをやるならJSONを使うのはもうダサいと思っています。遅いし、普通はAPIを内部でしか使わないので。

Thrift IDLの例

struct GetProductRequest {
    1: ProductType type,
    2: string productId,
}
struct Product {
    1: ProductType type,
    2: string productId,
    ...
}

service ShopService {
  Product getProduct(1:GetProductRequest request) throws (1:ShopException e)
}

そんな中、LINEではArmeriaというオープンソースのHTTP/2対応Java向け非同期RPCライブラリを公開しています。簡単に言うと、ArmeriaはThriftやgRPCを使って定義したAPIを使うためのサーバーとクライアントを勝手に作って、しかもHTTP/2で通信してくれる便利なフレームワークです。その一方で、HTTP REST APIにも対応しています。通信は、HTTP/2を基本とし、Javaの非同期ネットワークライブラリとして有名なNettyを使って実装されています(LINEに在籍しているNettyプロジェクトの設立者が開発しています)。Armeriaは、以下のようなマイクロサービスを構築するために必要なものが全て揃っている素晴らしいフレームワークです。

Armeriaを使うと、ThriftやgRPCを利用するサーバー・クライアントをすべて非同期化することができます。LINE Shopでは、ほぼすべてのサービスでArmeriaを利用して、サービスの基礎となるネットワーク通信部分から、その先のRPCコールまでを非同期APIで結んでいます。社内でも、Armeriaを使っているプロジェクトはまだまだ多くないのですが、これからもっと増やしていこうと、パイロットプロジェクト的なものになっています(実際にはそんなきれい事ではなくて、多数のバグや障害を率先的に踏み抜いていく役目です)。まだまだ、大規模なAPIの変更などがあったり、メトリック出力用のライブラリが突然Dropwizard MetricsからMicrometerに変わったりしていますが、LINE Shopやその他いくつかのチームではプロダクション環境で本気で使っていますし、基本的な部分に関しては最近は安定しているので、使ってくれる仲間を募集中です。

Armeria Documentation Service Armeria Documentation Service

Prometheus+Grafanaを利用したメトリック出力 Shop Grafana Dashboard

サーバーサイドRxJava 2

LINE Shopでは、今年の初めあたりにRxJava 2を導入して、非同期APIの結果に対する処理の流れがかなりきれいに書けるようになりました。非同期APIを使って、上から下まですべてを非同期にしてしまうと、実際に内部の処理を書くコードは結構変わってきます。それまでは全てが同期的なAPIで、ただ普通の型を扱っていたコードが、突然Futureだらけになってしまうからです。そして、Futureに対して何か変換処理を行ったり、2つのFutureの結果を待ち受けて処理を行ったりするためには、特殊な操作が必要です。非同期でFutureを使うと、コードの書きやすさとしてはどうしても劣ってしまいますし、今までの手続き的な同期APIばかりを使ってプログラムを書いてきた者としては、とっつきづらいという感想もチーム内ではよくありました。非同期APIをたくさん扱うようになると、このあたりが面倒になってきて、諦める人も多いかと思います。LINE Shopでは、Reactive Streamsを採用したRxJava 2を導入することで、この煩わしさを解決しました。

本格的に非同期化を始めた2年半ほど前は、GuavaのListenableFutureを使っていました。.transform()や.transformAsync()といったメソッドで、Futureの結果に対して変換を行ったり、そのFutureをもとに新たなリクエストを投げるFutureに変換したりするのですが、ListenableFutureのままではなかなか扱いづらいです。Java 8は普段から利用していたので、CompletableFutureも使える状況で、こちらは.map()のような処理が簡単に行えるAPIが用意されているのですが、いろいろあってListenableFutureのままでした。DaggerのAsync Dependency Injection を裏技的に利用したりもしていましたが、コードの可読性やデバッグのやりやすさが著しく低下するので、現在はRxJava 2に乗り換えて正解だったと思っています。

RxJavaを導入して得られたメリット

RxJavaというより、Reactive Streamsを採用することで得られた恩恵は、非同期APIに対するデータの処理が簡潔に書けるようになったことと、データの流れを意識しながらプログラムを書けるようになったことです。例えば、従来のFutureでは.transform()を利用して、ネストして書いていたものが、RxJavaではmapを使って簡潔に書けるようになります。これが、複数の非同期APIの結果を利用して、また別の非同期APIを呼び出すような場合に、.flatMap()を使って気軽に行えるあたりはとても気持ち良いです。

// Service AからProductを複数取得
// ProductごとにTag idを利用してService Bにリクエスト
// Tagを取得したらProductにセット
// ProductのListを返す
serviceA.getProducts()             // Single<List<Product>>
        .filter(p -> p.isOnSale())
        .flattenAsFlowable()       // Flowable<Product>
        .flatMap(p -> serviceB.getTag(p.getTagId()) // Single<Tag>
                              .map(tag -> p.setTag(tag)))
        .toList()                  // Single<List<Product>>
        .doOnError(e -> log.error("ERROR!"));

ListenableFutureやCompletableFutureを使っているライブラリでも、簡単にRxJava 2に変換できるように、それらを変換するライブラリを作ったり利用したりして、あまり細かいことは気にせずにReactive Streamsに変換できるような環境を整えています(例えば、RxJava2Jdk8Interopなど)。実際は、非同期処理が発火されるタイミングが、FutureとRxJavaでは根本的に考え方が違うのですが、このあたりはあまり気にせずに利用できるようにしています。

また、データの流れを意識して、Reactiveに書けるという点も、コードを書いていてとても気持ちよくなる点の1つです。今までは手続き的にいろんな変数にぐちゃぐちゃと一時的なデータを入れ込んで、見通しが悪いプログラムを書きがちだったものが、データが非同期的に集まってきて、Reactive Streamsの中できれいに書こうとすると、自ずとデータの流れを意識した効率的なプログラムになります。今書いているコードにどんなデータが必要で、どこからデータが来て、何に変換されていくか、という流れが、自然に明確になる感じがあります。

このようなデータフローを意識していくと、無駄な処理や、イケていないコードや設計が駆逐されていきます(正確に言うと、データの流れに反するようなコードをReactive Streamsの上で書くのが難しいので、書きたくなくなります)。これは、直接非同期化とは関係ない事ではありますが、RxJavaを使うことで現れた良い副作用の1つですね。ただし、RxJavaを使いこなすためには若干学習が必要なのと、関数型言語やReactiveな世界に触れたことがない手続き型バリバリの頭だと、最初は厳しいこともあると思うので、導入する際にはチームメンバーの教育と協力が必要だと思います。

その他にも、RxJavaを採用して得られたメリットや落とし穴など、詳しくは福岡オフィスの同じチームのメンバーである@kojilinさんがJJUG CCCで話した内容に詳しく解説されているので、こちらをぜひご覧ください。

サーバーサイドでの非同期処理で色々やったよ https://docs.google.com/presentation/d/1LKcFzspXUQbq-sivRBPb3Q6V7fQBLKB0zH6BSnGrD_I/edit#slide=id.p

今後の課題

今後の課題には、以下のようなものがあります。

  • MySQL(JDBC)には、非同期APIが存在しない
  • 同期的なAPIと非同期的なAPIが混ざることによりミスが発生する
  • 新しいメンバーに対する、非同期とはなにか、非同期の場合にはこうするべき、というような知識の移譲

まずは、1つ目のMySQLの問題です。LINE ShopではMyBatisと組み合わせてMySQLを利用しているのですが、仕方なくThreadPoolExecutorを使って、Futureに変換しています。また、MySQLはスタンプ(Sticker)関連の処理でしか利用していないので、さまざまなリクエストが集まるサーバーではなく、スタンプ専用のMySQLが必要なリクエストを処理するサーバーに処理をまとめることで、問題を回避しています。将来的に、非同期的なJDBCが実装され利用できるようになれば、この問題も解決されることでしょう。

同期的なAPIと非同期的なAPIが混ざることによるミスというのは、非同期を実現するために利用しているイベントループの根本的な仕組みではあるのですが、イベントループは少ない数のスレッドで実現されているために、その中ではCPUをブロックしてしまうような処理を実行することは許されません。つまり、ArmeriaやNettyなどイベントループを利用するライブラリでは、リクエストを受け付けてAPIのハンドラを実行する関数は通常イベントループ上で実行されるため、その中で同期的なAPIリクエストを行うことはできません。同期的なAPIをイベントループ内で呼んでしまうと、著しいパフォーマンスの低下やデッドロックを引き起こします。

しかし、ミスはつきもので、今までの感覚でついつい同期的なAPIを呼びたくなってしまうこともあるし、コードレビューで見抜けないこともあります。なので、同期的なAPIを利用する関数を完全に撤廃するべきなのですが、同期APIから非同期APIに移行するとき、すべてを一気に移行するのは難しいです。

3番目の課題につながることですが、非同期APIについてチーム内で理解を深めあって、やってはいけないことの共有をコードレビューなどを通じてしっかり行いながら、共通認識を持つのが大事だと思います。

まとめ

この記事のまとめですが、言いたかったことはだいたい以下のようなものです。

  • RPCやDBアクセスが沢山発生するAPIで、スレッドプールを使って並列化する場合、スレッドプールの限界を考える必要があります。
  • 同期的なAPIだと何をやってもリクエストしている間にスレッドを専有してしまうので、非同期APIを使うと効率的です。
  • LINEでは、ThriftとgRPCを扱える、HTTP/2に対応した非同期RPCフレームワークArmeriaを開発しています。
  • RxJavaを使うと、RPCやDBアクセスを非同期的にした場合でも、Reactiveにデータの流れを考えながらきれいなコードが書けます。

結果的に、LINE Shopでの1台のサーバーあたりで捌いているAPIリクエスト数について言えば、APIの内容にもよりますが、物理サーバー1台あたり最大毎秒3,000件程度のリクエストを受けて、毎秒10,000件程度のRPC、Redis、DBリクエストを発行するような、高トラフィックな状況を耐え凌いだこともあります。スタンプメッセージのバリデーションを行うようなサーバーは、常時毎秒数千リクエストを捌くことができています。

実際には、非同期APIを利用するだけでなく、RPCを少なくするような仕組みや、キャッシュを行うサーバーを限定したり、Caffeineを使ったローカルメモリキャッシュを併用したりしつつ、いろいろな工夫をして意外と少ないサーバー数で運用しています。実際には、APIによってはまだまだ無駄なことをしている部分があったり、ベンチマークをきちんとできているとはいえないので、今の構成でもさらに詰めるべき点はあると思っています。さらに、ほとんどのAPIは100ms以内に結果を返すことができていて、私達のチームでは、APIのレスポンスが95パーセンタイルで1秒を超えていたら、好ましくない状況だという共通認識で運用をしています。

雑多な紹介になってしまいましたが、非同期APIの必要性と面白さをおわかりいただけたでしょうか?最後になりますが、非同期RPCフレームワークArmeriaをよろしくお願いします。

明日の記事はSongran Liuさんによる「LINE Developersサイトを支えるDAO自動生成ツール」です。お楽しみに!

AdventCalendar rxjava reactivestreams Armeria async microservices line-shop

Hirotaka Kawata 2017.12.02