缺失的环节:使用 AWS DMS 从传统数据库触发无服务器事件

发布: (2026年1月8日 GMT+8 03:09)
7 min read
原文: Dev.to

Source: Dev.to

by khaldoun488

我们生活在一个希望一切都是 事件驱动 的世界里。SQL 数据库中新注册的用户应当立即:

  • 通过 SES 触发欢迎邮件,
  • 通过 API 更新 CRM,且
  • 启动 Step Functions 工作流。

如果你在 DynamoDB 上从零构建(greenfield),这很容易实现(DynamoDB Streams)。但如果你的数据存放在传统的 MySQL 单体、内部部署的 Oracle 数据库,或是普通的 PostgreSQL 实例中呢?

你需要 变更数据捕获(Change Data Capture,CDC)——需要把这些变更流式传输到云端。

于是你自然会想到 AWS DMS(数据库迁移服务)。它非常适合迁移数据,但你很快会碰到瓶颈:

问题

AWS DMS 不能直接以 AWS Lambda 函数为目标。
您无法简单地配置一个任务,声明“当表 X 中插入一行时,调用函数 Y”。

那么,如何在“旧世界”(SQL)和“新世界”(无服务器)之间架起桥梁?虽然很多人建议使用 Kinesis,但最稳健且具成本效益的答案是 Amazon S3

下面是我用于在不重写的情况下现代化传统后端的架构模式。

架构:“S3 Drop”模式

  1. Source – DMS 连接到您的遗留数据库,并通过事务日志捕获更改(INSERT/UPDATE/DELETE)。
  2. Target – DMS 将这些更改写入 S3 bucket 中的 JSON 文件。
  3. Trigger – S3 检测到新文件并触发 event notification
  4. Compute – 您的 Lambda function 接收事件,读取文件并处理业务逻辑。

Architecture diagram

为什么选择 S3 而不是 Kinesis 或 Airbyte?

为什么不使用 Kinesis Data Streams?

  • 成本 – 与预置的 Kinesis 流相比,S3 的费用要低得多,尤其是在旧数据库写入较少时。
  • 可观测性 – 你可以直接在桶中看到文件形式的变更,调试难度降低约 10 倍。
  • 批处理 – DMS 以批次方式写入 S3,在写入高峰期间自然限制 Lambda 的调用频率。

为什么不使用 Airbyte 或 Fivetran?

  • 这些工具擅长 ELT 流水线(例如每 15–60 分钟将数据加载到 Snowflake)。
  • 我们的目标是 事件驱动 处理——尽可能接近“实时”触发 Lambda。
  • AWS DMS 提供持续的 CDC,输出细粒度的事件流,而基于批处理的 ELT 工具往往会遗漏这些事件。
  • 完全使用 AWS 原生服务可以在严格的企业环境中简化 IAM 管理。

实施指南

DMS 端点设置

在 DMS 中创建 目标端点(S3)时,不要依赖默认设置。使用以下 额外连接属性,使输出更适合 Lambda 使用:

dataFormat=json;
datePartitionEnabled=true;
  • dataFormat=json – DMS 默认使用 CSV;JSON 更容易被 Lambda 解析。
  • datePartitionEnabled=true – 按日期组织文件(/2023/11/02/...),避免单个文件夹中出现数百万个对象。

理解事件结构

典型的 DMS 生成文件如下(行分隔的 JSON,也称为 NDJSON):

{
    "data": { "id": 101, "username": "jdoe", "status": "active" },
    "metadata": { "operation": "insert", "timestamp": "2023-11-02T10:00:00Z" }
}
{
    "data": { "id": 102, "username": "asmith", "status": "pending" },
    "metadata": { "operation": "update", "timestamp": "2023-11-02T10:05:00Z" }
}

每一行都包含 操作insertupdatedelete)和 负载data),结构清晰。

Lambda 逻辑

由于 DMS 写入的是 NDJSON,不能一次性对整个文件调用 json.loads()。必须逐行遍历。

下面是一段能够正确处理该文件的 Python 示例代码:

import boto3
import json

s3 = boto3.client('s3')

def handler(event, context):
    # 1️⃣ 从 S3 事件中提取 bucket 与 key
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key    = record['s3']['object']['key']

        print(f"Processing file: s3://{bucket}/{key}")

        # 2️⃣ 获取 DMS 生成的文件
        obj = s3.get_object(Bucket=bucket, Key=key)
        content = obj['Body'].read().decode('utf-8')

        # 3️⃣ 解析 NDJSON(行分隔的 JSON)
        for line in content.splitlines():
            if not line.strip():
                continue  # 跳过空行

            row = json.loads(line)

            # 4️⃣ 根据 operation 类型进行过滤/处理
            operation = row.get('metadata', {}).get('operation')
            if operation == 'insert':
                user_data = row.get('data')
                # TODO: 添加插入操作的业务逻辑
                print(f"INSERT: {user_data}")

            elif operation == 'update':
                user_data = row.get('data')
                # TODO: 添加更新操作的业务逻辑
                print(f"UPDATE: {user_data}")

            elif operation == 'delete':
                # 如有需要,处理删除操作
                print("DELETE operation received")

关键点

  • 不要 对整个文件调用 json.loads(content)
  • 使用 content.splitlines() 逐行遍历,并单独解析每一行。
  • 利用 metadata.operation 字段来分发你的业务逻辑。

TL;DR

  • 使用 AWS DMS 从任何传统关系型数据库 捕获 CDC
  • 将更改写入 JSON 文件到 S3(按日期分区)。
  • 通过 S3 事件通知 触发 Lambda
  • 逐行解析 NDJSON 负载并实现基于事件的业务逻辑。

这种 “S3 Drop” 模式为您提供了一个低成本、可观测且完全 AWS 原生的桥梁,将传统数据库与现代无服务器工作流连接起来。 🚀

print(f"New User Detected: {user_data['username']}")
# trigger_welcome_email(user_data)

elif operation == 'update':
    print(f"User Updated: {row['data']['id']}")

摘要

您不需要重构整个遗留数据库即可获得无服务器的好处。通过使用 AWS DMS 解锁数据,并将 S3 作为可靠的缓冲区,您可以从已有 20 年历史的数据库触发现代 Lambda 工作流,几乎没有摩擦。此模式将稳定性和可观测性置于原始速度之上——在企业迁移中,这种权衡通常是值得的。

Back to Blog

相关文章

阅读更多 »