たにしきんぐダム

プログラミングやったりアニメやゲーム見たり京都に住んだりしてます

Golangで簡易的ワーカー/ジョブキュー

  • goroutineの数を制限しつつ並列にジョブを実行したい
  • ジョブは定期的に投入できたい
  • ジョブが空っぽになっても終了しないで欲しい
  • SIGTERMとか送ったら実行中のジョブが完了するまで待ってから終了してほしい

というようなものが欲しくなることが最近よくある!のでメモ

参考

実装

package main
import (
"context"
"sync"
)
type Dispatcher struct {
sem chan struct{} // semaphore
jobBuffer chan *Job
worker Worker
wg sync.WaitGroup
}
func NewDispatcher(worker Worker, maxWorkers int, buffers int) *Dispatcher {
return &Dispatcher{
// buffered channel で goroutine の数を制限
sem: make(chan struct{}, maxWorkers),
jobBuffer: make(chan *Job, buffers),
worker: worker,
}
}
func (d *Dispatcher) Start(ctx context.Context) {
d.wg.Add(1)
go d.loop(ctx)
}
func (d *Dispatcher) Wait() {
d.wg.Wait()
}
func (d *Dispatcher) Add(job *Job) {
d.jobBuffer <- job
}
func (d *Dispatcher) stop() {
d.wg.Done()
}
func (d *Dispatcher) loop(ctx context.Context) {
var wg sync.WaitGroup
Loop:
for {
select {
case <-ctx.Done():
// すべてのjobが終了するまで待つ
wg.Wait()
break Loop
case job := <-d.jobBuffer:
// 実行中のジョブ数+1
wg.Add(1)
// semaphorを1つ減らす
d.sem <- struct{}{}
go func(job *Job) {
defer wg.Done()
// jobが終了したらsemaphorを1つ増やす
defer func() { <-d.sem }()
d.worker.Work(job)
}(job)
}
}
d.stop()
}
view raw dispatcher.go hosted with ❤ by GitHub
package main
type Job struct {
URL string
}
view raw job.go hosted with ❤ by GitHub
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
defer close(sigCh)
// signalをトラップしてcancelを呼ぶ
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT)
go func() {
<-sigCh
cancel()
}()
p := NewPrinter()
// goroutineの最大数は10
// jobBufferは1000
d := NewDispatcher(p, 10, 1000)
d.Start(ctx)
for i := 0; i < 100; i++ {
url := fmt.Sprintf("http://example.com/%d", i)
job := &Job{URL: url}
d.Add(job)
}
d.Wait()
}
view raw main.go hosted with ❤ by GitHub
package main
import (
"fmt"
"math/rand"
"time"
)
type Printer struct{}
func NewPrinter() *Printer {
return &Printer{}
}
func (p *Printer) Work(j *Job) {
// ダミーのワーカー、ランダムに数秒待つだけ
t := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second)
defer t.Stop()
<-t.C
fmt.Println(j.URL)
}
view raw printer.go hosted with ❤ by GitHub
package main
type Worker interface {
Work(j *Job)
}
view raw worker.go hosted with ❤ by GitHub

gistbe9fac4aa02419d68c6770a85e53c936

この実装ではmainの中のforループでのみジョブをenqueueしているが、

  • 定期的に外部リソース(APIたたいたり、DBを眺めにいったり)からデータを取得してジョブをenqueueしたり
  • HTTPサーバーをたててHTTP経由でジョブをenqueueできるようにして、別のプロセスから定期的にジョブをenqueueしたり

とかいろいろできて便利〜