停止重新运行所有内容:DuckDB 中的本地增量管道

发布: (2026年1月10日 GMT+8 22:26)
9 min read
原文: Dev.to

Source: Dev.to

请提供您希望翻译的具体文本内容(除代码块和 URL 之外),我将为您翻译成简体中文并保持原有的 Markdown 格式。

Source:

增量模型 + 缓存的 DAG 运行(仅限 DuckDB)

我热爱本地优先的数据工作……直到我发现自己第 12 次这样做时:

“我改了一个模型。最好重新运行整个流水线。”

本文是一个简短的演示,展示如何使用 增量模型缓存的 DAG 运行 来改掉这个习惯——全部在你的笔记本电脑上,使用 DuckDB。示例是现有 incremental_demo 项目的简化、仅限 DuckDB 的版本。

我们将进行三次运行:

  1. seed v1 → 初始构建
  2. 再次运行(未改动)→ 大部分被跳过
  3. seed v2(更新 + 新增行)→ 增量合并/Upsert

就这些。无需云服务。无需繁琐的流程。

用一句话概括整个演示

我们从 CSV 中种子一个小的 raw.events 表,构建一个 staging 模型,然后构建只处理 “足够新” 行(基于 updated_at)的增量事实表,并根据 event_id 应用更新。

小项目包含哪些内容

关键部分有三块:

  1. 两个种子快照

    • v1 – 3 行。
    • v2 – 将 event_id = 2updated_at 更新为更晚的时间并更改 value,并新增 event_id = 4
  2. 源映射 – 项目定义了一个源 raw.events,指向名为 seed_events 的种子表。

  3. 若干模型(SQL + Python)

模型类型描述
events_baseStaging(SQL)将时间戳转换为正确类型,保持列整洁
fct_events_sql_inlineIncremental SQL(inline)增量逻辑直接写在模型文件中
fct_events_sql_yamlIncremental SQL(YAML)增量配置存放在 project.yml
fct_events_py_incrementalIncremental Python(DuckDB)在 pandas 中添加 value_x10 列,并返回增量数据帧

上述所有内容都包含在导出的演示中。

仅 DuckDB 设置

演示的 DuckDB 配置文件很简单:它写入本地 DuckDB 文件。

profiles.yml(DuckDB 配置文件)

dev_duckdb:
  engine: duckdb
  duckdb:
    path: "{{ env('FF_DUCKDB_PATH', '.local/incremental_demo.duckdb') }}"

.env.dev_duckdb(可选的便利设置)

FF_DUCKDB_PATH=.local/incremental_demo.duckdb
FF_DUCKDB_SCHEMA=inc_demo_schema

模型(有趣的部分)

Staging: events_base

{{ config(materialized='table') }}

select
  event_id,
  cast(updated_at as timestamp) as updated_at,
  value
from {{ source('raw', 'events') }};

Incremental SQL(内联配置):fct_events_sql_inline

{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental={ 'updated_at_column': 'updated_at' },
) }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base
{% if is_incremental() %}
where updated_at > (
  select coalesce(max(updated_at), timestamp '1970-01-01 00:00:00')
  from {{ this }}
)
{% endif %};

它的作用

  • materialized='incremental'
  • unique_key='event_id'
  • 水印列:updated_at

在增量运行时,它只会选择比目标中已存在的最大 updated_at 更新的行。这假设 updated_at 在行变化时递增(仅用于演示;真实管道可能需要处理延迟到达)。

Incremental SQL(YAML‑配置风格):fct_events_sql_yaml

{{ config(materialized='incremental') }}

with base as (
  select *
  from {{ ref('events_base.ff') }}
)
select
  event_id,
  updated_at,
  value
from base;

project.yml 中的增量参数

models:
  incremental:
    fct_events_sql_yaml.ff:
      unique_key: "event_id"
      incremental:
        enabled: true
        updated_at_column: "updated_at"

选择更适合你的风格。

Incremental Python(DuckDB):fct_events_py_incremental

from fastflowtransform import engine_model
import pandas as pd

@engine_model(
    only="duckdb",
    name="fct_events_py_incremental",
    deps=["events_base.ff"],
)
def build(events_df: pd.DataFrame) -> pd.DataFrame:
    df = events_df.copy()
    df["value_x10"] = df["value"] * 10
    return df[["event_id", "updated_at", "value", "value_x10"]]

该模型的增量行为(合并/更新)在 project.yml 中配置。

Source:

三次运行演练

我们将严格按照演示的“故事线”进行:首次构建、空操作构建,然后通过种子文件的更改触发增量更新。

步骤 0:选择本地 seeds 文件夹

Makefile 使用本地 seeds 目录,并在 v1 与 v2 之间切换 seed_events.csv

mkdir -p .local/seeds

一个仍能证明“增量”的小数据集

提供了同一 seed 文件的两个版本。v2 更新了一行已有记录并新增了一行——这样你可以看到增量模型同时完成 upsert(更新插入)和 insert(插入)操作。

seeds/seed_events_v1.csv

event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-02 00:00:00,20
3,2024-01-03 00:00:00,30

seeds/seed_events_v2.csv

event_id,updated_at,value
1,2024-01-01 00:00:00,10
2,2024-01-05 00:00:00,999
3,2024-01-03 00:00:00,30
4,2024-01-06 00:00:00,40

当你从 v1 切换到 v2 并再次运行时,最终应得到:

  • event_id = 2 被更新(updated_at 更新,value = 999
  • event_id = 4 被插入(全新记录)

1️⃣ 第一次运行(seed v1 → 初始构建)

将 v1 复制到目标位置:

cp seeds/seed_events_v1.csv .local/seeds/seed_events.csv

种子并运行:

FFT_SEEDS_DIR=.local/seeds fft seed . --env dev_duckdb
FFT_SEEDS_DIR=.local/seeds fft run . --env dev_duckdb

你应该会看到所有模型首次物化。

2️⃣ 第二次运行(种子未变 → 空操作)

再次执行相同命令:

FFT_SEEDS_DIR=.local/seeds fft run . --env dev_duckdb

所有模型应 被跳过(已缓存),因为没有任何更改。

3️⃣ 第三次运行(seed v2 → 增量合并/upsert)

换入更新后的种子文件:

cp seeds/seed_events_v2.csv .local/seeds/seed_events.csv

重新种子并运行:

FFT_SEEDS_DIR=.local/seeds fft seed . --env dev_duckdb
FFT_SEEDS_DIR=.local/seeds fft run . --env dev_duckdb

你会看到:

  • fct_events_sql_inlinefct_events_sql_yaml 只处理 updated_at > 已存在的最大 updated_at 的行。
  • Python 模型 (fct_events_py_incremental) 仅接收增量行,给 value 乘以 10 并将结果合并回去。

检查最终表:

select * from inc_demo_schema.fct_events_sql_inline;

结果:

event_idupdated_atvalue
12024‑01‑01 00:00:0010
22024‑01‑05 00:00:00999
32024‑01‑03 00:00:0030
42024‑01‑06 00:00:0040

…and the Python model will have an extra column value_x10 with the multiplied values.

回顾

  • Incremental models 只处理新行或已更改的行。
  • Cached DAG runs(通过 FastFlowTransform)在没有变化时跳过工作。
  • 所有这些都在本地使用 DuckDB 运行——无需云服务,也无需额外的繁琐步骤。

试一试,调整 watermark 逻辑,或实验迟到数据的处理。祝你增量建模愉快!

使用缓存选项运行 (v_duckdb)

fft run . --env dev_duckdb --cache=rw

预期结果

  • events_base 将变为普通表。
  • 增量模型首次创建目标表(相当于第一次完整构建)。

空操作运行(相同的 seed v1;大部分应被跳过)

fft run . --env dev_duckdb --cache=rw

演示中将其称为 “空操作运行…大部分应被跳过”,这是本地数据开发中最好的体验。

更换 seed(v2 快照)并运行增量模型

# 替换 seed 文件
cp seeds/seed_events_v2.csv .local/seeds/seed_events.csv

# 加载新的 seed
FFT_SEEDS_DIR=.local/seeds fft seed . --env dev_duckdb

# 运行增量模型
fft run . --env dev_duckdb --cache=rw

结果

  • event_id = 2 带有 更新的 updated_atvalue = 999
  • event_id = 4 第一次出现。

你的增量事实表应 更新 event_id = 2 的行,并 插入 event_id = 4,依据 unique_key = event_id

在 DuckDB 中进行检查

查询增量 SQL 表

duckdb .local/incremental_demo.duckdb \
  "SELECT * FROM inc_demo_schema.fct_events_sql_inline ORDER BY event_id;"

在 v2 之后你应该看到:

  • event_id = 2updated_at = 2024-01-05value = 999
  • 新增的 event_id = 4updated_at = 2024-01-06value = 40

查询 Python 表

duckdb .local/incremental_demo.duckdb \
  "SELECT * FROM inc_demo_schema.fct_events_py_incremental ORDER BY event_id;"

你还应看到派生列 value_x10(例如,更新行的值为 9990)。

让 DAG 可视化

fft docs serve --env dev_duckdb --open

DAG from local docs server

可选的简易“质量检查”

fft test . --env dev_duckdb --select tag:incremental

你刚刚买下的东西

使用此设置,你的本地开发循环变为:

  1. 运行一次 – 构建所有内容。
  2. 再次运行 – 跳过大部分工作(无操作)。
  3. 更改输入数据 – 只更新必要的部分。
  4. 安全地更新已有行(通过 unique_key),而不是“追加并祈祷”。

所有这些都在单个本地 DuckDB 文件中完成,使实验既便宜又快速。

Back to Blog

相关文章

阅读更多 »