我们的 Pipeline 处理同一天数据的次数 47 次

发布: (2025年12月17日 GMT+8 23:29)
12 min read
原文: Dev.to

Source: Dev.to

我在周一早上注意到我们的 Airflow 日志中有些异常。我们的每日数据管道在周末多次运行,而不是每天只运行一次。

不仅仅是几次额外的运行——47 次执行,全部处理同一天的数据:12 月 3 日

每次运行都显示成功。没有错误。没有警报。只是同一天的数据被一次又一次地处理。

以下是发生的情况以及我对重试逻辑的领悟——我真希望早些知道这些。

我是怎么发现的

星期一上午,我在例行检查中审查我们周末的管道运行情况。Airflow 仪表板显示出异常模式——我们的主转换 DAG 的执行次数远超预期。

进一步查看后,我发现该 DAG 在周六上午到周一之间运行了 47 次,尽管我们把它安排在每天 凌晨 2 点 运行一次。

引起我注意的是:每一次运行都在处理 12 月 3 日 的数据。不是 12 月 4 日、5 日或 6 日——仅仅是 12 月 3 日,一次又一次。

所有运行都显示为成功(绿色状态),没有失败的任务。日志显示正常处理——读取数据、转换、写入数据仓库、标记完成。

调查

I checked the obvious things first:

  • 有人手动触发重跑吗?
    没有。审计日志显示所有运行都是自动的,由调度器触发。

  • 源数据有变动吗?
    没有。S3 时间戳显示 12 月 3 日的数据自创建后未被修改。

  • 调度器配置有问题吗?
    调度看起来是正确的:每天凌晨 2 AM。

Then I noticed something in the run history. The pattern started on Saturday. Our pipeline ran at 2 AM (normal), then again at 4 AM, 6 AM, 8 AM… every two hours through the weekend.

That’s when I realized: these weren’t scheduled runs. These were retries.

背景

上周五,我们部署了一个新的分析功能——按客户细分计算平均交易价值。市场部希望将高端客户的行为与普通客户分开跟踪。

代码已经经过彻底测试。我们使用过去一周的样本数据运行了它。所有测试都通过了。我们在周五下午部署了。

我们没有测试的内容: 周末的数据模式。

根本原因

我们的管道使用 Airflow 的 execution_date 来确定要处理的数据分区:

execution_date = context['execution_date']
data_date = execution_date.strftime('%Y-%m-%d')
s3_path = f"s3://bucket/data/date={data_date}/"

管道步骤

  1. 从 S3 读取数据
  2. 转换并校验记录
  3. 计算每日指标
  4. 写入数据仓库

第 3 步是在周末出现问题的地方。

我们新计算的指标是“每个客户细分的平均交易价值”:

# Calculate average for our premium customer segment
target_customers = df[df['customer_segment'] == 'premium']
total_value = target_customers['amount'].sum()
customer_count = target_customers['customer_id'].nunique()
avg_value = total_value / customer_count

在我们测试的工作日,这段代码运行正常:

日期高级客户数结果
12 月 3 日(周三)8,500成功
12 月 4 日(周四)7,200成功
12 月 5 日(周五)6,800成功
12 月 6 日(周六)0失败

我们的高级细分全部是 B2B 客户——企业账户、企业客户。由于企业在周末关闭,它们不会在周末进行交易。

在周六我们只有 0 位高级客户,这导致了除以零的错误:

customer_count = target_customers['customer_id'].nunique()  # Returns 0
avg_value = total_value / 0  # Division by zero error

任务失败,Airflow 随即调度了重试。我们编写的重试逻辑如下:

if task_instance.try_number > 1:
    # If this is a retry, process the last successful date
    # to avoid reprocessing potentially corrupted data
    last_successful = get_last_successful_date()
    data_date = last_successful
else:
    data_date = execution_date.strftime('%Y-%m-%d')

本意是好的:如果任务在处理过程中失败,就不要重新处理可能已损坏的数据,而是回到上一次已知的成功日期。

实际发生的情况:

  1. 12 月 6 日 处理失败(除以零)。
  2. 触发重试;try_number = 2,代码获取 last_successful = 12 月 3 日
  3. 重试处理了 12 月 3 日 的数据(当时有高级交易)。
  4. 计算成功,Airflow 将 12 月 6 日的运行标记为完成。

同样的情况也发生在 12 月 7 日(周日),并在整个周末持续,直到我在周一上午手动停止。

影响

当时的直接问题是 数据重复。我们把 12 月 3 日的交易加载到了数据仓库 47 次

  • 我们的去重逻辑捕获了大部分重复——我们使用交易 ID 作为主键,数据库会直接覆盖相同的记录。
  • 但是,并不是所有下游报表都做了去重。一些聚合表把每一次加载都算作新数据。于是,在周一早上的几个小时里,我们的仪表盘显示 12 月 3 日的交易量是 正常的 47 倍

更大的问题是:这个 bug 暴露了我们重试策略的缺陷——当周末(或任何段落缺少数据的时期)导致任务失败时,历史数据可能会在不知情的情况下被破坏。

经验教训

  • 在真实的周末数据上进行测试——尤其是当新逻辑依赖于某些天可能不存在的数据时。
  • 重试逻辑应具备幂等性,并避免在不确定不会导致重复的情况下重新处理旧日期。
  • 防止除零错误(或任何在空数据集上会失败的操作),需要显式检查。
  • 为异常的重试模式添加告警(例如,在短时间窗口内出现大量重试),以便及早发现问题。

通过修复除零检查并简化重试逻辑,使其只重新处理相同的 execution_date(或在没有数据时直接跳过运行),我们阻止了进一步的重复加载,并恢复了对管道的信心。

问题

我们没有 December 6thDecember 7th 的数据。管道误以为已经成功处理了这些日期(因为它实际上处理了 December 3rd),于是继续处理 December 8th

我们在没有意识到的情况下跳过了两天的周末数据,直到业务用户询问为何我们的周末销售报告为空。

修复

我修复了两件事:

1️⃣ 立即修复的 bug – 在计算中处理计数为零的情况

target_customers = df[df['customer_segment'] == 'premium']
customer_count = target_customers['customer_id'].nunique()

if customer_count > 0:
    avg_value = target_customers['amount'].sum() / customer_count
else:
    # 该细分市场没有客户 – 设置为 NULL 而不是报错
    avg_value = None

2️⃣ 重试逻辑 – 完全移除

# 始终处理执行日期,无论重试次数多少
data_date = execution_date.strftime('%Y-%m-%d')

关键洞察: 重试应该重新处理相同的数据,而不是回退到旧数据。如果是真正的数据问题,重试也无济于事;如果是瞬时性问题,重试同一操作即可解决。

周末特定更新

# 周末数据说明:Premium 细分(B2B)在周末没有活动
# 这属于预期行为 – 对周末指标记录为 NULL

我学到的

  • 使用真实的数据模式进行测试。 我们只测试了工作日数据,因为很方便。我们还应该测试周末、假日和月末数据——所有的边缘情况。
  • 重试逻辑需要仔细考虑。 假设“上一次成功的日期”是安全的回退是错误的。重试必须重新处理相同的数据,而不是不同的数据。
  • 除以零在分析中很常见。 每当你计算平均值或比率时,要显式处理计数为零的情况,而不是让它直接报错。
  • 监控成功运行,而不仅仅是失败。 我们所有的警报都只关注失败。这些运行成功了,所以没有警报。问题只能通过手动查看日志发现。
  • 执行日期与数据日期不同。 Airflow 的执行日期是作业运行的时间;你处理的数据日期可能不同,尤其在重试时。要在代码中将它们区分开。

事后

修复后,管道正常处理了周末数据:

  • 星期六: 处理了 12月13日 – Premium metrics = NULL(预期)。成功。
  • 星期日: 处理了 12月14日 – Premium metrics = NULL(预期)。成功。
  • 无重试。无重复处理。

我手动回填了缺失的 12月6日7日 数据,并在我们的测试套件中添加了针对周末情形的测试用例。

  • 调试总时长: ~3 小时
  • 修复缺失周末数据所用时间: ~2 小时

经验教训: 始终测试边缘情况,尤其是像周末这样可预测的情况。

讨论

你是否在星期五部署了代码,结果在周末出现故障?或者使用了重试逻辑,却让情况变得更糟而不是更好?我很想听听大家如何处理具有可变数据模式的指标的数据质量验证。

LinkedIn上与我联系,或查看我的作品集

感谢阅读!关注以获取更多实用的数据工程故事和生产系统的经验教训。

Back to Blog

相关文章

阅读更多 »

仓库利用的权威指南

引言 仓库本质上只是一个 3‑D 盒子。利用率只是衡量你实际使用了该盒子多少的指标。虽然物流 c...