MySQL 到 VeloDB 实时数据同步概览

发布: (2025年12月3日 GMT+8 04:40)
3 min read
原文: Dev.to

Source: Dev.to

Flink 可以在将数据从 MySQL 兼容数据库(例如 Amazon Aurora)迁移到 VeloDB 时,充当实时数据同步引擎。
它的高吞吐、低延迟流处理能力能够实现:

  • 全量数据初始加载 – 将 MySQL/Aurora 中的现有表导入 VeloDB。
  • 增量变更捕获 – 使用 MySQL binlog(CDC)捕获 INSERT/UPDATE/DELETE 事件,并持续写入 VeloDB。

整体架构如下图所示:

Architecture diagram

示例工作流

1. 创建 AWS RDS Aurora MySQL 实例

Create Aurora instance

2. 创建 MySQL 数据库及相应表

CREATE DATABASE test_db;

CREATE TABLE test_db.student (
    id          INT PRIMARY KEY,
    name        VARCHAR(100) NOT NULL,
    age         INT,
    email       VARCHAR(255),
    phone       VARCHAR(20),
    score       DECIMAL(5,2),
    created_at  TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO test_db.student (id, name, age, email, phone, score, created_at)
VALUES
    (1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
    (2, 'Bob Li',      21, 'bob@example.com',   '13900139000', 76.80, NOW()),
    (3, 'Charlie Wang',23, 'charlie@example.com','13600136000',92.00, NOW()),
    (4, 'David Chen',  20, 'david@example.com', '13500135000', 85.60, NOW()),
    (5, 'Emma Liu',    22, 'emma@example.com',  '13700137000', 78.90, NOW());

3. 创建 VeloDB 仓库

Create VeloDB warehouse

4. 修改 MySQL(Aurora)配置

  1. 创建参数组 并启用 binlog。

    Parameter group

  2. binlog_format 设置为 ROW

    binlog_format=ROW

  3. 将参数组关联到 DB 集群并重启实例

    Apply parameter group

5.1 下载预构建包

可以下载并解压包含 Doris 连接器的 Flink 1.17 发行版,直接使用。

下载以下构件:

  • Flink 1.17(二进制发行版)
  • Flink MySQL CDC 连接器
  • Flink Doris 连接器
  • MySQL JDBC 驱动

解压 Flink 发行版:

tar -zxvf flink-1.17.2-bin-scala_2.12.tgz

将连接器 JAR 包和 MySQL 驱动复制到 flink-1.17.2-bin/lib

Copy JARs to lib directory

作业运行时,Doris 连接器会根据源 MySQL 的 schema 自动在 VeloDB 中创建目标表。Flink 可部署在 Local、Standalone、YARN 或 Kubernetes 模式下。

6.1 本地环境示例

cd flink-1.17.2-bin
bin/flink run -t local \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-25.1.0.jar \
    mysql-sync-database \
    --database test_db \
    --mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
    --mysql-conf port=3306 \
    --mysql-conf username=admin \
    --mysql-conf password=YOUR_PASSWORD \
    --mysql-conf server-id=5400 \
    --mysql-conf server-time-zone=UTC \
    --doris-conf fenodes=YOUR_VELODB_ENDPOINT \
    --doris-conf username=YOUR_VELODB_USER \
    --doris-conf password=YOUR_VELODB_PASSWORD \
    --doris-conf table.identifier=test_db.student \
    --doris-conf sink.enable-delete=true

将占位符(YOUR_PASSWORDYOUR_VELODB_ENDPOINT 等)替换为实际的凭证和端点信息。

该作业将:

  1. test_db.student 执行一次快照并加载到 VeloDB。
  2. 持续读取 Aurora MySQL 的 binlog 事件,并实时将相应的 INSERT、UPDATE、DELETE 应用到 VeloDB 表中。
Back to Blog

相关文章

阅读更多 »

切换账户

@blink_c5eb0afe3975https://dev.to/blink_c5eb0afe3975 正如大家所知,我正重新开始记录我的进展,我认为最好在一个不同的…

Strands 代理 + Agent Core AWS

入门指南:Amazon Bedrock AgentCore 目录 - 前置要求(requisitos‑previos) - 工具包安装(instalación‑del‑toolkit) - 创建…