什么是 Process-Time Temporal Joins?

发布: (2025年12月2日 GMT+8 11:00)
6 min read
原文: Dev.to

Source: Dev.to

想象一下,你运营一家在线商店,商品价格经常变动。顾客下单后,五分钟你就更新了他们购买的商品的价格。审查订单时,如何确定顾客实际支付的正确价格?普通的 数据库连接 往往会显示最新的价格,这对过去的订单来说是错误的。

这在 实时数据系统 中是一个常见难题。你需要知道数据在被处理的那一刻到底是什么样子。在 RisingWave 中,你可以轻松使用 处理时间(process‑time)时态连接 来解决这个问题。

什么是处理时间时态连接?

处理时间时态连接是一种特殊的连接方式,它根据系统处理每个事件的时间,将数据流与表进行关联。它不是与记录的最新版本进行连接,而是与事件在流中被处理时该记录所处的版本进行连接。

这正好适用于电商场景:我们可以将 orders 流与 product_prices 表进行连接,找到每笔订单在被处理的那一刻所使用的价格。

什么是处理时间时态连接

步骤演示:为订单追踪价格

我们来构建一个实时物化视图,使用购买时的商品价格来正确计算订单的总费用。

步骤 1:创建表

首先,需要两个数据结构。一个用于存储商品价格(允许更新),另一个用于追加写入的新订单流。

-- 1. 使用自动处理时间跟踪商品价格
CREATE TABLE product_prices (
    product_id INT PRIMARY KEY,   -- 必须有主键
    price FLOAT
);
-- 2. 包含历史订单的追加写入源
CREATE TABLE orders (
    order_id   INT,
    product_id INT,
    quantity   INT
) APPEND ONLY;

步骤 2:插入示例数据

先插入一些商品及其初始价格,然后立即更新它们,以模拟价格变动。

-- 初始价格(08:00 更新)
INSERT INTO product_prices VALUES
(101, 110.0),
(102, 200.0);

接着插入两笔在价格更新后处理的客户订单。

-- 08:30 处理的订单
INSERT INTO orders VALUES
(1, 101, 2),
(2, 102, 1);

步骤 3:创建带时态连接的物化视图

FOR SYSTEM_TIME AS OF PROCTIME() 语法告诉 RisingWave 将每笔订单与订单处理时刻有效的商品价格版本进行连接。

CREATE MATERIALIZED VIEW order_with_price AS
SELECT
    o.order_id,
    o.product_id,
    p.price AS price_at_purchase
FROM orders o
JOIN product_prices FOR SYSTEM_TIME AS OF PROCTIME() p
    ON o.product_id = p.product_id;

步骤 4:查询结果并观察效果

SELECT * FROM order_with_price;

结果(示例):

order_id | product_id | price_at_purchase | price_update_time | order_process_time
--------+------------+-------------------+-------------------+-------------------
1       | 101        | 110               | 08:00:00          | 08:30:00
2       | 102        | 200               | 08:00:00          | 08:30:00

现在再次更新价格。

-- 09:00 更新的价格
INSERT INTO product_prices VALUES
(101, 150.0),
(102, 250.0);

再次查询视图,已有的行保持不变,因为时态连接“锁定”了每笔订单首次处理时的价格。

SELECT * FROM order_with_price;
-- 输出与之前相同

在最新的价格变动后再插入一笔新订单。

-- 09:30 处理的订单
INSERT INTO orders VALUES
(3, 102, 3);
SELECT * FROM order_with_price;

最终结果:

order_id | product_id | price_at_purchase | price_update_time | order_process_time
--------+------------+-------------------+-------------------+-------------------
1       | 101        | 110               | 08:00:00          | 08:30:00
2       | 102        | 200               | 08:00:00          | 08:30:00
3       | 102        | 250               | 09:00:00          | 09:30:00

优化你的连接:追加写入 vs. 非追加写入

RisingWave 会根据左侧流的属性选择不同的连接实现方式:

  • 追加写入时态连接 – 本演示使用的方式。因为左侧(orders)从不更新或删除行,系统无需维护大量状态,效率更高。
  • 非追加写入时态连接 – 当左侧可能收到更新或删除时支持此方式。此时需要更多资源,因为 RisingWave 必须跟踪变化并可能撤回之前的结果。

为了获得最佳性能,尽可能在时态连接的左侧使用追加写入源或表。

结论

处理时间时态连接是 RisingWave 中构建需要时间点准确性的应用的强大特性。通过使用 FOR SYSTEM_TIME AS OF PROCTIME() 子句,你可以确保每个事件都与正确版本的参考数据进行连接,在保持历史正确性的同时仍然受益于实时处理。

Back to Blog

相关文章

阅读更多 »

切换账户

@blink_c5eb0afe3975https://dev.to/blink_c5eb0afe3975 正如大家所知,我正重新开始记录我的进展,我认为最好在一个不同的…

Strands 代理 + Agent Core AWS

入门指南:Amazon Bedrock AgentCore 目录 - 前置要求(requisitos‑previos) - 工具包安装(instalación‑del‑toolkit) - 创建…