我们的 Pipeline 处理同一天数据的次数 47 次
Source: Dev.to
我在周一早上注意到我们的 Airflow 日志中有些异常。我们的每日数据管道在周末多次运行,而不是每天只运行一次。
不仅仅是几次额外的运行——47 次执行,全部处理同一天的数据:12 月 3 日。
每次运行都显示成功。没有错误。没有警报。只是同一天的数据被一次又一次地处理。
以下是发生的情况以及我对重试逻辑的领悟——我真希望早些知道这些。
我是怎么发现的
星期一上午,我在例行检查中审查我们周末的管道运行情况。Airflow 仪表板显示出异常模式——我们的主转换 DAG 的执行次数远超预期。
进一步查看后,我发现该 DAG 在周六上午到周一之间运行了 47 次,尽管我们把它安排在每天 凌晨 2 点 运行一次。
引起我注意的是:每一次运行都在处理 12 月 3 日 的数据。不是 12 月 4 日、5 日或 6 日——仅仅是 12 月 3 日,一次又一次。
所有运行都显示为成功(绿色状态),没有失败的任务。日志显示正常处理——读取数据、转换、写入数据仓库、标记完成。
调查
I checked the obvious things first:
-
有人手动触发重跑吗?
没有。审计日志显示所有运行都是自动的,由调度器触发。 -
源数据有变动吗?
没有。S3 时间戳显示 12 月 3 日的数据自创建后未被修改。 -
调度器配置有问题吗?
调度看起来是正确的:每天凌晨 2 AM。
Then I noticed something in the run history. The pattern started on Saturday. Our pipeline ran at 2 AM (normal), then again at 4 AM, 6 AM, 8 AM… every two hours through the weekend.
That’s when I realized: these weren’t scheduled runs. These were retries.
背景
上周五,我们部署了一个新的分析功能——按客户细分计算平均交易价值。市场部希望将高端客户的行为与普通客户分开跟踪。
代码已经经过彻底测试。我们使用过去一周的样本数据运行了它。所有测试都通过了。我们在周五下午部署了。
我们没有测试的内容: 周末的数据模式。
根本原因
我们的管道使用 Airflow 的 execution_date 来确定要处理的数据分区:
execution_date = context['execution_date']
data_date = execution_date.strftime('%Y-%m-%d')
s3_path = f"s3://bucket/data/date={data_date}/"
管道步骤
- 从 S3 读取数据
- 转换并校验记录
- 计算每日指标
- 写入数据仓库
第 3 步是在周末出现问题的地方。
我们新计算的指标是“每个客户细分的平均交易价值”:
# Calculate average for our premium customer segment
target_customers = df[df['customer_segment'] == 'premium']
total_value = target_customers['amount'].sum()
customer_count = target_customers['customer_id'].nunique()
avg_value = total_value / customer_count
在我们测试的工作日,这段代码运行正常:
| 日期 | 高级客户数 | 结果 |
|---|---|---|
| 12 月 3 日(周三) | 8,500 | 成功 |
| 12 月 4 日(周四) | 7,200 | 成功 |
| 12 月 5 日(周五) | 6,800 | 成功 |
| 12 月 6 日(周六) | 0 | 失败 |
我们的高级细分全部是 B2B 客户——企业账户、企业客户。由于企业在周末关闭,它们不会在周末进行交易。
在周六我们只有 0 位高级客户,这导致了除以零的错误:
customer_count = target_customers['customer_id'].nunique() # Returns 0
avg_value = total_value / 0 # Division by zero error
任务失败,Airflow 随即调度了重试。我们编写的重试逻辑如下:
if task_instance.try_number > 1:
# If this is a retry, process the last successful date
# to avoid reprocessing potentially corrupted data
last_successful = get_last_successful_date()
data_date = last_successful
else:
data_date = execution_date.strftime('%Y-%m-%d')
本意是好的:如果任务在处理过程中失败,就不要重新处理可能已损坏的数据,而是回到上一次已知的成功日期。
实际发生的情况:
- 12 月 6 日 处理失败(除以零)。
- 触发重试;
try_number= 2,代码获取last_successful = 12 月 3 日。 - 重试处理了 12 月 3 日 的数据(当时有高级交易)。
- 计算成功,Airflow 将 12 月 6 日的运行标记为完成。
同样的情况也发生在 12 月 7 日(周日),并在整个周末持续,直到我在周一上午手动停止。
影响
当时的直接问题是 数据重复。我们把 12 月 3 日的交易加载到了数据仓库 47 次。
- 我们的去重逻辑捕获了大部分重复——我们使用交易 ID 作为主键,数据库会直接覆盖相同的记录。
- 但是,并不是所有下游报表都做了去重。一些聚合表把每一次加载都算作新数据。于是,在周一早上的几个小时里,我们的仪表盘显示 12 月 3 日的交易量是 正常的 47 倍。
更大的问题是:这个 bug 暴露了我们重试策略的缺陷——当周末(或任何段落缺少数据的时期)导致任务失败时,历史数据可能会在不知情的情况下被破坏。
经验教训
- 在真实的周末数据上进行测试——尤其是当新逻辑依赖于某些天可能不存在的数据时。
- 重试逻辑应具备幂等性,并避免在不确定不会导致重复的情况下重新处理旧日期。
- 防止除零错误(或任何在空数据集上会失败的操作),需要显式检查。
- 为异常的重试模式添加告警(例如,在短时间窗口内出现大量重试),以便及早发现问题。
通过修复除零检查并简化重试逻辑,使其只重新处理相同的 execution_date(或在没有数据时直接跳过运行),我们阻止了进一步的重复加载,并恢复了对管道的信心。
问题
我们没有 December 6th 或 December 7th 的数据。管道误以为已经成功处理了这些日期(因为它实际上处理了 December 3rd),于是继续处理 December 8th。
我们在没有意识到的情况下跳过了两天的周末数据,直到业务用户询问为何我们的周末销售报告为空。
修复
我修复了两件事:
1️⃣ 立即修复的 bug – 在计算中处理计数为零的情况
target_customers = df[df['customer_segment'] == 'premium']
customer_count = target_customers['customer_id'].nunique()
if customer_count > 0:
avg_value = target_customers['amount'].sum() / customer_count
else:
# 该细分市场没有客户 – 设置为 NULL 而不是报错
avg_value = None
2️⃣ 重试逻辑 – 完全移除
# 始终处理执行日期,无论重试次数多少
data_date = execution_date.strftime('%Y-%m-%d')
关键洞察: 重试应该重新处理相同的数据,而不是回退到旧数据。如果是真正的数据问题,重试也无济于事;如果是瞬时性问题,重试同一操作即可解决。
周末特定更新
# 周末数据说明:Premium 细分(B2B)在周末没有活动
# 这属于预期行为 – 对周末指标记录为 NULL
我学到的
- 使用真实的数据模式进行测试。 我们只测试了工作日数据,因为很方便。我们还应该测试周末、假日和月末数据——所有的边缘情况。
- 重试逻辑需要仔细考虑。 假设“上一次成功的日期”是安全的回退是错误的。重试必须重新处理相同的数据,而不是不同的数据。
- 除以零在分析中很常见。 每当你计算平均值或比率时,要显式处理计数为零的情况,而不是让它直接报错。
- 监控成功运行,而不仅仅是失败。 我们所有的警报都只关注失败。这些运行成功了,所以没有警报。问题只能通过手动查看日志发现。
- 执行日期与数据日期不同。 Airflow 的执行日期是作业运行的时间;你处理的数据日期可能不同,尤其在重试时。要在代码中将它们区分开。
事后
修复后,管道正常处理了周末数据:
- 星期六: 处理了 12月13日 – Premium metrics =
NULL(预期)。成功。 - 星期日: 处理了 12月14日 – Premium metrics =
NULL(预期)。成功。 - 无重试。无重复处理。
我手动回填了缺失的 12月6日 和 7日 数据,并在我们的测试套件中添加了针对周末情形的测试用例。
- 调试总时长: ~3 小时
- 修复缺失周末数据所用时间: ~2 小时
经验教训: 始终测试边缘情况,尤其是像周末这样可预测的情况。
讨论
你是否在星期五部署了代码,结果在周末出现故障?或者使用了重试逻辑,却让情况变得更糟而不是更好?我很想听听大家如何处理具有可变数据模式的指标的数据质量验证。
感谢阅读!关注以获取更多实用的数据工程故事和生产系统的经验教训。
