Golang 并发模式:Fan-out / Fan-in

发布: (2026年2月23日 GMT+8 04:40)
4 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的具体文本内容,我将按照要求保留原始的 Markdown 格式、代码块和链接,仅翻译正文部分。谢谢!

此模式可以解决的问题

  • 你有 8 核心,但数据处理阶段在单个 goroutine 中顺序执行。CPU 空闲而任务队列不断增长。通过创建 N 个工作者(每个核心一个),让它们并行地从共享通道拉取任务,所有核心都能得到利用。
  • 管道的某个阶段调用外部 API 或进行图像缩放,这一步很慢。如果仍在单线程中执行,整个管道都会在该阶段形成瓶颈。
  • 你已经为网页抓取启动了 100 个 goroutine。需要将所有结果收集到一个地方,以便最终写入数据库,而不使用全局变量和互斥锁。
  • 为每个进入的请求创建一个新的 goroutine,在高峰负载下可能导致服务崩溃(因内存耗尽而 panic)。

本质

一个由两个阶段组成、协同工作的模式,用于并行化任务。

  • Fan‑out – 启动多个 goroutine(工作者)从单一输入通道读取任务,分配负载。
  • Fan‑in – 将多个 goroutine 的结果合并到单一输出通道,汇总数据以进行最终处理。

思路: 通过将相似任务分配给固定数量的工作池来并行执行,然后通过复用输出通道聚合结果。

与 Pipeline 的区别

  • Fan‑out/Fan‑in(分叉/合并): 对单个阶段的水平扩展。将一个步骤在多个执行器上并行复制。
  • Pipeline(流水线): 将过程垂直拆分为不同的步骤(读取 → 处理 → 写入)。Fan‑out/Fan‑in 通常位于流水线的特定阶段内部。

与工作池的区别

  • Fan‑out = 将任务分派到池中。
  • Fan‑in = 从池中收集结果。

与 Pub/Sub(发布‑订阅)的区别

  • Fan‑out/Fan‑in: 每个来自输入通道的任务仅被一个工作者接收(工作分配,任务竞争)。
  • Pub/Sub: 每条消息会被所有订阅者接收(广播分发)。
package main

import (
    "context"
    "fmt"
    "sync"
)

// Task generation
func generateJobs(n int) <-chan int {
    ch := make(chan int)
    go func() {
        for i := 1; i <= n; i++ {
            ch <- i
        }
        close(ch)
    }()
    return ch
}

// Fan‑out: Distributing tasks among workers
func fanOut(ctx context.Context, jobs <-chan int, numWorkers int) []<-chan int {
    workerChannels := make([]<-chan int, 0, numWorkers)

    for i := 0; i < numWorkers; i++ {
        resultCh := make(chan int)

        go func() {
            defer close(resultCh)
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return // Task channel closed
                    }
                    // Task processing (example: squaring)
                    resultCh <- job * job
                case <-ctx.Done():
                    return // Cancellation via context
                }
            }
        }()

        workerChannels = append(workerChannels, resultCh)
    }
    return workerChannels
}

// Fan‑in: Merging results
func fanIn(channels []<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for res := range c {
                merged <- res
            }
        }(ch)
    }

    // Close the final channel when all workers are done
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // 1. Generate tasks
    jobs := generateJobs(5)

    // 2. Fan‑out: distribute tasks among 3 workers
    resultChannels := fanOut(context.Background(), jobs, 3)

    // 3. Fan‑in: merge results from all channels
    mergedResults := fanIn(resultChannels)

    // 4. Results
    for res := range mergedResults {
        fmt.Printf("Получен результат: %d\n", res)
    }
}
0 浏览
Back to Blog

相关文章

阅读更多 »

Go的秘密生活:select 语句

如何阻止快速数据在慢通道上等待 第25部分:Multiplexer、Timeout 和 Non‑Blocking Read 伊森正在观看他的终端输出 dri...

测试,是鸡蛋还是鸡?

测试的封面图片:是先有鸡还是先有蛋? https://media2.dev.to/dynamic/image/width=1000,height=420,fit=cover,gravity=auto,format=auto/https%3A%2F%2F...