この記事の結論
globalなExecutionContextではブロックする処理をblockingで包むとスレッド数が勝手に増えるから空きスレッドが無くて実行できないといったことを防げる。ExecutionContext.fromExecutorService(new ForkJoinPool(100))で生成されるThreadはBlockContextトレイトを継承してないのでblockingを使ってもスレッド数を増やした方が良いという情報がスケジューラに伝わらない。akkaの
dispatcherをExecutionContextとして使うとBlockContext付きのForkJoinPoolを簡単に作れる。ExecutionContext.fromExecutorServiceでもForkJoinPoolのコンストラクタに自前定義したThreadFactoryを渡すようにすればblockContextを渡せる。scala 2.11.xの実装はあまり重大でない(?)バグがあるから2.12.xからコピペしても良いかもしれない。
blockingとは・・・
def blocking[T](body: ⇒ T): T
Scala Standard Library API (Scaladoc) 2.10.0
Used to designate a piece of code which potentially blocks, allowing the current BlockContext to adjust the runtime's behavior.
Properly marking blocking code may improve performance or avoid deadlocks.
ランタイムビヘイビアーが調節されますって言われても困る(´;ω;`)
ということで blocking っていうメソッドが謎に包まれていたのでちょっと調べました。
解説というより自分用メモなのあえてまとめて無くてリファレンスとして持ってこれるように冗長になっています。なのでこの記事を副読本がてらコードを実際に追ったほうが速いと思います(◞‸◟) そして、調べ初めなので間違っている可能性もあります。(特にスケジューラ周りがあやふや)
ForkJoinPool, ManagedBlocker#onBlock, BlockContext については下記のエントリが詳しいです。
ForkJoinPoolとblocking - テストステ論
以下は引用です。
ForkJoinPoolについてはよく調べられていないのですが、今のところの理解は以下のようになりました。
work-stealing:
fork()したタスクをjoin()するとタスクの終了までブロックするが、これはThread.sleepやIO待ちのブロックのようにスレッドが完全に何もしなくなるのとは違い「一度タスク実行を中断した上で、スレッドが他のタスクの処理を開始してくれるようになる」*1動的なスレッド生成: スケジューラが必要と判断したらスレッドを増やしてくれる。(ただしタスクがブロックしていることを自動で検出してくれたりはしないのでヒントを伝える必要がある)
スレッドを使いまわしてスレッドの生成コストやコンテキストスイッチを抑えつつ、本当に必要ならスレッド生成もしてくれる便利な奴といったところでしょうか。
blockingの中身
さて、肝心のblockingメソッドは(上の引用にもありますが)、scala.concurrentのパッケージオブジェクト内で定義されています。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/package.scala#L123
def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)
実装を見るとBlockContext内のメソッドを呼んでいるようです。BlockContextは先程も出てきました。
BlockContextはどこにあるか
さてBlockContextがどこかに存在するらしいのでどこにあるのかを探すと以下の様なものが見つかります。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L41-L49 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L51-L65
trait BlockContext { def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T } object BlockContext { private object DefaultBlockContext extends BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk } private val contextLocal = new ThreadLocal[BlockContext]() /** Obtain the current thread's current `BlockContext`. */ def current: BlockContext = contextLocal.get match { case null => Thread.currentThread match { case ctx: BlockContext => ctx case _ => DefaultBlockContext } case some => some } // ... }
blockingメソッドで呼んでいたBlockContext.currentは現在のスレッドが「BlockContextを継承している場合は現在のスレッド」を、そうでない場合は「DefaultBlockContextというblockOnメソッドの中が何もせず中身を評価するだけのメソッドになっているBlockContextを返す」ようになっています。
これを見るとBlockContextはThreadに継承させて使う物のようです。
ということは、blockOnが実際に何をやるかについて知りたい場合はここを見ても仕方がなさそうです。
IntelliJを使って気合で探すと ExecutionContextImpl 内で見つかります。*2
def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false override def block(): Boolean = { result = try thunk finally { isdone = true } true } override def isReleasable = isdone }) result } })
overrideしたblockOnでは色々やっていますがつまるところ ForkJoinPool.ManagedBlocker#blockでブロックするかもしれない処理を包んだ後にForkJoinPool.managedBlockというメソッドに渡しているようです。名前からしてblockする処理を渡すと何かマネージしてくれそうなオーラを感じます。
ManagedBlockerについて
ManagedBlockerについては上述した記事でも触れられていますが、 ForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 の「ForkJoinTaskの中で協調的に動作するSleepの実装例」が分かりやすかったです。
詳しい解説は元記事によりますが、isReleasableでまだブロックする必要があるかを判定し、終わっていなかったらblockメソッド呼んでくれるようです。
Sleepの例ではblockが何度も実行されるようになっていましたがExecutionContextImpl#newThreadの実装ではthunkの評価が終わるまでブロックする(そして終わり次第isReleasableがtrueになる)ので一度しか実行されないようです。
ManagedBlockerの内部実装は読んでいないのですが、 ForkJoinPool.ManagedBlocker (Java Platform SE 7 ) では
they allow more efficient internal handling of cases in which additional workers may be, but usually are not, needed to ensure sufficient parallelism
とあります。(usuallyとは・・。)ForkJoinPoolは分割統治アルゴリズムに向けたフレームワークということなのでDB操作でブロックするとかそういうことに使うのはusuallyから逸脱しているとかあるのでしょうか・・。どのくらいスレッドが増えてくれるのかは謎ですが、以下の記事にもスレッドが増えそうな言及がありました。
This will give the thread pool a chance to spawn new threads in order to prevent starvation
When informed that one of those threads is about to block, it compensates by starting an additional thread.
Welcome, ManagedBlocker — the way to signal to the ForkJoinPool that it should extend its parallelism, to compensate for potential blocked worker threads.
とりあえずどれだけ増えるか分からないけどManagedBlockerに渡せば必要なら増えてくれるという理解になりました。
ここまでのまとめ
ここまでの流れを整理すると以下のようになります。
blockingメソッドでブロックする処理を包むとThreadがBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)ExecutionContextImplではonBlock内でManagedBlockerにブロックする処理を渡すようなThreadを生成するような実装を行っている場所がどうやらあるらしい。ManagedBlockerにブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。
ExecutionContextImplは何者なのか
ExecutionContextImplが出てきましたが、一部分しか見ていないのでもう少し詳しく見てみます。
ExecutionContextImplは名前の通りExecutionContextの元です。
実装上は以下のように利用されています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L119-L164
def global: ExecutionContextExecutor = Implicits.global object Implicits { implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) } def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService = impl.ExecutionContextImpl.fromExecutorService(e, reporter) def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter) def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(e, reporter)
import scala.concurrent.ExecutionContext.Implicits.globalで使えるようになるExecutionContextはfromExecutorにnullを渡すことで生成されているようです。
fromExecutorServiceもありますが、これはJavaと同じようにExecutorにいくつかのメソッドが追加されたExecutorServiceが生成されるというだけで大きくは違わず、最終的にはExecutionContextImplがnewされるようです。(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L132-L150)
ExecutionContextImpl は渡されたes: Executorを以下のようにパターンマッチしてnullの場合に備えています。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L27-L30
val executor: Executor = es match { case null => createExecutorService case some => some }
Implicits.globalやglobalの際はnullが渡されるので、このcreateExecutorService が呼ばれるようです。(試してないですが、ExecutionContext.fromExecutor(null)とかすると標準のと同じExecutionContextが作れそうです。)
createExecutorServiceは以下のようなっています。(長いので抜粋)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L74-L97
val threadFactory = new DefaultThreadFactory(daemonic = true) try { new ForkJoinPool( desiredParallelism, threadFactory, uncaughtExceptionHandler, true) // Async all the way baby } catch {...}
ついにForkJoinPoolが生成されました。そしてForkJoinPoolにはデフォルトの並列数*3と、DefaultThreadFactoryが渡されています。uncaughtExceptionHandlerは名前のとおりです。最後のasyncModeをtrueにすると通常スタックとして積まれるタスクがキューのようにFIFOになるようです。
asyncModeについての余談
playのconfigについてのドキュメントにこの設定について書いてありました。ThreadPools
# Setting this to LIFO changes the fork-join-executor # to use a stack discipline for task scheduling. This usually # improves throughput at the cost of possibly increasing # latency and risking task starvation (which should be rare). task-peeking-mode = LIFO
この設定は正確にはakkaのdispatcherの設定です。akkaではtask-peeking-modeというキーでasyncModeが決められています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L443-L448
val asyncMode = config.getString("task-peeking-mode") match { case "FIFO" ⇒ true case "LIFO" ⇒ false case unsupported ⇒ throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " + """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") }
LIFOにすると(少なくともplayでは)レイテンシが悪化してスループットが向上するようです。ちなみにakkaでは何も指定しないとFIFOになるようです。(https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/resources/reference.conf#L351)
DefaultThreadFactory
impl.ExecutionContextImpl.fromExecutorにnullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryの実装を見てみます。これはExecutionContextImplの内部クラスとして定義されています。
class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def wire[T <: Thread](thread: T): T = { thread.setDaemon(daemonic) thread.setUncaughtExceptionHandler(uncaughtExceptionHandler) thread } def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false override def block(): Boolean = { result = try thunk finally { isdone = true } true } override def isReleasable = isdone }) result }
何か見覚えがあるなと思ったら、このDefaultThreadFactoryがまさに先ほどIntelliJを使って気合で探してきたBlockContextを使っている部分でした。
ここまでのまとめ
ここまでの流れを整理すると以下のようになります。
blockingメソッドでブロックする処理を包むとThreadがBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)impl.ExecutionContextImpl.fromExecutorにnullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryではonBlock内でManagedBlockerにブロックする処理を渡すようなBlockContextを継承したForkJoinWorkerThreadを生成するThreadFactoryを定義してForkJoinPoolに渡している。scala.concurrent.ExecutionContext.Implicit.globalで取得できるExecutionContextはimpl.ExecutionContextImpl.fromExecutorにnullを渡した結果生成された物で。ManagedBlockerにブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。
ForkJoinPoolとfromExecutorService
ここまででglobalなExecutionContextだとBlockContextが継承されたThreadが生成されて、blockingが良い感じになるということがわかりましたが、
ExecutionContext.fromExecutorService(new ForkJoinPool(100))とした場合はどうなるのか気になったので調べてみました。
scalaリポジトリ内のForkJoinPoo.java *4 を見ると以下のようになっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
// L1403~1404 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; // L881~886 static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } // L3678~3747 static { defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); } // L2838~2939 public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } //
これを見るとForkJoinPoolにForkJoinWorkerThreadFactoryを渡さない時にデフォルトで指定されるDefaultForkJoinWorkerThreadFactoryはBlockContextを継承しないForkJoinWorkerThreadを生成するようです。(そもそもBlockContextはscalaの物なのでJavaのクラスがデフォルトで継承してないのは当たり前な気がしてきました。)
ということはExecutionContext.fromExecutorService(new ForkJoinPool(100))のように指定してしまうとblockingで包んでも何もしてくれないDefaultBlockContextで処理されることになりそうです。
globalでないExecutionContextを使う際は予めブロックすることを前提にスレッドを多めに作ったりするものの少し注意が必要なようです。(動的にスレッドを増やすより、予め作っておいたほうが早く処理できると思うのでblockingで包まない戦略もある程度ありな気がしています)
akkaのdispatcher
akkaのDispatcherはExecutionContextとして使えるのですが、そちらを使えば自分でForkJoinWorkerThreadFactoryを定義しなくても済みそうです。
akkaのThreadFactoryはMonitorableThreadFactoryになっていてこれが、AkkaForkJoinWorkerThreadを生成しています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L183-L188
final case class MonitorableThreadFactory(name: String, daemonic: Boolean, contextClassLoader: Option[ClassLoader], exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing, protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool)) // Name of the threads for the ForkJoinPool are not customizable. Change it here. t.setName(name + "-" + counter.incrementAndGet()) t }
そしてAkkaForkJoinWorkerThreadはBlockContextを継承しています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L161-L174
private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext { override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = { val result = new AtomicReference[Option[T]](None) ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { def block(): Boolean = { result.set(Some(thunk)) true } def isReleasable = result.get.isDefined }) result.get.get // Exception intended if None } }
blockOnの中身はExecutioContext.globalで生成されるものとそんなに変わらなさそうです。
MonitorableThreadFactoryはActorSystemImplで定義されて、Dispatchersに渡されています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L572-L573
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L642-L643
final val threadFactory: MonitorableThreadFactory = MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler) val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
ActorSystem.applyからdispatchers経由でAkkaForkJoinPoolを生成しつつ指定がなければAkkaForkJoinPoolが生成されるようです。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L321
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L380-L396
// 断片のみ executor match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) }
final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler, asyncMode: Boolean) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics { def this(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true) override def execute(r: Runnable): Unit = if (r ne null) super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) else throw new NullPointerException("Runnable was null") def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() }
この中間部分も調べたのですが、この記事の分量が倍になるくらい色々経由していたので割愛します(;´Д`)
結論としては、akkaのdispatcherでfork-join-executorを指定しておけば、blockingが使えるExecutionContextが取得できそうです。
おまけ: scala.2.12.xでのDefaultThreadFactoryの改善
akkaのDispatcherを使わなくてもForkJoinPoolのコンストラクタにExecutionContextImpl.DefaultThreadFactoryと同じものを渡せば問題はないはずです。
注意点としてscala2.12.0-M3のコードを見ると以下の様な注意書きが見つかります。
https://github.com/scala/scala/blob/v2.12.0-M3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L78-L79
// When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk }
どうもscala2.11.xのコードではblockingをネストさせるとスケジューラにヒントが行き過ぎてしまう問題があるようです。コピペして使う際には注意が必要そうです。
おまけ: Future.applyとExecutionContext
よく考えたらFutureでExecutionContextがどうやって使われてるのかよく知らなかったので調べてみました。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/Future.scala#L492
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/Future.scala#L18-L34
class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } } def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.prepare.execute(runnable) runnable.promise.future }
コードを見る限りexecuteが呼ばれているようです。*5
executeメソッドはExecutionContextImplを見てみると以下の様な実装になっていました。(DefaultThreadFactoryではないのでfromExecutorServiceに非nullを渡してもこのメソッドが呼ばれます)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L99-L110
def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => val fjt: ForkJoinTask[_] = runnable match { case t: ForkJoinTask[_] => t case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) } Thread.currentThread match { case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() case _ => fj execute fjt } case generic => generic execute runnable }
AdaptedForkJoinTaskは大雑把にはrunnableをForkJoinTaskでラップするような実装です。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L118-L130
全体としてはプールがForkJoinPoolならrunnableをとりあえずForkJoinTaskにしてfork()を呼ぶという感じになっているようです。
joinを明示的に行っている部分は見つからなかったので、そういったwork-stealな効果は使っていないのか別の部分で使っているのかは不明です。特にFutureやPromise
の実装をほとんど見てないのでこの部分は適当です。
感想
以上です。結論は一番上に書いてあります。 akkaのdispatcherもそれなりに追ったのですがコードがわりと飛んででいるので紹介するのは厳しさを感じ断念しました。(コンフィグファイル越しにかなり柔軟に設定できるので仕方ない気がします) コードリーディングの結果をブログに書こうとすると死ぬけど、何かしら読んだメモは残したいしどうしようみたいな気持ちになった。(小学生以下の感想)
*1:かぎかっこ内の表現はForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 を意識しました
*2:BatchingExecutorでも見つかる(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BatchingExecutor.scala#L47-L99)のですが、ForkJoinPoolの話とは少しずれてしまうので割愛します
*3:Runtime.availableProcessorsによって得られる論理コア数と同じ数になるようです。http://docs.scala-lang.org/overviews/core/futures.html
*4:scala 2.11.xではJava8以前の処理系にも対応するためにforkjoinパッケージを自前で保持していますが、2.12.xではJava8を前提とするためこのコミットで依存が取り除かれているようです
*5:executor.prepareメソッドについても調べたのですが、https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L75-L89のコメントがなるほど分からんという感じだったのでスルーしています