LINE Engineering
Blog
Akka HTTPの仕組みを理解する
初めまして、Ads Platform開発チームの岡田(@ocadaruma)です。
この記事はLINE Advent Calendar 2017の17日目の記事です。
今回、個人的に以前から気になっていたAkka HTTPの内部構造について、この機会に調べましたので紹介いたします。
Akka HTTPは、Lightbend社によって開発されている、Scala/Java用のHTTP toolkitです。
現在はメンテナンスが終了したsprayの後継と位置付けられており、特徴的なRouting DSLをsprayから受け継いでいます。
また、Play Frameworkは2.6系より、Akka HTTPをデフォルトのバックエンドとして採用しています。
Routing DSLを始めとしたAkka HTTPのAPIは、シンプルかつ高いComposability(組み立て可能性。小さく再利用しやすい構成要素を複数組み合わせて使えること)を提供する一方、その抽象度の高さから、低レイヤーでのHTTPリクエストの処理の仕組みまで含めて理解するには、若干のハードルがあるように感じていました。
このことは、例えば高負荷時におけるパフォーマンスの問題の調査などで問題となりえます。
これが、今回Akka HTTPについて調べることにしたきっかけです。
以下のバージョンのソースコードを対象とします。
また、この記事で着目するのはAkka HTTP Serverのみとし、Client-Sideは対象外とします。
なお、Scala、Akka、およびjava.nioに関する基本的な解説は省略します。
この記事では、次のような仕様を持つ小さなサンプルコードを例に、仕組みを追っていくことにします。
package com.example
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
//========
// (1) Routing DSL
//========
val route: Route = get {
extractUri { uri =>
complete(uri.toString())
}
} ~ put {
complete("PUT")
}
//========
// (2) HTTPサーバー起動
//========
Http().bindAndHandle(route, "localhost", 8080)
}
}
公式ドキュメント:Directives
サンプルコードの(1)
の部分では、Routing DSLを使ってRoute
を構成しています。
Route
はRequestContext => Future[RouteResult]
の型エイリアスであり、いわゆる普通のリクエストハンドラー的なシグネチャです。
get
、extractUri
、およびput
は、Directive[L]のインスタンスです。
Directive[L]
は大雑把にいえば(L => Route) => Route
というシグネチャを持つ関数のようなものです。
例えばextractUri
はDirective1[Uri]
のインスタンスであり、Uri => Route
を受け取ってRouteを返します。
extractUri
が行うのは、Uri
をリクエストから抽出してinner Uri => Route
に渡すことだけです。
以下はBasicDirectives.scala#L154の記述です。
def textract[L: Tuple](f: RequestContext ⇒ L): Directive[L] =
Directive { inner ⇒ ctx ⇒ inner(f(ctx))(ctx) }
一方、get
およびput
はDirective[Unit]
のインスタンスで、以下のように条件に応じて異なるDirectiveを返すDirectiveです(ややこしい)。
=> Route
をそのまま評価するDirectiveRouteResult.Rejected
を返すRoute」を返すDirective以下はMethodDirectives.scala#L83の記述です。
def method(httpMethod: HttpMethod): Directive0 =
extractMethod.flatMap[Unit] {
case `httpMethod` ⇒ pass
case _ ⇒ reject(MethodRejection(httpMethod))
} & cancelRejections(classOf[MethodRejection])
そして、(1)
のコードではget
とput
による2つのRouteを~
で結合して、1つのRoute
を作っています。
~
は、次のような動きを持つRouteのメソッドです(正確にはenrich my libraryパターンで足されたメソッドですが)。
RouteResult.Complete
ならそれを返す。RouteResult.Rejected
なら引数のRouteを評価した結果を返す。このように、Directiveをネストさせたり結合したりすることで、リクエストハンドラーを直感的に組み立てられるようになっています。
次に、HTTPリクエストを受け付けてリクエストハンドラーに渡すサーバー部分について追っていきます。
サンプルコードの(2)
の部分は、Http()
によって、Akka ExtensionsであるHttpExt
を取得し、bindAndHandle
でハンドラーを渡してサーバーを起動するコードです。
bindAndHandle
の実装を参照すると、次のようにネストしたAkka Streams Graphが存在することがわかります。
Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]]
Tcp.IncomingConnection#flow: Flow[ByteString, ByteString, NotUsed]
公式ドキュメント:Working with streaming IO
(Tcp.IncomingConnection#flow
は生のTCPレイヤーなので、Route
でハンドリングできるようにfullLayer
(HTTP/TLSレイヤー)と接続されていますが、今回はfullLayer
のことは置いておきます)
これらのGraphがどのように作られているかを見れば、Akka HTTPの仕組みについておおまかに理解できるはずです。
ところで、Akka(HTTP)はjava.nioによるIO多重化を用いており、低レイヤーでやっていること自体は、下記のような典型的なTCPサーバーと同様です。
package com.example
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import scala.collection.JavaConverters._
object EchoServer {
def main(args: Array[String]): Unit = {
//========
// ソケットを監視するSelectorをopen
//========
val selector = Selector.open()
//========
// ポートをListenするChannelをopen
//========
val listeningChannel = ServerSocketChannel
.open()
.bind(new InetSocketAddress(args.head.toInt))
listeningChannel.configureBlocking(false)
listeningChannel
.register(selector, SelectionKey.OP_ACCEPT)
try {
//========
// イベントループ
//========
while (selector.select() > 0) {
val keys = selector.selectedKeys()
keys.asScala.foreach { key =>
if (key.isAcceptable) {
val connectionChannel = listeningChannel.accept()
connectionChannel
.configureBlocking(false)
.register(key.selector(), SelectionKey.OP_READ)
} else {
val channel = key.channel().asInstanceOf[SocketChannel]
if (key.isReadable) {
val byteBuffer = ByteBuffer.allocate(4096)
channel.read(byteBuffer)
byteBuffer.flip()
channel.write(byteBuffer)
}
}
}
keys.clear()
}
} finally {
selector.keys().asScala.foreach(_.channel().close())
}
}
}
つまり、Akkaでも次のような処理を行なっている箇所が存在します。
それがActor based TCP handlingです。
上記を念頭に置きつつ、まずSource[Tcp.IncomingConnection, Future[Tcp.ServerBinding]]
から掘り下げていきます。
このSourceは、カスタムGraphStageであるConnectionSourceStage
から作られています(参照:Tcp.scala#L111)。
IO(IoTcp)(system)
によってTcpManager
Actorが生成され、この時点で、ソケットの監視を行うSelector
が以下の流れで作成されます。
SelectionHandler
Actorが生成される。SelectionHandler
のregistry
メンバーとして、ChannelRegistryImplが生成される。次に、ConnectionSourceStage
のGraphStageLogicを見ると、preStart hookでTcpManager
ActorへTcp.Bind
メッセージを送信しています。
これをトリガーとして、次のようにTCP Listenerが形成されます。
TcpManager
はTcp.Bind
を受けて、子ActorであるSelectionHandler
にWorkerForCommand
を送信し、SelectionHandler
はそれを受けて、自身の子ActorとしてTcpListener
Actorを生成する。TcpListener
生成時に、Listening ChannelのopenおよびInetSocketAddressへのbindと、channelRegistryへ登録(つまりSelectorへの登録)を行い、TCPコネクションを受け付けることができるようになる。ここでChannelRegistryImpl#register
に着目すると、SelectionKey#attachment
にActorを格納していることがわかります。
これにより、イベントループ中でActorを取り出し、Actorを介してイベントのハンドリングを行うことができます。
ここまでで、Selectorの生成とイベントループの開始およびコネクションの待ち受けを行う流れが把握できました。
Listening Channelがacceptableになると、以下のフローでTcp.IncomingConnection
が生成され、ConnectionSourceStage
の出力となります。
ChannelRegistryImpl
のイベントループでOP_ACCEPTを検出し、TcpListener
ActorへChannelAcceptable
を送信する。TcpListener
はそれを受けてacceptしたのちにSelectionHandler
ActorへWorkerForCommand
を送信する。SelectionHandler
はWorkerForCommand
を受け、自身の子ActorとしてTcpIncomingConnection
Actorを生成する。TcpIncomingConnection
インスタンス生成時にregistry
への登録を行い、ChannelRegistryImpl
はTcpIncomingConnection
に対してChannelRegistration
メッセージを送り返す。TcpIncomingConnection
はそれを受けて、ConnectionSourceStage
のlogicにConnected
メッセージを送信した後、送受信待ちへ移行する。ConnectionSourceStage
のLogic
は、IncomingConnectionStage
GraphからFlow[ByteString, ByteString]
およびTcp.IncomingConnection
を生成し、ConnectionSourceStage
の出力としてpushする。例えば2つのTCPコネクションがある場合、Actor treeは以下のようになります。
system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 4 children
⌊-> IO-TCP RepointableActorRef class akka.io.TcpManager status=0 1 children
⌊-> selectors RoutedActorRef class akka.routing.RouterPoolActor status=0 1 children
⌊-> $a LocalActorRef class akka.io.SelectionHandler status=0 3 children
⌊-> 0 LocalActorRef class akka.io.TcpListener status=0 no children
⌊-> 12 LocalActorRef class akka.io.TcpIncomingConnection status=2 no children
⌊-> 13 LocalActorRef class akka.io.TcpIncomingConnection status=0 no children
これが、コネクションをacceptし、Source[Tcp.IncomingConnection]
の出力を生成するまでのフローです。
イベントループを用いたノンブロッキング処理を、Akka Actorによってうまくハンドリングしている印象です。
今回、Akka HTTPをjava.nioのレベルまで掘り下げて処理の流れを一通り追ったことで、トラブルシュートの際に有用な知見が得られたと感じています。
加えて、Play FrameworkもAkka HTTPを採用しているため、Play Frameworkを深く理解する際にも今回調べたことが役立つ気がします。
明日はKagayaさんによる「Redis Lua scriptingをatomicな処理とcache stampede対策に使った話」です。お楽しみに!