You have 2 free stories left this month.
Golangを理解し、Grafanaを使って監視する方法(後半)
前編では、goチャンネルとは何か、そしてその仕組みやスケーラブルな実装方法についてお話しました。
後半では、それらを監視する方法を紹介し、ワーカーごとの処理時間のような有用なメトリクスを取得する方法をご紹介します。1分間に実行されているジョブの数か、など他にも色々ご紹介したいと思います。
Stack
では、使用するツールを簡単に説明しましょう。
・Prometheus:オープンソースの時系列メトリックシステムです。また、アラートにも対応していますが、今日はこれについては言及しません。
・Grafana :多くのデータソースに対応しているオープンソースのアナリティクス&モニターソリューションで、Prometheusと一緒に使用して、Goアプリから発動したメトリックを表示します。
Prometheus
まず、アプリ内のメトリクスを設定してPrometheusパッケージに登録する必要がありますが、メトリクスの種類とそれぞれの目的を設定してみましょう。
- Counter:増やすかゼロに戻すかのどちらかしかできない数字です。あなたが送ったメールの数のように、5が3になることもあります。増加することしかできません。
- Gauge: 任意に上下することができる単一の数値を表しています。これらの数値は上下するので、例えばメモリ使用量や同時使用ユーザーのような数値を表し、ここでは実行中のワーカーと実行中のジョブを表します。
- Histogram: オブザーバーの中にあります。サンプルに基づいて、レスポンスタイム、レスポンスサイズ、リクエスト時間のような平均値を教えてくれます。
- Summary: ヒストグラムのようなものですが、オブザベーションの合計とオブザベーションされたすべての値の合計をカウントしてくれます。
上記のSummaryについてはよくわからないのですが、どっちみちこの記事には必要ありません。
// prometheus.gopackage queuevar (JobsProcessed *prometheus.CounterVecRunningJobs *prometheus.GaugeVecProcessingTime *prometheus.HistogramVecRunningWorkers *prometheus.GaugeVec)var collectorContainer []prometheus.Collector//InitPrometheus ... initalize prometheusfunc InitPrometheus() {prometheus.MustRegister(collectorContainer...)}//PushRegister ... Push collectores to prometheus before inializingfunc PushRegister(c ...prometheus.Collector) {collectorContainer = append(collectorContainer, c...)}func InitMetrics() {JobsProcessed = prometheus.NewCounterVec(prometheus.CounterOpts{Namespace: "worker",Subsystem: "jobs",Name: "processed_total",Help: "Total number of jobs processed by the workers",},[]string{"worker_id", "type"},)RunningJobs = prometheus.NewGaugeVec(prometheus.GaugeOpts{Namespace: "worker",Subsystem: "jobs",Name: "running",Help: "Number of jobs inflight",},[]string{"type"},)RunningWorkers = prometheus.NewGaugeVec(prometheus.GaugeOpts{Namespace: "worker",Subsystem: "workers",Name: "running",Help: "Number of workers inflight",},[]string{"type"},)ProcessingTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{Namespace: "worker",Subsystem: "jobs",Name: "process_time_seconds",Help: "Amount of time spent processing jobs",},[]string{"worker_id", "type"},)metrics.PushRegister(ProcessingTime, RunningJobs, JobsProcessed, RunningWorkers)}
メトリクスを定義した後、Prometheus がメトリクスデータを取得するために呼び出すエンドポイントを実行する必要があります。
r.Handle("GET", "/metrics", gin.WrapH(promhttp.Handler()))
いくつかの呼び出しをしてみると
JobsProcessed.WithLabelValues("Worker-1", "ahmedash.com").Inc()JobsProcessed.WithLabelValues("Worker-1", "ahmedash.com").Inc()JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()JobsProcessed.WithLabelValues("Worker-2", "ahmedash.com").Inc()
Prometheusのエンドポイントにヒットしたときの結果 http://localhost/metrics
Prometheus のダッシュボードを開くと、以下のようなチャートが表示されます。
これがPrometheus との連携方法です。Prometheus にデータを送ってダッシュボードをチェックして簡単に可視化します。
しかし、Prometheusのダッシュボードは見ての通り、それほど立派ではありません。そこで、そのシステムで何が起こっているのかを誰もが簡単に把握できるように、素敵なダッシュボードを作成する必要があります
Push the metric from Go to Prometheus
最初の部分から、メトリクスに必要なデータを送信するためのコードを修正していきます。
Follow arrows
矢印に従ってください。
func (d *Dispatcher) Run() {for i := 0; i < d.maxWorkers; i++ {// increase the number of running workersRunningWorkers.WithLabelValues("Emails").Inc() ⬅️⬅️⬅️⬅️⬅️⬅️⬅️⬅️worker := NewWorker(d.WorkerPool)worker.Start()d.Workers = append(d.Workers, worker)}go d.dispatch()}
ジョブごとに、キュー内のジョブを増やす必要があります。
func (d *Dispatcher) dispatch() {for {select {case job := <-JobQueue:// Increase running jobs GaugeRunningJobs.WithLabelValues("Emails").Inc() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️go func(job Queuable) {jobChannel := <-d.WorkerPooljobChannel <- job}(job)}}}
・First metric: ワーカーごとに処理されたジョブ数を格納します。
・Second metric: 前のコードスニペットから実行中のジョブの数を減らします。
・Third metric: 各ジョブの処理時間を格納します。
func (w Worker) Start() {go func() {for {w.WorkerPool <- w.JobChannelselect {case job := <-w.JobChannel:startTime := time.Now()// track the total number of jobs processed by the workerJobsProcessed.WithLabelValues(w.Name, "Emails").Inc() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️if err := job.Handle(); err != nil {log.Fatal("Error in job: %s", err.Error())}// Decrease the number of running jobs once we finishRunningJobs.WithLabelValues("Emails").Dec() ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️// ⬇️ Register the proccesing time in the Histogram ⬇️ProcessingTime.WithLabelValues(w.Name, "Emails").Observe(time.Now().Sub(startTime).Seconds()) ️⬅️⬅️⬅️⬅️⬅️⬅️⬅️}}}()}
Grafana
これですべての準備が整い、アプリは期待していた通りに動作しています。最後の手順では、Grafanaで美しいダッシュボードを作成します。
Grafanaダッシュボードから、データソースとしてPrometheusを追加する必要があります。
そして、トップバーのHOMEから新しいダッシュボードを作成します。
そして、カウンターとゲージのメトリクスのための Singlestat を追加する必要があります。
現在:
1.データソースからPrometheusを選択します。
2.メトリックの名前を探します。
3.完璧に正しいメトリックの名前ができました。
次に、オプションタブから、現在の値か合計値かを選択します。私たちの場合は、現在の値を選択します。
最後に Generalタブからコンポーネントをリネームします。
また、パネル内の各コンポーネントのサイズをコントロールすることもできます。
worker_workers_running メトリック名を使って、実行中のワーカーのために別のワーカーを作成してみましょう。
では、1分間に何個のジョブがあるかを示すグラフパネルを作ってみましょう。
これで、アプリにいくつかのリクエストをして、リアルタイムでデータを見ることができるようになりました。auto-refreshが利用可能かどうかを確認してください。
これでダッシュボードの最初の部分が完成しました。
第二部は、それぞれのワーカーごとの処理時間をグラフにして表示するだけのものです。
このメトリックに対して、その値は、worker_jobs_process_time_seconds_sum / worker_jobs_process_time_seconds_count
となります。
値をミリ秒単位で表示している確認してください。
そして最終結果は…
これでダッシュボードには、稼働中のワーカー、キューに入っているジョブの数、ジョブの処理時間などが表示されます。
ジョブにスパイクが発生していないか、システムが期待通りに動作しているかを簡単に確認できます。
まとめ
2つのパートを含むこの記事では、以下のやり方について学びました。
・チャンネルの作成
・ディスパッチ/ワーカーパターン
・ワーカーを生成&キルする方法
・メトリックをプッシュし、素敵なダッシュボードで表示する方法
説明していないこと
・実行できなかったジョブに関しては扱わないので、どんなジョブでもエラーが発生した場合、再試行する方法や格納する方法がありません。
・データの永続性がない。アプリが再起動した場合、メモリ内にあるジョブをすべて失うことになりますとくのも、それらはメモリーの中にあるからです。
・RedisやRabbitMQのようなサードパーティのメッセージング/キューイングシステムで動作させる方法は伝えていませんでした。
Orangesys.ioでは、kuberneteの運用、DevOps、監視のお手伝いをさせていただいています。ぜひ私たちにおまかせください。