本記事では、Fluentd で fluent-plugin-datacounter を使って行っていた集計処理を Norikra に委譲する設定方法を解説します。
置き換えるべき理由
そもそもなぜ置き換える必要があるのかというと、ズバリ Fluentd は「分散集計処理」をするのには向いていない、と考えているためです。 その理由をつらつら書き始めると非常に長くなるので、この記事では詳細は割愛しますが、以下の要因があると思っています。
- Fluentd は CRuby で動くため、CPUコアをすべて有効活用するためには、Fluentd プロセスを複数立ち上げなければならず、複雑になる
- Fluentd プロセス間で、メモリ空間を共有できないため、要件によっては多段アーキテクチャを取らざるを得ず、複雑となる
- Fluentd プロセス間で、データを集計する time window の起点がバラけるため(起動タイミング依存)、集計結果が正確ではなくなる。
- 多段アーキテクチャを取った場合に、プロセス間のネットワークトラフィック、コネクション数が増大し、運用が困難になる
- 多段アーキテクチャを取った場合に、ネットワーク状況に加え、受け手のFluentdプロセスの処理状況によっては、データを受け取るタイミングが遅延し、集計結果が正確ではなくなる。
解決策を提示すると以下のようになりそうです(実際に試して解決したものではないので、考えがまだ甘い可能性が高いです)
- JRuby, Rubinious を使う?
- fluent-plugin-datacounter を redis 対応する
- fluent-plugin-datacounter で集計する際に メッセージの time フィールドを見るようにする
- out_forward の heartbeat を止める、コネクション数を減らす工夫を凝らした上で データ送信を keepalive に変える (実施済)
- 解決策思いつかず
地道に改善していくことで Fluentd でも最終的に分散集計処理を正確に行うことができるようになるかもしれませんが、Norikra を使ったほうが話が早い、ということですね。※ そもそも Fluentd はストリーム集計処理をするためになんて作られてないんやで ...
また、Norikra の場合、新しく集計対象を登録、削除する場合でも、再起動が不要、というのも大きな魅力です。 Fluentd の場合、新しい集計処理を登録しようと conf を書き換えた場合、再起動が必要になりますからね。graceful restart の機能欲しい(チラッ
fluent-plugin-datacounter を用いた集計処理
置き換え対象とする fluent-plugin-datacounter 設定は次のようになります。
要件
以下のような要件があるものとします。
- ログに含まれる status フィールドを元に、単位時間あたりのHTTPステータスコードのカウントを出したい
- 1 をホスト毎に出したい
- ホスト全体の合算値としても出したい
入力データの仕様
この記事では、入力データは「visualizer.ログ名.ホスト名」というタグで送信されてくるものとします。 ここで、「ログ名」とはログを識別するために運用者が任意でつけた文字列のことを指します。
また、メッセージには time, status, reqtime, method, uri フィールドが以下のような形式で入っているものとします。
例) visualizer.api_restful.host001
{"status":"403","reqtime":"0.123","method":"GET","uri":"http://example.com/v1/restful/people?id=1"}
設定
fluent-plugin-datacounter を利用する場合は、以下のようにして実現が可能です。
<source>
type forward
port 24224
</source>
<match visualizer.api_restful.*>
type copy
# ホスト毎
<store>
type record_reformer
enable_ruby false
tag ${tag_prefix[-2]}.hosts.${tag_parts[-1]}
</store>
# 全体
<store>
type record_reformer
enable_ruby false
tag ${tag_prefix[-2]}.all.all
</store>
</match>
<match visualizer.api_restful.{hosts,all}.*>
type datacounter
count_interval 60
aggregate tag
output_per_tag yes
tag_prefix status_count
count_key status
pattern1 2xx ^2\d\d$
pattern2 3xx ^3\d\d$
pattern3 400 ^400$
pattern4 4xx ^4\d\d$
pattern5 503 ^503$
pattern6 5xx ^5\d\d$
</match>
出力結果は以下のようになります(不要なフィールドの記述を省略しています)。
status_count.visualizer.api_restful.hosts.host001:
{"2xx_count":5,"3xx_count":0,"400_count":0,"4xx_count":0,"503_count":0,"5xx_count":0}
これを Norikra に置き換えることを考えます。
Norikra に置き換えてみる
Norikra のドキュメントおよび fluent-plguin-norikra の README を参考に、 fluent-plugin-norikra を out_norikra, Norikra Query (正確には Esper の EPL と呼ばれるもの)、in_norikra に置き換えてみます.
out_norikra
データを受け取って、Norikra にデータを流す Fluentd の設定を以下のようにします。
データのタグ「visualizer.ログ名.ホスト名」のうち「ログ名」を Norikra の target (RDBMS でいう table のようなもの) に指定するようにしています。 また「ホスト名」をタグから取り出して、メッセージの host フィールドに設定しています。
<source>
type forward
port 24224
</source>
<match visualizer.api_restful.**>
type record_reformer
enable_ruby false
tag reformed.${tag_prefix[-2]} # タグからホスト名を取り除き、
<record>
host ${tag_parts[-1]} # ホスト名は host フィールドに設定. GROUP BY 句で利用するため
</record>
</match>
<match reformed.visualizer.api_restful>
type norikra
norikra localhost:26571
remove_tag_prefix reformed.visualizer
target_map_tag true # target が api_restful になる
<default>
auto_field false # norikra includes fields only used in queries.
</default>
</match>
Norikra Query
Norikra クエリにはJava コードを書けるので、String#matches を使うことによって fluent-plugin-datacounter と、同等な条件を実現することができます。 また、GROUP BY 句を使って、ホスト毎の集計を取ることができます。秀でている。
# target の作成
norikra-client target open api_restful
# 全体。AS `2xx_count` とはできないので AS count_2xx にしている
norikra-client query add status_count.all.api_restful "$(cat <<EOF
SELECT \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM api_restful.win:time_batch(60 sec)
EOF
)"
# ホスト毎
norikra-client query add status_count.host.api_restful "$(cat <<EOF
SELECT \
host, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM api_restful.win:time_batch(60 sec) \
GROUP BY host
EOF
)"
クエリを以下のようにしても実現できますが、クエリの数が多くても扱いづらいので、 上記のように EPL の COUNT(1, 条件)文法を使ってまとめて1つのクエリで集計するように工夫しています。
# これだと辛い
$ norikra-client query add status_count.all.2xx.api_restful \
"SELECT COUNT(status) AS count_2xx \
FROM api_restful.win:time_batch(60 sec) \
WHERE status.matches('^2\d\d$')"
$ norikra-client query add status_count.all.3xx.api_restful \
"SELECT COUNT(status) AS count_3xx \
FROM api_restful.win:time_batch(60 sec) \
WHERE status.matches('^3\d\d$')"
# 400 (略
# 4xx (略
# 503 (略
# 5xx (略
in_norikra
Norikra が計算してくれた結果を 60s 毎に取り出すには、Fluentd の設定を以下のようにします。 ここでは、取り出したメッセージのタグ名が、query add で指定した「クエリ名」になるように tag query_name を指定しています。
<source>
type norikra
norikra localhost:26571
<fetch>
method event
target status_count.all.api_restful # クエリ名
tag query_name
# tag string api_restful.status_count.all
# tag field field_name
interval 60s
</fetch>
<fetch>
method event
target status_count.host.api_restful # クエリ名
tag query_name
interval 60s
</fetch>
</source>
これで完成です。
出力は次のようになります。(datacounter と同じくフィールド名を "2xx_count" のようにしたかったのですが、Norikra の仕様上、数字始まりにはできないようです。残念。。。)
status_count.host.api_restful:
{"host":"host001","count_2xx":5,"count_3xx":0,"count_400":0,"count_4xx":0,"count_503":0,"count_5xx":0}
Norikra に置き換えてみる - 汎化版
さきほどの設定で、置き換えはできたのですが、このままでは、 新たなログを集計対象に追加したい場合に、out_norikra、in_norikra の設定ファイルを書き換え、Fluentd を再起動する必要があります。
Fluentd を再起動することなく、集計処理の対象を追加できる汎用的な状態を目指してみます。
out_norikra
データを受け取って、Norikra にデータを流す Fluentd の設定を以下のように変更します。 「ログ名」を label というフィールドに入れています。
<source>
type forward
port 24224
</source>
<match visualizer.**>
type record_reformer
enable_ruby true
tag reformed.visualizer
<record>
host ${tag_parts[-1]} # ホスト名
label ${tag_parts[1..-2].join('.')} # ログ名
</record>
</match>
<match reformed.visualizer>
type norikra
norikra localhost:26571
target_string visualizer
<default>
auto_field false
</default>
</match>
Norikra Query
タグ名が visualizer で始まるメッセージを全て、 Norikra の visualizer ターゲットに投入しています。
そして、GROUP BY label で「ログ名」ごとに集計を出すようにしています。
# target の作成
norikra-client target open visualizer
# 全体
norikra-client query add status_count.all "$(cat <<EOF
SELECT \
label, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM visualizer.win:time_batch(60 sec) \
GROUP BY label
EOF
)"
# ホスト毎
norikra-client query add status_count.host "$(cat <<EOF
SELECT \
label, host, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM visualizer.win:time_batch(60 sec) \
GROUP BY label, host
EOF
)"
in_norikra
tag query_name ではなく tag field label として「ログ名」をタグとして取り出すようにします。
<source>
type norikra
norikra localhost:26571
<fetch>
method event
target status_count.all # クエリ名
tag field label # 「ログ名」をタグ名として取り出す
tag_prefix status_count.all # status_count.all.ログ名
interval 60s
</fetch>
<fetch>
method event
target status_count.host # クエリ名
tag field label # 「ログ名」をタグ名として取り出す
tag_prefix status_count.host # status_count.host.ログ名
interval 60s
</fetch>
</source>
これで新しいログを集計対象に加える場合でも、新たに設定を追加する必要がなくなりました。
Norikra に置き換えてみる - 特化版
アプリによってステータスコード体系が異なるなどして、汎用的な共通設定項目が1つあるだけでは不十分な場合があります。1つ具体的な話をすると JSON-RPC を使うアプリの場合 status コード体系が HTTP API とは異なるのです。 See JSON-RPC 2.0 Specification - Error Object
そのような要件も満たせるように、特別な設定項目に特化できるようにしつつ、それでもなお、Fluentd の再起動は必要とされない姿を目指してみます。
ここでは、例としてログ名が「api_restful」なログに加えて、新たに「api_jsonrpc」なログを加えるような状況を想定します。
out_norikra
データを受け取って、Norikra にデータを流す Fluentd の設定を以下のようにします。
<source>
type forward
port 24224
</source>
<match visualizer.**>
type record_reformer
enable_ruby true
tag reformed.${tag_prefix[-2]} # ホスト名を取り除く
<record>
host ${tag_parts[-1]} # ホスト名
</record>
</match>
<match reformed.visualizer.**>
type norikra
norikra localhost:26571
remove_tag_prefix reformed.visualizer
target_map_tag true # 「ログ名」が target 名になる
<default>
auto_field false
</default>
</match>
Norikra Query
「ログ名」をターゲット名にしたため、ログの種類が増えた場合、Norikra に target を追加する必要がありますが、再起動は不要です。
クエリグループを利用し、api_restful も api_jsonrpc も同じ名前で取り出せるようにしています。 新しく api_jsonrpc 用のクエリを追加する必要はありますが、やはり再起動は不要です。
# target の作成
norikra-client target open api_restful
norikra-client target open api_jsonrpc
# api_restful 全体
norikra-client query add -g status_count.all status_count.all.api_restful "$(cat <<EOF
SELECT \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM api_restful.win:time_batch(5 sec)
EOF
)"
# api_restful ホスト毎
norikra-client query add -g status_count.host status_count.host.api_restful "$(cat <<EOF
SELECT \
host, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM api_restful.win:time_batch(5 sec) \
GROUP BY host
EOF
)"
# api_jsonrpc 全体: AS count_-32603 はエラーとなるため、AS count_32603 としている
norikra-client query add -g status_count.all status_count.all.api_jsonrpc "$(cat <<EOF
SELECT \
COUNT(1, status.matches('^-32603$')) AS count_32603, \
COUNT(1, status.matches('^-3(?!2603)\d\d\d\d$')) AS count_3xxxx \
FROM api_jsonrpc.win:time_batch(60 sec)
EOF
)"
# api_jsonrpc ホスト毎
norikra-client query add -g status_count.host status_count.host.api_jsonrpc "$(cat <<EOF
SELECT \
host, \
COUNT(1, status.matches('^-32603$')) AS count_32603, \
COUNT(1, status.matches('^-3(?!2603)\d\d\d\d$')) AS count_3xxxx \
FROM api_jsonrpc.win:time_batch(60 sec) \
GROUP BY host
EOF
)"
in_norikra
method event を method sweep に変更すると、target にクエリグループ名を指定することができます。
<source>
type norikra
norikra localhost:26571
<fetch>
method sweep
target status_count.all
tag query_name
interval 60s
</fetch>
<fetch>
method sweep
target status_count.host
tag query_name
interval 60s
</fetch>
</source>
総じて、Fluentd を再起動することなく、ログ毎に特別な設定項目を作れるようになりました。
おまけ:win:ext_timed_batch
win:time_batch の代わりに、win:ext_timed_batch を利用すると、メッセージの到着時間ではなく、メッセージに含まれる time フィールドの値を元に集計してくれるようになります。 Fluentd の out_forward による伝送遅延があるような場合でも、win:ext_timed_batch を使えば大きな誤差なく、集計結果を得られる可能性があります。※ 筆者注:現在検証中です
例えば、「汎化してみる」の設定を以下のように変更することで利用可能となります。
out_norikra
メッセージが持っている epoch 時間を time フィールドに追加しています。
<source>
type forward
port 24224
</source>
<match visualizer.**>
type record_reformer
enable_ruby true
tag reformed.visualizer
<record>
host ${tag_parts[-1]} # ホスト名
label ${tag_parts[1..-2].join('.')} # ログ名
time ${time.to_i}
</record>
</match>
<match reformed.visualizer>
type norikra
norikra localhost:26571
target_string visualizer
<default>
auto_field false
</default>
</match>
Norikra Query
fluent-plugin-record-reformer が typecast に対応しておらず文字列として epoch time を出力するので、target を作成する際に time フィールドを明示的に integer 指定しています。cast する fluentd プラグインもあるのでそちらを使ってもよいと思います.
また、クエリで win:ext_timed_batch を利用するように置き換えています。第一引数は milliseconds epoch time とのことなので * 1000 しています。
# target の作成
norikra-client target open visualizer time:integer
# 全体
norikra-client query add status_count.all "$(cat <<EOF
SELECT \
label, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM visualizer.win:ext_timed_batch(time * 1000, 60 sec) \
GROUP BY label
EOF
)"
# ホスト毎
norikra-client query add status_count.host "$(cat <<EOF
SELECT \
label, host, \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM visualizer.win:ext_timed_batch(time * 1000, 60 sec) \
GROUP BY label, host
EOF
)"
in_norikra
こちらは変更ありません
<source>
type norikra
norikra localhost:26571
<fetch>
method event
target status_count.all # クエリ名
tag field label # 「ログ名」をタグ名として取り出す
tag_prefix status_count.all # status_count.all.ログ名
interval 60s
</fetch>
<fetch>
method event
target status_count.host # クエリ名
tag field label # 「ログ名」をタグ名として取り出す
tag_prefix status_count.host # status_count.host.ログ名
interval 60s
</fetch>
</source>
課題もしくは公開質問
以下、解説に便乗して、まだ解決できていない課題について、公開質問させて頂きたいと思います。
Q. 1台で捌ききれなくなった場合に、どう分散させたら良いか
これはすでに Norikra 界(ってなんだ)のパイオニア @kazunori_279 氏より貴重な知見を頂いていて、
cf. https://twitter.com/kazunori_279/status/461756753932472320
とりあえず全部のノードにNorikra入れておけばむずかしいこと考えず集計値のみ後段Norikraに転送していくらでもスケールすると思うのだけど。
ということで、拙作 fluent-plugin-hash-forward を使うなどして、Fluentd 側で同じログは同じプロセスにルーティングするようにした上で、 ローカルの Norikra に投入して集計結果を取り出せば良さそう、ということでその方針でやってみようと思っています。
Q. クエリ結果を target に保存できるか
InfluxDB には、Continuous Query と呼ばれる、 あらかじめ Query を仕込んでおくと、クエリの結果を別途 Series (RDBMS でいう table のようなもの)に保存してくれる機能があります。 この Series に対して、さらにクエリを発行することができるわけです(はず)。
Norikra でもこれができると、ステータスコードの集計結果に対して、
SELECT count_5xx FROM status_count.all.api_restful WHERE count_5xx > 1
のようなクエリを発行することで、5xx なログが1件でもあったらアラートを飛ばす、 というような処理を、あまりコストをかけずにできるようになると思います。
一度、in_norikra で取り出して、再度 out_norikra で Norikra に投入する形になるのでしょうか? 直接 target に保存できると良いのですが。
Q. GROUP BY に Java コードを書きたいような場合、どうしたらよいか
以下のような(とても辛い)要求があります。
- エンドポイント毎にもステータスコードの集計を出したい。ただし、エンドポイントは method と uri フィールドから取り出す必要がある。
例えばこのようなログデータの場合、
{status":"403","reqtime":"0.123","method":"GET","uri":"http://example.com/v1/restful/people?id=1"}
method と uri から、GET_people という文字列をエンドポイントとして抜き出す必要があります。(うぅ、辛い...
そこで、以下のように GROUP BY 句に対して、Java コードを書こうとしてみた所、GROUP BY 句には Java コードを使えません、と怒られてしまいました。
# エンドポイント毎
$ norikra-client query add status_count.endpoint.api_restful "$(cat <<EOF
SELECT \
COUNT(1, status.matches('^2\d\d$')) AS count_2xx, \
COUNT(1, status.matches('^3\d\d$')) AS count_3xx, \
COUNT(1, status.matches('^400$')) AS count_400, \
COUNT(1, status.matches('^4(?!00)\d\d$')) AS count_4xx, \
COUNT(1, status.matches('^503$')) AS count_503, \
COUNT(1, status.matches('^5(?!03)\d\d$')) AS count_5xx \
FROM api_restful.win:time_batch(60 sec) \
GROUP BY method.concat("_").concat(Arrays.asList(Arrays.asList(uri.split("\\?")).get(0).split("/")).get(4))
EOF
)"
何かうまい手はないでしょうか?アプリ直せよって?ははは..
Q. スレッド数のベストプラクティスは?
Norikra に起動オプション、inbound-threads, outbound-threads, route-threads, timer-threads があるが、 例えば CPU コアが 32 だとした場合、それぞれいくらぐらいにすると良いのかベストプラクティスが欲しいですね。それぞれ 3:3:2:3 ぐらいの比率で振り分けるとよさそう??
また、inbound-thread-capacity, outbound-thread-capacity, route-thread-capacity, timer-thread-capacity は一体なんのパラメータなのかすらわかっていないので、Esper に詳しい人にご教授お願いしたい。。。
なお、Norikra の JVM パラメータについては NorikraのJVMチューンで苦労している話 - SlideShare を参考にさせていただいております :D
まとめ
fluent-plugin-datacounter と同等の集計処理を Norikra で実施する設定方法を解説しました。
まえおきとして、なぜ Norikra に置き換える必要があるのか、私の考えを述べました。
その後、fluent-plugin-datacounter を用いた集計処理を Norikra に置き換える設定方法を述べました。
- 設定変更が不要な、汎化された設定方法、
- 特別な設定項目を作れるようにしつつ、Fluentd の再起動は不要とする設定方法、
- おまけとして win:ext_timed_batch を用いた設定
の解説をしました。
まだ、本番への投入はもちろんのこと、パフォーマンス測定もおわっていなければ、 「公開質問」のセクションで書いたような残課題も残っている状態なので、もっと触っていきます。