管道思维:构建 Go 系统的更好方式
Source: Dev.to
(未提供需要翻译的正文内容。如需翻译,请粘贴文章的其余部分。)
Source: …
我最近在做一个个人项目——职位爬虫
在这个过程中,我遇到了一种真正改变我对 Go 后端系统结构思考方式的模式。
它叫做 Pipeline(管道)模式,在很多场景中都会出现——支付、分析、API 等等。
在本文中,我将使用我的职位爬虫项目来演示它,这正是该模式的完美用例。
我们想要避免的混乱
在展示模式之前,先看看如果不使用它,代码会是什么样子。
我的爬虫要完成四件事:
- 从多个来源抓取职位列表
- 对它们进行标准化(即清理)
- 根据关键字给它们打分(最符合我技能的职位得分最高)
- 将它们保存到数据库
一个天真的实现可能是这样(简化版):
for _, raw := range rawJobs {
// normalize
raw.Title = strings.TrimSpace(raw.Title)
raw.Location = strings.ReplaceAll(raw.Location, "NYC", "New York")
// score
score := 0
for _, keyword := range keywords {
if strings.Contains(raw.Title, keyword) {
score++
}
}
// save
s.Repo.Create(raw.Title, raw.Location, score)
}
这在技术上是可行的,但会产生几个问题:
- 没有明确的阶段。 标准化何时结束、评分何时开始?你必须阅读全部代码才能理解任何一点。
- 难以测试。 如何只测试评分逻辑?做不到——它与循环中的其他所有代码粘在一起。
- 难以修改。 想添加新的评分规则?你得在循环里挖来挖去,可能会触碰到标准化代码,甚至破坏保存逻辑。所有东西都耦合在一起。
- 难以复用。 如果在别处需要相同的标准化逻辑,你只能复制粘贴,导致代码重复。
- 并发几乎不可能。 想并发处理职位吗?循环结构纠缠不清,根本不知道哪些部分可以安全并行。
领悟
仔细观察这个循环,你会发现它并不是在解决单一问题,而是对每个职位重复同一套步骤:
抓取 → 清理 → 评估 → 保存
于是问题变成——如果我们把这些步骤显式化会怎样?
管道(Pipeline)
我们不再使用一个大循环完成所有工作,而是把工作拆分成独立的阶段:
抓取 → 标准化 → 打分 → 存储
每个阶段只做一件事。数据在各阶段之间流动、被转换,然后继续前进。
下面是我爬虫实际使用的管道实现。
type Pipeline struct {
scorer scoring.Scorer
jobService JobService
companyService CompanyService
logger *slog.Logger
}
func NewPipeline(
scorer scoring.Scorer,
jobService JobService,
companyService CompanyService,
logger *slog.Logger,
) *Pipeline {
return &Pipeline{
scorer: scorer,
jobService: jobService,
companyService: companyService,
logger: logger,
}
}
注意
NewPipeline中发生了什么。我们没有硬编码任何特定的爬虫或存储实现,而是通过参数传入。稍后我们会看到这为何重要。
运行管道
func (p *Pipeline) Run(ctx context.Context, scraper Scraper) error {
// 1. Scrape
rawJobs, err := scraper.Scrape(ctx)
if err != nil {
return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
}
var saved, failed int
for _, rawJob := range rawJobs {
// 2. Normalize
normalizedJob, err := normalize.Normalize(rawJob)
if err != nil {
failed++
continue
}
// 3. Score
normalizedJob.Score = p.scorer.Score(normalizedJob)
// 4. Save
if err := p.jobService.Save(ctx, normalizedJob); err != nil {
failed++
continue
}
saved++
}
p.logger.Info("pipeline finished", "saved", saved, "failed", failed)
return nil
}
逻辑与混乱版本完全相同,但现在…
Source: …
each stage lives in its own function. Reading the Run method top‑to‑bottom instantly tells you what the system does: scrape → normalize → score → save. No digging, no guessing. The structure itself tells the story.
可替换性
因为每个阶段都是独立的,并通过接口进行连接,你可以在不触碰其他部分的情况下替换管道的任意部分。
替换爬虫
// testing
scraper := &FakeScraper{}
// production
scraper := &RemotiveScraper{}
管道代码保持完全不变。
替换存储
// development
store := NewInMemoryStore()
// production
store := NewPostgresStore()
同样,管道保持不变。
替换评分器
// simple keyword scorer
scorer := &KeywordScorer{keywords: []string{"Go", "backend", "remote"}}
// later, a smarter scorer
scorer := &MLScorer{}
管道仍然可以正常工作,无需修改。
为什么这很重要
- 明确的关注点分离 让代码库更易阅读和理解。
- 可测试性 大幅提升;你可以对每个阶段进行单元测试,互不干扰。
- 可扩展性 只需实现一个新接口并接入即可。
- 并发性 变得直观——可以并行化任何无状态的阶段(例如归一化或评分),而不会导致其他阶段的竞争条件。
TL;DR
管道模式将纠结的单体循环转变为清晰、可组合的阶段序列。通过明确的边界——爬取、归一化、评分、存储——你获得了可读性、可测试性、可替换性以及并行运行系统部分的能力。
在你的下一个 Go 后端项目中试试看吧;你会惊讶于以前是怎么没有它就活下来的。
并发 – 工作池
现在系统已经干净且灵活了。但性能呢?
目前,一次只运行一个任务:抓取一个,规范化它,打分,保存,然后再处理下一个。对于小数据集还行,但如果有上千个任务就会很慢。
提升层级
思路很简单:不要一次只处理一个任务,而是启动一组工作者(goroutine),让它们并发处理任务。如果你之前没用过 goroutine,它们基本上是 Go 中的轻量级线程。可以启动很多而几乎没有额外开销。这里的关键字是 many(很多),而不是 unlimited(无限)。稍后会详细说明。
它的大致结构如下:
func (p *Pipeline) Run(ctx context.Context, scraper Scraper, numWorkers int) error {
rawJobs, err := scraper.Scrape(ctx)
if err != nil {
return fmt.Errorf("scraping %s: %w", scraper.Source(), err)
}
jobs := make(chan RawJob)
var wg sync.WaitGroup
// spin up workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for raw := range jobs {
// normalize, score, save
}
}()
}
// feed jobs into the channel
for _, raw := range rawJobs {
jobs <- raw
}
close(jobs)
wg.Wait()
return nil
}
可以把它想象成一个队列和一支团队。channel 就是队列:原始任务从一端进入,工作者从另一端取出。每个工作者独立地把任务跑完流水线的各个阶段。sync.WaitGroup 只负责确保在所有工作者完成之前不会返回。
numWorkers 参数是关键所在。你决定启动多少工作者,而不是由 Go 运行时决定。这一点很重要,因为无限制的并发会带来真实的成本——一千个 goroutine 同时向数据库写入会比帮助更糟。通常控制在三到十个工作者是比较合适的选择。
概念上流水线并没有改变;它只是现在以并行方式运行而已。
Source: …
管道无处不在
一旦你内化了这种模式,就会在各处看到它的身影。
- 支付: 验证卡片 → 扣款 → 保存交易。
- 分析: 收集事件 → 清洗数据 → 存储。
- API: 接收请求 → 处理 → 发送响应。
不同领域,却拥有相同的形态。数据进入后,经过多个阶段的流转,最终以转化后的形式出现。这就是管道模式,它之所以频繁出现,是因为它很好地映射了现实工作:一步一步、阶段之间交接明确。
如果你学会识别它,就可以不仅在爬虫项目中使用,在任何需要对某物进行一系列处理并产生结果的代码中都能应用。
Source:
那么,为什么要这么做?
你完全可以写一个没有这些结构的系统。很多可用的软件就是一个大循环把所有事情都搞定,对于小型一次性项目来说这没问题。
但一旦你的系统需要扩展,这个大循环就会开始与你作对:
- 你想添加一个新的数据源,但抓取逻辑已经和归一化混在一起。
- 你想单独测试评分,但它被埋在了三层深的调用里。
- 你想把内存存储换成真实的数据库,却找不到干净的接入口。
我在开发爬虫以及其他最近的项目时都遇到过这些问题。管道模式让这些问题变得可控。
四个值得保留的理念
- 拆分为阶段。 每个阶段只做一件事。
- 保持阶段专注。 如果一个阶段很难起名,说明它可能做得太多。
- 使部件可替换。 通过接口而不是具体类型来连接。
- 控制并发。 使用工作池,而不是无限制的 goroutine。
以管道的思维来构建系统可以让人更容易理解,这在你正处于构建或更新的过程中尤为重要。
这就是这个模式。希望对你有帮助!