2017-10 / 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

2017-10-15 (日)

golang の並列処理を Ruby と比較しながら学ぶ [golang]

一ヶ月ほど前の社内のインフラ共有会でタイトルの話をしました。記録の
ために記事を書いておきます。

Gist に置いてあるので、コードは git clone で取得可能です。

$ git clone https://gist.github.com/c0a4234a5264c89655c40adcf7c27cb2.git




Ruby


例えば Ruby で 30 個の処理をするコードがあったとします。こんな素朴
なコードです。それぞれ 3 秒かかる処理が 30 あるので、とても遅いです。

#!/usr/bin/env ruby
# Serial processing
def thread_num
Thread.list.select {|thread| thread.status == 'run'}.count
end
def heavy_process(i)
sleepSecond = 3
print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num]
sleep(sleepSecond)
end
30.times do |i|
heavy_process(i)
end
view raw serial.rb hosted with ❤ by GitHub



Thread を使って 5 並列にしました。明らかに速くなりました。

ついでにそれぞれの結果(というほどのものではありませんが)を
results に代入し、最後にまとめて表示しました。

results は共有リソースになるので、Thread::Mutex#synchronize でロッ
クをかけて安全に書き込んでいます。ロックをかけないと results に同
時に書き込まれるケースを救うことが出来ません。

#!/usr/bin/env ruby
# parallels processing v1
def thread_num
Thread.list.select {|thread| thread.status == 'run'}.count
end
def heavy_process(i)
sleepSecond = 3
print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num]
sleep(sleepSecond)
end
# 5 parallels
thread_num = 5
threads = []
results = []
mutex = Mutex::new
(30 / thread_num).times do |a|
thread_num.times do |b|
threads << Thread.start do
i = (a * thread_num) + b
heavy_process(i)
mutex.synchronize { results << i }
end
end
threads.each(&:join)
end
# Print saved results
print "------------\n"
printf "result.count: %d\n", results.count
results.each do |v|
printf "i: %d\n", v
end
view raw parallels_v1.rb hosted with ❤ by GitHub



parallel gem を使うと、ずいぶんとすっきり書くことが出来ます。

#!/usr/bin/env ruby
# parallels processing v2 using third party gem
require 'parallel'
def thread_num
Thread.list.select {|thread| thread.status == 'run'}.count
end
def heavy_process(i)
sleepSecond = 3
print "i: %2d, sleep: %ds, thread_num: %d\n" % [i, sleepSecond, thread_num]
sleep(sleepSecond)
end
# 5 parallels
thread_num = 5
results = []
mutex = Mutex::new
Parallel.each(0..29, in_threads: thread_num) do |i|
heavy_process(i)
mutex.synchronize { results << i }
end
# Print saved results
print "------------\n"
printf "result.count: %d\n", results.count
results.each do |v|
printf "i: %d\n", v
end
view raw parallels_v2.rb hosted with ❤ by GitHub



golang


直列処理に関しては、Ruby と同様に素朴です。goroutineNum(あとで説
明します)は常に 1 です。

// Serial processing
package main
import (
"fmt"
"runtime"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
for i := 0; i < 30; i++ {
heavyProcess(i)
}
}
view raw serial.go hosted with ❤ by GitHub



何も考えずに heavyProcess() を goroutine で動かします。19 行目に
"go " を追加しただけです。驚いたことに何も表示されずに終了します。

// parallels processing v1 (broken code)
package main
import (
"fmt"
"runtime"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
for i := 0; i < 30; i++ {
go heavyProcess(i)
}
}
view raw parallels_v1.go hosted with ❤ by GitHub



実は何も表示されなかったのは、goroutine の終了を待たずに main() が
終了してしまったためです。シェルのバックグラウンド実行(&)とよく
似ていると思いました。

今度は sync パッケージを使って、goroutine の終了を待ちましたが、
今回も何も表示されません。

// parallels processing v2 (broken code)
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 30; i++ {
wg.Add(1)
go heavyProcess(i)
wg.Done()
}
wg.Wait()
}
view raw parallels_v2.go hosted with ❤ by GitHub



これは heavyProcess() の終了を待たずに WaitGroup.Done しているからです。

WaitGroup.Done も含めた処理を goroutine で実行すると(23-26行目)、
出力されます。ただしこの時点では 30 並列です。goroutineNum は最大
31 です。

// parallels processing v3
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
heavyProcess(i)
wg.Done()
}(i)
}
wg.Wait()
}
view raw parallels_v3.go hosted with ❤ by GitHub



defer を使ってリファクタリングしました。23 行目の無名関数の最後で
必ず WaitGroup.Done を実行してくれることが明示&保証されました。

// parallels processing v4
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
heavyProcess(i)
}(i)
}
wg.Wait()
}
view raw parallels_v4.go hosted with ❤ by GitHub



5 並列にしました。サイズ 5 で int 型の channel を作り、同時実行数
を制限しました。変数 semaphore には 1 が書き込まれ、heavyProcess()
が終わったら、出ていきます。

実行すると、goroutineNum が最大 31 なのは変わりませんが、徐々に減っ
ていくのがわかると思います。

// parallels processing v5
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
// 5 parallels
semaphore := make(chan int, 5)
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
semaphore <- 1
heavyProcess(i)
<-semaphore
}(i)
}
wg.Wait()
}
view raw parallels_v5.go hosted with ❤ by GitHub



少し脱線して並列数を CPU コア数にしました。通常はこれで良いと思い
ますが、Web API へのアクセスを伴う処理であれば、もっと増やして良い
と思います。

// parallels processing v5b
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
cpuNum := runtime.NumCPU()
fmt.Printf("cpuNum: %d\n", cpuNum)
// CPU num parallels
semaphore := make(chan int, cpuNum)
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
semaphore <- 1
heavyProcess(i)
<-semaphore
}(i)
}
wg.Wait()
}
view raw parallels_v5b.go hosted with ❤ by GitHub



parallels_v1.rb と同じように、結果を最後にまとめて表示します。
sync.Mutex でロック用の変数を作り、34~36 行目でロックをかけながら
results に書き込んでいます。

// parallels processing v6
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
var results []int
var mu sync.Mutex
// 5 parallels
semaphore := make(chan int, 5)
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
semaphore <- 1
heavyProcess(i)
<-semaphore
mu.Lock()
defer mu.Unlock()
results = append(results, i)
}(i)
}
wg.Wait()
// Print saved results
fmt.Print("------------\n")
fmt.Printf("len(result): %d\n", len(results))
for _, v := range results {
fmt.Printf("i: %d\n", v)
}
}
view raw parallels_v6.go hosted with ❤ by GitHub



少しリファクタリングして、semaphore に詰める値を 1 から i に変えま
した。少し自身がありませんが...。

// parallels processing v7
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func heavyProcess(i int) {
sleepSecond := 3 * time.Second
fmt.Printf("i: %2d, sleep: %v, goroutineNum: %d\n", i, sleepSecond, runtime.NumGoroutine())
time.Sleep(sleepSecond)
}
func main() {
var wg sync.WaitGroup
var results []int
var mu sync.Mutex
// 5 parallels
semaphore := make(chan int, 5)
for i := 0; i < 30; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
semaphore <- i
heavyProcess(i)
mu.Lock()
defer mu.Unlock()
results = append(results, <-semaphore) // Is this safe?
}(i)
}
wg.Wait()
// Print saved results
fmt.Print("------------\n")
fmt.Printf("len(result): %d\n", len(results))
for _, v := range results {
fmt.Printf("i: %d\n", v)
}
}
view raw parallels_v7.go hosted with ❤ by GitHub



考察など


goroutine とは


何か処理を "go" で括れば、main() とは別のスレッド(という表現で良
いか分からない)で動作すると理解しました。シェルのバックグラウンド
実行とよく似ていますし、これ自体はシンプルだと思います。

そのままだと main() が先に終わってしまうので、sync.WaitGroup など
で待つ必要があります。

ちなみに sync.WaitGroup を使わずに channel で同じことができます。
でも、私は読みやすさの点から sync.WaitGroup を使うほうが好きです。

Ruby と golang の並列処理の違い


今回の Ruby と golang のコードを実行すると、Ruby は並列数分の処理
が全部終わってから次に進み、golang は並列処理のうち 1 つでも終わる
と、次の処理が並列処理に加わります。

擬音にすると、Ruby は「ガッガッガッガッ」で、golang は「スルスルス
ルスル」です。分かるかな?

main() も goroutine


実は main() 自体も goroutine で動いているようです。例えば以下の変
更を加えて"$ GOTRACEBACK=2 go run parallels_v7.go" を実行すると、
なんとなく分かります。

diff --git a/parallels_v7.go b/parallels_v7.go
index f17339a..d34b0b1 100644
--- a/parallels_v7.go
+++ b/parallels_v7.go
@@ -33,6 +33,7 @@ func main() {
 			mu.Lock()
 			defer mu.Unlock()
 			results = append(results, <-semaphore) // Is this safe?
+			select {}
 		}(i)
 	}
 	wg.Wait()



参考: Go の並行処理 - Block Rockin’ Codes

2017-10 / 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

最終更新時間: 2017-10-15 15:58

検索


最近の話題
- 2017-10-15
  golang の並列処理を Ruby と比較しながら学ぶ
- 2017-09-24
  『応仁の乱』を読んだ
- 2017-08-31
  ZenHub と GitHub の Issue/PR URL をトグルするブックマークレットを作った
- 2017-08-07
  github-nippou v3.0.0 released
- 2017-08-02
  『サーバーレスシングルページアプリケーション』を読んだ
- 2017-07-10
  emacs-helm の標準添付から外された helm-elscreen を Melpa に追加した
- 2017-07-05
  WEB+DB PRESS Vol.99の「実践Kubernetes」の第4章でつまづいたメモ #wdpress
最近追記された記事
- 2017-06-02-1 (106日前)
- 2017-04-29-1 (168日前)
- 2017-04-13-1 (182日前)
- 2017-04-13-1 (184日前)
- 2017-03-02-1 (226日前)
- 2017-02-25-1 (231日前)
- 2017-02-21-1 (235日前)
- 2015-06-07-1 (241日前)
- 2016-10-19-1 (250日前)
- 2016-01-01-1 (262日前)
カテゴリ
- Anthy (3)
- Apache (11)
- Apple (1)
- ATOK (4)
- au (3)
- AWS (19)
- Bazaar (1)
- Berkshelf (2)
- BigQuery (1)
- BitBar (3)
- Book (88)
- Boxen (2)
- Bugsnag (1)
- capistrano (4)
- chalow (56)
- ChatWork (1)
- Chef (17)
- Chrome (3)
- Chromecast (1)
- CircleCI (10)
- clang (26)
- Comics (2)
- Cooking (10)
- cvs (15)
- cygwin (12)
- D3.js (1)
- Debian (55)
- Docker (3)
- E-mail (8)
- elasticsearch (4)
- Emacs (220)
- Emacs講座 (10)
- English (4)
- feedforce (7)
- fetchmail (3)
- Firefox (20)
- Fluentd (4)
- ftp (1)
- Game (20)
- GCP (1)
- Gem (5)
- Git (9)
- GitHub (17)
- golang (6)
- Google (1)
- gpg (4)
- GrowthForecast (7)
- Health (3)
- Heroku (9)
- Homebrew (10)
- HTML (6)
- iBook (1)
- iPhone (15)
- IRC (1)
- Jenkins (8)
- JS (1)
- Karabiner (1)
- KeySnail (3)
- Kibana (1)
- Kindle (1)
- Kubernetes (2)
- Langrich (7)
- LDAP (6)
- Life (19)
- Linux (6)
- Mackerel (1)
- Mew (18)
- MongoDB (1)
- Mozilla (19)
- Music (1)
- MySQL (1)
- NAS (4)
- nginx (6)
- NHK (1)
- Node (1)
- ntp (4)
- OOP (1)
- OpenID (2)
- openssl (1)
- Opera (2)
- OSX (41)
- Perl (14)
- PHP (19)
- PostgreSQL (1)
- procmail (4)
- Programing (3)
- Puppet (1)
- Python (2)
- Rails (12)
- Rake (2)
- RaspberryPi (1)
- RedHat (29)
- Redmine (3)
- Rspec (1)
- Ruby (50)
- samba (3)
- screen (7)
- sed (5)
- serverspec (6)
- sh (8)
- Slack (2)
- Solaris9 (22)
- Spring (2)
- ssh (4)
- StatusNet (21)
- svn (12)
- Swift (1)
- Tablet (1)
- tdiary (3)
- Twitter (14)
- Twmode (6)
- Ubuntu (5)
- UNIX (102)
- vagrant (8)
- Video (21)
- vim (1)
- Wercker (9)
- Windows (29)
- Wine (3)
- XML (11)
- XP (1)
- zsh (25)
- インストールメモ (33)
- クイックシェイプ (12)
- ネタ (15)
- 勉強会 (15)
- 携帯 (6)
- 正規表現 (4)
過去ログ
2017 : 01 02 03 04 05 06 07 08 09 10 11 12
2016 : 01 02 03 04 05 06 07 08 09 10 11 12
2015 : 01 02 03 04 05 06 07 08 09 10 11 12
2014 : 01 02 03 04 05 06 07 08 09 10 11 12
2013 : 01 02 03 04 05 06 07 08 09 10 11 12
2012 : 01 02 03 04 05 06 07 08 09 10 11 12
2011 : 01 02 03 04 05 06 07 08 09 10 11 12
2010 : 01 02 03 04 05 06 07 08 09 10 11 12
2009 : 01 02 03 04 05 06 07 08 09 10 11 12
2008 : 01 02 03 04 05 06 07 08 09 10 11 12
2007 : 01 02 03 04 05 06 07 08 09 10 11 12
2006 : 01 02 03 04 05 06 07 08 09 10 11 12
2005 : 01 02 03 04 05 06 07 08 09 10 11 12
2004 : 01 02 03 04 05 06 07 08 09 10 11 12
2003 : 01 02 03 04 05 06 07 08 09 10 11 12
2002 : 01 02 03 04 05 06 07 08 09 10 11 12
2001 : 01 02 03 04 05 06 07 08 09 10 11 12
Google+