Goの並列処理の動作を理解する
Goやるなら並列処理やるでしょ
Goのイメージ = 並列処理というイメージがある人は多いと思います。Goはケアが難しかった並列処理を他の言語よりも比較的扱いがしやすいようになっていて、わりと手軽に書けたりします。ですが、なんとなくで使っていると思わぬメモリリークの発生などが発生したりします。パフォーマンスアップのつもりが、パフォーマンスダウンになってしまうことも・・・。
私自身もちゃんと理解できていないなと感じることがあったので、今回はざっくり理解していきたいと思います。
今回の記事のソースは以下のRepoになります。
並列処理って何が良いの
何が嬉しいのかって話をまずしますと、以下のような合計で6秒どうしてもかかってしまう処理があったとします。確かに順次処理でやっていくと、各処理は2秒ずつかかってしまいます。ですが、例えばこれが同時に処理できたらどうでしょう?最大でも2秒で終わる処理に変わりますよねっていうのが並列処理です。
package main import ( "fmt" "testing" "time" ) func main() { result := testing.Benchmark(func(b *testing.B) { run() }) fmt.Println(result.T) } func run() { fmt.Println("Start!") process("A") process("B") process("C") fmt.Println("Finish!") } func process(name string) { time.Sleep(2 * time.Second) fmt.Println(name) }
$ go run exp1/main.go Start! A B C Finish! 6.010750402s
よくあるミス
Goはgo
キーワードを使って関数を実行するだけで、goroutine(サブスレッドのようなもの)が生成され、並列に処理が実行されますが、使い方には注意が必要です。最初に意味もわからずよくやってしまいがちなミスの例を示すと。
package main import ( "fmt" "time" ) func main() { fmt.Println("Start!") go process("A") <------ goキーワードで関数実行するとgoroutineが生成される go process("B") go process("C") fmt.Println("Finish!") } func process(name string) { time.Sleep(2 * time.Second) fmt.Println(name) }
$ go run exp2/main.go Start! Finish!
上記のような感じで、一瞬で終わりますが、さっきまで出力されていたA,B,Cの文字列が消えています。何故こうなるかについて説明すると、go
キーワードによって生成されたgoroutineに処理を任せてるので、最初のgo run
で生成されたgoroutineと同期を取っていないので勝手に終わってしまっているからです。これを防ぐには何らかの連絡手段を使って同期を取る必要があります。それがchannel
です。
channelを使って同期させよう
ここからの説明では、聞き覚えのあるワードを使って説明します。
スレッド名 | 実行のされ方 | 具体例 |
---|---|---|
メインスレッド | go run *.go | go run exp1/main.go |
サブスレッド | go hgoe() | go process(“A”) |
channel
は何かと言うと、簡単に言うと専用の電話線のようなものです。これを使うことでメインスレッドとサブスレッドの連絡が取れるようになります。以下にサブスレッドの処理と同期をするためのシンプルな例があります。
package main import ( "fmt" "time" ) func main() { fmt.Println("Start!") // boolの型でchannelを作成する ch := make(chan bool) // goroutineを生成して、サブスレッドで処理する go func() { time.Sleep(2 * time.Second) // chに対してtrueを投げる(送信) ch <- true }() // chに受信があったらisFinに返事する。 // 受信があるまで、処理をブロックし続ける(これで同期が取れる) isFin := <-ch // <-chだけでもブロック出来る // chをクローズする close(ch) // 受信した値をprintする fmt.Println(isFin) fmt.Println("Finish!") }
$ go run exp3/main.go Start! true Finish!
以下のどちらかの記述を使うことでブロックすることが出来るのがポイントとなります。
isFin <- ch
<- ch
channelの動作を使って、並列処理をすると以下のようになります。
package main import ( "fmt" "time" ) func main() { // それぞれとの連絡のためのchを作成する isFin1 := make(chan bool) isFin2 := make(chan bool) isFin3 := make(chan bool) fmt.Println("Start!") go func() { process("A") isFin1 <- true }() go func() { process("B") isFin2 <- true }() go func() { process("C") isFin3 <- true }() // 全部が終わるまでブロックし続ける <-isFin1 <-isFin2 <-isFin3 fmt.Println("Finish!") } func process(name string) { time.Sleep(2 * time.Second) fmt.Println(name) }
$ go run exp4/main.go Start! A B C Finish! 2.003493386s
約2秒で終了しました。これだけで、約4秒短く終わらせることが出来るのもすごいですが、非常に簡単に記述することが出来るのもGoのすごいところです。ですが、いまの記述だと非常にひ弱なコードになっています。
例えば、今までとは違い処理の数が変動する場合どうでしょう?chの数が固定だと破綻します。そういったイレギュラーに対応するには、処理の数とchのブロックをうまく使えば、同期処理が可能ですが、直感的でないので、sync.WaitGroup
を使うことをおすすめします。
sync.WaitGroupでいい感じにする
package main import ( "fmt" "testing" "time" "sync" ) func main() { result := testing.Benchmark(func(b *testing.B) { run("A", "B", "C", "D", "E") }) fmt.Println(result.T) } func run(name ...string) { fmt.Println("Start!") // WaitGroupを作成する wg := new(sync.WaitGroup) // channelを処理の数分だけ作成する isFin := make(chan bool, len(name)) for _, v := range name { // 処理1つに対して、1つ数を増やす(この例の場合は5になる) wg.Add(1) // サブスレッドに処理を任せる go process(v, isFin, wg) } // wg.Doneが5回通るまでブロックし続ける wg.Wait() close(isFin) fmt.Println("Finish!") } func process(name string, isFin chan bool, wg *sync.WaitGroup) { // wgの数を1つ減らす(この関数が終了した時) defer wg.Done() time.Sleep(2 * time.Second) fmt.Println(name) isFin <- true }
$ go run exp5/main.go Start! B D E A C Finish! 2.005301726s
sync.WaitGroup
は、処理は走る回数分だけ、wg.Add(int)
します。基本的には、1処理に対してwg.Add(1)
で問題ないと思います。そして、その処理が終わったら、wg.Done
で終了を知らせます。
最後の方に書いているwg.Wait()
はwg.Add(int)
の数分だけのwg.Done()
が通るまで、処理をブロックし続けます。そして、決められた回数処理が終わるとブロックを終了します。
まとめ
以上で簡単な並列処理なら問題なく実行出来ると思います。ただ、実際に使うとなると、例えば並列処理の数は5つまでにしたいとか、ある処理が終わるまでスタートしてほしくないとか、色々あると思います。次回はそこらへんもやってみたいと思います。