Fluentd + Kinesis + Elasticsearch + Kibana / Grafanaでのリアルタイムログ解析基盤

こんにちは、SUUMOスマホサイトの開発チームに所属しているエンジニアの上野です。

今回は、リアルタイムログ解析基盤を紹介します。

背景

皆様はwebサーバログ監視(アクセスログ・エラーログなど)をどのように行われているでしょうか?
スーモスマホサイトでは、アクセス数増加に伴いサーバ台数が増え、csshX地獄に陥りました。(以下の図のような状態のことです。)

ログの確認が必要になった際に、このcsshXを使用して秘伝のワンライナーで確認したい部分をtailして抽出していましたが、属人的な作業で作業時間もかかるため、だれもが嫌がる作業となっていました。(一人で視力の低下と戦っていました)

リアルタイムログ解析基盤

そこで、リアルタイムログ解析基盤を構築しました。

なにがうれしいか?

以下が可能になり、とても幸せになりました。

1. ログが一箇所に集約され、誰でもブラウザ上で確認できるようになった
2. 単に検索するだけでなくグラフなどでビジュアライズできるようになった

 

実際の構成がこちら

構成要素

データ分析では、
1.収集 → 2.変換 → 3.保存 → 4.分析 → 5.表示 → 6.運用
という流れがあります。

また、最初の収集・変換などに関して以下の役割の考え方があります。

  • Forwarder(転送元ノード)
  • Aggregator(集約ノード)
  • Processor(フィルタ処理ノード)

今回の構成では、それぞれ以下のツールを使用しています。

役割 使用ツール
Forwarder fluentd fluentd
Aggregator Amazon Kinesis
Processor Kinesis App

 

残りの「3.保存 → 4.分析 → 5.表示」に関してですが、それぞれ以下のようにしています。

役割 使用ツール
保存・分析 Elasticsearch
ビジュアライズ Kibana
Grafana

 

構築にあたって

今回のシステムでは以下のログを集約しています。

  • apache access_log
  • apache error_log
  • application_log
  • php_log
  • CloudWatchログ(今回の話には無関係)

など

 

Fluentd

まず、これらのログをfluentdでtailさせ、aws-fluent-plugin-kinesisでAmazon Kienesisに転送するのですが、以下の2点で詰まりました。

1. fluentd起動時のCPU張り付き

大量に生成されるログで起動時にCPUが100%張り付き状態に陥りました。

1つのCPUに処理が偏るため、処理を分散させる必要がありました。

そこで、fluent-plugin-multiprocessを使用して各ログのtail処理を別々の子プロセスで実行させるようにしました。

実際の設定が以下になります。

...(省略)...

<source>
  type multiprocess

  <process>  ★1つ目のログをtailするためのプロセス
    cmdline -c /etc/td-agent/[別のログをtailするための設定ファイル] --log /var/log/td-agent/xxx.log
    sleep_before_start 1s
    sleep_before_shutdown 5s
  </process>

  <process> ★2つ目のログをtailするためのプロセス
    cmdline -c /etc/td-agent/[別のログをtailするための設定ファイル] --log /var/log/td-agent/xxx.log
    sleep_before_start 1s
    sleep_before_shutdown 5s
  </process>

...(省略)...

 

2. applicatioin_logの複数行にわたるログ
アプリケーションログでは、ログにdumpしたものを吐かせていたためFluentdでtailした際に、スタックトレースとして吐かれた連想配列などは細切れにされて転送されていました。

Array(
  [timestamp] => 2007-04-06T07:16:37-07:00
  [message] => 通知メッセージ
  [priority] => 6
  [priorityName] => INFO
)

そこで、formatのmultilineを使用して複数行のログを取得できるようにしています。

...(省略)...
<source>
  type tail
  path /path/to/logfile
  format multiline
  format_firstline /正規表現/  ★ここの正規表現に次にマッチするまでを複数行として取得
  format1 /ログの取得フォーマットの正規表現/
  time_format %Y/%m/%d %H:%M:%S.%L
  ...(省略)...
</source>
...(省略)...

上記の設定の際に、正規表現と戦うことになるのですが、こちらのサービスで少し楽をできます。実際にtailしたいログを準備してテストできます。
http://fluentular.herokuapp.com/

Amazon Kinesis

次にAmazon Kinesisですが、1つのストリームに複数のシャードを作成しています。
集約口なので全部一緒くたにしています。(こうしたほうがコスト効率も良いです。)
実際にfluentdから転送するために、aws-fluent-plugin-kinesisを使用しています。

...(省略)...
<match **>
  type kinesis
  stream_name [kinesisで作成したストリーム名]
  aws_key_id xxxxxxxxxxx
  aws_sec_key xxxxxxxxxxx
  use_yajl true
  region [作成したリージョン]
  ...(省略)...
</match>
...(省略)... 

Kinesis App

今回の構成では、以下のようにAmazon Lambdaを使用していません。
Amazon Kinesis で AWS Lambda を使用する

EC2にSDKを使用して構築したほうが運用コストが少ないため、こっちの選択肢を取っています。

実際のコードの一部が以下になります。

...(省略)... 

  # ★シャードを取得する部分
  def get_shard_iterators
    response = @client.describe_stream({
      stream_name: @stream_name,
      limit: 1,
    })
    shard_ids = []
    response.stream_description.shards.each do |shard|
      shard_ids.push(shard.shard_id)
    end

    shard_iterators = []
    shard_ids.each do |shard_id|
      response = @client.get_shard_iterator({
        stream_name: @stream_name,
        shard_id: shard_id,
        shard_iterator_type: "LATEST",
      })
      shard_iterators.push(response.shard_iterator)
    end
    return shard_iterators
  end

  ...(省略)... 

  # ★シャードイテレータを取得する部分
  def get_records shard_iterator
    response = @client.get_records({
      shard_iterator: shard_iterator,
      limit: @record_limit,
    })
    return response
  end

  ...(省略)...

  # ★レコードを取得する部分
 def get_records shard_iterator
    response = @client.get_records({
      shard_iterator: shard_iterator,
      limit: @record_limit,
    })
    return response
  end
...(省略)...

また、送信する部分ですが、ある程度まとめて送信することも考えられるのですが、リアルタイム性が損なわれるため、1つ1つのレコードを逐次送信するようにしています。

このときに、単純に1つ1つのレコードを送信していたら、処理に時間がかかってしまうため、em-synchronyを使用して並列的に送信するようにしています。

Elasticsearch

加工処理まで済むと次は保存になります。

ここでも詰まったポイントが2つありましたが、その問題と解決方法を書いていきます。

デフォルトのマッピングのままで、すべてのデータを保存しようとすると容量を多く使用してしまいます。
そこで、マッピングに“_all”:{“enabled”:false}を追加することで、容量を大幅に削減できます。
こちらの記事によると24%程度削減できるようです。

Elasticsearchインデクシングパフォーマンスのための考慮事項

また、削除期限も設定可能です。先ほどの_allの使用も含めたマッピングが以下になります。

...(省略)...
    "mappings" : {
      "access_log" : {
        "_all" : {
          "enabled" : false ★_allの使用をやめる
        },
        "_ttl" : {
          "enabled" : true,
          "default" : 691200000   ★TTL(Time To Live)を指定
        },
...(省略)...

また、メモリが逼迫して検索できないことがありますが、“doc_value”: trueとすることで、ディスクに保存・使用することが可能となり、メモリ問題が解消できます。

Kibana / Grafana

最後にデータをビジュアライズするKibana / Grafanaですが、今回のケースでは運用・セキュリティなどの観点からElasticsearchと別のインスタンスに構築して、Elasticsearchへの直アクセスを遮断しています。

この場合、Elasticsearchの各種プラグインが使用できなくなってしまいます。

そこで、nginxをリバースプロキシとして利用することで、Kibana自体とElasticsearchプラグインだけでなく、grafanaなどを1つのサーバで提供できるようにしています。

こだわった点

以下、2つにこだわりました。

kibanaをいじってスーモ仕様にしました

実際の画面が以下になります。(*Kibanaのソースをいじればグラフの色なども自由に変更できます)

Amazon Elasticsearchを使用していません
Amazon Elasticsearchはフルマネージドなので、スクリプトを埋め込むことができません。
今回のケースだと、groovy scriptでデータの加工を行う必要性があったため、自前で構築しています。

 

まとめ・感想

今回構築したリアルタイムログ解析基盤ですが、インフラ担当者だけでなく、開発者やそれ以外の方にも利用していただける結果になりました。

現在は、開発者の席付近に置いている大画面に、1)4XXエラー推移などエラー系情報や、2)Time To First Byteなど速度指標などを表示させて開発者さん含めいろんな方に意識してもらうようにしています。

さらに、ログ調査だけでなくリリースしたサービスが正常に動作しているかどうかなども確認することができ、開発者の精神安定剤としても役に立っています。

最後にログ情報はもっと他の用途にも利用可能だと思っています。
例えば、ElasticsearchではAPIを利用してデータを抽出することができるため、異常検知システムも可能ですし、
リアルタイムな情報をを生かして、サイト内でのレコメンドなども可能だと思います。

今後

今回ざっと作成してみての課題と展望になります。

  • fluentdでログの欠損が生じており、すべてのログが拾えているわけではないので欠損率を0に
  • 今回のシステムだとそこまでリアルタイム性が求められるものでもないため、Elasticsearchへbulkで送信してもよかったかな

 

オーケストレーションツールConsulの紹介(1)

はじめに Webアプリケーション屋さんの皆様こんにちは、tomitaで…

SUUMOスマホサイトでのChatOps事例

こんにちは。SUUMOスマホサイトの開発チームに所属している新人エンジ…

テックブログをHTTP/2+PHP7+WordPressで構築してみた

こんにちは。スマホ最適化サイトの開発チームに所属している新人エンジニア…