管道思维:构建 Go 系统的更好方式

发布: (2026年5月4日 GMT+8 00:55)
12 分钟阅读
原文: Dev.to

Source: Dev.to

(未提供需要翻译的正文内容。如需翻译,请粘贴文章的其余部分。)

Source:

我最近在做一个个人项目——职位爬虫

在这个过程中,我遇到了一种真正改变我对 Go 后端系统结构思考方式的模式。
它叫做 Pipeline(管道)模式,在很多场景中都会出现——支付、分析、API 等等。

在本文中,我将使用我的职位爬虫项目来演示它,这正是该模式的完美用例。

我们想要避免的混乱

在展示模式之前,先看看如果不使用它,代码会是什么样子。

我的爬虫要完成四件事:

  1. 从多个来源抓取职位列表
  2. 对它们进行标准化(即清理)
  3. 根据关键字给它们打分(最符合我技能的职位得分最高)
  4. 将它们保存到数据库

一个天真的实现可能是这样(简化版):

for _, raw := range rawJobs {
    // normalize
    raw.Title = strings.TrimSpace(raw.Title)
    raw.Location = strings.ReplaceAll(raw.Location, "NYC", "New York")

    // score
    score := 0
    for _, keyword := range keywords {
        if strings.Contains(raw.Title, keyword) {
            score++
        }
    }

    // save
    s.Repo.Create(raw.Title, raw.Location, score)
}

这在技术上是可行的,但会产生几个问题:

  • 没有明确的阶段。 标准化何时结束、评分何时开始?你必须阅读全部代码才能理解任何一点。
  • 难以测试。 如何只测试评分逻辑?做不到——它与循环中的其他所有代码粘在一起。
  • 难以修改。 想添加新的评分规则?你得在循环里挖来挖去,可能会触碰到标准化代码,甚至破坏保存逻辑。所有东西都耦合在一起。
  • 难以复用。 如果在别处需要相同的标准化逻辑,你只能复制粘贴,导致代码重复。
  • 并发几乎不可能。 想并发处理职位吗?循环结构纠缠不清,根本不知道哪些部分可以安全并行。

领悟

仔细观察这个循环,你会发现它并不是在解决单一问题,而是对每个职位重复同一套步骤

抓取 → 清理 → 评估 → 保存

于是问题变成——如果我们把这些步骤显式化会怎样?

管道(Pipeline)

我们不再使用一个大循环完成所有工作,而是把工作拆分成独立的阶段:

抓取 → 标准化 → 打分 → 存储

每个阶段只做一件事。数据在各阶段之间流动、被转换,然后继续前进。

下面是我爬虫实际使用的管道实现。

type Pipeline struct {
    scorer         scoring.Scorer
    jobService     JobService
    companyService CompanyService
    logger         *slog.Logger
}

func NewPipeline(
    scorer scoring.Scorer,
    jobService JobService,
    companyService CompanyService,
    logger *slog.Logger,
) *Pipeline {
    return &Pipeline{
        scorer:         scorer,
        jobService:     jobService,
        companyService: companyService,
        logger:         logger,
    }
}

注意 NewPipeline 中发生了什么。我们没有硬编码任何特定的爬虫或存储实现,而是通过参数传入。稍后我们会看到这为何重要。

运行管道

func (p *Pipeline) Run(ctx context.Context, scraper Scraper) error {
    // 1. Scrape
    rawJobs, err := scraper.Scrape(ctx)
    if err != nil {
        return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
    }

    var saved, failed int

    for _, rawJob := range rawJobs {
        // 2. Normalize
        normalizedJob, err := normalize.Normalize(rawJob)
        if err != nil {
            failed++
            continue
        }

        // 3. Score
        normalizedJob.Score = p.scorer.Score(normalizedJob)

        // 4. Save
        if err := p.jobService.Save(ctx, normalizedJob); err != nil {
            failed++
            continue
        }
        saved++
    }

    p.logger.Info("pipeline finished", "saved", saved, "failed", failed)
    return nil
}

逻辑与混乱版本完全相同,但现在…

Source:

each stage lives in its own function. Reading the Run method top‑to‑bottom instantly tells you what the system does: scrape → normalize → score → save. No digging, no guessing. The structure itself tells the story.

可替换性

因为每个阶段都是独立的,并通过接口进行连接,你可以在不触碰其他部分的情况下替换管道的任意部分。

替换爬虫

// testing
scraper := &FakeScraper{}

// production
scraper := &RemotiveScraper{}

管道代码保持完全不变。

替换存储

// development
store := NewInMemoryStore()

// production
store := NewPostgresStore()

同样,管道保持不变。

替换评分器

// simple keyword scorer
scorer := &KeywordScorer{keywords: []string{"Go", "backend", "remote"}}

// later, a smarter scorer
scorer := &MLScorer{}

管道仍然可以正常工作,无需修改。

为什么这很重要

  • 明确的关注点分离 让代码库更易阅读和理解。
  • 可测试性 大幅提升;你可以对每个阶段进行单元测试,互不干扰。
  • 可扩展性 只需实现一个新接口并接入即可。
  • 并发性 变得直观——可以并行化任何无状态的阶段(例如归一化或评分),而不会导致其他阶段的竞争条件。

TL;DR

管道模式将纠结的单体循环转变为清晰、可组合的阶段序列。通过明确的边界——爬取、归一化、评分、存储——你获得了可读性、可测试性、可替换性以及并行运行系统部分的能力。

在你的下一个 Go 后端项目中试试看吧;你会惊讶于以前是怎么没有它就活下来的。

并发 – 工作池

现在系统已经干净且灵活了。但性能呢?

目前,一次只运行一个任务:抓取一个,规范化它,打分,保存,然后再处理下一个。对于小数据集还行,但如果有上千个任务就会很慢。

提升层级

思路很简单:不要一次只处理一个任务,而是启动一组工作者(goroutine),让它们并发处理任务。如果你之前没用过 goroutine,它们基本上是 Go 中的轻量级线程。可以启动很多而几乎没有额外开销。这里的关键字是 many(很多),而不是 unlimited(无限)。稍后会详细说明。

它的大致结构如下:

func (p *Pipeline) Run(ctx context.Context, scraper Scraper, numWorkers int) error {
    rawJobs, err := scraper.Scrape(ctx)
    if err != nil {
        return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
    }

    jobs := make(chan RawJob)
    var wg sync.WaitGroup

    // spin up workers
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for raw := range jobs {
                // normalize, score, save
            }
        }()
    }

    // feed jobs into the channel
    for _, raw := range rawJobs {
        jobs <- raw
    }
    close(jobs)

    wg.Wait()
    return nil
}

可以把它想象成一个队列和一支团队。channel 就是队列:原始任务从一端进入,工作者从另一端取出。每个工作者独立地把任务跑完流水线的各个阶段。sync.WaitGroup 只负责确保在所有工作者完成之前不会返回。

numWorkers 参数是关键所在。决定启动多少工作者,而不是由 Go 运行时决定。这一点很重要,因为无限制的并发会带来真实的成本——一千个 goroutine 同时向数据库写入会比帮助更糟。通常控制在三到十个工作者是比较合适的选择。

概念上流水线并没有改变;它只是现在以并行方式运行而已。

Source:

管道无处不在

一旦你内化了这种模式,就会在各处看到它的身影。

  • 支付: 验证卡片 → 扣款 → 保存交易。
  • 分析: 收集事件 → 清洗数据 → 存储。
  • API: 接收请求 → 处理 → 发送响应。

不同领域,却拥有相同的形态。数据进入后,经过多个阶段的流转,最终以转化后的形式出现。这就是管道模式,它之所以频繁出现,是因为它很好地映射了现实工作:一步一步、阶段之间交接明确。

如果你学会识别它,就可以不仅在爬虫项目中使用,在任何需要对某物进行一系列处理并产生结果的代码中都能应用。

Source:

那么,为什么要这么做?

你完全可以写一个没有这些结构的系统。很多可用的软件就是一个大循环把所有事情都搞定,对于小型一次性项目来说这没问题。

但一旦你的系统需要扩展,这个大循环就会开始与你作对:

  • 你想添加一个新的数据源,但抓取逻辑已经和归一化混在一起。
  • 你想单独测试评分,但它被埋在了三层深的调用里。
  • 你想把内存存储换成真实的数据库,却找不到干净的接入口。

我在开发爬虫以及其他最近的项目时都遇到过这些问题。管道模式让这些问题变得可控。

四个值得保留的理念

  1. 拆分为阶段。 每个阶段只做一件事。
  2. 保持阶段专注。 如果一个阶段很难起名,说明它可能做得太多。
  3. 使部件可替换。 通过接口而不是具体类型来连接。
  4. 控制并发。 使用工作池,而不是无限制的 goroutine。

以管道的思维来构建系统可以让人更容易理解,这在你正处于构建或更新的过程中尤为重要。

这就是这个模式。希望对你有帮助!

0 浏览
Back to Blog

相关文章

阅读更多 »

Go 零值:var 与 :=

Go 有两种声明变量的方式:var 和 :=。它们存在的原因不同,规则也不同。了解何时使用每一种可以避免低级错误和 c…