このエントリは RecoChoku Tech Night 5社合同 AWS re:Invent 2019 参加報告会 - connpass での発表内容、及びはてなエンジニア Advent Calendar 2019の20日目のブログ(若干フライングで投稿しています!)です。
AWS re:Inventのセッションの一つである SVS323-R1 - REPEAT 1 Mastering AWS Lambda streaming event sources
を元ネタに、re:Invent 2019直前に出たKinesis Data Streamsの新機能の解説をします。
- 基本的な用語説明
- Kinesis + Lambdaの活用事例
- Kinesis + Lambdaのスループットが思ったように出ない時、及びその対処方法
- シャードを増やすのは万能なのか?他の手はないのか?新機能のご紹介
- デモ
- ParallelizationFactorを増やすことの注意点
- 最後に
- 免責
- 最後の最後に
基本的な用語説明
このエントリを読むのに必要な最低限の用語を解説します。 詳しくはAmazon Kinesis Data Streams の用語と概念 - Amazon Kinesis Data Streamsが画像、内容共に参考になります。
Kinesis Data Streams
はシャードの集合です。AWSのサービスとしてのKinesisにはData Streams以外にも色々なサービスがあるのですが、このエントリでは特に誤解の懸念はないため、以下 Kinesis
と呼びます。
Kinesisに投げ込まれるデータ1件1件のことをデータレコード
と呼びます。データレコードをKinesisに投げ込むのは プロデューサー
です。
シャードが複数ある状態でデータレコードがKinesisに挿入されられると、データレコード内にある パーティションキー
を元に、各シャードにグループ分けされます。
Kinesisに投げ込まれたデータは、コンシューマ
が取得して処理します。
このエントリではコンシューマがLambdaである場合について記述します。
シャード1つ1つに対して読み取りや書き込みのスループットに関する制限があり、また、Kinesis + Lambdaの場合、1つのシャードに1つのLambda関数が実行されるというモデル(このエントリで紹介する新機能の前の話ですが)であるため、性能を上げたい場合はシャードを増やす、というのが基本的な戦略になります。
あるデータレコードがKinesisに投げ込まれてからLambdaによって処理されるまでの時間は、CloudWatchでLambdaの IteratorAge
として確認することができます。IteratorAgeが上がった場合、一般的には投げ込まれるスループットよりも処理するスループットが少ないことが多いのでIteratorAgeはどんどん上昇していくことになります。
ちなみにこのエントリではKinesis + Lambdaについて語っていますが、ほとんどそのままDynamoDB Streams + Lambdaにも当てはまります。
Kinesis + Lambdaの活用事例
AWSの導入事例でも、Kinesis + Lambdaを、スケールするデータ分析機能の一部として使っている事例が紹介されています。 Bustle Case Study - Amazon Web Services (AWS)
はてなでも、 時系列データベースという概念をクラウドの技で再構築する - ゆううきブログ にあるように、Kinesis + Lambdaの構成はバリバリ使っています。
Kinesis + Lambdaのスループットが思ったように出ない時、及びその対処方法
Kinesis + Lambdaの構成を実際に運用していると、思うようにスループットが出ない、つまりKinesisに投げ込まれたデータ量に対して出口のLambdaの処理量が足りなくなり、Lambdaが処理していないデータがKinesis内にどんどん増えていく状況が起きる場合があります。 どのようなときにそのようなことが起きるのでしょうか。
インターネットに書いてある事例や私のみたことのある事例や私の想像力から、以下に起きそうなことと、その対処方法を以下に列挙します。この列挙と対処法 ついては Lambda 関数の IteratorAge メトリクスを下げる に書いてある内容を大いに参考にしています。
- Lambdaの同時実行数制限を超えてしまい、Lambdaの実行にスロットリングが発生する
- Lambdaの同時実行数制限を上げましょう
- Lambdaが特定のデータレコードに対してエラーになり、そのデータレコードが生きている間(デフォルトでは24時間)延々と同じデータレコードの再試行を続けて全くデータの読み取りが進まなくなる
- 対象のデータレコードが来てもLambdaがエラーにならないようにLambdaを修正してデプロイしましょう
- もしくは AWS Lambda Supports Failure-Handling Features for Kinesis and DynamoDB Event Sources に書いてあるようなLambdaのエラー周りの新機能をうまく使って回避しましょう
- こちらも実はre:Invent直前に出た新機能です。エラーを無限に再試行しないような設定をしたり、エラーが発生したら1つのLambdaが受け取るデータ量を減らして再試行したりすることができるようになりました
- Kinesisのコンシューマーが複数存在する際に、コンシューマーが1つのシャードからデータを読んでくることでシャードのスループット制限を超えてデータが読めなくなる
- Kinesisの拡張ファンアウトを使う、もしくはシャードを増やして1シャードあたりのスループットを減らしましょう
- コンシューマーであるLambdaが遅すぎて1shardに入るデータ量よりもLambdaが処理するデータ量が少ない
- Lambdaで行われている処理に対してCPUリソースが足りない場合は、Lambdaのメモリを増量することでCPUを強化できるのでしましょう。もしくはシャードを増やすことでデータ量あたりのLambdaの実行数を増やしましょう
- Lambdaで行われている処理以外に時間がかかっている場合(ex. sleep, 外部のネットワークへのI/O等)は、シャードを増やすことでデータ量あたりのLambdaの実行数を増やしましょう
シャードを増やすのは万能なのか?他の手はないのか?新機能のご紹介
シャードを増やすことの問題
上記で、シャードを増やす、という解決策がいくつか出てきます。実際Kinesisはシャードを増やすことでだいたいのスループットの問題は解決するのですが、ここで1つ大きな問題点があります。 それは、詰まってる状態でシャードを増やしてもシャードが増えた効果はすぐには発揮されないということです。以下で詳しく解説します。
Kinesisのシャードを増やすというのはどういうことかというと、1つのシャードを分割することを意味します。分割する前のシャードを親、分割した後のシャードを子と表現します。 Kinesisにおいて1つのシャードを2つに分割すると、それ以降に投げ込まれるデータレコードは新しい2つの子シャードに入っていくのですが、すでにKinesisが親シャードで受け取っていたデータは親シャードに残り、1並列でLambdaが処理することとなります。
すなわち、投げ込まれるデータレコードのスループットよりも処理されるデータレコードのスループットが少ない状態で時間が経過して、lambdaで処理しているデータレコードが、x時間前にKinesisに入ったデータだった場合(=LambdaのIteratorAgeがx時間)、シャードを増やしたとしてもx時間経過して親シャードのデータがすべて読み取られるまでは遅延が続くことになります。
対応するまでに膨れ上がったIteratorAgeの時間が、シャードを増やした後にKinesis + Lambdaの遅延が解消するまでの時間にそのまま直結するため、素早い復旧のためには如何に遅れを素早く検知してシャードを増やすかが重要になってきます。IteratorAgeを見つつシャードをオートスケールさせるのが解の一つなのかもしれませんが、Lambdaの更に裏側のなにか(DBなり外部APIなり)が詰まる可能性も考えると、オートスケールさせるのも少し不安になりますね。 かといって人間が対応しようとすると、普通の障害以上に、如何に早く検知して早く対応するかの戦いになってしまいます。大変ですね。 もちろん、Kinesis + Lambdaの構成はそもそも非同期であるため、少しの遅延が即座に問題になるわけではないようにシステムを構築するのが基本ではありますが、いくら遅延が大丈夫と言っても数分の遅延は許せるが数時間の遅延になってくると問題になるケースは少なくないのではないでしょうか。
上記の悩みを解決する新機能のご紹介
今回の問題は、Lambdaの並列数を増やすためにはシャードを増やすくらいしかないが、シャードを増やすしても効果が出始めるのは問題が発生し始めてからシャードを増やすまでの時間だけかかってしまうということでした。何か、対応したらすぐに効果が出る方法はないものでしょうか。 ということでre:Invent2019直前に発表された新機能のご紹介です。 AWS Lambda Supports Parallelization Factor for Kinesis and DynamoDB Event Sourcesです。
これはなにかというと、これまでできなかった1シャードあたりのLambdaの実行数を増やすことができるという機能となります。これによって、シャードを増やさなくてもLambdaの並列数を増やすことができるようになります。
それでは、本当にこの機能によって、これまでの シャードを増やす
という対応策では実現できなかった、すでにKinesisのシャードに投げ込まれたデータの処理の並列数を後から増やすことはできるのでしょうか。
以下デモをご覧ください。
デモ
RecoChoku Tech Night 5社合同 AWS re:Invent 2019 参加報告会
では以下をデモします。
このブログを読んでいる皆さんも以下を参考にデモをしてみることが可能だと思いますので一度やってみてください。
以下をcloneし、README通りに環境を設定し、make put_records
まで実施してKinesisにデータを投げ入れます。
github.com
その後、template.ymlを編集して2つあるLambdaのうちの片方のParallelizationFactor
部分を2以上に増やし make deploy
して待つと、CloudWatch上の、ParallelizationFactorを増やした方のLambdaのIteratorAgeの上限がもう片方と比べて下がるのが確認できるのではないかと思います。
このデモでは、1シャードのKinesisに1件のレコードを取得し毎回1秒スリープするLambdaが付与されておりそこに1000件のデータを流し込んでいるので、ParallelizationFactorが1だとIteratorAgeは1000秒付近になりますが、ParallelizationFactorを増やすとその分IteratorAgeの上限が下がっているのがわかるかと思います。
事前に私の手元で実施した環境でのそれぞれのIteratorAgeの推移(サーバ管理ツールMackerelで取得)は以下のとおりです。グラフが途切れてる部分が処理完了を表しています。以下グラフの時刻が同じ時刻になってるのを見るとわかると思いますが、1つのKinesisに対して2つのLambdaをコンシューマーとして登録し、片方のLambdaのParallelizationFactorを増やしています。
ParallelizationFactor: 1
で全レコード処理するのに1000秒かかってる様子(単位: ms)
ParallelizationFactor: 1
を ParallelizationFactor: 10
に変更して200秒ほどで全レコードが処理されている様子
ParallelizationFactorを増やすことの注意点
- Lambdaの並列実行数が増えることになりますので、Lambdaの並列実行数制限には注意しましょう
- シャードを増やす時同様、Lambdaの更に後ろにあるサービス(DB, 外部API, etc)がつまらないかどうかには注意しましょう
最後に
以上、AWS re:Invent 2019の直前に発表されたKinesisの機能の紹介でした。そんなに派手な機能ではないと思いますが、切り札の一つとして持っておくといいのではないかなと思います。 本エントリへのツッコミなど激しく募集しています
免責
- この実験情報はあくまでも私の環境で行ったもので、あまねくどの環境でもそのようなことが起きるかは不明です。本番環境で使用する前にAWSのサポートに問い合わせていただけたらと思います。
最後の最後に
はてなエンジニア Advent Calendar 2019ですが、 前日は id:Pasta-K さんのウェブページの表示を遅くなくしたい時の道標 - ぱすたけ日記でした。 明日は id:papix さんになります。