2017-10 / 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
一ヶ月ほど前の社内のインフラ共有会でタイトルの話をしました。記録の
ために記事を書いておきます。
Gist に置いてあるので、コードは git clone で取得可能です。
$ git clone https://gist.github.com/c0a4234a5264c89655c40adcf7c27cb2.git
例えば Ruby で 30 個の処理をするコードがあったとします。こんな素朴
なコードです。それぞれ 3 秒かかる処理が 30 あるので、とても遅いです。
| #!/usr/bin/env ruby | |
| # Serial processing | |
| def thread_num | |
| Thread.list.select {|thread| thread.status == 'run'}.count | |
| end | |
| def heavy_process(i) | |
| sleepSecond = 3 | |
| print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num] | |
| sleep(sleepSecond) | |
| end | |
| 30.times do |i| | |
| heavy_process(i) | |
| end |
Thread を使って 5 並列にしました。明らかに速くなりました。
ついでにそれぞれの結果(というほどのものではありませんが)を
results に代入し、最後にまとめて表示しました。
results は共有リソースになるので、Thread::Mutex#synchronize でロッ
クをかけて安全に書き込んでいます。ロックをかけないと results に同
時に書き込まれるケースを救うことが出来ません。
| #!/usr/bin/env ruby | |
| # parallels processing v1 | |
| def thread_num | |
| Thread.list.select {|thread| thread.status == 'run'}.count | |
| end | |
| def heavy_process(i) | |
| sleepSecond = 3 | |
| print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num] | |
| sleep(sleepSecond) | |
| end | |
| # 5 parallels | |
| thread_num = 5 | |
| threads = [] | |
| results = [] | |
| mutex = Mutex::new | |
| (30 / thread_num).times do |a| | |
| thread_num.times do |b| | |
| threads << Thread.start do | |
| i = (a * thread_num) + b | |
| heavy_process(i) | |
| mutex.synchronize { results << i } | |
| end | |
| end | |
| threads.each(&:join) | |
| end | |
| # Print saved results | |
| print "------------\n" | |
| printf "result.count: %d\n", results.count | |
| results.each do |v| | |
| printf "i: %d\n", v | |
| end |
parallel gem を使うと、ずいぶんとすっきり書くことが出来ます。
| #!/usr/bin/env ruby | |
| # parallels processing v2 using third party gem | |
| require 'parallel' | |
| def thread_num | |
| Thread.list.select {|thread| thread.status == 'run'}.count | |
| end | |
| def heavy_process(i) | |
| sleepSecond = 3 | |
| print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num] | |
| sleep(sleepSecond) | |
| end | |
| # 5 parallels | |
| thread_num = 5 | |
| results = [] | |
| mutex = Mutex::new | |
| Parallel.each(0..29, in_threads: thread_num) do |i| | |
| heavy_process(i) | |
| mutex.synchronize { results << i } | |
| end | |
| # Print saved results | |
| print "------------\n" | |
| printf "result.count: %d\n", results.count | |
| results.each do |v| | |
| printf "i: %d\n", v | |
| end |
直列処理に関しては、Ruby と同様に素朴です。goroutineNum(あとで説
明します)は常に 1 です。
| // Serial processing | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| for i := 0; i < 30; i++ { | |
| heavyProcess(i) | |
| } | |
| } |
何も考えずに heavyProcess() を goroutine で動かします。19 行目に
"go " を追加しただけです。驚いたことに何も表示されずに終了します。
| // parallels processing v1 (broken code) | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| for i := 0; i < 30; i++ { | |
| go heavyProcess(i) | |
| } | |
| } |
実は何も表示されなかったのは、goroutine の終了を待たずに main() が
終了してしまったためです。シェルのバックグラウンド実行(&)とよく
似ていると思いました。
今度は sync パッケージを使って、goroutine の終了を待ちましたが、
今回も何も表示されません。
| // parallels processing v2 (broken code) | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go heavyProcess(i) | |
| wg.Done() | |
| } | |
| wg.Wait() | |
| } |
これは heavyProcess() の終了を待たずに WaitGroup.Done しているからです。
WaitGroup.Done も含めた処理を goroutine で実行すると(23-26行目)、
出力されます。ただしこの時点では 30 並列です。goroutineNum は最大
31 です。
| // parallels processing v3 | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| heavyProcess(i) | |
| wg.Done() | |
| }(i) | |
| } | |
| wg.Wait() | |
| } |
defer を使ってリファクタリングしました。23 行目の無名関数の最後で
必ず WaitGroup.Done を実行してくれることが明示&保証されました。
| // parallels processing v4 | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| defer wg.Done() | |
| heavyProcess(i) | |
| }(i) | |
| } | |
| wg.Wait() | |
| } |
5 並列にしました。サイズ 5 で int 型の channel を作り、同時実行数
を制限しました。変数 semaphore には 1 が書き込まれ、heavyProcess()
が終わったら、出ていきます。
実行すると、goroutineNum が最大 31 なのは変わりませんが、徐々に減っ
ていくのがわかると思います。
| // parallels processing v5 | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| // 5 parallels | |
| semaphore := make(chan int, 5) | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| defer wg.Done() | |
| semaphore <- 1 | |
| heavyProcess(i) | |
| <-semaphore | |
| }(i) | |
| } | |
| wg.Wait() | |
| } |
少し脱線して並列数を CPU コア数にしました。通常はこれで良いと思い
ますが、Web API へのアクセスを伴う処理であれば、もっと増やして良い
と思います。
| // parallels processing v5b | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| cpuNum := runtime.NumCPU() | |
| fmt.Printf("cpuNum: %d\n", cpuNum) | |
| // CPU num parallels | |
| semaphore := make(chan int, cpuNum) | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| defer wg.Done() | |
| semaphore <- 1 | |
| heavyProcess(i) | |
| <-semaphore | |
| }(i) | |
| } | |
| wg.Wait() | |
| } |
parallels_v1.rb と同じように、結果を最後にまとめて表示します。
sync.Mutex でロック用の変数を作り、34~36 行目でロックをかけながら
results に書き込んでいます。
| // parallels processing v6 | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| var results []int | |
| var mu sync.Mutex | |
| // 5 parallels | |
| semaphore := make(chan int, 5) | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| defer wg.Done() | |
| semaphore <- 1 | |
| heavyProcess(i) | |
| <-semaphore | |
| mu.Lock() | |
| defer mu.Unlock() | |
| results = append(results, i) | |
| }(i) | |
| } | |
| wg.Wait() | |
| // Print saved results | |
| fmt.Print("------------\n") | |
| fmt.Printf("len(result): %d\n", len(results)) | |
| for _, v := range results { | |
| fmt.Printf("i: %d\n", v) | |
| } | |
| } |
少しリファクタリングして、semaphore に詰める値を 1 から i に変えま
した。少し自身がありませんが...。
| // parallels processing v7 | |
| package main | |
| import ( | |
| "fmt" | |
| "runtime" | |
| "sync" | |
| "time" | |
| ) | |
| func heavyProcess(i int) { | |
| sleepSecond := 3 * time.Second | |
| fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine()) | |
| time.Sleep(sleepSecond) | |
| } | |
| func main() { | |
| var wg sync.WaitGroup | |
| var results []int | |
| var mu sync.Mutex | |
| // 5 parallels | |
| semaphore := make(chan int, 5) | |
| for i := 0; i < 30; i++ { | |
| wg.Add(1) | |
| go func(i int) { | |
| defer wg.Done() | |
| semaphore <- i | |
| heavyProcess(i) | |
| mu.Lock() | |
| defer mu.Unlock() | |
| results = append(results, <-semaphore) // Is this safe? | |
| }(i) | |
| } | |
| wg.Wait() | |
| // Print saved results | |
| fmt.Print("------------\n") | |
| fmt.Printf("len(result): %d\n", len(results)) | |
| for _, v := range results { | |
| fmt.Printf("i: %d\n", v) | |
| } | |
| } |
何か処理を "go" で括れば、main() とは別のスレッド(という表現で良
いか分からない)で動作すると理解しました。シェルのバックグラウンド
実行とよく似ていますし、これ自体はシンプルだと思います。
そのままだと main() が先に終わってしまうので、sync.WaitGroup など
で待つ必要があります。
ちなみに sync.WaitGroup を使わずに channel で同じことができます。
でも、私は読みやすさの点から sync.WaitGroup を使うほうが好きです。
今回の Ruby と golang のコードを実行すると、Ruby は並列数分の処理
が全部終わってから次に進み、golang は並列処理のうち 1 つでも終わる
と、次の処理が並列処理に加わります。
擬音にすると、Ruby は「ガッガッガッガッ」で、golang は「スルスルス
ルスル」です。分かるかな?
実は main() 自体も goroutine で動いているようです。例えば以下の変
更を加えて"$ GOTRACEBACK=2 go run parallels_v7.go" を実行すると、
なんとなく分かります。
diff --git a/parallels_v7.go b/parallels_v7.go index f17339a..d34b0b1 100644 --- a/parallels_v7.go +++ b/parallels_v7.go @@ -33,6 +33,7 @@ func main() { mu.Lock() defer mu.Unlock() results = append(results, <-semaphore) // Is this safe? + select {} }(i) } wg.Wait()
参考: Go の並行処理 - Block Rockin’ Codes
2017-10 / 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
最終更新時間: 2017-10-15 15:58