PlayFrameworkやAkka HTTPでのスレッドプール分割のススメ

  • 18
    Like
  • 0
    Comment

概要

PlayFrameworkやAkka HTTPをつかっていて、うまくパフォーマンスがでなかった経験ありませんか?
もしかしたらそれ、スレッドプールをうまく分割できていなかったからかもしれません。

PlayFrameworkやAkka HTTPなど、ノンブロッキングI/Oを前提につくられたライブラリやフレームワークでは、ブロッキングI/Oに十分な注意を払う必要があります。

具体的に言うと、ノンブロッキングI/Oの処理のみを実行するスレッドプールと、ブロッキングI/O(I/O待ちによりスレッドを占有する)を含む処理を実行するスレッドプールは分割し、それぞれ適切に設定する必要があります。

なぜスレッドプールをわけるか

ノンブロッキングI/Oはその特性上、I/O待機中にCPUやスレッドを専有せず、他の処理に割り当てることができます。
この特性を活かし、PlayFrameworkやAkka HTTPなどのノンブロッキングI/Oでリクエストを受け付けるWebフレームワークは、大量の同時接続を少ないスレッドで効率的に処理することができます。

ここにブロッキングI/Oを含む処理を同じスレッドプールのスレッドで実行すると、I/O待ちなスレッドがじわじわと増えていき、ともなってスレッド割当待ちの処理が増加し、全体的に動作が悪くなってしまいます。
(最初は調子よくさばいているのにじわじわレスポンスタイムが増えていって...などはこの症状かもしれません。)

Akkaのコアコミッターである、@ktosopl (Konrad Malawski) さんもScalaMatsuri2016の「The Zen of Akka」で言及されています。
http://www.slideshare.net/ktoso/zen-of-akka#39

PlayFrameworkやAkka HTTPでのデフォルトのスレッドプールは、ノンブロッキングI/O用のものになっているので、たとえばPlayFrameworkのAction内でブロッキングI/O処理を意識せずに書いてしまい、上記のような状態になっているかもしれません。

たいていのWebアプリケーションはデータの永続化層にRDBを用いると思います。
しかしJavaやScalaで一般的に利用されるJDBCはインターフェイスが同期的なシグネチャになっており、内部実装もそれに合わせてブロッキングI/Oとなっており、特に注意が必要です。
(SlickなどのFutureを返すライブラリでも、内部で利用しているJDBCがブロッキングI/Oなので、そのFutureを実行するスレッドに気をつける必要があります。)

どのような処理がブロッキング処理(スレッドをブロックする処理)になるのか

  • java.lang.Thread#sleep()
  • scala.concurrent.Await.result()
    • Futureをユーザープログラム内でAwaitするのはやめたほうが極力さけたほうがよいです
    • 主要なWebフレームワークはFutureのまま結果を扱えるようになっています
  • synchronized
  • java.ioパッケージでのIO処理
  • 多くのJDBC実装

どのようにスレッドプールをわけるか

スレッドプールを分割するには、

  1. スレッドプールを生成する
  2. 生成したスレッドプールの実行コンテキストを利用する

上記で可能です。

スレッドプールの生成

スレッドプールを簡単に作成する方法は、Akkadispatcher(ExecutionContextExecutor)を利用する方法です。
application.confに定義するだけで、設定したスレッドプールが割り当てられたdispatcherを、ActorSystemから取得できます。

(参考) Akka Dispatchers

application.conf
blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}

non-blocking-io-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 100
}
val system = ActorSystem()

// dispatcher(ExecutionContextExecutor)の取得
// ExecutionContextExecutorはExecutionContextを継承している
val defaultDispatcher = system.dispatcher // akka-actor default dispatcher
val blockingDispatcher = system.dispatchers.lookup("blocking-io-dispatcher")
val nonBlockingDispatcher = system.dispatchers.lookup("non-blocking-io-dispatcher")

なお、PlayFrameworkの場合は、play.api.libs.concurrent.Akkaというobjectが、PlayFramework内部で利用しているActorSystemを提供しています。

import play.api.libs.concurrent.Akka

val defaultDispatcher = Akka.system.dispatcher // akka-actor default dispatcher
val blockingDispatcher = Akka.system.dispatchers.lookup("blocking-io-dispatcher")
val nonBlockingDispatcher = Akka.system.dispatchers.lookup("non-blocking-io-dispatcher")

この記事では言及しませんが、ブロッキングI/O、ノンブロッキングI/Oそれぞれ、下記のスレッドプールを扱うほうが良いようです。

  • ノンブロッキングI/Oを扱うスレッドプール => ForkJoinPool
  • ブロンキングI/Oを扱うスレッドプール => ThreadPoolExecutor

Futureの実行スレッドを変更する

取得したdispatcher(ExectionContext)を使って、applymapflatMapを行います。

val future = Future {
  User.findById(userId) // jdbc access
}(blockingDispatcher).map { user =>
  s"Hello ${user.name}!"
}(nonBlockingDispatcher)

akka-actorの実行スレッドを変更する

Actorを生成する際に、利用するdispatcherを指定できます。
(指定がない場合は、default-dispatcherが利用されます。)

val actor = context.actorOf(Props[UseJdbcActor].withDispatcher("blocking-io-dispatcher"))

akka-streamの実行スレッドを変更する

Graph(Source, Flow, Sinkの基底)のAttributeを変更して、利用するdispatcherを指定できます。

val blockingFlow = Flow[Long].map { id =>
  User.findById(userId) // jdbc access
}.addAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))

まとめ

  • ブロッキングIOを含む処理のスレッドプールとノンブロッキングな処理のスレッドプールはわける
  • ActorSystemAkka.systemdispatchers.lookupをつかえば楽に任意のスレッドプールを生成し、利用できる