Amazon Kinesis + Reactive Extensionsによる簡易CEP
AWSのAmazon Kinesis!大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス。うーん、いかにもわくわくしそうなキーワードが並んでいます。そしてついに先日、東京リージョンでも利用可能になったということでAWS Summitの最中もwktkして、どうやって利用したもんかと考えてました。だって、リアルタイムにイベントデータが流れてくる→オブザーバブルシーケンス→Reactive Extensions(Rx)、という連想になるのは自然なことですよね?
Kinesisとは
Rx、の前にKinesisとは。【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理を見れば事足りますが、表現するなら土管、ですかね。イベントデータの。以下ぽんち絵
Streamの中はShardという単位で分かれていて、データを放り込む時はPartitionKeyを元に、どのShardに突っ込まれるか決まる。読み書き性能自体は完全にShardの数で決まっていて、1シャード毎にWriteは1MB/sec - 1000Req/sec, Readは2MB/sec - 5Req/secとなってます。事前に負荷状況を予測していくのと、随時、Split(Shardの分割)とMerge(Shardの統合)してスケーリングしていく、って感じですかねえ。API自体は単純で、あんま数もないので簡単に理解できるかと。
APIが単純なのはやれることが少ないから。土管。情報を左から右に流すだけのパイプ。その代わり入力は限りなく無限にスケールしていく(Shardを増やしまくれば)。では出力は?というと、Kinesis Applicationとよばれる、といっても実体は、別にAPIをほぼほぼポーリングで叩いてデータ取り出して何か処理するものをそう呼んでるだけ。で、取り出すのはAPI叩いて保存されたデータを読むだけ。
そう、ポーリング。Kinesis自体は一時保管所であって、本当のリアルタイムでPubSub配信するわけじゃあない(用途としては問題ないレベルで低遅延にはなるけれど)。保存時間は24時間で、その間はStream中のどこから(最初からでも最新からでも任意の位置から)でも取り出すことができる。一時保管所がわりにS3を使ったりすると、ゴミは貯まるしどこまで取ったかとか煩わしくなるけれど、Kinesisの場合はStreamの形状になっているのでとてもやりやすい。ただしKinesisは制限として1レコード辺り50KBまで。更にHTTPで投げる際にBase64になってブヨっと膨らむ。
ObservableKinesisClient
C#でKinesisを使うには、AWS SDK for .NETを使えばAmazonKinesisClient入ってます。ソースコードも公開されてるしNuGetでも入れられるし、APIはとりあえずAsyncに対応してるし、APIデザインもちょっと奇妙なところもあるけれど、一応全て統一されたモデルでデザインされてるので、割と結構良いと思ってます。
Kinesis、データの登録はPutRecordでバイナリ投げるだけなので単純なのですが、取り出しの方はいささか面倒で、DescribeStreamによるStream内のShard情報の取得、GetShardIteratorによるShardIterator(どの位置から取得開始するか、の情報)の取得、それを元にGetRecord、そして延々とポーリングのためのループ。と、繰り返す必要があります。
というわけかで、まずは利用例の方を。
// とりあえずAWSのキーと、ストリーム名で生成する感じ var client = new ObservableKinesisClient("awsAccessId", "awsSecretAccessKey", RegionEndpoint.APNortheast1, streamName: "KinesisTest"); // データの登録。オブジェクトを投げ込むとJSONシリアライズしたのを叩き込む。 await client.PutRecordAsync(new { Date = DateTime.Now, Value = "ほげほげほげほげ" }); // ObserveRecordDynamicでJSONのストリームとして購読できる client.ObserveRecordDynamic() .Where(x => x.Value != "ほげ") // xはdynamicなのでどんなSchemaのJSONも自由に辿れる .Select(x => x.Date + ":" + x.Value) .Subscribe(Console.WriteLine);
はい。ObserveRecordDynamicで、リアルタイムに流れてくるデータを簡単に購読できます。IObservableなので、Rxによって自由にクエリを書くことが可能。また、何のデータが流れてくるか分からないストリームのために、JSONはdynamicの形でデシリアライズされています。(IntelliSenseの補助は効きませんが)スキーマレスに、あらゆるデータをRxで処理できます。もちろん、型付けされたものが欲しければObserverRecord<T>を、今は実装してないですが、まあ簡単につくれます:)
以下ObservableKinesisClient本体。
// JSON.NET, AWSSDK, Rx-Mainの参照が必要 public class ObservableKinesisClient { readonly UTF8Encoding encoding = new UTF8Encoding(false); readonly JsonSerializer serializer = new JsonSerializer() { Formatting = Newtonsoft.Json.Formatting.None }; // ThreadSafeだよ readonly string streamName; readonly AmazonKinesisClient kinesis; // ThreadSafeなのかは知らない(ぉぃ // コンストラクタはもっとまぢめにやりましょう public ObservableKinesisClient(string awsAccessId, string awsSecretAccessKey, RegionEndpoint endPoint, string streamName) { this.kinesis = new AmazonKinesisClient(awsAccessId, awsSecretAccessKey, endPoint); this.streamName = streamName; } // ようするにObjectを1レコードずつJSONで突っ込むもの public async Task<PutRecordResponse> PutRecordAsync(object value) { using (var ms = new MemoryStream()) using (var sw = new StreamWriter(ms, encoding)) using (var jw = new JsonTextWriter(sw) { Formatting = Formatting.None }) { serializer.Serialize(jw, value); jw.Flush(); ms.Position = 0; var request = new PutRecordRequest { StreamName = streamName, Data = ms, PartitionKey = Guid.NewGuid().ToString() // PartitionKeyは適当にランダム }; // つまり1レコード1HTTP POSTということになる。 // 大量に投げる際は素朴すぎてアレゲ感があるので、実際にやるときはまとめてから放り込んで // 取り出す側も↑の構造を前提にして取り出すよーな感じにしたほうがいーかもデスネー return await kinesis.PutRecordAsync(request).ConfigureAwait(false); } } // Dynamicが嫌な場合はSerialize<T>でおk。とりあえずこの例ではdynamicでやります。 // Client内部で分配しちゃったほうがきっと自然にやさしい(Publish().RefCount()) public IObservable<dynamic> ObserveRecordDynamic() { return Observable.Create<dynamic>(async (observer, cancellationToken) => { var isRunningNextPipeline = false; try { // まずShard一覧を取得する // TODO:これを使いまわしちゃうとShardsの増減には対応してないよ! // 毎回DescribeStream読むのもアレだしたまに問い合わせとかがいいの? var describeStreamResponse = await kinesis.DescribeStreamAsync(new DescribeStreamRequest { StreamName = streamName }).ConfigureAwait(false); var shards = describeStreamResponse.StreamDescription.Shards; var nextIterators = new List<string>(); foreach (var shard in shards) { if (cancellationToken.IsCancellationRequested) return; // CancellationTokenの監視だいぢだいぢ // ShardIteratorTypeは実際は取り出した位置を記録しておいてAFTER_SEQUENCE_NUMBERでやるか、LATESTでやるかがいーんじゃないでしょーか? var shardIterator = await kinesis.GetShardIteratorAsync(new GetShardIteratorRequest { StreamName = streamName, ShardId = shard.ShardId, ShardIteratorType = ShardIteratorType.TRIM_HORIZON, // TRIM_HORIZON = 最初から, LATEST = 最新, AT_SEQUENCE_NUMBER = そこから, AFTER_SEQUENCE_NUMBER = 次から }).ConfigureAwait(false); var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator.ShardIterator }).ConfigureAwait(false); // Shardの順番で回してるので、このPushの順番は必ずしも「時系列ではない」ことにチューイ! foreach (var item in record.Records) { PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push! } nextIterators.Add(record.NextShardIterator); } // NextShardIteratorがある状態で無限ぐるぐる do { if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part2 for (int i = 0; i < nextIterators.Count; i++) { if (cancellationToken.IsCancellationRequested) return; // ところどころCancellationTokenの監視 Part3 var shardIterator = nextIterators[i]; var record = await kinesis.GetRecordsAsync(new GetRecordsRequest { ShardIterator = shardIterator }).ConfigureAwait(false); // こちらでも、やはりShardの順番で回してるので、状況によって必ずしも時系列にはならないことにチューイ! foreach (var item in record.Records) { PushRecord(item, observer, ref isRunningNextPipeline); // ObserverでPush!Push!Push! } nextIterators[i] = record.NextShardIterator; } await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // 実質ポーリングなのでなんとなくDelayをちょっと入れてみる nextIterators = nextIterators.Where(x => x != null).ToList(); // 明らかに非効率なこの実装はテキトーなんで真面目にやるなら真面目に書いてください:) } while (nextIterators.Any()); } catch (Exception ex) { if (isRunningNextPipeline) { throw; } else { observer.OnError(ex); } return; } observer.OnCompleted(); }); } void PushRecord(Record record, IObserver<dynamic> observer, ref bool isRunningNextPipeline) { using (var sr = new StreamReader(record.Data, encoding)) // item.DataにMemoryStreamの形で1レコードが受け取れる using (var jr = new JsonTextReader(sr)) { var obj = serializer.Deserialize(jr); isRunningNextPipeline = true; observer.OnNext(obj); // 1レコードをPush isRunningNextPipeline = false; } } }
PutRecordAsyncはまんま、JSONにシリアライズしたデータを投げ込んでるだけです。ObserverRecordDynamicのほうはちょっと複雑っぽいですが、やってることは順に、DescribeStreamAsyncでShard一覧を取得→それぞれのShardでGetShardIteratorAsyncで始点の取得・GetRecordsAsyncで最初のデータを取得しobserverに配信→取得できたNextShardIteratorを元にデータ取得と配信の無限ループ。です。
コメントで色々書いてありますが、Shard単位で処理していくのでレコードのSequenceNumberの順にPushされているわけではないことと、ShardがSplitやMergeで変動することへの対応は必要よね、とか考えることは色々ありますね。あと、Readの制限が5Req/secとかなり少ないので、複数処理する必要があるなら、できればリクエストは分配してやりたいところ。RxならPublishで分配、ついでにRefCountでSubscriberが0になったら購読解除というのが自然に書けるので、その辺も入れてやるといいかなー、なんて思います。とはいえ、基本的にはデータ取ってOnNextで垂れ流すという、それだけに収まってはいます(ほんとだよ!)。
従来はこの手のコードはyield returnで処理するはずですが、それがOnNextに変わっているという事実が面白い!勿論、同期API + yield returnにすることも可能ですが、AWS SDKの同期APIは非同期のものを.Resultで取ってるだけで非同期のほうがネイティブになるので、同期API使うのはお薦めしません。非同期時代のLINQ、非同期時代のイテレータ。中々面白くありません?UniRx - Reactive Extensions for UnityのFromCoroutineでも、IObserverをyielderとして渡して、非同期のイテレータを作れる(コンバートできる)ようにしています。こういうのも一つのデザイン。
like CEP(with LINQPad)
CEP(Complex Event Processing)は最近良く聞くようになりましたねー、MicrosoftにもStreamInsightというかなり立派なプロダクトがあるのですが、あんまり話を聞かないし将来性もビミョーそうなので見なかったことにしましょう。ちなみにStreamInsightは2.1からRxと統合されたりして、この手のイベントストリームとRxとが相性良いこと自体は証明済みです。
そんなわけでMicrosoft周辺では全然聞きませんが、日本だとLINEでのEsper CEPの活用例とかNorikra:Schema-less Stream Processing with SQLで盛んに聞いて、まーたMicrosoft周辺によくある、一歩先を行ったと思ったら周回遅れ現象か!とか思ったり思わなかったり。
というわけで、Norikraの紹介スライドのクエリ5つをRxで書いてみましょう。また、動作確認はLINQPadのDumpでリアルタイムに表示が可能です(asynchronousにクエリが走ってる最中はResultsのところにリアルタイムにグリッドが追加されていく!)
// Queries:(1) client.ObserveRecordDynamic() .Select(x => new{ x.Name, x.Age }) .Dump(); // Queries:(2) client.ObserveRecordDynamic() .Where(x => x.Current == "Shibuya") .Select(x => new{ x.Name, x.Age }) .Dump(); // Queries:(3) client.ObserveRecordDynamic() .Buffer(TimeSpan.FromMinutes(5)) .Select(xs => xs.GroupBy(x => x.Age).Select(x => new { Age = x.Key, Count = x.Count() })) .Dump(); // Queries:(4) client.ObserveRecordDynamic() .Buffer(TimeSpan.FromMinutes(5)) .Select(xs => xs.Max(x => x.Age)) .Dump(); // Queries:(5) client.ObserveRecordDynamic() .Where(x => x.Current == "Kyoto" && x.Attend[0] && x.Attend[1]) .Buffer(TimeSpan.FromMinutes(5)) .Select(xs => xs.GroupBy(x => x.User.Age).Select(x => new { Age = x.Key, Count = x.Count() })) .Dump();
5分間だったらBufferもしくはWindowが使えます(量が少なそうならBufferのほうが、後続クエリにLINQ to Objectsが使えて分かりやすい、量が多いならWindowで、同様にRxで集計クエリが書ける)。他に何ができるかはRxJavaのWikiのOperator一覧でもどうぞ。めちゃくちゃ何でもできます。
SQL vs Rx
SQLである必要は、あるようで、ない。テキストベースのDSLを作るならSQLが共通知識として期待できるので、SQLに寄せる必要性はかなり高い。けれど、Rxならば、LINQとしての共通知識と、C#そのものであるというコンパイルセーフな点と何でもできること、メソッドチェーン(+IntelliSense)による書きやすさ。SQLライクなものを使いたい理由は全くない。
(とはいえ勿論いちだいのRxがぶんさんごりごりのに勝てるとは思ってないんで、そこはまぁかじゅあるなはなしです)
TODO
というわけで見てきたわけですが、まあ所詮まだ単純なコードによるコンセプトレベルの話ですね!本格的にこれからやるとしたら
- ObservableKinesisClientをもっとしっかりしたものに
- Kinesis ApplicationをホストするためのServiceとプラグイン機構
- ログ転送側としてSLABのKinesis用Sink
ですかねえ。まぁ、これらはJavaですでに用意されているamazon-kinesis-clientやamazon-kinesis-connectorsを.NET環境で代替するために必要だ、といったところですね。素直にJava書けば?っていうのは一理あるけれど、どーなんですかね、C#でやりたいんですよ(笑)
Semantic Logging Application Block(SLAB)というのは構造化ロガー(正確にはロガーは含まれないけれど)と収集サービスがセットになったライブラリです。面白いのはOut-Of-Processでの動作が選べて、その場合はWindowsネイティブのEvent Tracing for Windows (ETW)経由でログが運ばれるので、非常に高速に動作する、というところ。Sinkというのは出力用プラグインみたいなものです。なので、アプリケーション→EventSourceロガー→SLAB Service(+ KinesisSink)→Kinesis という構造を作ることで、データをリアルタイムに投下するところまでは行ける。あとはRedShiftに送って解析(amazon-kinesis-connectorsには既にありますね)するなり、他のKinesis Application作るなりよしなに出来るかなぁ、できればいいかなぁ、と。ラムダアーキテクチャ、というホドデハ・モチロンナイ。
AWS + Windows(C#)
先週の木・金に開催されたAWS Summit Tokyo 2014にて、AWS + Windows(C#)で構築する.NET最先端技術によるハイパフォーマンスウェブアプリケーション開発実践と題して、セッションを行いました。
まとめで書きましたが、C#+AWSは現実解、だと思ってます。そしてAWSだからといって特別なこともなく、そしてC#だからといって特別なこともない。Kinesisもちゃんと使えるわけだし、結構面白いことがまだまだ出来るんじゃないかな、って思ってます。なんでAzure使わないんですか?というのには、よく聞かれるのでお茶を濁して答えないとして(!)、AzureにもKinesisのようなAzure Event Hubsというものが先週プレビューリリースされました。C#からの活用という点では、こちらにも注目していきたいところです。Event Hubs Developer Guideなんか見ると普通に色々参考になるし、機能的にはHTTP以外にAMQP使えたり、ちょっと強そうではある。