Skip to content

Go 并发编程教程

📖 关于本教程本教程系统讲解 Go 并发编程的核心知识:goroutine、channel、sync 原语、并发模式、MPG 模型、context 等,从基础到实战,配合大量代码示例与注释。


1. 启动协程

go
package main

import (
    "fmt"
    "time"
)

// ==================== goroutine 基础 ====================
// goroutine 是 Go 的轻量级线程(协程)
// 一个 goroutine 初始栈只有 2KB(线程通常 1~8MB)
// Go 程序启动时,main 函数本身就运行在一个 goroutine 中

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("[%s] 第 %d 次打招呼\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // ==================== 用 go 关键字启动协程 ====================
    go sayHello("Alice") // 启动一个新协程,非阻塞,立即返回
    go sayHello("Bob")   // 再启动一个

    // 匿名函数协程
    go func() {
        fmt.Println("匿名协程运行中")
    }()

    // 带参数的匿名函数(注意闭包陷阱)
    for i := 0; i < 3; i++ {
        // ❌ 错误:闭包捕获循环变量 i 的引用
        // go func() { fmt.Println(i) }() // 可能全部输出 3

        // ✅ 正确:通过参数传值
        go func(n int) {
            fmt.Println("协程参数:", n)
        }(i)
    }

    // ⚠️ main 退出后,所有协程立即被杀死!
    // 这里用 Sleep 等待只是演示,实际项目用 WaitGroup/channel
    time.Sleep(1 * time.Second)
    fmt.Println("main 退出")
}

// 启动大量协程(Go 可以轻松启动数十万个协程)
func manyGoroutines() {
    for i := 0; i < 100000; i++ {
        go func(id int) {
            // 每个协程做少量工作
            _ = id * id
        }(i)
    }
}

2. 协程的生命周期与 WaitGroup

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // ==================== sync.WaitGroup ====================
    // WaitGroup 用于等待一组协程全部完成
    // 三个方法:Add(n)、Done()、Wait()

    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1) // 计数器 +1(在启动协程之前调用)

        go func(id int) {
            defer wg.Done() // 计数器 -1(协程结束时调用)

            fmt.Printf("协程 %d 开始\n", id)
            time.Sleep(time.Duration(id*100) * time.Millisecond)
            fmt.Printf("协程 %d 结束\n", id)
        }(i)
    }

    wg.Wait() // 阻塞,直到计数器归零
    fmt.Println("所有协程已完成")
}

❌ WaitGroup 常见错误

go
// 错误1:在协程内部调用 Add(可能 Wait 先于 Add 执行)
go func() {
    wg.Add(1) // ❌ 太晚了!main 可能已经 Wait 返回
    defer wg.Done()
}()

// 错误2:Add 和启动协程数量不匹配
wg.Add(10)
for i := 0; i < 5; i++ { // 只启动了 5 个
    go func() { defer wg.Done() }()
}
wg.Wait() // 永远阻塞!计数器不会归零

// 错误3:Done 调用次数超过 Add
wg.Add(1)
wg.Done()
wg.Done() // panic: negative WaitGroup counter

// 错误4:复制 WaitGroup(值类型,复制后计数器独立)
wg2 := wg // ❌ 不能复制!
go
// ==================== 协程的生命周期 ====================
//
// 1. 创建:go func() 启动,进入可运行队列
// 2. 运行:被调度器分配到某个 OS 线程上执行
// 3. 阻塞:遇到 channel、锁、IO、Sleep 等进入等待状态
// 4. 唤醒:阻塞条件满足,重新进入可运行队列
// 5. 结束:函数返回 / panic / main 退出
//
// ⚠️ goroutine 没有 ID,没有 Kill 方法
// 要停止一个协程,只能通过 channel 或 context 通知它自己退出

3. error group

go
package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

// ==================== errgroup:WaitGroup + error 收集 ====================
// 需要安装:go get golang.org/x/sync

func fetchURL(url string) error {
    time.Sleep(100 * time.Millisecond) // 模拟请求
    if url == "bad://url" {
        return fmt.Errorf("fetch %s: connection refused", url)
    }
    fmt.Printf("成功获取: %s\n", url)
    return nil
}

func main() {
    // ==================== 基本用法 ====================
    var g errgroup.Group

    urls := []string{
        "https://go.dev",
        "https://google.com",
        "https://github.com",
    }

    for _, url := range urls {
        url := url // Go 1.22 之前需要拷贝
        g.Go(func() error {
            return fetchURL(url)
        })
    }

    // Wait 等待所有协程完成,返回第一个非 nil 的 error
    if err := g.Wait(); err != nil {
        fmt.Println("发生错误:", err)
    } else {
        fmt.Println("全部成功")
    }

    // ==================== 带 context(一个失败,取消其他)====================
    fmt.Println("\n=== 带 context ===")
    g2, ctx := errgroup.WithContext(context.Background())

    tasks := []string{"task1", "task2", "bad://url", "task4"}

    for _, task := range tasks {
        task := task
        g2.Go(func() error {
            // 检查 context 是否已取消
            select {
            case <-ctx.Done():
                fmt.Printf("[%s] 已取消,跳过\n", task)
                return ctx.Err()
            default:
            }

            err := fetchURL(task)
            if err != nil {
                return err // 这会导致 ctx 被取消
            }
            return nil
        })
    }

    if err := g2.Wait(); err != nil {
        fmt.Println("错误:", err)
    }

    // ==================== 限制并发数 ====================
    fmt.Println("\n=== 限制并发 ===")
    g3 := new(errgroup.Group)
    g3.SetLimit(3) // 最多 3 个协程同时运行

    for i := 0; i < 10; i++ {
        i := i
        g3.Go(func() error {
            fmt.Printf("任务 %d 开始(并发限制为 3)\n", i)
            time.Sleep(200 * time.Millisecond)
            return nil
        })
    }
    g3.Wait()

    // ==================== 收集多个错误 ====================
    fmt.Println("\n=== 收集所有错误 ===")
    var allErrors []error
    var mu sync.Mutex
    var g4 errgroup.Group

    for _, url := range []string{"good", "bad://url", "bad://url2"} {
        url := url
        g4.Go(func() error {
            err := fetchURL(url)
            if err != nil {
                mu.Lock()
                allErrors = append(allErrors, err)
                mu.Unlock()
            }
            return err
        })
    }
    g4.Wait()

    if len(allErrors) > 0 {
        combined := errors.Join(allErrors...)
        fmt.Println("所有错误:", combined)
    }
}

4. 并发安全性与原子操作

4.1 竞态条件

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    // ==================== 竞态条件演示 ====================
    // 多个协程同时读写同一个变量,结果不可预测

    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // ❌ 竞态!counter++ 不是原子操作
            // 实际是:读取 → 加 1 → 写回(三步,可能被打断)
        }()
    }
    wg.Wait()
    fmt.Println("counter:", counter) // 结果 < 1000(每次运行可能不同)
}

// 用 go run -race 检测竞态:
// go run -race main.go
// 输出: WARNING: DATA RACE

4.2 互斥锁 sync.Mutex

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var mu sync.Mutex
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()   // 加锁:同一时间只有一个协程能进入
            counter++
            mu.Unlock() // 解锁:让其他协程可以进入
        }()
    }
    wg.Wait()
    fmt.Println("counter:", counter) // 准确得到 1000

    // ==================== defer Unlock(推荐)====================
    // 防止忘记解锁或 panic 导致死锁
    mu.Lock()
    defer mu.Unlock()
    // ... 临界区代码 ...
}

4.3 原子操作 sync/atomic

go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    // ==================== atomic 基本操作 ====================
    // 原子操作在底层由 CPU 指令保证,比锁更轻量

    var counter int64 = 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1) // 原子加 1
        }()
    }
    wg.Wait()
    fmt.Println("atomic counter:", atomic.LoadInt64(&counter)) // 1000

    // ==================== 常用原子操作 ====================
    var val int64 = 10

    atomic.StoreInt64(&val, 42)                    // 原子写入
    loaded := atomic.LoadInt64(&val)               // 原子读取
    old := atomic.SwapInt64(&val, 100)             // 原子交换,返回旧值
    swapped := atomic.CompareAndSwapInt64(&val, 100, 200) // CAS 操作

    fmt.Println(loaded, old, swapped, val)

    // ==================== atomic.Value(存储任意类型)====================
    var config atomic.Value

    type Config struct {
        Server string
        Port   int
    }

    // 原子存储
    config.Store(Config{Server: "localhost", Port: 8080})

    // 原子读取
    cfg := config.Load().(Config)
    fmt.Printf("Config: %+v\n", cfg)

    // 适用场景:配置热更新(一个协程写,多个协程读)
}

// ==================== Mutex vs Atomic 选择 ====================
//
// atomic:
//   ✅ 简单的计数器、标志位
//   ✅ 单个变量的读写
//   ✅ 性能更好(无锁)
//
// Mutex:
//   ✅ 保护多个变量
//   ✅ 保护复杂的临界区(多步操作)
//   ✅ 读写需要一致性的场景

5. 读写锁

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// ==================== sync.RWMutex ====================
// 读写锁:多读单写
// - 多个 goroutine 可以同时持有读锁(RLock)
// - 写锁(Lock)是独占的,持有时不允许任何读或写
// - 适用于读多写少的场景(如缓存、配置)

// 线程安全的缓存
type SafeCache struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewSafeCache() *SafeCache {
    return &SafeCache{data: make(map[string]string)}
}

func (c *SafeCache) Get(key string) (string, bool) {
    c.mu.RLock()         // 加读锁(允许多个 goroutine 同时读)
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

func (c *SafeCache) Set(key, value string) {
    c.mu.Lock()          // 加写锁(独占,阻塞所有读和写)
    defer c.mu.Unlock()
    c.data[key] = value
}

func (c *SafeCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.data, key)
}

// 只读的批量操作
func (c *SafeCache) Keys() []string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    keys := make([]string, 0, len(c.data))
    for k := range c.data {
        keys = append(keys, k)
    }
    return keys
}

func main() {
    cache := NewSafeCache()
    var wg sync.WaitGroup

    // 启动 10 个写协程
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key_%d", id)
            cache.Set(key, fmt.Sprintf("value_%d", id))
            time.Sleep(10 * time.Millisecond)
        }(i)
    }

    // 启动 100 个读协程
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key_%d", id%10)
            if val, ok := cache.Get(key); ok {
                _ = val // 使用值
            }
        }(i)
    }

    wg.Wait()
    fmt.Println("缓存大小:", len(cache.Keys()))
}

⚠️ 读写锁注意事项

  • 不要在持有读锁时尝试获取写锁(同一协程内),会死锁
  • 写多读少时 RWMutex 性能不如 Mutex(因为 RWMutex 内部开销更大)
  • 不要递归加锁(Go 的锁不是可重入的)

6. 如何并行修改结构体、切片、map

6.1 并发修改结构体

go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// ==================== 方案1:Mutex 保护整个结构体 ====================
type Stats struct {
    mu         sync.Mutex
    TotalReqs  int
    FailedReqs int
    TotalBytes int64
}

func (s *Stats) Record(bytes int64, failed bool) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.TotalReqs++
    s.TotalBytes += bytes
    if failed {
        s.FailedReqs++
    }
}

// ==================== 方案2:各字段独立用 atomic ====================
type AtomicStats struct {
    TotalReqs  atomic.Int64
    FailedReqs atomic.Int64
    TotalBytes atomic.Int64
}

func (s *AtomicStats) Record(bytes int64, failed bool) {
    s.TotalReqs.Add(1)
    s.TotalBytes.Add(bytes)
    if failed {
        s.FailedReqs.Add(1)
    }
}

// ==================== 方案3:不同字段由不同协程负责(最安全)====================
// 如果结构体的不同字段分别由不同协程独占修改,不需要锁
type Processor struct {
    InputCount  int // 只有 inputWorker 修改
    OutputCount int // 只有 outputWorker 修改
}

func main() {
    // 方案1
    stats := &Stats{}
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            stats.Record(1024, false)
        }()
    }
    wg.Wait()
    fmt.Printf("Mutex Stats: %+v\n", stats)

    // 方案2
    astats := &AtomicStats{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            astats.Record(1024, false)
        }()
    }
    wg.Wait()
    fmt.Printf("Atomic Stats: reqs=%d, bytes=%d\n",
        astats.TotalReqs.Load(), astats.TotalBytes.Load())
}

6.2 并发修改切片

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    // ==================== ❌ 切片不是并发安全的 ====================
    // s := make([]int, 0)
    // go func() { s = append(s, 1) }() // 竞态!
    // go func() { s = append(s, 2) }() // 竞态!

    // ==================== 方案1:Mutex ====================
    var mu sync.Mutex
    result := make([]int, 0, 1000)
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            mu.Lock()
            result = append(result, n)
            mu.Unlock()
        }(i)
    }
    wg.Wait()
    fmt.Println("方案1 长度:", len(result))

    // ==================== 方案2:预分配索引(无需锁)====================
    // 如果知道每个协程写哪个位置,可以避免锁
    result2 := make([]int, 1000) // 预分配

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(idx int) {
            defer wg.Done()
            result2[idx] = idx * idx // 每个协程写不同索引,无竞态
        }(i)
    }
    wg.Wait()
    fmt.Println("方案2 长度:", len(result2))

    // ==================== 方案3:channel 收集结果 ====================
    ch := make(chan int, 1000)

    for i := 0; i < 1000; i++ {
        go func(n int) {
            ch <- n * n
        }(i)
    }

    result3 := make([]int, 0, 1000)
    for i := 0; i < 1000; i++ {
        result3 = append(result3, <-ch)
    }
    fmt.Println("方案3 长度:", len(result3))
}

6.3 并发修改 map

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    // ==================== ❌ Go 的 map 不是并发安全的 ====================
    // 并发读写 map 会导致 panic: concurrent map read and map write
    // m := map[string]int{}
    // go func() { m["a"] = 1 }()
    // go func() { _ = m["a"] }()

    // ==================== 方案1:sync.Map(标准库)====================
    var sm sync.Map

    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            sm.Store(fmt.Sprintf("key_%d", n), n)
        }(i)
    }
    wg.Wait()

    // 遍历
    sm.Range(func(key, value any) bool {
        // fmt.Printf("%s: %v\n", key, value)
        return true // 返回 false 停止遍历
    })

    // 其他操作
    val, ok := sm.Load("key_42")
    fmt.Println("Load:", val, ok)

    sm.Delete("key_42")

    actual, loaded := sm.LoadOrStore("key_new", 999)
    fmt.Println("LoadOrStore:", actual, loaded)

    // ==================== 方案2:Mutex + 普通 map(更常用)====================
    // sync.Map 适用于:key 稳定的读多写少场景
    // 大多数场景还是 Mutex + map 更简单可控

    type SafeMap struct {
        mu sync.RWMutex
        m  map[string]int
    }

    sm2 := &SafeMap{m: make(map[string]int)}

    // 写
    sm2.mu.Lock()
    sm2.m["key"] = 42
    sm2.mu.Unlock()

    // 读
    sm2.mu.RLock()
    v := sm2.m["key"]
    sm2.mu.RUnlock()
    fmt.Println("value:", v)
}

// ==================== sync.Map vs Mutex+map 选择 ====================
//
// sync.Map 适用于:
//   - key 集合基本稳定(很少有新 key 写入)
//   - 读远多于写
//   - 多个协程读写不相交的 key
//
// Mutex + map 适用于:
//   - 大多数场景(更简单、更可控)
//   - 需要批量操作(遍历 + 修改)
//   - 写操作频繁

7. recover 与协程

❌ 协程中的 panic 不会被外部协程的 recover 捕获!每个协程必须自己处理 panic。

go
package main

import (
    "fmt"
    "runtime/debug"
    "sync"
)

// ==================== 问题:协程 panic 会杀死整个进程 ====================
func main() {
    // ❌ 这个 recover 捕获不了子协程的 panic
    // defer func() {
    //     if r := recover(); r != nil {
    //         fmt.Println("主协程 recover:", r) // 不会执行!
    //     }
    // }()
    //
    // go func() {
    //     panic("子协程崩溃") // 直接杀死整个进程
    // }()

    // ==================== 正确做法:每个协程自己 recover ====================
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            defer func() {
                if r := recover(); r != nil {
                    fmt.Printf("协程 %d panic 已恢复: %v\n", id, r)
                    fmt.Println(string(debug.Stack()))
                }
            }()

            // 模拟其中一个协程 panic
            if id == 3 {
                panic("协程 3 出错了")
            }
            fmt.Printf("协程 %d 正常完成\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Println("所有协程处理完毕,程序继续运行")
}
go
// ==================== 通用安全启动函数 ====================
// 实际项目中,封装一个安全的 go 启动器

func safeGo(fn func()) {
    go func() {
        defer func() {
            if r := recover(); r != nil {
                // 实际项目中:记录日志、上报监控
                fmt.Printf("[PANIC] %v\n%s\n", r, debug.Stack())
            }
        }()
        fn()
    }()
}

// 带 WaitGroup 的安全启动
func safeGoWg(wg *sync.WaitGroup, fn func()) {
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer func() {
            if r := recover(); r != nil {
                fmt.Printf("[PANIC] %v\n", r)
            }
        }()
        fn()
    }()
}

// 使用
func example() {
    var wg sync.WaitGroup

    safeGoWg(&wg, func() {
        // 即使 panic 也不会影响其他协程
        panic("something went wrong")
    })

    safeGoWg(&wg, func() {
        fmt.Println("正常运行")
    })

    wg.Wait()
}

8. channel 的阻塞与遍历

8.1 channel 的阻塞规则

go
package main

import "fmt"

func main() {
    // ==================== 无缓冲 channel:同步通信 ====================
    // 发送和接收必须同时就绪,否则阻塞
    ch := make(chan int)

    go func() {
        ch <- 42 // 阻塞,直到有人接收
        fmt.Println("发送完成")
    }()

    val := <-ch // 阻塞,直到有人发送
    fmt.Println("收到:", val)

    // ==================== 有缓冲 channel:异步通信 ====================
    // 缓冲区未满时发送不阻塞,缓冲区为空时接收阻塞
    bufCh := make(chan int, 3)

    bufCh <- 1 // 不阻塞(缓冲区有空间)
    bufCh <- 2 // 不阻塞
    bufCh <- 3 // 不阻塞
    // bufCh <- 4 // 阻塞!缓冲区已满

    fmt.Println(<-bufCh) // 1(不阻塞,缓冲区有数据)
    fmt.Println(<-bufCh) // 2
    fmt.Println(<-bufCh) // 3
    // fmt.Println(<-bufCh) // 阻塞!缓冲区为空
}

阻塞规则总结:

text
操作                    无缓冲 channel         有缓冲 channel
──────────────────────────────────────────────────────────
发送 ch <- val         阻塞直到有接收者         缓冲区满时阻塞
接收 val := <-ch       阻塞直到有发送者         缓冲区空时阻塞
关闭后发送              panic                   panic
关闭后接收              返回零值, false          先读完缓冲区再返回零值
nil channel 发送       永远阻塞                 永远阻塞
nil channel 接收       永远阻塞                 永远阻塞

8.2 channel 遍历

go
package main

import "fmt"

func main() {
    ch := make(chan int, 5)

    // 生产者
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- i
        }
        close(ch) // 关闭 channel,通知消费者没有更多数据了
    }()

    // ==================== 方式1:for range(推荐)====================
    // 自动在 channel 关闭后退出循环
    for val := range ch {
        fmt.Println("收到:", val)
    }
    // 如果 channel 未关闭,for range 会永远阻塞!

    // ==================== 方式2:for + ok 检查 ====================
    ch2 := make(chan string, 3)
    go func() {
        ch2 <- "a"
        ch2 <- "b"
        close(ch2)
    }()

    for {
        val, ok := <-ch2
        if !ok {
            fmt.Println("channel 已关闭")
            break
        }
        fmt.Println("收到:", val)
    }

    // ==================== 关闭 channel 的规则 ====================
    // 1. 只有发送方应该关闭 channel(接收方不要关闭)
    // 2. 关闭已关闭的 channel 会 panic
    // 3. 向已关闭的 channel 发送会 panic
    // 4. 从已关闭的 channel 接收:
    //    - 缓冲区有数据:正常读取
    //    - 缓冲区为空:返回零值和 false
}

9. 阻塞代码的 5 种方法及导致死锁的根本原因

9.1 五种阻塞方式

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // ==================== 方式1:channel 阻塞 ====================
    done := make(chan struct{})
    go func() {
        time.Sleep(time.Second)
        close(done) // 通知解除阻塞
    }()
    <-done // 阻塞直到 done 被关闭
    fmt.Println("channel 解除阻塞")

    // ==================== 方式2:WaitGroup ====================
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second)
    }()
    wg.Wait() // 阻塞直到计数器归零
    fmt.Println("WaitGroup 解除阻塞")

    // ==================== 方式3:Mutex ====================
    var mu sync.Mutex
    mu.Lock()
    go func() {
        time.Sleep(time.Second)
        mu.Unlock() // 解除阻塞
    }()
    mu.Lock() // 阻塞,因为已经被锁定
    fmt.Println("Mutex 解除阻塞")
    mu.Unlock()

    // ==================== 方式4:time.Sleep ====================
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Sleep 解除阻塞")

    // ==================== 方式5:select ====================
    ch := make(chan int)
    go func() {
        time.Sleep(time.Second)
        ch <- 42
    }()
    select {
    case val := <-ch:
        fmt.Println("select 收到:", val)
    case <-time.After(2 * time.Second):
        fmt.Println("select 超时")
    }
}

9.2 死锁的根本原因

❌ 死锁 = 所有 goroutine 都在阻塞,没有任何一个能继续执行 Go 运行时会检测到这种情况并 panic:fatal error: all goroutines are asleep - deadlock!

go
package main

func main() {
    // ==================== 死锁案例1:无缓冲 channel 自己等自己 ====================
    // ch := make(chan int)
    // ch <- 1  // 阻塞:等待接收者,但没有其他协程
    // <-ch     // 永远不会执行

    // ==================== 死锁案例2:互相等待 ====================
    // ch1 := make(chan int)
    // ch2 := make(chan int)
    // go func() {
    //     val := <-ch1  // 等 ch1 的数据
    //     ch2 <- val    // 永远到不了这里
    // }()
    // val := <-ch2      // 等 ch2 的数据
    // ch1 <- val        // 永远到不了这里

    // ==================== 死锁案例3:WaitGroup 不匹配 ====================
    // var wg sync.WaitGroup
    // wg.Add(1)
    // wg.Wait() // 没有任何协程调用 Done,永远阻塞

    // ==================== 死锁案例4:锁的互相等待 ====================
    // var mu1, mu2 sync.Mutex
    // go func() {
    //     mu1.Lock()
    //     time.Sleep(time.Millisecond)
    //     mu2.Lock() // 等待 mu2,但 mu2 被另一个协程持有
    // }()
    // go func() {
    //     mu2.Lock()
    //     time.Sleep(time.Millisecond)
    //     mu1.Lock() // 等待 mu1,但 mu1 被另一个协程持有
    // }()
}

// ==================== 死锁的根本原因 ====================
//
// 形成死锁的四个必要条件(Coffman 条件):
// 1. 互斥:资源同一时间只能被一个协程持有
// 2. 持有并等待:持有资源的协程在等待另一个资源
// 3. 不可抢占:资源不能被强制夺走
// 4. 循环等待:A 等 B,B 等 A(或更长的链)
//
// 预防死锁的方法:
// 1. 统一加锁顺序(打破循环等待)
// 2. 尽量减小锁的粒度和持有时间
// 3. 使用超时机制(select + time.After)
// 4. 使用 channel 代替锁
// 5. 用 go vet 和 -race 检测

10. 用 channel 实现广播和 CountDownLatch

10.1 广播(close 实现一对多通知)

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // ==================== 广播:close channel 通知所有等待者 ====================
    // close 后,所有阻塞在 <-ch 的协程都会立即收到零值
    // 这是 Go 中实现「一对多通知」的惯用方式

    ready := make(chan struct{}) // 用空结构体,不传数据只传信号
    var wg sync.WaitGroup

    // 启动 5 个工人,都等待开始信号
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("工人 %d 就绪,等待开始...\n", id)
            <-ready // 阻塞,等待广播信号
            fmt.Printf("工人 %d 开始工作!\n", id)
        }(i)
    }

    time.Sleep(500 * time.Millisecond)
    fmt.Println("=== 发送广播 ===")
    close(ready) // 一次 close,所有工人同时开始

    wg.Wait()
    fmt.Println("所有工人完成")
}

10.2 CountDownLatch

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// CountDownLatch 等待 N 个事件全部发生后继续
// 类似 Java 的 CountDownLatch
type CountDownLatch struct {
    count int
    ch    chan struct{}
    once  sync.Once
    mu    sync.Mutex
}

func NewCountDownLatch(count int) *CountDownLatch {
    return &CountDownLatch{
        count: count,
        ch:    make(chan struct{}),
    }
}

// CountDown 事件完成,计数减 1
func (l *CountDownLatch) CountDown() {
    l.mu.Lock()
    l.count--
    if l.count <= 0 {
        l.once.Do(func() {
            close(l.ch) // 计数归零,通知所有等待者
        })
    }
    l.mu.Unlock()
}

// Wait 阻塞直到计数归零
func (l *CountDownLatch) Wait() {
    <-l.ch
}

func main() {
    // 等待 3 个服务全部就绪后启动
    latch := NewCountDownLatch(3)

    services := []string{"数据库", "缓存", "消息队列"}

    for _, svc := range services {
        go func(name string) {
            // 模拟初始化
            time.Sleep(time.Duration(100+len(name)*50) * time.Millisecond)
            fmt.Printf("[%s] 初始化完成\n", name)
            latch.CountDown()
        }(svc)
    }

    fmt.Println("等待所有服务就绪...")
    latch.Wait()
    fmt.Println("✅ 所有服务就绪,开始接收请求")
}

11. 招人嫌的 sync.Cond

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// ==================== sync.Cond 是什么 ====================
// Cond 是条件变量,用于协程间的条件等待
// 当条件不满足时阻塞,条件满足时被唤醒
//
// 为什么"招人嫌":
// 1. 容易误用(忘记加锁、虚假唤醒)
// 2. 大多数场景可以用 channel 更简洁地实现
// 3. 没有超时机制
// 4. 调试困难

// ==================== 生产者-消费者队列 ====================
type Queue struct {
    items []int
    mu    sync.Mutex
    cond  *sync.Cond
    max   int
}

func NewQueue(maxSize int) *Queue {
    q := &Queue{
        items: make([]int, 0, maxSize),
        max:   maxSize,
    }
    q.cond = sync.NewCond(&q.mu) // Cond 绑定到 Mutex
    return q
}

func (q *Queue) Put(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()

    // ⚠️ 必须用 for 循环判断条件,不能用 if
    // 因为 Wait 返回后条件可能又不满足了(虚假唤醒)
    for len(q.items) >= q.max {
        q.cond.Wait() // 释放锁 → 等待唤醒 → 重新获取锁
    }

    q.items = append(q.items, item)
    q.cond.Signal() // 唤醒一个等待的协程
}

func (q *Queue) Get() int {
    q.mu.Lock()
    defer q.mu.Unlock()

    for len(q.items) == 0 {
        q.cond.Wait() // 队列为空,等待
    }

    item := q.items[0]
    q.items = q.items[1:]
    q.cond.Signal() // 唤醒等待 Put 的协程
    return item
}

func main() {
    q := NewQueue(5)
    var wg sync.WaitGroup

    // 生产者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 20; i++ {
            q.Put(i)
            fmt.Printf("放入: %d\n", i)
            time.Sleep(50 * time.Millisecond)
        }
    }()

    // 消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 20; i++ {
            val := q.Get()
            fmt.Printf("取出: %d\n", val)
            time.Sleep(100 * time.Millisecond)
        }
    }()

    wg.Wait()

    // ==================== Cond 的三个方法 ====================
    // Wait()     释放锁 → 阻塞等待 → 被唤醒后重新获取锁
    // Signal()   唤醒一个等待的协程
    // Broadcast() 唤醒所有等待的协程

    // ==================== 大多数情况用 channel 更好 ====================
    // 上面的队列可以直接用 buffered channel 实现:
    ch := make(chan int, 5)
    // 发送 → ch <- item
    // 接收 → item := <-ch
    // 简洁太多了!
    _ = ch
}

12. MPG 并发模型

text
Go 的调度器使用 MPG(也叫 GMP)模型:

  M (Machine)  = OS 线程
  P (Processor) = 逻辑处理器(调度上下文)
  G (Goroutine) = 协程

┌─────────────────────────────────────────────┐
│                  Go 调度器                    │
│                                              │
│   ┌─────┐   ┌─────┐   ┌─────┐              │
│   │  M  │   │  M  │   │  M  │   OS 线程     │
│   └──┬──┘   └──┬──┘   └──┬──┘              │
│      │         │         │                   │
│   ┌──┴──┐   ┌──┴──┐   ┌──┴──┐              │
│   │  P  │   │  P  │   │  P  │   逻辑处理器   │
│   └──┬──┘   └──┬──┘   └──┬──┘              │
│      │         │         │                   │
│   ┌──┴──┐   ┌──┴──┐   ┌──┴──┐              │
│   │ LRQ │   │ LRQ │   │ LRQ │   本地运行队列 │
│   │G G G│   │G G G│   │G G  │              │
│   └─────┘   └─────┘   └─────┘              │
│                                              │
│   ┌────────────────────────────┐            │
│   │         GRQ (全局运行队列)    │            │
│   │      G  G  G  G  G         │            │
│   └────────────────────────────┘            │
└─────────────────────────────────────────────┘

P 的数量 = GOMAXPROCS(默认等于 CPU 核数)

调度流程:
  1. 新创建的 G 放入当前 P 的本地队列(LRQ)
  2. P 从自己的 LRQ 取 G 执行
  3. LRQ 为空时,从全局队列(GRQ)偷取
  4. GRQ 也为空时,从其他 P 的 LRQ 偷取(work stealing)
  5. G 阻塞(channel/锁/系统调用)时,M 放弃 P,P 绑定新的 M

关键设计:
  - 抢占式调度(Go 1.14+):长时间运行的 G 会被强制让出
  - 每个 P 有本地队列,减少全局锁竞争
  - Work Stealing 保证负载均衡
  - 系统调用时 M 和 P 解绑,不阻塞其他 G
go
package main

import (
    "fmt"
    "runtime"
)

func main() {
    // 查看和设置 P 的数量
    fmt.Println("CPU 核数:", runtime.NumCPU())
    fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0)) // 0 = 只查询不设置

    // 设置为 1:所有协程在一个线程上交替执行(并发不并行)
    // runtime.GOMAXPROCS(1)

    // 查看当前协程数量
    fmt.Println("协程数:", runtime.NumGoroutine())

    // 主动让出当前协程的执行权
    runtime.Gosched() // 类似 yield,当前协程重新排队
}

13. 协程与线程对比

text
特性              goroutine             OS 线程
────────────────────────────────────────────────────
初始栈大小        2 KB(可动态增长)      1~8 MB(固定)
创建开销          ~0.3 μs               ~10 μs
切换开销          ~100 ns               ~1000 ns
数量上限          轻松百万级              通常数千
调度方式          Go 运行时调度           OS 内核调度
通信方式          channel(CSP 模型)    共享内存 + 锁
阻塞影响          不阻塞 OS 线程          阻塞 OS 线程
创建方式          go func()             pthread_create 等
ID               无                     有 tid
取消/终止         只能通过信号自行退出     可以强制 kill

为什么 goroutine 这么轻量?
  1. 用户态调度,不需要陷入内核
  2. 栈可以动态伸缩(2KB → 最大 1GB)
  3. 复用少量 OS 线程(M:N 模型)
  4. 阻塞时不占用 OS 线程(和 P 解绑)

14. 用 channel 并行处理海量文件

go
package main

import (
    "crypto/md5"
    "fmt"
    "io"
    "os"
    "path/filepath"
    "sync"
    "time"
)

// FileResult 存放处理结果
type FileResult struct {
    Path string
    Hash string
    Size int64
    Err  error
}

// ParallelFileProcessor 并行文件处理器
func ParallelFileProcessor(root string, workers int) []FileResult {
    // 阶段1:扫描文件路径(单个协程)
    paths := make(chan string, 100)
    go func() {
        defer close(paths)
        filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
            if err != nil || d.IsDir() {
                return nil
            }
            paths <- path
        })
    }()

    // 阶段2:并行处理(多个 worker 协程)
    results := make(chan FileResult, 100)
    var wg sync.WaitGroup

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for path := range paths { // 从 channel 取任务
                result := processFile(path)
                results <- result
            }
        }(i)
    }

    // 关闭 results(所有 worker 完成后)
    go func() {
        wg.Wait()
        close(results)
    }()

    // 阶段3:收集结果
    var allResults []FileResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

func processFile(path string) FileResult {
    f, err := os.Open(path)
    if err != nil {
        return FileResult{Path: path, Err: err}
    }
    defer f.Close()

    info, _ := f.Stat()
    h := md5.New()
    io.Copy(h, f)

    return FileResult{
        Path: path,
        Hash: fmt.Sprintf("%x", h.Sum(nil)),
        Size: info.Size(),
    }
}

func main() {
    start := time.Now()
    results := ParallelFileProcessor(".", 8) // 8 个 worker

    var totalSize int64
    var errorCount int
    for _, r := range results {
        if r.Err != nil {
            errorCount++
            continue
        }
        totalSize += r.Size
    }

    fmt.Printf("处理了 %d 个文件,总大小 %d 字节\n", len(results), totalSize)
    fmt.Printf("错误: %d,耗时: %s\n", errorCount, time.Since(start))
}

15. 用 channel 限制接口的并发请求量

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// ==================== 信号量模式:buffered channel 限流 ====================

// RateLimiter 基于 channel 的并发限制器
type RateLimiter struct {
    sem chan struct{} // 信号量
}

func NewRateLimiter(maxConcurrent int) *RateLimiter {
    return &RateLimiter{
        sem: make(chan struct{}, maxConcurrent),
    }
}

// Acquire 获取许可(满了就阻塞)
func (r *RateLimiter) Acquire() {
    r.sem <- struct{}{} // 缓冲区满时阻塞
}

// Release 释放许可
func (r *RateLimiter) Release() {
    <-r.sem
}

// 模拟 API 处理器
func handleRequest(id int) {
    fmt.Printf("[%s] 请求 %d 开始处理\n",
        time.Now().Format("15:04:05.000"), id)
    time.Sleep(500 * time.Millisecond) // 模拟耗时操作
    fmt.Printf("[%s] 请求 %d 处理完成\n",
        time.Now().Format("15:04:05.000"), id)
}

func main() {
    limiter := NewRateLimiter(3) // 最多同时处理 3 个请求
    var wg sync.WaitGroup

    // 模拟 20 个并发请求
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            limiter.Acquire()       // 获取许可
            defer limiter.Release() // 完成后释放

            handleRequest(id)
        }(i)
    }

    wg.Wait()
    fmt.Println("所有请求处理完毕")
}

16. 用 channel 限制协程的总数

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// ==================== Worker Pool 模式 ====================
// 固定数量的 worker 消费任务,无论任务多少,协程数恒定

func workerPool() {
    const numWorkers = 5
    const numTasks = 50

    tasks := make(chan int, numTasks)
    var wg sync.WaitGroup

    // 启动固定数量的 worker
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range tasks { // 从 channel 取任务,关闭后退出
                fmt.Printf("Worker %d 处理任务 %d\n", workerID, task)
                time.Sleep(100 * time.Millisecond)
            }
        }(w)
    }

    // 提交任务
    for i := 0; i < numTasks; i++ {
        tasks <- i
    }
    close(tasks) // 所有任务已提交,关闭 channel

    wg.Wait()
    fmt.Println("所有任务完成")
}

// ==================== 带结果收集的 Worker Pool ====================
type Task struct {
    ID    int
    Input string
}

type Result struct {
    TaskID int
    Output string
    Err    error
}

func workerPoolWithResults() {
    const numWorkers = 3

    tasks := make(chan Task, 100)
    results := make(chan Result, 100)
    var wg sync.WaitGroup

    // 启动 worker
    for w := 0; w < numWorkers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for task := range tasks {
                // 处理任务
                output := fmt.Sprintf("处理了 %s", task.Input)
                results <- Result{TaskID: task.ID, Output: output}
            }
        }()
    }

    // 提交任务
    go func() {
        for i := 0; i < 10; i++ {
            tasks <- Task{ID: i, Input: fmt.Sprintf("数据_%d", i)}
        }
        close(tasks)
    }()

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for r := range results {
        fmt.Printf("任务 %d: %s\n", r.TaskID, r.Output)
    }
}

func main() {
    fmt.Println("=== Worker Pool ===")
    workerPool()

    fmt.Println("\n=== Worker Pool With Results ===")
    workerPoolWithResults()
}

17. select 多路监听

go
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    // ==================== 基本 select ====================
    // select 同时监听多个 channel,哪个就绪就执行哪个
    // 多个同时就绪时随机选一个

    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "来自 ch1"
    }()
    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "来自 ch2"
    }()

    select {
    case msg := <-ch1:
        fmt.Println("收到:", msg)
    case msg := <-ch2:
        fmt.Println("收到:", msg)
    }

    // ==================== 超时控制 ====================
    ch3 := make(chan int)
    go func() {
        time.Sleep(2 * time.Second)
        ch3 <- 42
    }()

    select {
    case val := <-ch3:
        fmt.Println("收到:", val)
    case <-time.After(1 * time.Second):
        fmt.Println("超时!") // 1 秒内没收到就超时
    }

    // ==================== 非阻塞操作(default)====================
    ch4 := make(chan int, 1)

    // 非阻塞发送
    select {
    case ch4 <- 42:
        fmt.Println("发送成功")
    default:
        fmt.Println("channel 已满,跳过")
    }

    // 非阻塞接收
    select {
    case val := <-ch4:
        fmt.Println("接收:", val)
    default:
        fmt.Println("channel 为空,跳过")
    }

    // ==================== 多路合并(Fan-In)====================
    fmt.Println("\n=== Fan-In ===")
    merge := func(channels ...<-chan string) <-chan string {
        out := make(chan string)
        var wg sync.WaitGroup

        for _, ch := range channels {
            wg.Add(1)
            go func(c <-chan string) {
                defer wg.Done()
                for val := range c {
                    out <- val
                }
            }(ch)
        }

        go func() {
            wg.Wait()
            close(out)
        }()

        return out
    }

    // 创建三个数据源
    src1 := make(chan string, 3)
    src2 := make(chan string, 3)
    src1 <- "A1"
    src1 <- "A2"
    close(src1)
    src2 <- "B1"
    src2 <- "B2"
    close(src2)

    merged := merge(src1, src2)
    for val := range merged {
        fmt.Println("合并:", val)
    }

    // ==================== 心跳检测 ====================
    fmt.Println("\n=== 心跳 ===")
    heartbeat := time.NewTicker(300 * time.Millisecond)
    defer heartbeat.Stop()

    work := make(chan int)
    go func() {
        for i := 0; i < 3; i++ {
            time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
            work <- i
        }
        close(work)
    }()

    for {
        select {
        case val, ok := <-work:
            if !ok {
                fmt.Println("工作完成")
                return
            }
            fmt.Println("处理:", val)
        case <-heartbeat.C:
            fmt.Println("💓 心跳")
        }
    }
}

18. 不使用 once 的单例模式

go
package main

import (
    "fmt"
    "sync"
)

// ==================== 方式1:sync.Once(标准方式)====================
type Database struct {
    DSN string
}

var (
    dbInstance *Database
    dbOnce     sync.Once
)

func GetDB() *Database {
    dbOnce.Do(func() {
        fmt.Println("初始化数据库连接(只执行一次)")
        dbInstance = &Database{DSN: "localhost:3306"}
    })
    return dbInstance
}

// ==================== 方式2:init 函数(最简单)====================
// 缺点:不能延迟初始化,程序启动就执行
var configInstance *Config

type Config struct {
    AppName string
}

func init() {
    configInstance = &Config{AppName: "MyApp"}
}

func GetConfig() *Config {
    return configInstance
}

// ==================== 方式3:包级变量(编译期单例)====================
// 适用于不需要复杂初始化的场景
var defaultLogger = &Logger{Level: "INFO"}

type Logger struct {
    Level string
}

func GetLogger() *Logger {
    return defaultLogger
}

// ==================== 方式4:Mutex + 双重检查(不推荐,用 Once 更好)====================
var (
    cacheInstance *Cache
    cacheMu       sync.Mutex
)

type Cache struct {
    Data map[string]string
}

func GetCache() *Cache {
    if cacheInstance != nil { // 第一次检查(无锁,快速路径)
        return cacheInstance
    }

    cacheMu.Lock()
    defer cacheMu.Unlock()

    if cacheInstance != nil { // 第二次检查(持锁,防止重复初始化)
        return cacheInstance
    }

    fmt.Println("初始化缓存(只执行一次)")
    cacheInstance = &Cache{Data: make(map[string]string)}
    return cacheInstance
}

// ==================== 方式5:channel(不用锁也不用 Once)====================
type Service struct {
    Name string
}

func newServiceSingleton() func() *Service {
    ch := make(chan *Service, 1)
    ch <- &Service{Name: "singleton-service"} // 预先放入一个实例

    // 第一次调用取出实例并缓存
    var instance *Service
    var once sync.Once // 这里还是依赖了 once,纯 channel 方案见下

    return func() *Service {
        once.Do(func() {
            instance = <-ch
            fmt.Println("Service 单例已创建")
        })
        return instance
    }
}

func main() {
    // sync.Once 方式
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db // 所有协程拿到同一个实例
        }()
    }
    wg.Wait()
    fmt.Printf("DB 地址: %p\n", GetDB())

    // Mutex 双重检查方式
    for i := 0; i < 5; i++ {
        go func() {
            cache := GetCache()
            _ = cache
        }()
    }
}

💡 实际项目推荐 99% 的场景直接用 sync.Once,它简洁、安全、高效。其他方式了解即可。


19. context 用法详解

19.1 context 是什么

text
context 是 Go 并发编程的核心工具,用于:
  1. 传递取消信号(超时、手动取消)
  2. 传递请求级别的元数据(如 request_id、user_id)
  3. 控制协程的生命周期

context 是树形结构:
  Background
  └── WithCancel
      ├── WithTimeout
      │   └── WithValue
      └── WithDeadline

规则:
  - context 作为函数的第一个参数传递
  - 不要存储在结构体中
  - 不要传 nil,用 context.Background() 或 context.TODO()

19.2 四种 context

go
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // ==================== context.Background ====================
    // 根 context,不可取消,无超时,无值
    // 通常在 main、init、测试中使用
    ctx := context.Background()
    _ = ctx

    // ==================== context.WithCancel ====================
    // 手动取消
    ctx1, cancel1 := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("WithCancel: 收到取消信号,退出")
                return
            default:
                fmt.Println("WithCancel: 工作中...")
                time.Sleep(200 * time.Millisecond)
            }
        }
    }(ctx1)

    time.Sleep(600 * time.Millisecond)
    cancel1() // 发送取消信号
    time.Sleep(100 * time.Millisecond)

    // ==================== context.WithTimeout ====================
    // 超时自动取消
    ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel2() // 即使超时也要调用 cancel 释放资源

    go func(ctx context.Context) {
        select {
        case <-ctx.Done():
            fmt.Println("WithTimeout:", ctx.Err()) // context deadline exceeded
        case <-time.After(1 * time.Second):
            fmt.Println("WithTimeout: 完成工作")
        }
    }(ctx2)

    time.Sleep(700 * time.Millisecond)

    // ==================== context.WithDeadline ====================
    // 在指定时间点取消
    deadline := time.Now().Add(300 * time.Millisecond)
    ctx3, cancel3 := context.WithDeadline(context.Background(), deadline)
    defer cancel3()

    <-ctx3.Done()
    fmt.Println("WithDeadline:", ctx3.Err())

    // ==================== context.WithValue ====================
    // 传递请求级别的数据
    type ctxKey string
    const requestIDKey ctxKey = "request_id"
    const userIDKey ctxKey = "user_id"

    ctx4 := context.WithValue(context.Background(), requestIDKey, "req-abc-123")
    ctx4 = context.WithValue(ctx4, userIDKey, 42)

    // 取值
    reqID := ctx4.Value(requestIDKey).(string)
    userID := ctx4.Value(userIDKey).(int)
    fmt.Printf("request_id=%s, user_id=%d\n", reqID, userID)
}

19.3 context 实战

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// ==================== 实战1:HTTP 请求超时控制 ====================
func fetchData(ctx context.Context, url string) (string, error) {
    // 模拟网络请求
    resultCh := make(chan string, 1)

    go func() {
        time.Sleep(300 * time.Millisecond) // 模拟耗时
        resultCh <- "response data from " + url
    }()

    select {
    case result := <-resultCh:
        return result, nil
    case <-ctx.Done():
        return "", ctx.Err() // 超时或被取消
    }
}

// ==================== 实战2:级联取消多个协程 ====================
func pipeline(ctx context.Context) {
    var wg sync.WaitGroup

    // 启动多个阶段的协程
    stages := []string{"fetch", "process", "save"}

    for _, stage := range stages {
        wg.Add(1)
        go func(name string) {
            defer wg.Done()
            for i := 0; ; i++ {
                select {
                case <-ctx.Done():
                    fmt.Printf("[%s] 收到取消信号,已处理 %d 个任务\n", name, i)
                    return
                default:
                    time.Sleep(100 * time.Millisecond)
                }
            }
        }(stage)
    }

    wg.Wait()
}

// ==================== 实战3:优雅关闭服务 ====================
func startServer(ctx context.Context) {
    fmt.Println("服务启动")

    <-ctx.Done()
    fmt.Println("收到关闭信号,开始清理...")

    // 给清理操作一个超时
    cleanupCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()

    // 模拟清理
    done := make(chan struct{})
    go func() {
        time.Sleep(1 * time.Second)
        fmt.Println("清理完成")
        close(done)
    }()

    select {
    case <-done:
        fmt.Println("服务优雅关闭")
    case <-cleanupCtx.Done():
        fmt.Println("清理超时,强制关闭")
    }
}

func main() {
    // 实战1
    ctx1, cancel1 := context.WithTimeout(context.Background(), 200*time.Millisecond)
    defer cancel1()
    result, err := fetchData(ctx1, "https://api.example.com")
    if err != nil {
        fmt.Println("请求失败:", err) // 200ms 超时,300ms 请求 → 超时
    } else {
        fmt.Println("请求成功:", result)
    }

    // 实战2
    fmt.Println("\n=== 级联取消 ===")
    ctx2, cancel2 := context.WithCancel(context.Background())
    go pipeline(ctx2)
    time.Sleep(500 * time.Millisecond)
    cancel2() // 一个 cancel 取消所有阶段
    time.Sleep(200 * time.Millisecond)

    // 实战3
    fmt.Println("\n=== 优雅关闭 ===")
    ctx3, cancel3 := context.WithCancel(context.Background())
    go startServer(ctx3)
    time.Sleep(500 * time.Millisecond)
    cancel3() // 模拟收到 SIGTERM
    time.Sleep(2 * time.Second)
}

19.4 context 最佳实践

⚠️ context 使用规范

  • context 始终作为函数第一个参数,命名为 ctx
  • 不要把 context 存到结构体里
  • 不要传 nil context,用 context.TODO() 占位
  • WithValue 只传请求级别数据,不要传可选参数
  • WithValue 的 key 用自定义类型,避免冲突
  • cancel 函数必须调用(即使超时也要 defer cancel)

附录:并发模式速查

text
场景                              推荐模式
──────────────────────────────────────────────────
等待一组协程完成                    sync.WaitGroup
等待 + 收集错误                    errgroup.Group
保护共享变量                       sync.Mutex / RWMutex
简单计数器/标志                    sync/atomic
一对多通知                         close(channel)
限制并发数                         buffered channel 信号量
固定协程池                         Worker Pool 模式
超时控制                           context.WithTimeout
级联取消                           context.WithCancel
多路监听                           select
生产者-消费者                      buffered channel
单例                              sync.Once
配置热更新                         atomic.Value