- goroutineの数を制限しつつ並列にジョブを実行したい
- ジョブは定期的に投入できたい
- ジョブが空っぽになっても終了しないで欲しい
- SIGTERMとか送ったら実行中のジョブが完了するまで待ってから終了してほしい
というようなものが欲しくなることが最近よくある!のでメモ
参考
実装
|
package main |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
) |
|
|
|
type Dispatcher struct { |
|
sem chan struct{} // semaphore |
|
jobBuffer chan *Job |
|
worker Worker |
|
wg sync.WaitGroup |
|
} |
|
|
|
func NewDispatcher(worker Worker, maxWorkers int, buffers int) *Dispatcher { |
|
return &Dispatcher{ |
|
// buffered channel で goroutine の数を制限 |
|
sem: make(chan struct{}, maxWorkers), |
|
jobBuffer: make(chan *Job, buffers), |
|
worker: worker, |
|
} |
|
} |
|
|
|
func (d *Dispatcher) Start(ctx context.Context) { |
|
d.wg.Add(1) |
|
go d.loop(ctx) |
|
} |
|
|
|
func (d *Dispatcher) Wait() { |
|
d.wg.Wait() |
|
} |
|
|
|
func (d *Dispatcher) Add(job *Job) { |
|
d.jobBuffer <- job |
|
} |
|
|
|
func (d *Dispatcher) stop() { |
|
d.wg.Done() |
|
} |
|
|
|
func (d *Dispatcher) loop(ctx context.Context) { |
|
var wg sync.WaitGroup |
|
Loop: |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
// すべてのjobが終了するまで待つ |
|
wg.Wait() |
|
break Loop |
|
case job := <-d.jobBuffer: |
|
// 実行中のジョブ数+1 |
|
wg.Add(1) |
|
// semaphorを1つ減らす |
|
d.sem <- struct{}{} |
|
go func(job *Job) { |
|
defer wg.Done() |
|
// jobが終了したらsemaphorを1つ増やす |
|
defer func() { <-d.sem }() |
|
d.worker.Work(job) |
|
}(job) |
|
} |
|
} |
|
d.stop() |
|
} |
|
package main |
|
|
|
type Job struct { |
|
URL string |
|
} |
|
package main |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"os" |
|
"os/signal" |
|
"syscall" |
|
) |
|
|
|
func main() { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
sigCh := make(chan os.Signal, 1) |
|
defer close(sigCh) |
|
|
|
// signalをトラップしてcancelを呼ぶ |
|
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT) |
|
go func() { |
|
<-sigCh |
|
cancel() |
|
}() |
|
|
|
p := NewPrinter() |
|
// goroutineの最大数は10 |
|
// jobBufferは1000 |
|
d := NewDispatcher(p, 10, 1000) |
|
d.Start(ctx) |
|
|
|
for i := 0; i < 100; i++ { |
|
url := fmt.Sprintf("http://example.com/%d", i) |
|
job := &Job{URL: url} |
|
d.Add(job) |
|
} |
|
|
|
d.Wait() |
|
} |
|
package main |
|
|
|
import ( |
|
"fmt" |
|
"math/rand" |
|
"time" |
|
) |
|
|
|
type Printer struct{} |
|
|
|
func NewPrinter() *Printer { |
|
return &Printer{} |
|
} |
|
|
|
func (p *Printer) Work(j *Job) { |
|
// ダミーのワーカー、ランダムに数秒待つだけ |
|
t := time.NewTimer(time.Duration(rand.Intn(5)) * time.Second) |
|
defer t.Stop() |
|
<-t.C |
|
fmt.Println(j.URL) |
|
} |
|
package main |
|
|
|
type Worker interface { |
|
Work(j *Job) |
|
} |
gistbe9fac4aa02419d68c6770a85e53c936
この実装ではmainの中のforループでのみジョブをenqueueしているが、
- 定期的に外部リソース(APIたたいたり、DBを眺めにいったり)からデータを取得してジョブをenqueueしたり
- HTTPサーバーをたててHTTP経由でジョブをenqueueできるようにして、別のプロセスから定期的にジョブをenqueueしたり
とかいろいろできて便利〜