<< Home

本番導入出来なかったけどGoでちょっと早いfluent-loggerを作った時の話

2017-12-15

この記事はGo4 Advent Calendar 2017の12/15のエントリです。

Go2 Advent Calendar 2017の1日目の記事で、go-fluent-clientの紹介 という lestrrat さんの投稿があり、そういえば今年の初めに転職やら色々あって導入までは出来なかった Go の fluent-logger 作ったなということを思い出したので、当時どんな感じで作っていたかを踏まえて簡単に紹介してみようと思います。

daichirata/fluent-logger-go

元のコードに関しては導入しないと決めた時にとりあえずファイルだけ Github に上げてるだけの状態だったので一旦別ブランチに退避して、今回は当時を再現しつつ1からコミットし直してみたいと思います。

そもそもなんでわざわざ作ったかというと、最近は Go をそもそもあまり触っていないので当時と同じ状況なのかどうかはわかりませんが、その時のモチベーションとして

  • fluentdに対して非同期にメッセージを送信したい (ここの送信でレスポンスに影響を出したくない)
  • 送信できなかったメッセージは、そのイベントをキャッチしてファイルや別経路の出力に退避させたい
  • embedされた構造体を、そのまま logger に渡しても上手くエンコードして送信して欲しい
    • ちょっとここは記憶が曖昧ですが、確か何らかの制限があって Post するときには自分で map にして渡すのが一番安全という感じだったような気がします

という所があって、オフィシャルで対応するのは大改造が必要っぽくてちょっと厳しそう + logger だけなら作るのそんなに大変じゃなそうだったので作ったとかだったような。

最終的には結構いい感じの速さになったんですが、不真面目なのでわりと感で最適化するアンチパターンで作っているので最適化の余地はかなりありそう。そもそも実戦投入出来ていないので導入もオススメ出来ません。

というわけでそれでは後発らしく、より速く、よりちょっと便利を目指してやって行きましょう。

Initial Commit

884b834e213fc999a3484feeba77fbeb68d2a942

最初のコミットに関しては、とりあえず動くことを目標に作ったので基本的に全ての機能が logger.go に同期処理で書かれてます。

type Logger struct {
 	conf Config
	conn io.WriteCloser
	bmu	 sync.Mutex // buffer mutex
	cmu	 sync.Mutex // connection mutex
	buf	 []byte
}

func (logger *Logger) PostWithTime(tag string, t time.Time, obj interface{}) error {
	record := []interface{}{
		tag,
		t.Unix(),
		obj,
	}

	buf := bytes.NewBuffer([]byte{})
	enc := msgpack.NewEncoder(buf)
	if err := enc.Encode(record); err != nil {
		return err
	}
	raw := buf.Bytes()

	logger.bmu.Lock()
	logger.buf = append(logger.buf, raw...)
	logger.bmu.Unlock()

	return logger.send()
}

PostWithTime でエンコードした後に logger.buf にデータを詰めて後は send() で送信してる感じで、一旦 buf に詰めてるのは送信に失敗した時にそのまま pending として扱う為です。 Mutex を2つ持っているところとかが中々にダサいですが、最初としてはまあこんな所でしょう。この状態で一旦公式とベンチマークを取ってみます。(ちなみに、benchmarkはこのファイルを最初からコミットしているので、今後は常にこちらを実行しています。)

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  100000	     14693 ns/op	    1958 B/op	      14 allocs/op
BenchmarkStructOfficial-4     	  100000	     18444 ns/op	    4891 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  100000	     15340 ns/op	    1671 B/op	      13 allocs/op
BenchmarkMapOfficial-4        	  100000	     19802 ns/op	    5971 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	7.535s

意外な事に現段階で既にオフィシャルより早いですが、そもそも機能が少ないという所もあるのでこの値より遅くならない事を目標にやって行きましょう。

送信を非同期に

38301f437e598c84b097a7a4487d6fe927f56403

先ずは、最初の目標として送信部分の非同期対応を行います。対応内容としては

  • fluentdへの接続を確立出来た後、送信用の goroutine を1つ立ち上げる
  • buffer に対する dirty channel を用意し、Post 実行時に send を呼び出す代わりに dirty に非同期で通知する
  • 送信用 goroutine は、 dirty channel か ticker を契機に送信処理を実行する
    • dirty -> 新規書き込み
    • ticker -> 送信失敗時の pending データの送信

という感じです。正直まだまだ現段階では問題が沢山有ります。(この実装だとdirtyへの通知多いよねとか) が、これからコードをどんどん変えていくことになるのでどんどん先に行っちゃいましょう。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	     10172 ns/op	    1389 B/op	      14 allocs/op
BenchmarkStructOfficial-4     	  100000	     20515 ns/op	    4839 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	     11976 ns/op	    1107 B/op	      13 allocs/op
BenchmarkMapOfficial-4        	   50000	     23154 ns/op	    5929 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	8.343s

ナイーブな非同期対応でも、正常系だけ見ればそれなりに効果は出るっぽいですね。

Buffer を別の構造体として管理

e0f40b163ac68657d734463a30a601190d64a498

この先作り込んでいく前に、早めの段階で logger の buffer を外に切り出してます。これで書込みの時と送信の時に logger 側から直接 Mutex を操作する必要がなくなったので、少し見通しが良くなったでしょうか。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      7210 ns/op	    1601 B/op	      14 allocs/op
BenchmarkStructOfficial-4     	  100000	     18486 ns/op	    4896 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      7657 ns/op	    1194 B/op	      13 allocs/op
BenchmarkMapOfficial-4        	  100000	     20273 ns/op	    5967 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	7.419s

パフォーマンス向上への影響も大きですね。 buffer を切り出すことでロックの粒度が細かくなったお陰で send のスループットが上がったからでしょうか。

エンコード用の byte slice を Pool で管理

d0cdf96d8d53cee100097ee77d07bd4519452f10

基本的に logger などのように文字列など場合によっては大きなデータを扱いつつ、返り値としてはエラーなどしか返さないようなライブラリの場合は大抵 sync.Pool による最適化が可能です(ベストでは無いかもしれませんが)。 今回のケースで言うと Post で受け取った構造体を MessagePack にエンコードする際の byte slice は送信後その領域自体を使いますことが可能です。なので buffer に Add する際のデータを []byte から Message 構造体に変え、その構造体を使いまわすように変更してみます。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      6122 ns/op	    1311 B/op	       8 allocs/op
BenchmarkStructOfficial-4     	  100000	     18993 ns/op	    4889 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      6570 ns/op	     988 B/op	       7 allocs/op
BenchmarkMapOfficial-4        	  100000	     20283 ns/op	    5974 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	7.036s

狙い通り、 allocation をかなり抑えることが出来てるっぽいですね。

ついでに、 MessagePack のデコーダー自体も byte slice と対になるようにして使いまわすようにしてみました。

2620523219a24ad483ee54883cf1f60d86e6ef0e

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      5872 ns/op	    1257 B/op	       6 allocs/op
BenchmarkStructOfficial-4     	  100000	     18894 ns/op	    4890 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      6557 ns/op	     908 B/op	       5 allocs/op
BenchmarkMapOfficial-4        	  100000	     20657 ns/op	    5969 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	6.993s

更に抑えられてていい感じですね。

buffer の dirty 通知を抑える

b81248d643ed44e2b7619bf47be88e285c33e05b

最初の方で話していたとおり、今の実装だと dirty への書き込みが多すぎて、無駄に goroutine を抱えることになります。(と言うか send が止まると無限に増える) buffer への読み書きは常にロックを取ったシーケンシャルな処理なので、そこで対処できそうです。

  • buffer の領域を新規書き込みと、再送中のデータに分離
  • 新規書き込み時、新規書き込みの領域にデータが溜まっていない(そのデータのみ)場合のみ、dirty 通知を行う
    • データが既にある場合、通知済みで pop されるのを待っている状態なので不要です

という感じで対応してみました。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      5759 ns/op	    1516 B/op	       8 allocs/op
BenchmarkStructOfficial-4     	  100000	     18354 ns/op	    4897 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      5508 ns/op	    1220 B/op	       7 allocs/op
BenchmarkMapOfficial-4        	  100000	     20165 ns/op	    5960 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	6.666s

領域が増えたことで allocation とメモリの消費量が増えてしまっていますが、対応しなければ行けない所なので諦めましょう。ちょっとだけ性能が改善しているのは誤差か或いは goroutine の起動が抑えられているからかもしれません。

再送系の処理を CircuitBreaker で改善

ネットワークの再送といえばAWSでよく使われるので Exponential Backoff の概念が最近だとよく見かけます。

MAX_RETRIES = 10
retries = 0
begin
    // 何かの処理
rescue => e
  if retries < MAX_RETRIES
    retries += 1
    sleep 2 ** i 
    retry
  else
    raise
  end
end

今回は更に、書き込みでエラーが起きた際には別の出力に切り替えるみたいな処理を間に挟みたいと思っているので、上記リトライに近いことが出来て更にブロックせずに扱いやすい CircuitBreaker を導入します。

3112bd1ec8a12ed164560fa34bd5598b0caabcf1

先ずはバッファ周りの処理と書き込み処理を分離。

cd0e0b7e95988cd7416b9386cfa74c22c7f64c36

次に CircuitBreaker を導入します。

func (logger *Logger) Subscribe() <-chan circuit.BreakerEvent {

で channel でイベントを受け取れるようにしているので、 fluent の書き込みに失敗した時にログを吐いて監視システムで拾うとかすると便利かもしれません。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      5970 ns/op	    1447 B/op	       8 allocs/op
BenchmarkStructOfficial-4     	  100000	     19022 ns/op	    4856 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      6034 ns/op	    1091 B/op	       6 allocs/op
BenchmarkMapOfficial-4        	  100000	     21286 ns/op	    5938 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	7.005s

パフォーマンス的にも特に問題なさそうですね。

ErrorHandler の概念を追加

af41bfd784ca84efef94a7b09d024953367c6657

最後に、一番やりたかったエラーが起きたときにハンドルする為のAPIを追加していきます。Go には http.HandlerFunc という関数型に対して関数を定義するという中々カッコイイ機能があるので、それを参考にします。

type Logger struct {
	ErrorHandler ErrorHandler

// ......

if logger.ErrorHandler != nil && len(messages) > logger.conf.PendingLimit {
	err = logger.ErrorHandler.HandleError(err, data)
}

// ......

type ErrorHandler interface {
	HandleError(error, []byte) error
}

type ErrorHandlerFunc func(error, []byte) error

func (f ErrorHandlerFunc) HandleError(err error, data []byte) error {
	return f(err, data)
}

このように定義しており、無名関数を ErrorHandlerFunc にキャストするか或いは HandleError を実装した構造体で、エラーが発生した場合に処理を受けれるようになっています。

具体的な使用例を幾つか追加していて、例えば書き込みに失敗した場合、もう一つの logger にフォールバックしたり、データをJSONとして指定した io.Writer に流すとかを出来るようにしています。

error_handler.go

logger, err := fluent.NewLogger(fluent.Config{})
if err != nil {
	// TODO: Handle error.
}

// Logging error.
logger.ErrorHandler = fluent.ErrorHandlerFunc(func(err error, _ []byte) error {
	log.Println(err)
	return err
})

// Fallback logger.
fallback, err := fluent.NewLogger(fluent.Config{})
if err != nil {
	// TODO: Handle error.
}
logger.ErrorHandler = fluent.NewFallbackHandler(fallback)

// Fallback json to stdout.
logger.ErrorHandler = fluent.NewFallbackJSONHandler(os.Stdout)

この機能の特徴として、HandleError と送信のエラー判定を結合させています。つまり、この関数がエラーを返さなかった場合はメッセージは pending にならずに成功したものとして扱います。逆にいうとエラーを返した場合は同じメッセージが何回も流れて来ます。フォールバックさせる場合には再送周りをコントロールする必要があるので、この方が何かと都合が良かったりするので一旦この形で落ち着いています。

パフォーマンス

というわけで、色々やってきましたが最終的なベンチマークはこんな感じになりました。

cd benchmark && go test -bench . -benchmem
enable dummay daemon
goos: darwin
goarch: amd64
pkg: github.com/daichirata/fluent-logger-go/benchmark
BenchmarkStructDaichirata-4   	  200000	      6027 ns/op	    1474 B/op	       8 allocs/op
BenchmarkStructOfficial-4     	  100000	     18551 ns/op	    4889 B/op	      35 allocs/op
BenchmarkMapDaichirata-4      	  200000	      7070 ns/op	     929 B/op	       6 allocs/op
BenchmarkMapOfficial-4        	  100000	     21664 ns/op	    5930 B/op	      62 allocs/op
PASS
ok  	github.com/daichirata/fluent-logger-go/benchmark	7.227s

結構誤差が出るので参考程度ですが、そこそこ良い結果にはなっているのではないでしょうか。ついでに lestrrat さんの奴のもやってみました。

// +build bench

package fluent_test

import (
	"testing"

	daichirata "github.com/daichirata/fluent-logger-go"
	official "github.com/fluent/fluent-logger-golang/fluent"
	k0kubun "github.com/k0kubun/fluent-logger-go"
	lestrrat "github.com/lestrrat/go-fluent-client"
)

const tag = "debug.test"
const postsPerIter = 1

func BenchmarkK0kubun(b *testing.B) {
	c := k0kubun.NewLogger(k0kubun.Config{})
	for i := 0; i < b.N; i++ {
		for j := 0; j < postsPerIter; j++ {
			c.Post(tag, map[string]interface{}{"count": j})
		}
	}
}

func BenchmarkDaichirata(b *testing.B) {
	c, _ := daichirata.NewLogger(daichirata.Config{})
	for i := 0; i < b.N; i++ {
		for j := 0; j < postsPerIter; j++ {
			c.Post(tag, map[string]interface{}{"count": j})
		}
	}
	c.Close()
}

func BenchmarkLestrrat(b *testing.B) {
	c, _ := lestrrat.New()
	for i := 0; i < b.N; i++ {
		for j := 0; j < postsPerIter; j++ {
			if c.Post(tag, map[string]interface{}{"count": j}) != nil {
				b.Logf("whoa Post failed")
			}
		}
	}
	c.Shutdown(nil)
}

func BenchmarkLestrratUnbuffered(b *testing.B) {
	c, _ := lestrrat.New(lestrrat.WithBuffered(false))
	for i := 0; i < b.N; i++ {
		for j := 0; j < postsPerIter; j++ {
			if c.Post(tag, map[string]interface{}{"count": j}) != nil {
				b.Logf("whoa Post failed")
			}
		}
	}
	c.Shutdown(nil)
}

func BenchmarkOfficial(b *testing.B) {
	c, _ := official.New(official.Config{})
	for i := 0; i < b.N; i++ {
		for j := 0; j < postsPerIter; j++ {
			if c.Post(tag, map[string]interface{}{"count": j}) != nil {
				b.Logf("whoa Post failed")
			}
		}
	}
	c.Close()
}
$ go test -run=none -bench=. -benchmem -tags bench

goos: darwin
goarch: amd64
pkg: github.com/lestrrat/go-fluent-client
BenchmarkK0kubun-4              	  500000	      3191 ns/op	    1679 B/op	      13 allocs/op
BenchmarkDaichirata-4           	  500000	      3152 ns/op	     829 B/op	      10 allocs/op
BenchmarkLestrrat-4             	  500000	      3838 ns/op	     529 B/op	       7 allocs/op
BenchmarkLestrratUnbuffered-4   	  300000	      8682 ns/op	     512 B/op	       7 allocs/op
BenchmarkOfficial-4             	  200000	      8893 ns/op	     896 B/op	       9 allocs/op
PASS
ok  	github.com/lestrrat/go-fluent-client	10.610s

というわけで、 github.com/k0kubun/fluent-logger-go と大体同じくらいでしょうか。(ただ、こちらは終了時に buffer を flush することが出来ないっぽいのでちょっと実用的には微妙かなという感じはありますが) それにしても lestrrat さんのやつはエンコーダーとデコーダーを自作してるみたいですし 7 allocs/op 凄いですね。

終わり

最近 Go を全然書いていないので久しぶりに触る機会で来て楽しかった。おわり。