Contents
並行処理と並列処理の違い
概念整理
| 用語 | 定義 | 主な利用シーン |
|---|---|---|
| Concurrency(並行処理) | 複数タスクを 設計上 同時に進めること。スレッド数が足りなくても、実装は単一スレッドで動くことがある。 | I/O 待ちやユーザー入力の同時受付 |
| Parallelism(並列処理) | 複数タスクを 物理的に 同時に実行すること。CPU コアが複数あれば、OS スレッドがそれぞれ別コアで走る。 | CPU バウンドな計算・画像加工 |
Go のランタイムは M:N スケジューラ(G = goroutine, M = OS スレッド, P = 実行単位)を採用しており、並行 に書いたコードは実行時に自動で 並列 にスケジュールされます。
|
1 2 3 4 |
CPU: 4 コア goroutine A,B,C が同時に走る → 並行設計 OS スレッドが 2 本だけの場合、A と B は同時に実行され C は待機 → 実際の並列度は 2 |
Qiita 記事「【Go 入門】 Goで並列処理を体感してみよう」でも同様の図解が掲載されています。
https://qiita.com/manabito76/items/448b77e2fc557016eadf
実務上のポイント
- タスク分割は 並行 設計で考える(「何を同時にやるか」)。
- CPU 数に応じた
runtime.GOMAXPROCSの設定だけで、ランタイムが最適な 並列 実行へ切り替わります。
goroutine の基礎
1. goroutine の作成
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package main import ( "fmt" "time" ) func hello(id int) { fmt.Printf("goroutine %d start\n", id) time.Sleep(500 * time.Millisecond) fmt.Printf("goroutine %d end\n", id) } func main() { for i := 1; i <= 5; i++ { go hello(i) // ← ここで goroutine を生成 } // メインがすぐに終了しないように待機(テスト用) time.Sleep(2 * time.Second) } |
goキーワードひとつで 非同期 に実行でき、スタックサイズやスケジューリングはランタイムが自動管理します。
2. スタックサイズの自動拡張
| 項目 | 内容 |
|---|---|
| 初期サイズ | 約 2 KB(Go 1.22 では 2 KB) |
| 最大上限 | 1 GB(公式ドキュメント: https://go.dev/ref/spec#Size_and_alignment_of_variables に記載) |
| 拡張方式 | 必要に応じて 倍増 し、不要になれば縮小される(「スプリット」アルゴリズム) |
|
1 2 3 4 5 6 7 8 9 10 |
func deepRecursion(n int) { if n == 0 { return } fmt.Println("depth:", n) deepRecursion(n - 1) } // 実行例: deepRecursion(10000) を goroutine 内で呼び出すと自動的に拡張される |
ポイント: 開発者がスタックサイズを意識する必要はほぼありません。メモリ使用量が気になる場合は
runtime.Stackで実行時のスタック情報を取得できます。
channel と select の活用法
1. バッファ有無(unbuffered vs buffered)
|
1 2 3 4 5 6 7 8 9 |
// unbuffered (バッファなし) ch := make(chan int) // サイズ 0 go func() { ch <- 42 // 受信側が待つまでブロック }() fmt.Println(<-ch) // ← ここで受信し、送信が完了 |
|
1 2 3 4 5 6 7 8 |
// buffered (バッファあり) bufCh := make(chan string, 3) // バッファサイズ 3 bufCh <- "a" bufCh <- "b" // まだ受信しなくてもブロックされない fmt.Println(<-bufCh) // a |
- unbuffered は「通信相手が揃うまで待つ」ことでレースコンディションを防止。
- buffered はスループット向上とバックプレッシャー制御に有効。
2. 方向限定チャネル
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func producer(out chan<- int) { for i := 0; i < 5; i++ { out <- i } close(out) } func consumer(in <-chan int) { for v := range in { fmt.Println(v) } } |
chan<-(送信専用)と<-chan(受信専用)は API の安全性 を高め、誤ったデータフローをコンパイル時に検出できます。
3. select 文でマルチチャネル待ち合わせ & タイムアウト
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
func fetch(ctx context.Context, url string) (string, error) { resultCh := make(chan string, 1) go func() { // 本来は http.Get 等の実装 time.Sleep(2 * time.Second) select { case <-ctx.Done(): return // キャンセルされたら何もしない default: resultCh <- "OK" } }() select { case <-ctx.Done(): return "", ctx.Err() case res := <-resultCh: return res, nil case <-time.After(1 * time.Second): return "", fmt.Errorf("timeout") } } |
selectは 複数の待ち と タイムアウト/キャンセル をシンプルに記述でき、デフォルトケース (default) の有無で「ノンブロッキング」かどうかを制御できます。
同期・キャンセル機構
1. sync.WaitGroup と sync.Mutex
|
1 2 3 4 5 6 7 8 9 10 11 12 |
var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("worker %d start\n", id) time.Sleep(time.Duration(id) * time.Second) fmt.Printf("worker %d done\n", id) }(i) } wg.Wait() // 全ワーカーが終了するまで待つ |
|
1 2 3 4 5 6 7 8 9 10 11 |
var ( mu sync.Mutex data = make(map[string]int) ) func inc(key string) { mu.Lock() defer mu.Unlock() data[key]++ } |
WaitGroupは タスク完了の集合的監視、Mutexは 共有リソースへの排他制御 を提供します。
2. context パッケージでキャンセルとデッドライン管理
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
func main() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() go func(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("goroutine stopped:", ctx.Err()) return default: time.Sleep(500 * time.Millisecond) fmt.Print(".") } } }(ctx) <-ctx.Done() // メインはタイムアウトまで待機 } |
context.WithCancel,WithTimeout,WithDeadlineは 階層的にキャンセル情報を伝搬 し、長時間走る goroutine のリーク防止に必須です。
代表的な並行パターン
1. Worker Pool(固定数ワーカー)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("worker %d processing job %d\n", id, j) time.Sleep(time.Second) // 擬似処理 results <- j * 2 } } func main() { const numWorkers = 3 jobs := make(chan int, 10) results := make(chan int, 10) for w := 1; w <= numWorkers; w++ { go worker(w, jobs, results) } for j := 1; j <= 5; j++ { jobs <- j } close(jobs) for i := 0; i < 5; i++ { fmt.Println("result:", <-results) } } |
- 利点: 同時実行数を一定に保ち、CPU コア数と合わせてリソース消費の上限が予測しやすい。
2. Fan‑out / Fan‑in(分散 & 集約)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
func fetch(id int) string { time.Sleep(time.Duration(id) * 100 * time.Millisecond) return fmt.Sprintf("data-%d", id) } func main() { n := 4 out := make(chan string, n) for i := 0; i < n; i++ { go func(i int) { out <- fetch(i) }(i) } for i := 0; i < n; i++ { fmt.Println(<-out) // fan‑in } } |
- fan‑out: 複数の goroutine にタスクを分散させ、I/O 待ち時間を相殺。
- fan‑in: 結果を 1 本のチャネルに集約し、呼び出し側は単一受信で完結。
3. Pipeline(ステージング)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func square(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func main() { nums := gen(1, 2, 3, 4) squares := square(nums) for v := range squares { fmt.Println(v) // 1, 4, 9, 16 } } |
- 特徴: 各ステージが独立して動作できるため、保守性とスループットが向上。
- 実装ポイント: ステージ間は必ずチャネルで接続し、
closeによって下流へ終了シグナルを伝搬させる。
Go 1.22 の新機能と実務ベストプラクティス
1. errgroup パッケージの現状(誤解の訂正)
Go 1.22 でも golang.org/x/sync/errgroup の API は変更されていません。
- 従来どおり errgroup.WithContext(ctx) が必要です。
- Group.Go のシグネチャは依然として func() error(context.Context を受け取らない)です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import ( "context" "net/http" "golang.org/x/sync/errgroup" ) func main() { g, ctx := errgroup.WithContext(context.Background()) urls := []string{"a.com", "b.com", "c.com"} for _, u := range urls { u := u // ループ変数のスコープ対策 g.Go(func() error { // ← 引数はなし req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://"+u, nil) if err != nil { return err } resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() // ...処理... return nil }) } if err := g.Wait(); err != nil { fmt.Println("failed:", err) } } |
errgroup.WithContextが返す 子コンテキスト (ctx) は、どの goroutine でもキャンセルやタイムアウトを共有できる点が便利です。- 今後のバージョンでシグネチャ変更があればリリースノートに明示されますので、常に公式ドキュメント(https://pkg.go.dev/golang.org/x/sync/errgroup)を参照してください。
2. デッドロック・リーク回避チェックリスト
| 項目 | 確認ポイント | 推奨ツール |
|---|---|---|
| Mutex の取得順序 | 複数の Lock が同一順序で呼ばれているか |
静的解析 (go vet -run=atomic) |
| channel の対称性 | 送信側と受信側が必ず対になるか(バッファサイズ・クローズ) | staticcheck |
| goroutine の終了パス | 全ての goroutine が ctx.Done() または wg.Done() に到達するか |
カスタムテスト (TestLeak) |
| タイムアウト設定 | 長時間ブロックしうる操作に context.WithTimeout を使用しているか |
コードレビュー |
| errgroup/WaitGroup のカウント漏れ | Add と Done が 1:1 になるか |
-race オプションで実行 |
3. 実務向けサンプルコードの取得
- GitHub リポジトリ(Go 1.22 対応)
https://github.com/manabito76/go-concurrency-samples
リポジトリには以下が同梱されています。
| ディレクトリ | 内容 |
|---|---|
example/workerpool |
Worker Pool の実装とベンチマーク |
example/pipeline |
3 ステージ Pipeline とテスト |
example/fanoutin |
Fan‑out / Fan‑in パターン |
example/errgroup |
正しい errgroup.WithContext 使用例 |
|
1 2 3 4 |
git clone https://github.com/manabito76/go-concurrency-samples.git cd go-concurrency-samples/example/workerpool go test -bench=. |
4. まとめのベストプラクティス
- 設計は並行性で考える → タスク分割・データフローを明確化。
- 実装は標準ライブラリだけで完結 →
goroutine,channel,select,sync,context,errgroupを組み合わせる。 - リソース制御はコンテキストと WaitGroup で一元管理 → キャンセル・タイムアウトが漏れないようにする。
- テストとベンチマークは必須 →
go test -race,-benchでデッドロックやリークを早期検出。 - 公式ドキュメントとリリースノートを定期的にチェック → API 変更(例: errgroup のシグネチャ)への対応が楽になる。
参考リンク
- Go 言語仕様 – スタックサイズ上限: https://go.dev/ref/spec#Size_and_alignment_of_variables
runtimeパッケージ – GOMAXPROCS: https://pkg.go.dev/runtime#GOMAXPROCSsync/errgroupパッケージ: https://pkg.go.dev/golang.org/x/sync/errgroupcontextパッケージ: https://pkg.go.dev/context
以上が Go 1.22 に合わせて改訂した並行処理入門と実務パターンです。ぜひローカル環境でサンプルを動かし、実際のプロジェクトへ応用してみてください。