Go 并发编程

Published on
6 34.6~44.5 min

近期重新学习了Go的并发编程,解决了一些困惑,以下为整理好的学习笔记。

引言

现代 CPU 通常包含多个核心,且每个核心可能支持多线程,这使得计算机能够同时执行多条指令流水线。为了充分利用 CPU 的计算能力,我们需要编写支持并发的程序。并发是指多个计算任务可能在同一个时间片段内同时运行,它们可能位于同一个程序、同一台计算机或同一个网络中。

Go 语言在设计之初就将并发作为核心特性,提供了简单、高效且安全的并发编程模型。Go 的并发哲学是:“不要通过共享内存来通信,而应通过通信来共享内存。” 这一思想指导我们使用通道在协程之间传递数据,从而避免复杂的锁机制。

本文将全面介绍 Go 的并发编程,涵盖协程基础、通道、同步原语、调度机制、恐慌恢复、上下文、常见并发模式、调试技巧以及陷阱与最佳实践。通过本文,读者将能够编写出健壮、高效的并发程序。


协程基础

什么是协程

协程有时也被称为绿色线程,它是由程序的运行时(runtime)管理的轻量级线程。与操作系统线程相比,协程的内存开销非常小(初始栈仅约 2KB),上下文切换开销也极低。因此,一个 Go 程序可以轻松创建成千上万个协程,而无需担心资源耗尽。

在 Go 中,我们无法直接创建系统线程,协程是唯一的并发实现方式。每个 Go 程序启动时,会自动创建一个对用户可见的协程,称为主协程(即 main 函数所在的协程)。

启动协程

使用 go 关键字可以在任何函数调用前启动一个新的协程,使该函数在新协程中并发执行。新协程不会阻塞当前协程,主协程会继续执行后续代码。

package main
​
import (
    "fmt"
    "time"
)
​
func say(s string) {
    for i := 0; i < 3; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}
​
func main() {
    // 直接调用(顺序执行)
    fmt.Println("Direct call:")
    say("world")
​
    // 启动协程(并发执行)
    fmt.Println("\nWith goroutine:")
    go say("hello") // 新协程在后台运行
    say("world")    // 主协程继续执行
​
    // 等待协程执行,否则主程序可能提前退出
    time.Sleep(500 * time.Millisecond)
}

重要说明

  • 协程调用的所有返回值(如果有)必须被舍弃,因为无法从外部直接获取返回值。

  • 主协程一旦结束,整个程序立即终止,所有其他协程都会被强制结束。因此,通常需要同步机制来等待协程完成。

主协程与程序退出

主协程是程序的入口,也是程序退出的决定者。如果主协程执行完毕,无论其他协程是否完成,程序都会终止。下面的示例展示了这一特性:

package main
​
import (
    "fmt"
    "time"
)
​
func main() {
    go func() {
        time.Sleep(2 * time.Second)
        fmt.Println("Goroutine finished")
    }()
    fmt.Println("Main exiting")
    // 程序立即退出,不会打印 "Goroutine finished"
}

为了避免这种情况,我们需要使用同步机制(如 WaitGroup、通道等)来确保主协程等待所有子协程完成后再退出。


协程间通信:通道

通道是 Go 中协程之间通信的首选方式。它是一个类型安全的队列,遵循先进先出的规则。通道可以是有缓冲的,也可以是无缓冲的。

通道的创建与基本操作

使用 make 函数创建通道,指定通道中元素的类型。<- 运算符用于发送和接收数据。

package main
​
import "fmt"
​
func main() {
    // 创建无缓冲的 int 通道
    ch := make(chan int)
​
    // 启动协程发送数据
    go func() {
        ch <- 42 // 发送
    }()
​
    // 从通道接收数据(阻塞直到有数据)
    value := <-ch
    fmt.Println("Received:", value)
}

带缓冲与无缓冲通道

  • 无缓冲通道:发送和接收操作必须同时准备好,否则会阻塞。这使得同步和通信同时发生。

  • 带缓冲通道:发送操作仅在缓冲区满时阻塞,接收操作仅在缓冲区空时阻塞。缓冲区大小在创建时指定。

func main() {
    // 带缓冲的通道,容量为2
    ch := make(chan string, 2)
    ch <- "first"
    ch <- "second"
    // ch <- "third" // 此时缓冲区已满,会阻塞
​
    fmt.Println(<-ch) // first
    fmt.Println(<-ch) // second
}

通道方向与关闭

可以在函数参数中指定通道的方向(只发送或只接收),提高代码的类型安全性。使用 close 函数关闭通道,关闭后不能再发送数据,但可以继续接收剩余数据。

func producer(ch chan<- int) { // 只发送通道
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // 关闭通道
}
​
func consumer(ch <-chan int) { // 只接收通道
    for v := range ch { // 循环直到通道关闭
        fmt.Println("Consumed:", v)
    }
}
​
func main() {
    ch := make(chan int)
    go producer(ch)
    consumer(ch)
}

注意:向已关闭的通道发送数据会引发 panic。从已关闭的通道接收数据时,可以继续读取缓冲区中的值,当缓冲区为空时,会返回通道元素类型的零值,并且第二个返回值(ok)为 false。

select 语句

select 语句允许一个协程同时等待多个通道操作,类似于 switch 但专门用于通道。当多个 case 同时满足条件时,select 会随机选择一个执行。

package main
​
import (
    "fmt"
    "time"
)
​
func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
​
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()
​
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
}

使用 default 实现非阻塞操作

select {
case msg := <-ch:
    fmt.Println("Received:", msg)
default:
    fmt.Println("No message received")
}

空 selectselect {})会永久阻塞,常用于防止主协程退出。

超时控制:结合 time.After 实现超时等待。

select {
case msg := <-ch:
    fmt.Println(msg)
case <-time.After(2 * time.Second):
    fmt.Println("Timeout")
}

同步机制

除了通道,Go 还提供了多种同步原语,主要用于保护共享内存或协调协程执行顺序。

WaitGroup

sync.WaitGroup 用于等待一组协程完成。它维护一个内部计数器,通过 AddDoneWait 方法控制。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时计数器减1
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1) // 启动前计数器加1
        go worker(i, &wg)
    }
    wg.Wait() // 阻塞直到计数器归零
    fmt.Println("All workers completed")
}

注意Add 必须在启动协程之前调用,Done 通常用 defer 确保在协程结束时调用。计数器不能为负数,否则会引发 panic。

互斥锁

sync.Mutex 提供基础的互斥锁,用于保护共享资源,防止数据竞争。

package main

import (
    "fmt"
    "sync"
)

var counter int
var mu sync.Mutex

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

读写锁

sync.RWMutex 区分读锁和写锁,允许多个读操作并发,但写操作独占。适用于读多写少的场景。

var rwMu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwMu.RLock()
    defer rwMu.RUnlock()
    return data[key]
}

func write(key, value string) {
    rwMu.Lock()
    defer rwMu.Unlock()
    data[key] = value
}

原子操作

sync/atomic 包提供了针对基本类型的原子操作,无需加锁即可实现并发安全。适合简单的计数器、标志位等。

import "sync/atomic"

var counter int64

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    atomic.AddInt64(&counter, 1)
}

条件变量

sync.Cond 用于在多个协程之间等待或通知事件的发生。它通常与互斥锁结合使用,实现复杂的同步模式(如生产者-消费者队列)。

package main

import (
    "fmt"
    "sync"
    "time"
)

var cond = sync.NewCond(&sync.Mutex{})
var queue []int

func producer() {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        cond.L.Lock()
        queue = append(queue, i)
        cond.Signal() // 唤醒一个等待的消费者
        cond.L.Unlock()
    }
}

func consumer() {
    for i := 0; i < 5; i++ {
        cond.L.Lock()
        for len(queue) == 0 {
            cond.Wait() // 等待条件满足
        }
        val := queue[0]
        queue = queue[1:]
        fmt.Println("Consumed:", val)
        cond.L.Unlock()
    }
}

func main() {
    go producer()
    go consumer()
    time.Sleep(2 * time.Second)
}

一次性初始化

sync.Once 保证某个函数只执行一次,常用于单例模式或初始化全局资源。

var once sync.Once
var config map[string]string

func loadConfig() {
    // 模拟加载配置
    config = map[string]string{"key": "value"}
}

func getConfig() map[string]string {
    once.Do(loadConfig)
    return config
}

协程调度

Go 运行时拥有自己的调度器,它将协程多路复用到操作系统线程上。了解调度模型有助于编写高效的并发程序。

GMP 模型简介

Go 调度器包含三个核心概念:

  • G(Goroutine):协程,包含栈、程序计数器、寄存器等信息。

  • M(Machine):代表操作系统线程,由操作系统调度。

  • P(Processor):逻辑处理器,持有可运行的 G 队列,并关联一个 M。P 的数量决定了同时并行执行的协程数量。

默认情况下,P 的数量等于 CPU 核心数(可通过 runtime.GOMAXPROCS 设置)。调度器会周期性地将 G 调度到 M 上执行,并在 G 阻塞时(如系统调用、通道操作)将其移出,让 M 执行其他 G。

GOMAXPROCS

runtime.GOMAXPROCS(n) 可以设置逻辑处理器的数量。在 Go 1.5 之后,默认值为 CPU 核心数。增大该值可能提高并行能力,但过多的 P 也会带来调度开销。通常保持默认即可,但在某些 IO 密集型场景,适当增加可能有益。

import "runtime"

func main() {
    fmt.Println("NumCPU:", runtime.NumCPU())
    fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0)) // 获取当前值
    runtime.GOMAXPROCS(2) // 设置为2
}

协程状态

协程在其生命周期中可以处于以下几种状态:

  • 运行中(Running):协程正在 M 上执行。

  • 可运行(Runnable):协程已准备好执行,正在等待被调度到 M 上。

  • 阻塞(Blocked):协程正在等待某个条件(如通道读写、互斥锁、系统调用等)。

  • 结束(Dead):协程已退出。

协程只能从运行状态退出,不能直接从阻塞状态退出。如果所有协程都陷入阻塞且没有其他协程能唤醒它们,程序将发生死锁。

死锁检测

Go 运行时可以检测到死锁。当所有协程都处于阻塞状态且无法被唤醒时,程序会崩溃并打印死锁信息。例如:

package main

func main() {
    ch := make(chan int)
    <-ch // 永远不会有数据发送,主协程阻塞,死锁
}

输出类似:

fatal error: all goroutines are asleep - deadlock!

恐慌与恢复

Go 使用恐慌(panic)表示程序中发生了不可恢复的严重错误。当函数调用 panic 时,正常的执行流程会停止,并开始执行当前协程中已注册的延迟函数(defer)。如果恐慌未被恢复,程序将打印堆栈信息并终止。

恢复(recover) 允许在 defer 函数中捕获恐慌,使程序有机会清理资源并优雅地退出,或者继续执行。

panic 与 recover 机制

package main

import "fmt"

func mayPanic() {
    panic("a problem occurred") // 触发恐慌
}

func main() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered from panic:", r)
        }
    }()
    mayPanic()
    fmt.Println("After panic (won't execute)")
}

关键规则

  • recover 只有在 defer 函数中调用才有效。

  • recover 只能捕获同一协程中发生的恐慌,无法跨协程恢复。

  • 恐慌被恢复后,程序会从恐慌点跳出,继续执行当前函数 defer 之后的代码,但恐慌点之后的代码不会执行。

在协程中保护

每个可能发生恐慌的协程都应该设置自己的 recover 保护,否则一个协程的恐慌会导致整个程序崩溃。

package main

import (
    "fmt"
    "time"
)

func safeGoroutine(id int) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Goroutine %d recovered: %v\n", id, r)
        }
    }()
    if id%2 == 0 {
        panic(fmt.Sprintf("panic in goroutine %d", id))
    }
    fmt.Printf("Goroutine %d finished normally\n", id)
}

func main() {
    for i := 0; i < 4; i++ {
        go safeGoroutine(i)
    }
    time.Sleep(time.Second)
}

上下文:context 包

context 包用于在协程之间传递取消信号、超时控制以及请求范围的值。它常被用于管理一组关联的协程的生命周期。

取消信号

使用 context.WithCancel 创建一个可取消的上下文,当调用取消函数时,所有监听该上下文的协程都会收到通知。

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d stopped\n", id)
            return
        default:
            fmt.Printf("Worker %d working...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    time.Sleep(2 * time.Second)
    cancel() // 通知所有 worker 停止
    time.Sleep(1 * time.Second) // 等待输出
}

超时控制

context.WithTimeoutcontext.WithDeadline 可以在指定时间后自动取消。

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 通常 defer 调用 cancel 以释放资源

go worker(ctx, 1)

select {
case <-ctx.Done():
    fmt.Println("Main: timeout or cancelled")
}

传递值

context.WithValue 可以在上下文中携带请求范围的值,但注意这些值必须是线程安全的。

type key string

func main() {
    ctx := context.WithValue(context.Background(), key("user"), "Alice")
    go func(ctx context.Context) {
        user := ctx.Value(key("user")).(string)
        fmt.Println("Hello,", user)
    }(ctx)
    time.Sleep(100 * time.Millisecond)
}

注意:不应使用 context 传递可选参数,它更适合传递跨 API 和协程的元数据(如认证令牌、请求 ID)。


常用并发模式

工作池

工作池模式通过固定数量的协程处理任务,避免无限制地创建协程。通道用于分发任务和收集结果。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        results <- j * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    var wg sync.WaitGroup

    // 启动工作者
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // 等待所有工作者完成
    wg.Wait()
    close(results)

    // 收集结果
    for r := range results {
        fmt.Println("Result:", r)
    }
}

扇出/扇入

  • 扇出(Fan-out):将同一个通道的数据分发给多个协程处理。

  • 扇入(Fan-in):将多个通道的数据合并到一个通道。

// 扇出示例:多个协程从同一个通道读取
func fanOut(input <-chan int, workers int) {
    for i := 0; i < workers; i++ {
        go func(id int) {
            for v := range input {
                fmt.Printf("Worker %d got %d\n", id, v)
            }
        }(i)
    }
}

// 扇入示例:合并多个通道
func fanIn(chs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, ch := range chs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

管道模式

管道模式将程序分解为多个阶段,每个阶段由协程处理,并通过通道连接。每个阶段可以独立并发执行。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 设置管道
    c := gen(2, 3)
    out := sq(c)

    // 消费输出
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

定时器与打点器

time.Timertime.Ticker 常与 select 结合使用,实现定时任务或超时控制。

  • Timer:在指定时间后发送一个值到通道。

  • Ticker:按指定间隔周期性发送值到通道。

// 超时示例
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
    fmt.Println("Timer expired")
}

// 周期性任务
ticker := time.NewTicker(500 * time.Millisecond)
go func() {
    for t := range ticker.C {
        fmt.Println("Tick at", t)
    }
}()
time.Sleep(2 * time.Second)
ticker.Stop() // 停止 ticker

并发安全的数据结构

sync.Map

sync.Map 是 Go 提供的并发安全的 map,适合读多写少或 key 不经常更新的场景。它有以下常用方法:

  • Load(key any) (value any, ok bool)

  • Store(key, value any)

  • LoadOrStore(key, value any) (actual any, loaded bool)

  • Delete(key any)

  • Range(f func(key, value any) bool)

var m sync.Map

// 写入
m.Store("key", "value")

// 读取
if val, ok := m.Load("key"); ok {
    fmt.Println(val)
}

// 遍历
m.Range(func(key, value any) bool {
    fmt.Println(key, value)
    return true // 继续遍历
})

注意sync.Map 不适合频繁写入的场景,此时使用普通的 map 加锁性能更好。

通道作为队列

通道本质上是一个并发安全的 FIFO 队列,可以用于传递数据或任务。无缓冲通道提供同步保证,带缓冲通道可以解耦生产者和消费者。


调试与监控

获取协程数量

使用 runtime.NumGoroutine() 可以获取当前存在的协程数量,用于检测协程泄漏。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    fmt.Println("Before goroutines:", runtime.NumGoroutine())
    go func() {
        time.Sleep(time.Hour)
    }()
    fmt.Println("After one goroutine:", runtime.NumGoroutine())
    // 输出可能类似:Before: 1, After: 2
}

pprof 分析

Go 内置了性能分析工具 pprof,可以分析 CPU、内存、阻塞、协程等信息。通过导入 net/http/pprof 包并启动 HTTP 服务器,即可在运行时获取分析数据。

import _ "net/http/pprof"

func main() {
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    // ... 程序逻辑
}

然后可用 go tool pprof 或浏览器访问 http://localhost:6060/debug/pprof/goroutine 查看协程堆栈信息。

检测协程泄漏

协程泄漏是指协程在完成任务后仍被阻塞,无法退出,导致资源占用。可以通过 runtime.NumGoroutine 监控协程数量变化,结合 pprof 找出泄漏点。常见泄漏原因包括:

  • 通道发送/接收未正确关闭

  • 无限循环缺少退出条件

  • 互斥锁未释放导致协程永久阻塞


常见陷阱与最佳实践

循环变量捕获

在循环中启动协程时,如果直接使用循环变量,可能会捕获到变量的最终值,导致所有协程使用同一个值。

// 错误示例
for i := 1; i <= 3; i++ {
    go func() {
        fmt.Println(i) // 可能打印 4,4,4 或类似
    }()
}

正确做法:将变量作为参数传递给匿名函数,或在循环内重新声明变量。

// 方式一:传参
for i := 1; i <= 3; i++ {
    go func(n int) {
        fmt.Println(n)
    }(i)
}

// 方式二:局部变量
for i := 1; i <= 3; i++ {
    i := i // 重新声明
    go func() {
        fmt.Println(i)
    }()
}

通道关闭规则

  • 向已关闭的通道发送数据会引发 panic。

  • 从已关闭的通道接收数据,可以继续读取缓冲区中的值,缓冲区为空时返回零值,第二个返回值(ok)为 false。

  • 关闭通道应由发送方负责,避免在接收方关闭通道。

  • 使用 range 循环从通道接收数据时,通道必须由发送方关闭,否则会死锁。

协程泄漏场景

以下情况容易导致协程泄漏:

  • 协程在无缓冲通道上发送数据,但没有接收方。

  • 协程在 select 中等待永远不会到达的信号。

  • 协程持有互斥锁后未释放,导致其他等待锁的协程永久阻塞。

  • 使用 time.After 但未及时处理通道,导致定时器资源未释放。

解决方法:确保每个协程都有明确的退出路径,使用 context 控制生命周期,避免无限阻塞。

锁复制问题

sync.Mutexsync.RWMutexsync.WaitGroup 等类型不应被复制(通过值传递),因为复制会创建新的实例,失去同步效果。应始终使用指针传递。

type Counter struct {
    mu sync.Mutex
    val int
}

func (c *Counter) Inc() { // 必须是指针接收者
    c.mu.Lock()
    defer c.mu.Unlock()
    c.val++
}

合理使用 select

  • 避免在 select 中使用 nil 通道,否则该 case 会被忽略。

  • 使用 default 分支实现非阻塞操作时,需注意可能造成忙循环,浪费 CPU。

  • 当多个通道都准备好时,select 随机选择,不应依赖特定顺序。


优雅退出与资源清理

监听系统信号

在生产环境中,程序应能够响应系统信号(如 SIGINT、SIGTERM),进行优雅关闭。可以使用 os/signal 包捕获信号。

package main
​
import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)
​
func worker(ctx context.Context, wg *sync.WaitGroup, id int) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d stopping\n", id)
            return
        default:
            fmt.Printf("Worker %d working...\n", id)
            time.Sleep(500 * time.Millisecond)
        }
    }
}
​
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup
​
    // 启动多个 worker
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(ctx, &wg, i)
    }
​
    // 监听信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
​
    <-sigCh // 等待信号
    fmt.Println("Received shutdown signal, stopping workers...")
    cancel() // 通知所有 worker 退出
​
    wg.Wait() // 等待所有 worker 完成
    fmt.Println("All workers stopped, exiting.")
}

协调协程退出

除了使用 context,还可以通过专用的退出通道(chan struct{})或关闭通道来通知协程退出。关闭通道可以广播给所有监听该通道的协程。

stopCh := make(chan struct{})
go func() {
    for {
        select {
        case <-stopCh:
            return
        default:
            // 工作
        }
    }
}()
close(stopCh) // 触发协程退出



0