MySQL 到 VeloDB 实时数据同步概览
发布: (2025年12月3日 GMT+8 04:40)
3 min read
原文: Dev.to
Source: Dev.to
Flink 可以在将数据从 MySQL 兼容数据库(例如 Amazon Aurora)迁移到 VeloDB 时,充当实时数据同步引擎。
它的高吞吐、低延迟流处理能力能够实现:
- 全量数据初始加载 – 将 MySQL/Aurora 中的现有表导入 VeloDB。
- 增量变更捕获 – 使用 MySQL binlog(CDC)捕获 INSERT/UPDATE/DELETE 事件,并持续写入 VeloDB。
整体架构如下图所示:
示例工作流
1. 创建 AWS RDS Aurora MySQL 实例
2. 创建 MySQL 数据库及相应表
CREATE DATABASE test_db;
CREATE TABLE test_db.student (
id INT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
score DECIMAL(5,2),
created_at TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO test_db.student (id, name, age, email, phone, score, created_at)
VALUES
(1, 'Alice Zhang', 22, 'alice@example.com', '13800138000', 89.50, NOW()),
(2, 'Bob Li', 21, 'bob@example.com', '13900139000', 76.80, NOW()),
(3, 'Charlie Wang',23, 'charlie@example.com','13600136000',92.00, NOW()),
(4, 'David Chen', 20, 'david@example.com', '13500135000', 85.60, NOW()),
(5, 'Emma Liu', 22, 'emma@example.com', '13700137000', 78.90, NOW());
3. 创建 VeloDB 仓库
4. 修改 MySQL(Aurora)配置
-
创建参数组 并启用 binlog。

-
将
binlog_format设置为ROW。
-
将参数组关联到 DB 集群并重启实例。

5. 安装带 Doris 连接器的 Flink
5.1 下载预构建包
可以下载并解压包含 Doris 连接器的 Flink 1.17 发行版,直接使用。
5.2 手动安装(如果已有 Flink 环境)
下载以下构件:
- Flink 1.17(二进制发行版)
- Flink MySQL CDC 连接器
- Flink Doris 连接器
- MySQL JDBC 驱动
解压 Flink 发行版:
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz
将连接器 JAR 包和 MySQL 驱动复制到 flink-1.17.2-bin/lib:
6. 提交 Flink 同步作业
作业运行时,Doris 连接器会根据源 MySQL 的 schema 自动在 VeloDB 中创建目标表。Flink 可部署在 Local、Standalone、YARN 或 Kubernetes 模式下。
6.1 本地环境示例
cd flink-1.17.2-bin
bin/flink run -t local \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-25.1.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=database-test.cluster-ro-ckbuyoqerz2c.us-east-1.rds.amazonaws.com \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=YOUR_PASSWORD \
--mysql-conf server-id=5400 \
--mysql-conf server-time-zone=UTC \
--doris-conf fenodes=YOUR_VELODB_ENDPOINT \
--doris-conf username=YOUR_VELODB_USER \
--doris-conf password=YOUR_VELODB_PASSWORD \
--doris-conf table.identifier=test_db.student \
--doris-conf sink.enable-delete=true
将占位符(YOUR_PASSWORD、YOUR_VELODB_ENDPOINT 等)替换为实际的凭证和端点信息。
该作业将:
- 对
test_db.student执行一次快照并加载到 VeloDB。 - 持续读取 Aurora MySQL 的 binlog 事件,并实时将相应的 INSERT、UPDATE、DELETE 应用到 VeloDB 表中。



