什么是 Process-Time Temporal Joins?
Source: Dev.to
想象一下,你运营一家在线商店,商品价格经常变动。顾客下单后,五分钟你就更新了他们购买的商品的价格。审查订单时,如何确定顾客实际支付的正确价格?普通的 数据库连接 往往会显示最新的价格,这对过去的订单来说是错误的。
这在 实时数据系统 中是一个常见难题。你需要知道数据在被处理的那一刻到底是什么样子。在 RisingWave 中,你可以轻松使用 处理时间(process‑time)时态连接 来解决这个问题。
什么是处理时间时态连接?
处理时间时态连接是一种特殊的连接方式,它根据系统处理每个事件的时间,将数据流与表进行关联。它不是与记录的最新版本进行连接,而是与事件在流中被处理时该记录所处的版本进行连接。
这正好适用于电商场景:我们可以将 orders 流与 product_prices 表进行连接,找到每笔订单在被处理的那一刻所使用的价格。

步骤演示:为订单追踪价格
我们来构建一个实时物化视图,使用购买时的商品价格来正确计算订单的总费用。
步骤 1:创建表
首先,需要两个数据结构。一个用于存储商品价格(允许更新),另一个用于追加写入的新订单流。
-- 1. 使用自动处理时间跟踪商品价格
CREATE TABLE product_prices (
product_id INT PRIMARY KEY, -- 必须有主键
price FLOAT
);
-- 2. 包含历史订单的追加写入源
CREATE TABLE orders (
order_id INT,
product_id INT,
quantity INT
) APPEND ONLY;
步骤 2:插入示例数据
先插入一些商品及其初始价格,然后立即更新它们,以模拟价格变动。
-- 初始价格(08:00 更新)
INSERT INTO product_prices VALUES
(101, 110.0),
(102, 200.0);
接着插入两笔在价格更新后处理的客户订单。
-- 08:30 处理的订单
INSERT INTO orders VALUES
(1, 101, 2),
(2, 102, 1);
步骤 3:创建带时态连接的物化视图
FOR SYSTEM_TIME AS OF PROCTIME() 语法告诉 RisingWave 将每笔订单与订单处理时刻有效的商品价格版本进行连接。
CREATE MATERIALIZED VIEW order_with_price AS
SELECT
o.order_id,
o.product_id,
p.price AS price_at_purchase
FROM orders o
JOIN product_prices FOR SYSTEM_TIME AS OF PROCTIME() p
ON o.product_id = p.product_id;
步骤 4:查询结果并观察效果
SELECT * FROM order_with_price;
结果(示例):
order_id | product_id | price_at_purchase | price_update_time | order_process_time
--------+------------+-------------------+-------------------+-------------------
1 | 101 | 110 | 08:00:00 | 08:30:00
2 | 102 | 200 | 08:00:00 | 08:30:00
现在再次更新价格。
-- 09:00 更新的价格
INSERT INTO product_prices VALUES
(101, 150.0),
(102, 250.0);
再次查询视图,已有的行保持不变,因为时态连接“锁定”了每笔订单首次处理时的价格。
SELECT * FROM order_with_price;
-- 输出与之前相同
在最新的价格变动后再插入一笔新订单。
-- 09:30 处理的订单
INSERT INTO orders VALUES
(3, 102, 3);
SELECT * FROM order_with_price;
最终结果:
order_id | product_id | price_at_purchase | price_update_time | order_process_time
--------+------------+-------------------+-------------------+-------------------
1 | 101 | 110 | 08:00:00 | 08:30:00
2 | 102 | 200 | 08:00:00 | 08:30:00
3 | 102 | 250 | 09:00:00 | 09:30:00
优化你的连接:追加写入 vs. 非追加写入
RisingWave 会根据左侧流的属性选择不同的连接实现方式:
- 追加写入时态连接 – 本演示使用的方式。因为左侧(
orders)从不更新或删除行,系统无需维护大量状态,效率更高。 - 非追加写入时态连接 – 当左侧可能收到更新或删除时支持此方式。此时需要更多资源,因为 RisingWave 必须跟踪变化并可能撤回之前的结果。
为了获得最佳性能,尽可能在时态连接的左侧使用追加写入源或表。
结论
处理时间时态连接是 RisingWave 中构建需要时间点准确性的应用的强大特性。通过使用 FOR SYSTEM_TIME AS OF PROCTIME() 子句,你可以确保每个事件都与正确版本的参考数据进行连接,在保持历史正确性的同时仍然受益于实时处理。