Go 并发编程
近期重新学习了Go的并发编程,解决了一些困惑,以下为整理好的学习笔记。
引言
现代 CPU 通常包含多个核心,且每个核心可能支持多线程,这使得计算机能够同时执行多条指令流水线。为了充分利用 CPU 的计算能力,我们需要编写支持并发的程序。并发是指多个计算任务可能在同一个时间片段内同时运行,它们可能位于同一个程序、同一台计算机或同一个网络中。
Go 语言在设计之初就将并发作为核心特性,提供了简单、高效且安全的并发编程模型。Go 的并发哲学是:“不要通过共享内存来通信,而应通过通信来共享内存。” 这一思想指导我们使用通道在协程之间传递数据,从而避免复杂的锁机制。
本文将全面介绍 Go 的并发编程,涵盖协程基础、通道、同步原语、调度机制、恐慌恢复、上下文、常见并发模式、调试技巧以及陷阱与最佳实践。通过本文,读者将能够编写出健壮、高效的并发程序。
协程基础
什么是协程
协程有时也被称为绿色线程,它是由程序的运行时(runtime)管理的轻量级线程。与操作系统线程相比,协程的内存开销非常小(初始栈仅约 2KB),上下文切换开销也极低。因此,一个 Go 程序可以轻松创建成千上万个协程,而无需担心资源耗尽。
在 Go 中,我们无法直接创建系统线程,协程是唯一的并发实现方式。每个 Go 程序启动时,会自动创建一个对用户可见的协程,称为主协程(即 main 函数所在的协程)。
启动协程
使用 go 关键字可以在任何函数调用前启动一个新的协程,使该函数在新协程中并发执行。新协程不会阻塞当前协程,主协程会继续执行后续代码。
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
// 直接调用(顺序执行)
fmt.Println("Direct call:")
say("world")
// 启动协程(并发执行)
fmt.Println("\nWith goroutine:")
go say("hello") // 新协程在后台运行
say("world") // 主协程继续执行
// 等待协程执行,否则主程序可能提前退出
time.Sleep(500 * time.Millisecond)
}重要说明:
协程调用的所有返回值(如果有)必须被舍弃,因为无法从外部直接获取返回值。
主协程一旦结束,整个程序立即终止,所有其他协程都会被强制结束。因此,通常需要同步机制来等待协程完成。
主协程与程序退出
主协程是程序的入口,也是程序退出的决定者。如果主协程执行完毕,无论其他协程是否完成,程序都会终止。下面的示例展示了这一特性:
package main
import (
"fmt"
"time"
)
func main() {
go func() {
time.Sleep(2 * time.Second)
fmt.Println("Goroutine finished")
}()
fmt.Println("Main exiting")
// 程序立即退出,不会打印 "Goroutine finished"
}为了避免这种情况,我们需要使用同步机制(如 WaitGroup、通道等)来确保主协程等待所有子协程完成后再退出。
协程间通信:通道
通道是 Go 中协程之间通信的首选方式。它是一个类型安全的队列,遵循先进先出的规则。通道可以是有缓冲的,也可以是无缓冲的。
通道的创建与基本操作
使用 make 函数创建通道,指定通道中元素的类型。<- 运算符用于发送和接收数据。
package main
import "fmt"
func main() {
// 创建无缓冲的 int 通道
ch := make(chan int)
// 启动协程发送数据
go func() {
ch <- 42 // 发送
}()
// 从通道接收数据(阻塞直到有数据)
value := <-ch
fmt.Println("Received:", value)
}带缓冲与无缓冲通道
无缓冲通道:发送和接收操作必须同时准备好,否则会阻塞。这使得同步和通信同时发生。
带缓冲通道:发送操作仅在缓冲区满时阻塞,接收操作仅在缓冲区空时阻塞。缓冲区大小在创建时指定。
func main() {
// 带缓冲的通道,容量为2
ch := make(chan string, 2)
ch <- "first"
ch <- "second"
// ch <- "third" // 此时缓冲区已满,会阻塞
fmt.Println(<-ch) // first
fmt.Println(<-ch) // second
}通道方向与关闭
可以在函数参数中指定通道的方向(只发送或只接收),提高代码的类型安全性。使用 close 函数关闭通道,关闭后不能再发送数据,但可以继续接收剩余数据。
func producer(ch chan<- int) { // 只发送通道
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭通道
}
func consumer(ch <-chan int) { // 只接收通道
for v := range ch { // 循环直到通道关闭
fmt.Println("Consumed:", v)
}
}
func main() {
ch := make(chan int)
go producer(ch)
consumer(ch)
}注意:向已关闭的通道发送数据会引发 panic。从已关闭的通道接收数据时,可以继续读取缓冲区中的值,当缓冲区为空时,会返回通道元素类型的零值,并且第二个返回值(ok)为 false。
select 语句
select 语句允许一个协程同时等待多个通道操作,类似于 switch 但专门用于通道。当多个 case 同时满足条件时,select 会随机选择一个执行。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}使用 default 实现非阻塞操作:
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message received")
}空 select(select {})会永久阻塞,常用于防止主协程退出。
超时控制:结合 time.After 实现超时等待。
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(2 * time.Second):
fmt.Println("Timeout")
}同步机制
除了通道,Go 还提供了多种同步原语,主要用于保护共享内存或协调协程执行顺序。
WaitGroup
sync.WaitGroup 用于等待一组协程完成。它维护一个内部计数器,通过 Add、Done 和 Wait 方法控制。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时计数器减1
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // 启动前计数器加1
go worker(i, &wg)
}
wg.Wait() // 阻塞直到计数器归零
fmt.Println("All workers completed")
}注意:Add 必须在启动协程之前调用,Done 通常用 defer 确保在协程结束时调用。计数器不能为负数,否则会引发 panic。
互斥锁
sync.Mutex 提供基础的互斥锁,用于保护共享资源,防止数据竞争。
package main
import (
"fmt"
"sync"
)
var counter int
var mu sync.Mutex
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final counter:", counter)
}读写锁
sync.RWMutex 区分读锁和写锁,允许多个读操作并发,但写操作独占。适用于读多写少的场景。
var rwMu sync.RWMutex
var data map[string]string
func read(key string) string {
rwMu.RLock()
defer rwMu.RUnlock()
return data[key]
}
func write(key, value string) {
rwMu.Lock()
defer rwMu.Unlock()
data[key] = value
}原子操作
sync/atomic 包提供了针对基本类型的原子操作,无需加锁即可实现并发安全。适合简单的计数器、标志位等。
import "sync/atomic"
var counter int64
func increment(wg *sync.WaitGroup) {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}条件变量
sync.Cond 用于在多个协程之间等待或通知事件的发生。它通常与互斥锁结合使用,实现复杂的同步模式(如生产者-消费者队列)。
package main
import (
"fmt"
"sync"
"time"
)
var cond = sync.NewCond(&sync.Mutex{})
var queue []int
func producer() {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
cond.L.Lock()
queue = append(queue, i)
cond.Signal() // 唤醒一个等待的消费者
cond.L.Unlock()
}
}
func consumer() {
for i := 0; i < 5; i++ {
cond.L.Lock()
for len(queue) == 0 {
cond.Wait() // 等待条件满足
}
val := queue[0]
queue = queue[1:]
fmt.Println("Consumed:", val)
cond.L.Unlock()
}
}
func main() {
go producer()
go consumer()
time.Sleep(2 * time.Second)
}一次性初始化
sync.Once 保证某个函数只执行一次,常用于单例模式或初始化全局资源。
var once sync.Once
var config map[string]string
func loadConfig() {
// 模拟加载配置
config = map[string]string{"key": "value"}
}
func getConfig() map[string]string {
once.Do(loadConfig)
return config
}协程调度
Go 运行时拥有自己的调度器,它将协程多路复用到操作系统线程上。了解调度模型有助于编写高效的并发程序。
GMP 模型简介
Go 调度器包含三个核心概念:
G(Goroutine):协程,包含栈、程序计数器、寄存器等信息。
M(Machine):代表操作系统线程,由操作系统调度。
P(Processor):逻辑处理器,持有可运行的 G 队列,并关联一个 M。P 的数量决定了同时并行执行的协程数量。
默认情况下,P 的数量等于 CPU 核心数(可通过 runtime.GOMAXPROCS 设置)。调度器会周期性地将 G 调度到 M 上执行,并在 G 阻塞时(如系统调用、通道操作)将其移出,让 M 执行其他 G。
GOMAXPROCS
runtime.GOMAXPROCS(n) 可以设置逻辑处理器的数量。在 Go 1.5 之后,默认值为 CPU 核心数。增大该值可能提高并行能力,但过多的 P 也会带来调度开销。通常保持默认即可,但在某些 IO 密集型场景,适当增加可能有益。
import "runtime"
func main() {
fmt.Println("NumCPU:", runtime.NumCPU())
fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0)) // 获取当前值
runtime.GOMAXPROCS(2) // 设置为2
}协程状态
协程在其生命周期中可以处于以下几种状态:
运行中(Running):协程正在 M 上执行。
可运行(Runnable):协程已准备好执行,正在等待被调度到 M 上。
阻塞(Blocked):协程正在等待某个条件(如通道读写、互斥锁、系统调用等)。
结束(Dead):协程已退出。
协程只能从运行状态退出,不能直接从阻塞状态退出。如果所有协程都陷入阻塞且没有其他协程能唤醒它们,程序将发生死锁。
死锁检测
Go 运行时可以检测到死锁。当所有协程都处于阻塞状态且无法被唤醒时,程序会崩溃并打印死锁信息。例如:
package main
func main() {
ch := make(chan int)
<-ch // 永远不会有数据发送,主协程阻塞,死锁
}输出类似:
fatal error: all goroutines are asleep - deadlock!恐慌与恢复
Go 使用恐慌(panic)表示程序中发生了不可恢复的严重错误。当函数调用 panic 时,正常的执行流程会停止,并开始执行当前协程中已注册的延迟函数(defer)。如果恐慌未被恢复,程序将打印堆栈信息并终止。
恢复(recover) 允许在 defer 函数中捕获恐慌,使程序有机会清理资源并优雅地退出,或者继续执行。
panic 与 recover 机制
package main
import "fmt"
func mayPanic() {
panic("a problem occurred") // 触发恐慌
}
func main() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
mayPanic()
fmt.Println("After panic (won't execute)")
}关键规则:
recover只有在defer函数中调用才有效。recover只能捕获同一协程中发生的恐慌,无法跨协程恢复。恐慌被恢复后,程序会从恐慌点跳出,继续执行当前函数
defer之后的代码,但恐慌点之后的代码不会执行。
在协程中保护
每个可能发生恐慌的协程都应该设置自己的 recover 保护,否则一个协程的恐慌会导致整个程序崩溃。
package main
import (
"fmt"
"time"
)
func safeGoroutine(id int) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Goroutine %d recovered: %v\n", id, r)
}
}()
if id%2 == 0 {
panic(fmt.Sprintf("panic in goroutine %d", id))
}
fmt.Printf("Goroutine %d finished normally\n", id)
}
func main() {
for i := 0; i < 4; i++ {
go safeGoroutine(i)
}
time.Sleep(time.Second)
}上下文:context 包
context 包用于在协程之间传递取消信号、超时控制以及请求范围的值。它常被用于管理一组关联的协程的生命周期。
取消信号
使用 context.WithCancel 创建一个可取消的上下文,当调用取消函数时,所有监听该上下文的协程都会收到通知。
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d stopped\n", id)
return
default:
fmt.Printf("Worker %d working...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // 通知所有 worker 停止
time.Sleep(1 * time.Second) // 等待输出
}超时控制
context.WithTimeout 或 context.WithDeadline 可以在指定时间后自动取消。
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 通常 defer 调用 cancel 以释放资源
go worker(ctx, 1)
select {
case <-ctx.Done():
fmt.Println("Main: timeout or cancelled")
}传递值
context.WithValue 可以在上下文中携带请求范围的值,但注意这些值必须是线程安全的。
type key string
func main() {
ctx := context.WithValue(context.Background(), key("user"), "Alice")
go func(ctx context.Context) {
user := ctx.Value(key("user")).(string)
fmt.Println("Hello,", user)
}(ctx)
time.Sleep(100 * time.Millisecond)
}注意:不应使用 context 传递可选参数,它更适合传递跨 API 和协程的元数据(如认证令牌、请求 ID)。
常用并发模式
工作池
工作池模式通过固定数量的协程处理任务,避免无限制地创建协程。通道用于分发任务和收集结果。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, j)
time.Sleep(time.Second) // 模拟耗时操作
results <- j * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动工作者
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 等待所有工作者完成
wg.Wait()
close(results)
// 收集结果
for r := range results {
fmt.Println("Result:", r)
}
}扇出/扇入
扇出(Fan-out):将同一个通道的数据分发给多个协程处理。
扇入(Fan-in):将多个通道的数据合并到一个通道。
// 扇出示例:多个协程从同一个通道读取
func fanOut(input <-chan int, workers int) {
for i := 0; i < workers; i++ {
go func(id int) {
for v := range input {
fmt.Printf("Worker %d got %d\n", id, v)
}
}(i)
}
}
// 扇入示例:合并多个通道
func fanIn(chs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range chs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}管道模式
管道模式将程序分解为多个阶段,每个阶段由协程处理,并通过通道连接。每个阶段可以独立并发执行。
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 设置管道
c := gen(2, 3)
out := sq(c)
// 消费输出
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}定时器与打点器
time.Timer 和 time.Ticker 常与 select 结合使用,实现定时任务或超时控制。
Timer:在指定时间后发送一个值到通道。
Ticker:按指定间隔周期性发送值到通道。
// 超时示例
timer := time.NewTimer(2 * time.Second)
select {
case <-timer.C:
fmt.Println("Timer expired")
}
// 周期性任务
ticker := time.NewTicker(500 * time.Millisecond)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()
time.Sleep(2 * time.Second)
ticker.Stop() // 停止 ticker并发安全的数据结构
sync.Map
sync.Map 是 Go 提供的并发安全的 map,适合读多写少或 key 不经常更新的场景。它有以下常用方法:
Load(key any) (value any, ok bool)Store(key, value any)LoadOrStore(key, value any) (actual any, loaded bool)Delete(key any)Range(f func(key, value any) bool)
var m sync.Map
// 写入
m.Store("key", "value")
// 读取
if val, ok := m.Load("key"); ok {
fmt.Println(val)
}
// 遍历
m.Range(func(key, value any) bool {
fmt.Println(key, value)
return true // 继续遍历
})注意:sync.Map 不适合频繁写入的场景,此时使用普通的 map 加锁性能更好。
通道作为队列
通道本质上是一个并发安全的 FIFO 队列,可以用于传递数据或任务。无缓冲通道提供同步保证,带缓冲通道可以解耦生产者和消费者。
调试与监控
获取协程数量
使用 runtime.NumGoroutine() 可以获取当前存在的协程数量,用于检测协程泄漏。
package main
import (
"fmt"
"runtime"
"time"
)
func main() {
fmt.Println("Before goroutines:", runtime.NumGoroutine())
go func() {
time.Sleep(time.Hour)
}()
fmt.Println("After one goroutine:", runtime.NumGoroutine())
// 输出可能类似:Before: 1, After: 2
}pprof 分析
Go 内置了性能分析工具 pprof,可以分析 CPU、内存、阻塞、协程等信息。通过导入 net/http/pprof 包并启动 HTTP 服务器,即可在运行时获取分析数据。
import _ "net/http/pprof"
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// ... 程序逻辑
}然后可用 go tool pprof 或浏览器访问 http://localhost:6060/debug/pprof/goroutine 查看协程堆栈信息。
检测协程泄漏
协程泄漏是指协程在完成任务后仍被阻塞,无法退出,导致资源占用。可以通过 runtime.NumGoroutine 监控协程数量变化,结合 pprof 找出泄漏点。常见泄漏原因包括:
通道发送/接收未正确关闭
无限循环缺少退出条件
互斥锁未释放导致协程永久阻塞
常见陷阱与最佳实践
循环变量捕获
在循环中启动协程时,如果直接使用循环变量,可能会捕获到变量的最终值,导致所有协程使用同一个值。
// 错误示例
for i := 1; i <= 3; i++ {
go func() {
fmt.Println(i) // 可能打印 4,4,4 或类似
}()
}正确做法:将变量作为参数传递给匿名函数,或在循环内重新声明变量。
// 方式一:传参
for i := 1; i <= 3; i++ {
go func(n int) {
fmt.Println(n)
}(i)
}
// 方式二:局部变量
for i := 1; i <= 3; i++ {
i := i // 重新声明
go func() {
fmt.Println(i)
}()
}通道关闭规则
向已关闭的通道发送数据会引发 panic。
从已关闭的通道接收数据,可以继续读取缓冲区中的值,缓冲区为空时返回零值,第二个返回值(ok)为 false。
关闭通道应由发送方负责,避免在接收方关闭通道。
使用
range循环从通道接收数据时,通道必须由发送方关闭,否则会死锁。
协程泄漏场景
以下情况容易导致协程泄漏:
协程在无缓冲通道上发送数据,但没有接收方。
协程在
select中等待永远不会到达的信号。协程持有互斥锁后未释放,导致其他等待锁的协程永久阻塞。
使用
time.After但未及时处理通道,导致定时器资源未释放。
解决方法:确保每个协程都有明确的退出路径,使用 context 控制生命周期,避免无限阻塞。
锁复制问题
sync.Mutex、sync.RWMutex、sync.WaitGroup 等类型不应被复制(通过值传递),因为复制会创建新的实例,失去同步效果。应始终使用指针传递。
type Counter struct {
mu sync.Mutex
val int
}
func (c *Counter) Inc() { // 必须是指针接收者
c.mu.Lock()
defer c.mu.Unlock()
c.val++
}合理使用 select
避免在
select中使用nil通道,否则该 case 会被忽略。使用
default分支实现非阻塞操作时,需注意可能造成忙循环,浪费 CPU。当多个通道都准备好时,
select随机选择,不应依赖特定顺序。
优雅退出与资源清理
监听系统信号
在生产环境中,程序应能够响应系统信号(如 SIGINT、SIGTERM),进行优雅关闭。可以使用 os/signal 包捕获信号。
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func worker(ctx context.Context, wg *sync.WaitGroup, id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d stopping\n", id)
return
default:
fmt.Printf("Worker %d working...\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// 启动多个 worker
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(ctx, &wg, i)
}
// 监听信号
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh // 等待信号
fmt.Println("Received shutdown signal, stopping workers...")
cancel() // 通知所有 worker 退出
wg.Wait() // 等待所有 worker 完成
fmt.Println("All workers stopped, exiting.")
}协调协程退出
除了使用 context,还可以通过专用的退出通道(chan struct{})或关闭通道来通知协程退出。关闭通道可以广播给所有监听该通道的协程。
stopCh := make(chan struct{})
go func() {
for {
select {
case <-stopCh:
return
default:
// 工作
}
}
}()
close(stopCh) // 触发协程退出0