channelとsync.Poolを使ってgoroutineの同時実行数を制御する

元ネタ

どちらも、固定長のバッファを持ったチャネルを共有リソースとすることでgoroutineの同時実行数を制御するパターンの例として紹介されています。

sync.Poolを使うともうすこし可読性があがると思ったので、サンプルコードを書いてみました。
今回ベースにしたのはmotemenさんの方で、実行数の制御とワーカーリソースのプールを主題としています。

sync.Pool

https://golang.org/pkg/sync/#Pool

使い方ですが、sync.Pool内の生成関数Newにほしいリソース取得処理を書きます。
※生成関数が返すのは変数のポインタです。

pool := sync.Pool{New: func() interface{} {
    t := time.Now()
    return &t
}}

取り出したものは型キャストして使い、必要なくなったらプールに戻します。

t1 := pool.Get().(*time.Time)
time.Sleep(100 * time.Millisecond)

t2 := pool.Get().(*time.Time)
fmt.Println(t1 == t2) // output: false

pool.Put(t1)

t3 := pool.Get().(*time.Time)
fmt.Println(t3)
fmt.Println(t1 == t3) // output: true

残念ながら sync.Pool に数の制限のしくみはないので独自のプールが必要になります。
今回はworkerリソース制御のしくみをつくりました。

ワーカープールを作る

同時実行数制御のlimitとワーカープールを内包したstruct workers をつくります。

type workers struct {
    limit chan struct{}
    pool  sync.Pool
}

func newWorkers(n int) *workers {
    ws := workers{}
    ws.limit = make(chan struct{}, n)
    ws.pool = sync.Pool{New: func() interface{} {
        return &worker{}
    }}
    return &ws
}

プールからのGetはlimitチャネルの空き状況によって制限されます(待たされます)。

func (ws *workers) Get() *worker {
    ws.limit <- struct{}{} // 空くまで待つ
    return ws.pool.Get().(*worker)
}

戻すときは同時にlimitチャネルから解放します。

func (ws *workers) Put(w *worker) {
    ws.pool.Put(w)
    <-ws.limit // 解放
}

このworkersを使ったmain関数はこうなります。
goroutineからfor/selectが消えてすっきりしました。

func main() {
    ws := newWorkers(5)
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            w := ws.Get()
            log.Printf("[worker %v] --> goroutine %d", w, i)
            w.work()
            log.Printf("[worker %v] <-- goroutine %d", w, i)
            ws.Put(w)
        }(i)
    }
    wg.Wait()
}

全体像はこんな感じ。

今回はcancel処理を省略しましたが、実際はworkerPoolにキャンセル用の仕組みを組み込む必要が有ってもうすこし複雑になりそうです。

mattnさんのコードをもとにしたものはこんな感じです。