缺失的环节:使用 AWS DMS 从传统数据库触发无服务器事件
Source: Dev.to
我们生活在一个希望一切都是 事件驱动 的世界里。SQL 数据库中新注册的用户应当立即:
- 通过 SES 触发欢迎邮件,
- 通过 API 更新 CRM,且
- 启动 Step Functions 工作流。
如果你在 DynamoDB 上从零构建(greenfield),这很容易实现(DynamoDB Streams)。但如果你的数据存放在传统的 MySQL 单体、内部部署的 Oracle 数据库,或是普通的 PostgreSQL 实例中呢?
你需要 变更数据捕获(Change Data Capture,CDC)——需要把这些变更流式传输到云端。
于是你自然会想到 AWS DMS(数据库迁移服务)。它非常适合迁移数据,但你很快会碰到瓶颈:
问题
AWS DMS 不能直接以 AWS Lambda 函数为目标。
您无法简单地配置一个任务,声明“当表 X 中插入一行时,调用函数 Y”。
那么,如何在“旧世界”(SQL)和“新世界”(无服务器)之间架起桥梁?虽然很多人建议使用 Kinesis,但最稳健且具成本效益的答案是 Amazon S3。
下面是我用于在不重写的情况下现代化传统后端的架构模式。
架构:“S3 Drop”模式
- Source – DMS 连接到您的遗留数据库,并通过事务日志捕获更改(INSERT/UPDATE/DELETE)。
- Target – DMS 将这些更改写入 S3 bucket 中的 JSON 文件。
- Trigger – S3 检测到新文件并触发 event notification。
- Compute – 您的 Lambda function 接收事件,读取文件并处理业务逻辑。

为什么选择 S3 而不是 Kinesis 或 Airbyte?
为什么不使用 Kinesis Data Streams?
- 成本 – 与预置的 Kinesis 流相比,S3 的费用要低得多,尤其是在旧数据库写入较少时。
- 可观测性 – 你可以直接在桶中看到文件形式的变更,调试难度降低约 10 倍。
- 批处理 – DMS 以批次方式写入 S3,在写入高峰期间自然限制 Lambda 的调用频率。
为什么不使用 Airbyte 或 Fivetran?
- 这些工具擅长 ELT 流水线(例如每 15–60 分钟将数据加载到 Snowflake)。
- 我们的目标是 事件驱动 处理——尽可能接近“实时”触发 Lambda。
- AWS DMS 提供持续的 CDC,输出细粒度的事件流,而基于批处理的 ELT 工具往往会遗漏这些事件。
- 完全使用 AWS 原生服务可以在严格的企业环境中简化 IAM 管理。
实施指南
DMS 端点设置
在 DMS 中创建 目标端点(S3)时,不要依赖默认设置。使用以下 额外连接属性,使输出更适合 Lambda 使用:
dataFormat=json;
datePartitionEnabled=true;
dataFormat=json– DMS 默认使用 CSV;JSON 更容易被 Lambda 解析。datePartitionEnabled=true– 按日期组织文件(/2023/11/02/...),避免单个文件夹中出现数百万个对象。
理解事件结构
典型的 DMS 生成文件如下(行分隔的 JSON,也称为 NDJSON):
{
"data": { "id": 101, "username": "jdoe", "status": "active" },
"metadata": { "operation": "insert", "timestamp": "2023-11-02T10:00:00Z" }
}
{
"data": { "id": 102, "username": "asmith", "status": "pending" },
"metadata": { "operation": "update", "timestamp": "2023-11-02T10:05:00Z" }
}
每一行都包含 操作(insert、update、delete)和 负载(data),结构清晰。
Lambda 逻辑
由于 DMS 写入的是 NDJSON,不能一次性对整个文件调用 json.loads()。必须逐行遍历。
下面是一段能够正确处理该文件的 Python 示例代码:
import boto3
import json
s3 = boto3.client('s3')
def handler(event, context):
# 1️⃣ 从 S3 事件中提取 bucket 与 key
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print(f"Processing file: s3://{bucket}/{key}")
# 2️⃣ 获取 DMS 生成的文件
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj['Body'].read().decode('utf-8')
# 3️⃣ 解析 NDJSON(行分隔的 JSON)
for line in content.splitlines():
if not line.strip():
continue # 跳过空行
row = json.loads(line)
# 4️⃣ 根据 operation 类型进行过滤/处理
operation = row.get('metadata', {}).get('operation')
if operation == 'insert':
user_data = row.get('data')
# TODO: 添加插入操作的业务逻辑
print(f"INSERT: {user_data}")
elif operation == 'update':
user_data = row.get('data')
# TODO: 添加更新操作的业务逻辑
print(f"UPDATE: {user_data}")
elif operation == 'delete':
# 如有需要,处理删除操作
print("DELETE operation received")
关键点
- 不要 对整个文件调用
json.loads(content)。 - 使用
content.splitlines()逐行遍历,并单独解析每一行。 - 利用
metadata.operation字段来分发你的业务逻辑。
TL;DR
- 使用 AWS DMS 从任何传统关系型数据库 捕获 CDC。
- 将更改写入 JSON 文件到 S3(按日期分区)。
- 通过 S3 事件通知 触发 Lambda。
- 逐行解析 NDJSON 负载并实现基于事件的业务逻辑。
这种 “S3 Drop” 模式为您提供了一个低成本、可观测且完全 AWS 原生的桥梁,将传统数据库与现代无服务器工作流连接起来。 🚀
print(f"New User Detected: {user_data['username']}")
# trigger_welcome_email(user_data)
elif operation == 'update':
print(f"User Updated: {row['data']['id']}")
摘要
您不需要重构整个遗留数据库即可获得无服务器的好处。通过使用 AWS DMS 解锁数据,并将 S3 作为可靠的缓冲区,您可以从已有 20 年历史的数据库触发现代 Lambda 工作流,几乎没有摩擦。此模式将稳定性和可观测性置于原始速度之上——在企业迁移中,这种权衡通常是值得的。