Go 并发编程教程
📖 关于本教程本教程系统讲解 Go 并发编程的核心知识:goroutine、channel、sync 原语、并发模式、MPG 模型、context 等,从基础到实战,配合大量代码示例与注释。
1. 启动协程
go
package main
import (
"fmt"
"time"
)
// ==================== goroutine 基础 ====================
// goroutine 是 Go 的轻量级线程(协程)
// 一个 goroutine 初始栈只有 2KB(线程通常 1~8MB)
// Go 程序启动时,main 函数本身就运行在一个 goroutine 中
func sayHello(name string) {
for i := 0; i < 3; i++ {
fmt.Printf("[%s] 第 %d 次打招呼\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
// ==================== 用 go 关键字启动协程 ====================
go sayHello("Alice") // 启动一个新协程,非阻塞,立即返回
go sayHello("Bob") // 再启动一个
// 匿名函数协程
go func() {
fmt.Println("匿名协程运行中")
}()
// 带参数的匿名函数(注意闭包陷阱)
for i := 0; i < 3; i++ {
// ❌ 错误:闭包捕获循环变量 i 的引用
// go func() { fmt.Println(i) }() // 可能全部输出 3
// ✅ 正确:通过参数传值
go func(n int) {
fmt.Println("协程参数:", n)
}(i)
}
// ⚠️ main 退出后,所有协程立即被杀死!
// 这里用 Sleep 等待只是演示,实际项目用 WaitGroup/channel
time.Sleep(1 * time.Second)
fmt.Println("main 退出")
}
// 启动大量协程(Go 可以轻松启动数十万个协程)
func manyGoroutines() {
for i := 0; i < 100000; i++ {
go func(id int) {
// 每个协程做少量工作
_ = id * id
}(i)
}
}2. 协程的生命周期与 WaitGroup
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// ==================== sync.WaitGroup ====================
// WaitGroup 用于等待一组协程全部完成
// 三个方法:Add(n)、Done()、Wait()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 计数器 +1(在启动协程之前调用)
go func(id int) {
defer wg.Done() // 计数器 -1(协程结束时调用)
fmt.Printf("协程 %d 开始\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("协程 %d 结束\n", id)
}(i)
}
wg.Wait() // 阻塞,直到计数器归零
fmt.Println("所有协程已完成")
}❌ WaitGroup 常见错误
go
// 错误1:在协程内部调用 Add(可能 Wait 先于 Add 执行)
go func() {
wg.Add(1) // ❌ 太晚了!main 可能已经 Wait 返回
defer wg.Done()
}()
// 错误2:Add 和启动协程数量不匹配
wg.Add(10)
for i := 0; i < 5; i++ { // 只启动了 5 个
go func() { defer wg.Done() }()
}
wg.Wait() // 永远阻塞!计数器不会归零
// 错误3:Done 调用次数超过 Add
wg.Add(1)
wg.Done()
wg.Done() // panic: negative WaitGroup counter
// 错误4:复制 WaitGroup(值类型,复制后计数器独立)
wg2 := wg // ❌ 不能复制!go
// ==================== 协程的生命周期 ====================
//
// 1. 创建:go func() 启动,进入可运行队列
// 2. 运行:被调度器分配到某个 OS 线程上执行
// 3. 阻塞:遇到 channel、锁、IO、Sleep 等进入等待状态
// 4. 唤醒:阻塞条件满足,重新进入可运行队列
// 5. 结束:函数返回 / panic / main 退出
//
// ⚠️ goroutine 没有 ID,没有 Kill 方法
// 要停止一个协程,只能通过 channel 或 context 通知它自己退出3. error group
go
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
// ==================== errgroup:WaitGroup + error 收集 ====================
// 需要安装:go get golang.org/x/sync
func fetchURL(url string) error {
time.Sleep(100 * time.Millisecond) // 模拟请求
if url == "bad://url" {
return fmt.Errorf("fetch %s: connection refused", url)
}
fmt.Printf("成功获取: %s\n", url)
return nil
}
func main() {
// ==================== 基本用法 ====================
var g errgroup.Group
urls := []string{
"https://go.dev",
"https://google.com",
"https://github.com",
}
for _, url := range urls {
url := url // Go 1.22 之前需要拷贝
g.Go(func() error {
return fetchURL(url)
})
}
// Wait 等待所有协程完成,返回第一个非 nil 的 error
if err := g.Wait(); err != nil {
fmt.Println("发生错误:", err)
} else {
fmt.Println("全部成功")
}
// ==================== 带 context(一个失败,取消其他)====================
fmt.Println("\n=== 带 context ===")
g2, ctx := errgroup.WithContext(context.Background())
tasks := []string{"task1", "task2", "bad://url", "task4"}
for _, task := range tasks {
task := task
g2.Go(func() error {
// 检查 context 是否已取消
select {
case <-ctx.Done():
fmt.Printf("[%s] 已取消,跳过\n", task)
return ctx.Err()
default:
}
err := fetchURL(task)
if err != nil {
return err // 这会导致 ctx 被取消
}
return nil
})
}
if err := g2.Wait(); err != nil {
fmt.Println("错误:", err)
}
// ==================== 限制并发数 ====================
fmt.Println("\n=== 限制并发 ===")
g3 := new(errgroup.Group)
g3.SetLimit(3) // 最多 3 个协程同时运行
for i := 0; i < 10; i++ {
i := i
g3.Go(func() error {
fmt.Printf("任务 %d 开始(并发限制为 3)\n", i)
time.Sleep(200 * time.Millisecond)
return nil
})
}
g3.Wait()
// ==================== 收集多个错误 ====================
fmt.Println("\n=== 收集所有错误 ===")
var allErrors []error
var mu sync.Mutex
var g4 errgroup.Group
for _, url := range []string{"good", "bad://url", "bad://url2"} {
url := url
g4.Go(func() error {
err := fetchURL(url)
if err != nil {
mu.Lock()
allErrors = append(allErrors, err)
mu.Unlock()
}
return err
})
}
g4.Wait()
if len(allErrors) > 0 {
combined := errors.Join(allErrors...)
fmt.Println("所有错误:", combined)
}
}4. 并发安全性与原子操作
4.1 竞态条件
go
package main
import (
"fmt"
"sync"
)
func main() {
// ==================== 竞态条件演示 ====================
// 多个协程同时读写同一个变量,结果不可预测
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // ❌ 竞态!counter++ 不是原子操作
// 实际是:读取 → 加 1 → 写回(三步,可能被打断)
}()
}
wg.Wait()
fmt.Println("counter:", counter) // 结果 < 1000(每次运行可能不同)
}
// 用 go run -race 检测竞态:
// go run -race main.go
// 输出: WARNING: DATA RACE4.2 互斥锁 sync.Mutex
go
package main
import (
"fmt"
"sync"
)
func main() {
counter := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // 加锁:同一时间只有一个协程能进入
counter++
mu.Unlock() // 解锁:让其他协程可以进入
}()
}
wg.Wait()
fmt.Println("counter:", counter) // 准确得到 1000
// ==================== defer Unlock(推荐)====================
// 防止忘记解锁或 panic 导致死锁
mu.Lock()
defer mu.Unlock()
// ... 临界区代码 ...
}4.3 原子操作 sync/atomic
go
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
// ==================== atomic 基本操作 ====================
// 原子操作在底层由 CPU 指令保证,比锁更轻量
var counter int64 = 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加 1
}()
}
wg.Wait()
fmt.Println("atomic counter:", atomic.LoadInt64(&counter)) // 1000
// ==================== 常用原子操作 ====================
var val int64 = 10
atomic.StoreInt64(&val, 42) // 原子写入
loaded := atomic.LoadInt64(&val) // 原子读取
old := atomic.SwapInt64(&val, 100) // 原子交换,返回旧值
swapped := atomic.CompareAndSwapInt64(&val, 100, 200) // CAS 操作
fmt.Println(loaded, old, swapped, val)
// ==================== atomic.Value(存储任意类型)====================
var config atomic.Value
type Config struct {
Server string
Port int
}
// 原子存储
config.Store(Config{Server: "localhost", Port: 8080})
// 原子读取
cfg := config.Load().(Config)
fmt.Printf("Config: %+v\n", cfg)
// 适用场景:配置热更新(一个协程写,多个协程读)
}
// ==================== Mutex vs Atomic 选择 ====================
//
// atomic:
// ✅ 简单的计数器、标志位
// ✅ 单个变量的读写
// ✅ 性能更好(无锁)
//
// Mutex:
// ✅ 保护多个变量
// ✅ 保护复杂的临界区(多步操作)
// ✅ 读写需要一致性的场景5. 读写锁
go
package main
import (
"fmt"
"sync"
"time"
)
// ==================== sync.RWMutex ====================
// 读写锁:多读单写
// - 多个 goroutine 可以同时持有读锁(RLock)
// - 写锁(Lock)是独占的,持有时不允许任何读或写
// - 适用于读多写少的场景(如缓存、配置)
// 线程安全的缓存
type SafeCache struct {
mu sync.RWMutex
data map[string]string
}
func NewSafeCache() *SafeCache {
return &SafeCache{data: make(map[string]string)}
}
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock() // 加读锁(允许多个 goroutine 同时读)
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func (c *SafeCache) Set(key, value string) {
c.mu.Lock() // 加写锁(独占,阻塞所有读和写)
defer c.mu.Unlock()
c.data[key] = value
}
func (c *SafeCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
}
// 只读的批量操作
func (c *SafeCache) Keys() []string {
c.mu.RLock()
defer c.mu.RUnlock()
keys := make([]string, 0, len(c.data))
for k := range c.data {
keys = append(keys, k)
}
return keys
}
func main() {
cache := NewSafeCache()
var wg sync.WaitGroup
// 启动 10 个写协程
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key_%d", id)
cache.Set(key, fmt.Sprintf("value_%d", id))
time.Sleep(10 * time.Millisecond)
}(i)
}
// 启动 100 个读协程
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key_%d", id%10)
if val, ok := cache.Get(key); ok {
_ = val // 使用值
}
}(i)
}
wg.Wait()
fmt.Println("缓存大小:", len(cache.Keys()))
}⚠️ 读写锁注意事项
- 不要在持有读锁时尝试获取写锁(同一协程内),会死锁
- 写多读少时 RWMutex 性能不如 Mutex(因为 RWMutex 内部开销更大)
- 不要递归加锁(Go 的锁不是可重入的)
6. 如何并行修改结构体、切片、map
6.1 并发修改结构体
go
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// ==================== 方案1:Mutex 保护整个结构体 ====================
type Stats struct {
mu sync.Mutex
TotalReqs int
FailedReqs int
TotalBytes int64
}
func (s *Stats) Record(bytes int64, failed bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.TotalReqs++
s.TotalBytes += bytes
if failed {
s.FailedReqs++
}
}
// ==================== 方案2:各字段独立用 atomic ====================
type AtomicStats struct {
TotalReqs atomic.Int64
FailedReqs atomic.Int64
TotalBytes atomic.Int64
}
func (s *AtomicStats) Record(bytes int64, failed bool) {
s.TotalReqs.Add(1)
s.TotalBytes.Add(bytes)
if failed {
s.FailedReqs.Add(1)
}
}
// ==================== 方案3:不同字段由不同协程负责(最安全)====================
// 如果结构体的不同字段分别由不同协程独占修改,不需要锁
type Processor struct {
InputCount int // 只有 inputWorker 修改
OutputCount int // 只有 outputWorker 修改
}
func main() {
// 方案1
stats := &Stats{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
stats.Record(1024, false)
}()
}
wg.Wait()
fmt.Printf("Mutex Stats: %+v\n", stats)
// 方案2
astats := &AtomicStats{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
astats.Record(1024, false)
}()
}
wg.Wait()
fmt.Printf("Atomic Stats: reqs=%d, bytes=%d\n",
astats.TotalReqs.Load(), astats.TotalBytes.Load())
}6.2 并发修改切片
go
package main
import (
"fmt"
"sync"
)
func main() {
// ==================== ❌ 切片不是并发安全的 ====================
// s := make([]int, 0)
// go func() { s = append(s, 1) }() // 竞态!
// go func() { s = append(s, 2) }() // 竞态!
// ==================== 方案1:Mutex ====================
var mu sync.Mutex
result := make([]int, 0, 1000)
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
mu.Lock()
result = append(result, n)
mu.Unlock()
}(i)
}
wg.Wait()
fmt.Println("方案1 长度:", len(result))
// ==================== 方案2:预分配索引(无需锁)====================
// 如果知道每个协程写哪个位置,可以避免锁
result2 := make([]int, 1000) // 预分配
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
result2[idx] = idx * idx // 每个协程写不同索引,无竞态
}(i)
}
wg.Wait()
fmt.Println("方案2 长度:", len(result2))
// ==================== 方案3:channel 收集结果 ====================
ch := make(chan int, 1000)
for i := 0; i < 1000; i++ {
go func(n int) {
ch <- n * n
}(i)
}
result3 := make([]int, 0, 1000)
for i := 0; i < 1000; i++ {
result3 = append(result3, <-ch)
}
fmt.Println("方案3 长度:", len(result3))
}6.3 并发修改 map
go
package main
import (
"fmt"
"sync"
)
func main() {
// ==================== ❌ Go 的 map 不是并发安全的 ====================
// 并发读写 map 会导致 panic: concurrent map read and map write
// m := map[string]int{}
// go func() { m["a"] = 1 }()
// go func() { _ = m["a"] }()
// ==================== 方案1:sync.Map(标准库)====================
var sm sync.Map
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
sm.Store(fmt.Sprintf("key_%d", n), n)
}(i)
}
wg.Wait()
// 遍历
sm.Range(func(key, value any) bool {
// fmt.Printf("%s: %v\n", key, value)
return true // 返回 false 停止遍历
})
// 其他操作
val, ok := sm.Load("key_42")
fmt.Println("Load:", val, ok)
sm.Delete("key_42")
actual, loaded := sm.LoadOrStore("key_new", 999)
fmt.Println("LoadOrStore:", actual, loaded)
// ==================== 方案2:Mutex + 普通 map(更常用)====================
// sync.Map 适用于:key 稳定的读多写少场景
// 大多数场景还是 Mutex + map 更简单可控
type SafeMap struct {
mu sync.RWMutex
m map[string]int
}
sm2 := &SafeMap{m: make(map[string]int)}
// 写
sm2.mu.Lock()
sm2.m["key"] = 42
sm2.mu.Unlock()
// 读
sm2.mu.RLock()
v := sm2.m["key"]
sm2.mu.RUnlock()
fmt.Println("value:", v)
}
// ==================== sync.Map vs Mutex+map 选择 ====================
//
// sync.Map 适用于:
// - key 集合基本稳定(很少有新 key 写入)
// - 读远多于写
// - 多个协程读写不相交的 key
//
// Mutex + map 适用于:
// - 大多数场景(更简单、更可控)
// - 需要批量操作(遍历 + 修改)
// - 写操作频繁7. recover 与协程
❌ 协程中的 panic 不会被外部协程的 recover 捕获!每个协程必须自己处理 panic。
go
package main
import (
"fmt"
"runtime/debug"
"sync"
)
// ==================== 问题:协程 panic 会杀死整个进程 ====================
func main() {
// ❌ 这个 recover 捕获不了子协程的 panic
// defer func() {
// if r := recover(); r != nil {
// fmt.Println("主协程 recover:", r) // 不会执行!
// }
// }()
//
// go func() {
// panic("子协程崩溃") // 直接杀死整个进程
// }()
// ==================== 正确做法:每个协程自己 recover ====================
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("协程 %d panic 已恢复: %v\n", id, r)
fmt.Println(string(debug.Stack()))
}
}()
// 模拟其中一个协程 panic
if id == 3 {
panic("协程 3 出错了")
}
fmt.Printf("协程 %d 正常完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有协程处理完毕,程序继续运行")
}go
// ==================== 通用安全启动函数 ====================
// 实际项目中,封装一个安全的 go 启动器
func safeGo(fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
// 实际项目中:记录日志、上报监控
fmt.Printf("[PANIC] %v\n%s\n", r, debug.Stack())
}
}()
fn()
}()
}
// 带 WaitGroup 的安全启动
func safeGoWg(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("[PANIC] %v\n", r)
}
}()
fn()
}()
}
// 使用
func example() {
var wg sync.WaitGroup
safeGoWg(&wg, func() {
// 即使 panic 也不会影响其他协程
panic("something went wrong")
})
safeGoWg(&wg, func() {
fmt.Println("正常运行")
})
wg.Wait()
}8. channel 的阻塞与遍历
8.1 channel 的阻塞规则
go
package main
import "fmt"
func main() {
// ==================== 无缓冲 channel:同步通信 ====================
// 发送和接收必须同时就绪,否则阻塞
ch := make(chan int)
go func() {
ch <- 42 // 阻塞,直到有人接收
fmt.Println("发送完成")
}()
val := <-ch // 阻塞,直到有人发送
fmt.Println("收到:", val)
// ==================== 有缓冲 channel:异步通信 ====================
// 缓冲区未满时发送不阻塞,缓冲区为空时接收阻塞
bufCh := make(chan int, 3)
bufCh <- 1 // 不阻塞(缓冲区有空间)
bufCh <- 2 // 不阻塞
bufCh <- 3 // 不阻塞
// bufCh <- 4 // 阻塞!缓冲区已满
fmt.Println(<-bufCh) // 1(不阻塞,缓冲区有数据)
fmt.Println(<-bufCh) // 2
fmt.Println(<-bufCh) // 3
// fmt.Println(<-bufCh) // 阻塞!缓冲区为空
}阻塞规则总结:
text
操作 无缓冲 channel 有缓冲 channel
──────────────────────────────────────────────────────────
发送 ch <- val 阻塞直到有接收者 缓冲区满时阻塞
接收 val := <-ch 阻塞直到有发送者 缓冲区空时阻塞
关闭后发送 panic panic
关闭后接收 返回零值, false 先读完缓冲区再返回零值
nil channel 发送 永远阻塞 永远阻塞
nil channel 接收 永远阻塞 永远阻塞8.2 channel 遍历
go
package main
import "fmt"
func main() {
ch := make(chan int, 5)
// 生产者
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch) // 关闭 channel,通知消费者没有更多数据了
}()
// ==================== 方式1:for range(推荐)====================
// 自动在 channel 关闭后退出循环
for val := range ch {
fmt.Println("收到:", val)
}
// 如果 channel 未关闭,for range 会永远阻塞!
// ==================== 方式2:for + ok 检查 ====================
ch2 := make(chan string, 3)
go func() {
ch2 <- "a"
ch2 <- "b"
close(ch2)
}()
for {
val, ok := <-ch2
if !ok {
fmt.Println("channel 已关闭")
break
}
fmt.Println("收到:", val)
}
// ==================== 关闭 channel 的规则 ====================
// 1. 只有发送方应该关闭 channel(接收方不要关闭)
// 2. 关闭已关闭的 channel 会 panic
// 3. 向已关闭的 channel 发送会 panic
// 4. 从已关闭的 channel 接收:
// - 缓冲区有数据:正常读取
// - 缓冲区为空:返回零值和 false
}9. 阻塞代码的 5 种方法及导致死锁的根本原因
9.1 五种阻塞方式
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// ==================== 方式1:channel 阻塞 ====================
done := make(chan struct{})
go func() {
time.Sleep(time.Second)
close(done) // 通知解除阻塞
}()
<-done // 阻塞直到 done 被关闭
fmt.Println("channel 解除阻塞")
// ==================== 方式2:WaitGroup ====================
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Second)
}()
wg.Wait() // 阻塞直到计数器归零
fmt.Println("WaitGroup 解除阻塞")
// ==================== 方式3:Mutex ====================
var mu sync.Mutex
mu.Lock()
go func() {
time.Sleep(time.Second)
mu.Unlock() // 解除阻塞
}()
mu.Lock() // 阻塞,因为已经被锁定
fmt.Println("Mutex 解除阻塞")
mu.Unlock()
// ==================== 方式4:time.Sleep ====================
time.Sleep(100 * time.Millisecond)
fmt.Println("Sleep 解除阻塞")
// ==================== 方式5:select ====================
ch := make(chan int)
go func() {
time.Sleep(time.Second)
ch <- 42
}()
select {
case val := <-ch:
fmt.Println("select 收到:", val)
case <-time.After(2 * time.Second):
fmt.Println("select 超时")
}
}9.2 死锁的根本原因
❌ 死锁 = 所有 goroutine 都在阻塞,没有任何一个能继续执行 Go 运行时会检测到这种情况并 panic:fatal error: all goroutines are asleep - deadlock!
go
package main
func main() {
// ==================== 死锁案例1:无缓冲 channel 自己等自己 ====================
// ch := make(chan int)
// ch <- 1 // 阻塞:等待接收者,但没有其他协程
// <-ch // 永远不会执行
// ==================== 死锁案例2:互相等待 ====================
// ch1 := make(chan int)
// ch2 := make(chan int)
// go func() {
// val := <-ch1 // 等 ch1 的数据
// ch2 <- val // 永远到不了这里
// }()
// val := <-ch2 // 等 ch2 的数据
// ch1 <- val // 永远到不了这里
// ==================== 死锁案例3:WaitGroup 不匹配 ====================
// var wg sync.WaitGroup
// wg.Add(1)
// wg.Wait() // 没有任何协程调用 Done,永远阻塞
// ==================== 死锁案例4:锁的互相等待 ====================
// var mu1, mu2 sync.Mutex
// go func() {
// mu1.Lock()
// time.Sleep(time.Millisecond)
// mu2.Lock() // 等待 mu2,但 mu2 被另一个协程持有
// }()
// go func() {
// mu2.Lock()
// time.Sleep(time.Millisecond)
// mu1.Lock() // 等待 mu1,但 mu1 被另一个协程持有
// }()
}
// ==================== 死锁的根本原因 ====================
//
// 形成死锁的四个必要条件(Coffman 条件):
// 1. 互斥:资源同一时间只能被一个协程持有
// 2. 持有并等待:持有资源的协程在等待另一个资源
// 3. 不可抢占:资源不能被强制夺走
// 4. 循环等待:A 等 B,B 等 A(或更长的链)
//
// 预防死锁的方法:
// 1. 统一加锁顺序(打破循环等待)
// 2. 尽量减小锁的粒度和持有时间
// 3. 使用超时机制(select + time.After)
// 4. 使用 channel 代替锁
// 5. 用 go vet 和 -race 检测10. 用 channel 实现广播和 CountDownLatch
10.1 广播(close 实现一对多通知)
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// ==================== 广播:close channel 通知所有等待者 ====================
// close 后,所有阻塞在 <-ch 的协程都会立即收到零值
// 这是 Go 中实现「一对多通知」的惯用方式
ready := make(chan struct{}) // 用空结构体,不传数据只传信号
var wg sync.WaitGroup
// 启动 5 个工人,都等待开始信号
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("工人 %d 就绪,等待开始...\n", id)
<-ready // 阻塞,等待广播信号
fmt.Printf("工人 %d 开始工作!\n", id)
}(i)
}
time.Sleep(500 * time.Millisecond)
fmt.Println("=== 发送广播 ===")
close(ready) // 一次 close,所有工人同时开始
wg.Wait()
fmt.Println("所有工人完成")
}10.2 CountDownLatch
go
package main
import (
"fmt"
"sync"
"time"
)
// CountDownLatch 等待 N 个事件全部发生后继续
// 类似 Java 的 CountDownLatch
type CountDownLatch struct {
count int
ch chan struct{}
once sync.Once
mu sync.Mutex
}
func NewCountDownLatch(count int) *CountDownLatch {
return &CountDownLatch{
count: count,
ch: make(chan struct{}),
}
}
// CountDown 事件完成,计数减 1
func (l *CountDownLatch) CountDown() {
l.mu.Lock()
l.count--
if l.count <= 0 {
l.once.Do(func() {
close(l.ch) // 计数归零,通知所有等待者
})
}
l.mu.Unlock()
}
// Wait 阻塞直到计数归零
func (l *CountDownLatch) Wait() {
<-l.ch
}
func main() {
// 等待 3 个服务全部就绪后启动
latch := NewCountDownLatch(3)
services := []string{"数据库", "缓存", "消息队列"}
for _, svc := range services {
go func(name string) {
// 模拟初始化
time.Sleep(time.Duration(100+len(name)*50) * time.Millisecond)
fmt.Printf("[%s] 初始化完成\n", name)
latch.CountDown()
}(svc)
}
fmt.Println("等待所有服务就绪...")
latch.Wait()
fmt.Println("✅ 所有服务就绪,开始接收请求")
}11. 招人嫌的 sync.Cond
go
package main
import (
"fmt"
"sync"
"time"
)
// ==================== sync.Cond 是什么 ====================
// Cond 是条件变量,用于协程间的条件等待
// 当条件不满足时阻塞,条件满足时被唤醒
//
// 为什么"招人嫌":
// 1. 容易误用(忘记加锁、虚假唤醒)
// 2. 大多数场景可以用 channel 更简洁地实现
// 3. 没有超时机制
// 4. 调试困难
// ==================== 生产者-消费者队列 ====================
type Queue struct {
items []int
mu sync.Mutex
cond *sync.Cond
max int
}
func NewQueue(maxSize int) *Queue {
q := &Queue{
items: make([]int, 0, maxSize),
max: maxSize,
}
q.cond = sync.NewCond(&q.mu) // Cond 绑定到 Mutex
return q
}
func (q *Queue) Put(item int) {
q.mu.Lock()
defer q.mu.Unlock()
// ⚠️ 必须用 for 循环判断条件,不能用 if
// 因为 Wait 返回后条件可能又不满足了(虚假唤醒)
for len(q.items) >= q.max {
q.cond.Wait() // 释放锁 → 等待唤醒 → 重新获取锁
}
q.items = append(q.items, item)
q.cond.Signal() // 唤醒一个等待的协程
}
func (q *Queue) Get() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 队列为空,等待
}
item := q.items[0]
q.items = q.items[1:]
q.cond.Signal() // 唤醒等待 Put 的协程
return item
}
func main() {
q := NewQueue(5)
var wg sync.WaitGroup
// 生产者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
q.Put(i)
fmt.Printf("放入: %d\n", i)
time.Sleep(50 * time.Millisecond)
}
}()
// 消费者
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
val := q.Get()
fmt.Printf("取出: %d\n", val)
time.Sleep(100 * time.Millisecond)
}
}()
wg.Wait()
// ==================== Cond 的三个方法 ====================
// Wait() 释放锁 → 阻塞等待 → 被唤醒后重新获取锁
// Signal() 唤醒一个等待的协程
// Broadcast() 唤醒所有等待的协程
// ==================== 大多数情况用 channel 更好 ====================
// 上面的队列可以直接用 buffered channel 实现:
ch := make(chan int, 5)
// 发送 → ch <- item
// 接收 → item := <-ch
// 简洁太多了!
_ = ch
}12. MPG 并发模型
text
Go 的调度器使用 MPG(也叫 GMP)模型:
M (Machine) = OS 线程
P (Processor) = 逻辑处理器(调度上下文)
G (Goroutine) = 协程
┌─────────────────────────────────────────────┐
│ Go 调度器 │
│ │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ M │ │ M │ │ M │ OS 线程 │
│ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │
│ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ │
│ │ P │ │ P │ │ P │ 逻辑处理器 │
│ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │
│ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ │
│ │ LRQ │ │ LRQ │ │ LRQ │ 本地运行队列 │
│ │G G G│ │G G G│ │G G │ │
│ └─────┘ └─────┘ └─────┘ │
│ │
│ ┌────────────────────────────┐ │
│ │ GRQ (全局运行队列) │ │
│ │ G G G G G │ │
│ └────────────────────────────┘ │
└─────────────────────────────────────────────┘
P 的数量 = GOMAXPROCS(默认等于 CPU 核数)
调度流程:
1. 新创建的 G 放入当前 P 的本地队列(LRQ)
2. P 从自己的 LRQ 取 G 执行
3. LRQ 为空时,从全局队列(GRQ)偷取
4. GRQ 也为空时,从其他 P 的 LRQ 偷取(work stealing)
5. G 阻塞(channel/锁/系统调用)时,M 放弃 P,P 绑定新的 M
关键设计:
- 抢占式调度(Go 1.14+):长时间运行的 G 会被强制让出
- 每个 P 有本地队列,减少全局锁竞争
- Work Stealing 保证负载均衡
- 系统调用时 M 和 P 解绑,不阻塞其他 Ggo
package main
import (
"fmt"
"runtime"
)
func main() {
// 查看和设置 P 的数量
fmt.Println("CPU 核数:", runtime.NumCPU())
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0)) // 0 = 只查询不设置
// 设置为 1:所有协程在一个线程上交替执行(并发不并行)
// runtime.GOMAXPROCS(1)
// 查看当前协程数量
fmt.Println("协程数:", runtime.NumGoroutine())
// 主动让出当前协程的执行权
runtime.Gosched() // 类似 yield,当前协程重新排队
}13. 协程与线程对比
text
特性 goroutine OS 线程
────────────────────────────────────────────────────
初始栈大小 2 KB(可动态增长) 1~8 MB(固定)
创建开销 ~0.3 μs ~10 μs
切换开销 ~100 ns ~1000 ns
数量上限 轻松百万级 通常数千
调度方式 Go 运行时调度 OS 内核调度
通信方式 channel(CSP 模型) 共享内存 + 锁
阻塞影响 不阻塞 OS 线程 阻塞 OS 线程
创建方式 go func() pthread_create 等
ID 无 有 tid
取消/终止 只能通过信号自行退出 可以强制 kill
为什么 goroutine 这么轻量?
1. 用户态调度,不需要陷入内核
2. 栈可以动态伸缩(2KB → 最大 1GB)
3. 复用少量 OS 线程(M:N 模型)
4. 阻塞时不占用 OS 线程(和 P 解绑)14. 用 channel 并行处理海量文件
go
package main
import (
"crypto/md5"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
)
// FileResult 存放处理结果
type FileResult struct {
Path string
Hash string
Size int64
Err error
}
// ParallelFileProcessor 并行文件处理器
func ParallelFileProcessor(root string, workers int) []FileResult {
// 阶段1:扫描文件路径(单个协程)
paths := make(chan string, 100)
go func() {
defer close(paths)
filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
return nil
}
paths <- path
})
}()
// 阶段2:并行处理(多个 worker 协程)
results := make(chan FileResult, 100)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for path := range paths { // 从 channel 取任务
result := processFile(path)
results <- result
}
}(i)
}
// 关闭 results(所有 worker 完成后)
go func() {
wg.Wait()
close(results)
}()
// 阶段3:收集结果
var allResults []FileResult
for r := range results {
allResults = append(allResults, r)
}
return allResults
}
func processFile(path string) FileResult {
f, err := os.Open(path)
if err != nil {
return FileResult{Path: path, Err: err}
}
defer f.Close()
info, _ := f.Stat()
h := md5.New()
io.Copy(h, f)
return FileResult{
Path: path,
Hash: fmt.Sprintf("%x", h.Sum(nil)),
Size: info.Size(),
}
}
func main() {
start := time.Now()
results := ParallelFileProcessor(".", 8) // 8 个 worker
var totalSize int64
var errorCount int
for _, r := range results {
if r.Err != nil {
errorCount++
continue
}
totalSize += r.Size
}
fmt.Printf("处理了 %d 个文件,总大小 %d 字节\n", len(results), totalSize)
fmt.Printf("错误: %d,耗时: %s\n", errorCount, time.Since(start))
}15. 用 channel 限制接口的并发请求量
go
package main
import (
"fmt"
"sync"
"time"
)
// ==================== 信号量模式:buffered channel 限流 ====================
// RateLimiter 基于 channel 的并发限制器
type RateLimiter struct {
sem chan struct{} // 信号量
}
func NewRateLimiter(maxConcurrent int) *RateLimiter {
return &RateLimiter{
sem: make(chan struct{}, maxConcurrent),
}
}
// Acquire 获取许可(满了就阻塞)
func (r *RateLimiter) Acquire() {
r.sem <- struct{}{} // 缓冲区满时阻塞
}
// Release 释放许可
func (r *RateLimiter) Release() {
<-r.sem
}
// 模拟 API 处理器
func handleRequest(id int) {
fmt.Printf("[%s] 请求 %d 开始处理\n",
time.Now().Format("15:04:05.000"), id)
time.Sleep(500 * time.Millisecond) // 模拟耗时操作
fmt.Printf("[%s] 请求 %d 处理完成\n",
time.Now().Format("15:04:05.000"), id)
}
func main() {
limiter := NewRateLimiter(3) // 最多同时处理 3 个请求
var wg sync.WaitGroup
// 模拟 20 个并发请求
for i := 0; i < 20; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
limiter.Acquire() // 获取许可
defer limiter.Release() // 完成后释放
handleRequest(id)
}(i)
}
wg.Wait()
fmt.Println("所有请求处理完毕")
}16. 用 channel 限制协程的总数
go
package main
import (
"fmt"
"sync"
"time"
)
// ==================== Worker Pool 模式 ====================
// 固定数量的 worker 消费任务,无论任务多少,协程数恒定
func workerPool() {
const numWorkers = 5
const numTasks = 50
tasks := make(chan int, numTasks)
var wg sync.WaitGroup
// 启动固定数量的 worker
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks { // 从 channel 取任务,关闭后退出
fmt.Printf("Worker %d 处理任务 %d\n", workerID, task)
time.Sleep(100 * time.Millisecond)
}
}(w)
}
// 提交任务
for i := 0; i < numTasks; i++ {
tasks <- i
}
close(tasks) // 所有任务已提交,关闭 channel
wg.Wait()
fmt.Println("所有任务完成")
}
// ==================== 带结果收集的 Worker Pool ====================
type Task struct {
ID int
Input string
}
type Result struct {
TaskID int
Output string
Err error
}
func workerPoolWithResults() {
const numWorkers = 3
tasks := make(chan Task, 100)
results := make(chan Result, 100)
var wg sync.WaitGroup
// 启动 worker
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
// 处理任务
output := fmt.Sprintf("处理了 %s", task.Input)
results <- Result{TaskID: task.ID, Output: output}
}
}()
}
// 提交任务
go func() {
for i := 0; i < 10; i++ {
tasks <- Task{ID: i, Input: fmt.Sprintf("数据_%d", i)}
}
close(tasks)
}()
// 等待所有 worker 完成后关闭 results
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for r := range results {
fmt.Printf("任务 %d: %s\n", r.TaskID, r.Output)
}
}
func main() {
fmt.Println("=== Worker Pool ===")
workerPool()
fmt.Println("\n=== Worker Pool With Results ===")
workerPoolWithResults()
}17. select 多路监听
go
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
// ==================== 基本 select ====================
// select 同时监听多个 channel,哪个就绪就执行哪个
// 多个同时就绪时随机选一个
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 ch2"
}()
select {
case msg := <-ch1:
fmt.Println("收到:", msg)
case msg := <-ch2:
fmt.Println("收到:", msg)
}
// ==================== 超时控制 ====================
ch3 := make(chan int)
go func() {
time.Sleep(2 * time.Second)
ch3 <- 42
}()
select {
case val := <-ch3:
fmt.Println("收到:", val)
case <-time.After(1 * time.Second):
fmt.Println("超时!") // 1 秒内没收到就超时
}
// ==================== 非阻塞操作(default)====================
ch4 := make(chan int, 1)
// 非阻塞发送
select {
case ch4 <- 42:
fmt.Println("发送成功")
default:
fmt.Println("channel 已满,跳过")
}
// 非阻塞接收
select {
case val := <-ch4:
fmt.Println("接收:", val)
default:
fmt.Println("channel 为空,跳过")
}
// ==================== 多路合并(Fan-In)====================
fmt.Println("\n=== Fan-In ===")
merge := func(channels ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 创建三个数据源
src1 := make(chan string, 3)
src2 := make(chan string, 3)
src1 <- "A1"
src1 <- "A2"
close(src1)
src2 <- "B1"
src2 <- "B2"
close(src2)
merged := merge(src1, src2)
for val := range merged {
fmt.Println("合并:", val)
}
// ==================== 心跳检测 ====================
fmt.Println("\n=== 心跳 ===")
heartbeat := time.NewTicker(300 * time.Millisecond)
defer heartbeat.Stop()
work := make(chan int)
go func() {
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
work <- i
}
close(work)
}()
for {
select {
case val, ok := <-work:
if !ok {
fmt.Println("工作完成")
return
}
fmt.Println("处理:", val)
case <-heartbeat.C:
fmt.Println("💓 心跳")
}
}
}18. 不使用 once 的单例模式
go
package main
import (
"fmt"
"sync"
)
// ==================== 方式1:sync.Once(标准方式)====================
type Database struct {
DSN string
}
var (
dbInstance *Database
dbOnce sync.Once
)
func GetDB() *Database {
dbOnce.Do(func() {
fmt.Println("初始化数据库连接(只执行一次)")
dbInstance = &Database{DSN: "localhost:3306"}
})
return dbInstance
}
// ==================== 方式2:init 函数(最简单)====================
// 缺点:不能延迟初始化,程序启动就执行
var configInstance *Config
type Config struct {
AppName string
}
func init() {
configInstance = &Config{AppName: "MyApp"}
}
func GetConfig() *Config {
return configInstance
}
// ==================== 方式3:包级变量(编译期单例)====================
// 适用于不需要复杂初始化的场景
var defaultLogger = &Logger{Level: "INFO"}
type Logger struct {
Level string
}
func GetLogger() *Logger {
return defaultLogger
}
// ==================== 方式4:Mutex + 双重检查(不推荐,用 Once 更好)====================
var (
cacheInstance *Cache
cacheMu sync.Mutex
)
type Cache struct {
Data map[string]string
}
func GetCache() *Cache {
if cacheInstance != nil { // 第一次检查(无锁,快速路径)
return cacheInstance
}
cacheMu.Lock()
defer cacheMu.Unlock()
if cacheInstance != nil { // 第二次检查(持锁,防止重复初始化)
return cacheInstance
}
fmt.Println("初始化缓存(只执行一次)")
cacheInstance = &Cache{Data: make(map[string]string)}
return cacheInstance
}
// ==================== 方式5:channel(不用锁也不用 Once)====================
type Service struct {
Name string
}
func newServiceSingleton() func() *Service {
ch := make(chan *Service, 1)
ch <- &Service{Name: "singleton-service"} // 预先放入一个实例
// 第一次调用取出实例并缓存
var instance *Service
var once sync.Once // 这里还是依赖了 once,纯 channel 方案见下
return func() *Service {
once.Do(func() {
instance = <-ch
fmt.Println("Service 单例已创建")
})
return instance
}
}
func main() {
// sync.Once 方式
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDB()
_ = db // 所有协程拿到同一个实例
}()
}
wg.Wait()
fmt.Printf("DB 地址: %p\n", GetDB())
// Mutex 双重检查方式
for i := 0; i < 5; i++ {
go func() {
cache := GetCache()
_ = cache
}()
}
}💡 实际项目推荐 99% 的场景直接用 sync.Once,它简洁、安全、高效。其他方式了解即可。
19. context 用法详解
19.1 context 是什么
text
context 是 Go 并发编程的核心工具,用于:
1. 传递取消信号(超时、手动取消)
2. 传递请求级别的元数据(如 request_id、user_id)
3. 控制协程的生命周期
context 是树形结构:
Background
└── WithCancel
├── WithTimeout
│ └── WithValue
└── WithDeadline
规则:
- context 作为函数的第一个参数传递
- 不要存储在结构体中
- 不要传 nil,用 context.Background() 或 context.TODO()19.2 四种 context
go
package main
import (
"context"
"fmt"
"time"
)
func main() {
// ==================== context.Background ====================
// 根 context,不可取消,无超时,无值
// 通常在 main、init、测试中使用
ctx := context.Background()
_ = ctx
// ==================== context.WithCancel ====================
// 手动取消
ctx1, cancel1 := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WithCancel: 收到取消信号,退出")
return
default:
fmt.Println("WithCancel: 工作中...")
time.Sleep(200 * time.Millisecond)
}
}
}(ctx1)
time.Sleep(600 * time.Millisecond)
cancel1() // 发送取消信号
time.Sleep(100 * time.Millisecond)
// ==================== context.WithTimeout ====================
// 超时自动取消
ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2() // 即使超时也要调用 cancel 释放资源
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println("WithTimeout:", ctx.Err()) // context deadline exceeded
case <-time.After(1 * time.Second):
fmt.Println("WithTimeout: 完成工作")
}
}(ctx2)
time.Sleep(700 * time.Millisecond)
// ==================== context.WithDeadline ====================
// 在指定时间点取消
deadline := time.Now().Add(300 * time.Millisecond)
ctx3, cancel3 := context.WithDeadline(context.Background(), deadline)
defer cancel3()
<-ctx3.Done()
fmt.Println("WithDeadline:", ctx3.Err())
// ==================== context.WithValue ====================
// 传递请求级别的数据
type ctxKey string
const requestIDKey ctxKey = "request_id"
const userIDKey ctxKey = "user_id"
ctx4 := context.WithValue(context.Background(), requestIDKey, "req-abc-123")
ctx4 = context.WithValue(ctx4, userIDKey, 42)
// 取值
reqID := ctx4.Value(requestIDKey).(string)
userID := ctx4.Value(userIDKey).(int)
fmt.Printf("request_id=%s, user_id=%d\n", reqID, userID)
}19.3 context 实战
go
package main
import (
"context"
"fmt"
"sync"
"time"
)
// ==================== 实战1:HTTP 请求超时控制 ====================
func fetchData(ctx context.Context, url string) (string, error) {
// 模拟网络请求
resultCh := make(chan string, 1)
go func() {
time.Sleep(300 * time.Millisecond) // 模拟耗时
resultCh <- "response data from " + url
}()
select {
case result := <-resultCh:
return result, nil
case <-ctx.Done():
return "", ctx.Err() // 超时或被取消
}
}
// ==================== 实战2:级联取消多个协程 ====================
func pipeline(ctx context.Context) {
var wg sync.WaitGroup
// 启动多个阶段的协程
stages := []string{"fetch", "process", "save"}
for _, stage := range stages {
wg.Add(1)
go func(name string) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-ctx.Done():
fmt.Printf("[%s] 收到取消信号,已处理 %d 个任务\n", name, i)
return
default:
time.Sleep(100 * time.Millisecond)
}
}
}(stage)
}
wg.Wait()
}
// ==================== 实战3:优雅关闭服务 ====================
func startServer(ctx context.Context) {
fmt.Println("服务启动")
<-ctx.Done()
fmt.Println("收到关闭信号,开始清理...")
// 给清理操作一个超时
cleanupCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// 模拟清理
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
fmt.Println("清理完成")
close(done)
}()
select {
case <-done:
fmt.Println("服务优雅关闭")
case <-cleanupCtx.Done():
fmt.Println("清理超时,强制关闭")
}
}
func main() {
// 实战1
ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel1()
result, err := fetchData(ctx1, "https://api.example.com")
if err != nil {
fmt.Println("请求失败:", err) // 200ms 超时,300ms 请求 → 超时
} else {
fmt.Println("请求成功:", result)
}
// 实战2
fmt.Println("\n=== 级联取消 ===")
ctx2, cancel2 := context.WithCancel(context.Background())
go pipeline(ctx2)
time.Sleep(500 * time.Millisecond)
cancel2() // 一个 cancel 取消所有阶段
time.Sleep(200 * time.Millisecond)
// 实战3
fmt.Println("\n=== 优雅关闭 ===")
ctx3, cancel3 := context.WithCancel(context.Background())
go startServer(ctx3)
time.Sleep(500 * time.Millisecond)
cancel3() // 模拟收到 SIGTERM
time.Sleep(2 * time.Second)
}19.4 context 最佳实践
⚠️ context 使用规范
- context 始终作为函数第一个参数,命名为
ctx - 不要把 context 存到结构体里
- 不要传 nil context,用
context.TODO()占位 - WithValue 只传请求级别数据,不要传可选参数
- WithValue 的 key 用自定义类型,避免冲突
- cancel 函数必须调用(即使超时也要 defer cancel)
附录:并发模式速查
text
场景 推荐模式
──────────────────────────────────────────────────
等待一组协程完成 sync.WaitGroup
等待 + 收集错误 errgroup.Group
保护共享变量 sync.Mutex / RWMutex
简单计数器/标志 sync/atomic
一对多通知 close(channel)
限制并发数 buffered channel 信号量
固定协程池 Worker Pool 模式
超时控制 context.WithTimeout
级联取消 context.WithCancel
多路监听 select
生产者-消费者 buffered channel
单例 sync.Once
配置热更新 atomic.Value