Go 并发模式:Worker Pool 与 Fan-out/Fan-in,从 goroutine 泛滥到受控并行
一、goroutine 泛滥的工程风险:资源耗尽的隐性炸弹
Go 语言的 goroutine 以轻量著称,单个 goroutine 仅占 2-8KB 栈空间,这使得开发者倾向于"遇到并发就 go"。然而,无节制的 goroutine 创建在生产环境中是定时炸弹:当并发请求量激增时,数百万 goroutine 同时运行会耗尽内存与 CPU 调度资源,导致服务整体不可用。更危险的是,每个 goroutine 可能持有数据库连接、HTTP 客户端等外部资源,goroutine 泛滥会引发连接池耗尽、文件描述符耗尽等级联故障。
Worker Pool 模式通过固定数量的 worker goroutine 消费任务队列,将并发度控制在可预测的范围内。Fan-out/Fan-in 模式则将任务分发到多个并行 worker,再聚合结果,适用于可并行化的计算密集型任务。
二、Worker Pool 与 Fan-out/Fan-in 的数据流模型
flowchart LR A[任务生产者] --> B[任务队列 Channel] B --> C1[Worker 1] B --> C2[Worker 2] B --> C3[Worker N] C1 --> D[结果 Channel] C2 --> D C3 --> D D --> E[结果聚合] subgraph Worker Pool C1 C2 C3 end subgraph Fan-out A --> C1 A --> C2 A --> C3 end subgraph Fan-in C1 --> E C2 --> E C3 --> E endWorker Pool 的核心约束:worker 数量固定,任务通过 buffered channel 排队。当队列满时,生产者被阻塞,形成天然的背压机制。Fan-out/Fan-in 在 Worker Pool 基础上增加了结果聚合层,通过sync.WaitGroup或errgroup.Group协调并行任务的完成。
三、工程实现:生产级 Worker Pool 与 Fan-out/Fan-in
// workerpool.go — 受控并发的 Worker Pool package pool import ( "context" "fmt" "sync" ) type Task func(ctx context.Context) (interface{}, error) type Result struct { Value interface{} Err error } type WorkerPool struct { workers int taskCh chan Task resultCh chan Result wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } // 创建 Worker Pool func New(parentCtx context.Context, workers int, queueSize int) *WorkerPool { ctx, cancel := context.WithCancel(parentCtx) p := &WorkerPool{ workers: workers, taskCh: make(chan Task, queueSize), resultCh: make(chan Result, queueSize), ctx: ctx, cancel: cancel, } // 启动固定数量的 worker for i := 0; i < workers; i++ { p.wg.Add(1) go p.worker(i) } return p } func (p *WorkerPool) worker(id int) { defer p.wg.Done() for { select { case <-p.ctx.Done(): return case task, ok := <-p.taskCh: if !ok { return } // 执行任务,捕获 panic 防止单个任务崩溃影响整个 Pool func() { defer func() { if r := recover(); r != nil { p.resultCh <- Result{ Err: fmt.Errorf("worker %d panic: %v", id, r), } } }() value, err := task(p.ctx) p.resultCh <- Result{Value: value, Err: err} }() } } } // 提交任务(非阻塞,队列满时返回错误) func (p *WorkerPool) Submit(task Task) error { select { case p.taskCh <- task: return nil default: return fmt.Errorf("任务队列已满,请稍后重试") } } // 优雅关闭:等待所有任务完成 func (p *WorkerPool) Shutdown() { close(p.taskCh) p.wg.Wait() close(p.resultCh) } // 强制取消:立即停止所有 worker func (p *WorkerPool) Cancel() { p.cancel() }// fanout_fanin.go — Fan-out/Fan-in 并行计算模式 package fanout import ( "context" "sync" "golang.org/x/sync/errgroup" ) // Fan-out:将任务分发到多个并行 worker,Fan-in 聚合结果 func ProcessParallel[T any, R any]( ctx context.Context, workers int, items []T, processFn func(ctx context.Context, item T) (R, error), ) ([]R, error) { // 创建带缓冲的结果切片,避免 Fan-in 阶段的锁竞争 results := make([]R, len(items)) errs := make([]error, len(items)) g, ctx := errgroup.WithContext(ctx) g.SetLimit(workers) // 限制并发度 for i, item := range items { i, item := i, item // 捕获循环变量 g.Go(func() error { // 检查上下文是否已取消,避免无谓计算 select { case <-ctx.Done(): return ctx.Err() default: } result, err := processFn(ctx, item) results[i] = result errs[i] = err return nil // 不将业务错误传给 errgroup,避免取消其他任务 }) } // 等待所有任务完成 if err := g.Wait(); err != nil { return nil, err } // 检查是否有业务错误 var firstErr error for _, err := range errs { if err != nil && firstErr == nil { firstErr = err } } return results, firstErr }// usage_example.go — 使用示例:并行查询多个数据源 func FetchUserData(ctx context.Context, userID string) (*UserProfile, error) { // 并行查询用户基本信息、订单历史、积分记录 results, err := fanout.ProcessParallel(ctx, 3, []string{ "basic_info", "order_history", "points_record", }, func(ctx context.Context, dataType string) (interface{}, error) { switch dataType { case "basic_info": return userDB.GetBasicInfo(ctx, userID) case "order_history": return orderDB.GetRecentOrders(ctx, userID, 10) case "points_record": return pointsDB.GetPointsSummary(ctx, userID) default: return nil, fmt.Errorf("未知数据类型: %s", dataType) } }) if err != nil { return nil, err } // 组装结果 profile := &UserProfile{UserID: userID} profile.BasicInfo = results[0].(*BasicInfo) profile.Orders = results[1].([]*Order) profile.Points = results[2].(*PointsSummary) return profile, nil }四、并发模式的边界与权衡
Worker Pool 的队列大小选择:队列过小导致生产者频繁阻塞,队列过大占用内存且增加任务等待延迟。建议队列大小设为 worker 数量的 2-4 倍,并根据监控指标(队列使用率、生产者阻塞时间)动态调整。
Fan-out 的错误传播策略:errgroup默认行为是任一任务出错即取消其他任务(Fail-Fast)。但部分场景需要"尽力而为"——即使部分任务失败,也收集成功的结果。上述实现通过不将业务错误传给errgroup,实现了"全部执行,统一检查错误"的策略,但需根据业务语义选择合适的错误传播模式。
goroutine 泄漏风险:如果 worker 中的任务阻塞在 I/O 操作上且不检查ctx.Done(),即使调用Cancel()也无法回收该 goroutine。所有阻塞操作必须传入 context 并检查取消信号,这是 Go 并发编程的基本纪律。
结果顺序保证:Fan-out/Fan-in 模式中,结果通过索引位置保持与输入的对应关系。如果不需要顺序保证,可以使用无序聚合(如 channel 收集),减少内存分配。
五、总结
Worker Pool 与 Fan-out/Fan-in 是 Go 并发编程的两种核心模式:前者通过固定 worker 数量与任务队列控制并发度,后者通过并行分发与结果聚合加速可并行化任务。工程落地的关键在于:Worker Pool 的背压机制防止资源耗尽、panic 恢复保障 Pool 稳定性、errgroup 简化 Fan-out 的错误管理、context 取消信号防止 goroutine 泄漏。并发不是越多越好,而是"受控的并行"——在资源预算内最大化吞吐量,同时保障服务的稳定性与可预测性。