Golang 并发模式:Fan-out / Fan-in
发布: (2026年2月23日 GMT+8 04:40)
4 分钟阅读
原文: Dev.to
Source: Dev.to
请提供您希望翻译的具体文本内容,我将按照要求保留原始的 Markdown 格式、代码块和链接,仅翻译正文部分。谢谢!
此模式可以解决的问题
- 你有 8 核心,但数据处理阶段在单个 goroutine 中顺序执行。CPU 空闲而任务队列不断增长。通过创建 N 个工作者(每个核心一个),让它们并行地从共享通道拉取任务,所有核心都能得到利用。
- 管道的某个阶段调用外部 API 或进行图像缩放,这一步很慢。如果仍在单线程中执行,整个管道都会在该阶段形成瓶颈。
- 你已经为网页抓取启动了 100 个 goroutine。需要将所有结果收集到一个地方,以便最终写入数据库,而不使用全局变量和互斥锁。
- 为每个进入的请求创建一个新的 goroutine,在高峰负载下可能导致服务崩溃(因内存耗尽而 panic)。
本质
一个由两个阶段组成、协同工作的模式,用于并行化任务。
- Fan‑out – 启动多个 goroutine(工作者)从单一输入通道读取任务,分配负载。
- Fan‑in – 将多个 goroutine 的结果合并到单一输出通道,汇总数据以进行最终处理。
思路: 通过将相似任务分配给固定数量的工作池来并行执行,然后通过复用输出通道聚合结果。
与 Pipeline 的区别
- Fan‑out/Fan‑in(分叉/合并): 对单个阶段的水平扩展。将一个步骤在多个执行器上并行复制。
- Pipeline(流水线): 将过程垂直拆分为不同的步骤(读取 → 处理 → 写入)。Fan‑out/Fan‑in 通常位于流水线的特定阶段内部。
与工作池的区别
- Fan‑out = 将任务分派到池中。
- Fan‑in = 从池中收集结果。
与 Pub/Sub(发布‑订阅)的区别
- Fan‑out/Fan‑in: 每个来自输入通道的任务仅被一个工作者接收(工作分配,任务竞争)。
- Pub/Sub: 每条消息会被所有订阅者接收(广播分发)。
package main
import (
"context"
"fmt"
"sync"
)
// Task generation
func generateJobs(n int) <-chan int {
ch := make(chan int)
go func() {
for i := 1; i <= n; i++ {
ch <- i
}
close(ch)
}()
return ch
}
// Fan‑out: Distributing tasks among workers
func fanOut(ctx context.Context, jobs <-chan int, numWorkers int) []<-chan int {
workerChannels := make([]<-chan int, 0, numWorkers)
for i := 0; i < numWorkers; i++ {
resultCh := make(chan int)
go func() {
defer close(resultCh)
for {
select {
case job, ok := <-jobs:
if !ok {
return // Task channel closed
}
// Task processing (example: squaring)
resultCh <- job * job
case <-ctx.Done():
return // Cancellation via context
}
}
}()
workerChannels = append(workerChannels, resultCh)
}
return workerChannels
}
// Fan‑in: Merging results
func fanIn(channels []<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for res := range c {
merged <- res
}
}(ch)
}
// Close the final channel when all workers are done
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// 1. Generate tasks
jobs := generateJobs(5)
// 2. Fan‑out: distribute tasks among 3 workers
resultChannels := fanOut(context.Background(), jobs, 3)
// 3. Fan‑in: merge results from all channels
mergedResults := fanIn(resultChannels)
// 4. Results
for res := range mergedResults {
fmt.Printf("Получен результат: %d\n", res)
}
}