Postgres Pro 中的消息队列:摆脱外部代理,实现真正的事务可靠性
Source: Dev.to
在分布式系统的时代——每个组件不仅要快,还要可预测——数据交换的可靠性变得至关重要。想象一下:用户点击 “生成报告”,瞬间就有十几项流程需要同步——从生成文档到发送邮件。但如果邮件服务器暂时宕机呢?或者任务处理器在操作中途崩溃?
这正是消息队列发挥作用的地方:它们把混乱的请求风暴转化为受控的流,确保没有任务在途中丢失。
创建 PostgreSQL 内置队列的背后故事源于一个熟悉的痛点:外部代理如 RabbitMQ 或 Kafka——虽然强大——却带来复杂性。它们需要专用服务器、集群、监控、备份……整个动物园。在拥有成千上万部署的企业环境中,每增加一个组件都会提升运维风险和管理负担。
于是自然会产生这样的问题:为什么要在已有的数据库内部实现队列时,还要额外加装一个独立的代理? 这不仅节省时间,还消除了数据一致性的问题:如果事务回滚,消息也随之回滚——无需额外代码,也不需要神奇的粘合剂。
两种队列方法:基于日志 vs AMQP/JMS
分布式系统已经收敛为两种主要的消息处理模型:
-
基于日志的队列(例如 Kafka)。
这些队列以追加写入的方式维护事件日志。数据严格按顺序写入,消费者也以相同的顺序读取。非常适合微服务之间的数据同步或数据库复制。它们最大的优势——线性顺序——在需要灵活性时会成为短板。无法轻松按优先级挑选消息或即时过滤。 -
AMQP/JMS 中间件(例如 RabbitMQ)。
这些中间件让你能够管理消息生命周期:优先级、过滤、错误处理。你可以模拟异步 RPC,并可靠地重试失败的操作。它们的弱点不在协议本身,而在架构上:没有外部中间件能够保证与数据库的完整事务一致性。
外部经纪人(消息中间件)的问题:事务与复杂性
在企业环境中使用 Kafka 或 RabbitMQ 往往会导致一些细微但非常真实的问题:
数据不匹配 – 想象一下:应用向 RabbitMQ 发送消息后立即开启数据库事务。消息已经进入经纪人,但数据库事务在提交时失败。结果如何?出现不一致:任务已经执行,但其数据并未保存。可以通过两阶段提交(2PC)来解决,但这会再引入一个协调者,降低整体性能,并使架构更加复杂。
运维开销 – 外部经纪人需要单独的安装、调优、监控和备份。它们会增加新的故障点(例如,应用与经纪人之间的网络分区)。支持难度也会提升——尤其是当不同团队分别负责数据库和经纪人时。
Postgres Pro Enterprise 队列:事务性、本地、自动化
全新的 pgpro_queue 扩展是对那些厌倦了处理一致性问题的开发者的直接回应。通过将队列集成到数据库内部,它彻底消除了对外部组件的需求。
消息存储在普通表中,通过标准的 PostgreSQL 机制进行复制,并且参与与应用逻辑相同的事务。
安装与设置
设置 pgpro_queue 十分简便:
-
在
shared_preload_libraries中启用(postgresql.conf)shared_preload_libraries = 'pgpro_queue' -
创建扩展
CREATE EXTENSION pgpro_queue; -
初始化内部对象
SELECT queue_initialize();这会创建专用模式
pgpro_queue_data,确保pg_dump和复制过程顺畅。
关键特性及其工作原理
回滚重试(自动重试)
这是 pgpro_queue 的核心优势。如果处理消息的事务回滚(例如外部服务不可用),消息不会丢失——它会自动返回队列。
配置
使用 CREATE_QUEUE 并设置 q_retries 与 q_retrydelay 来为每个队列指定重试策略。
重要 – 要启用重试,必须在 postgresql.conf 中设置数据库名称:
pgpro_queue.database_with_managed_retry = 'your_database_name'
如果未设置此项,回滚时消息将被删除。
过滤与优先级
- 优先级 – 通过
INSERT_MESSAGE插入消息时,可设置q_msg_priority。数值越小,优先级越高。高优先级的消息会先被处理。 - 过滤 –
READ_MESSAGE和READ_MESSAGE_XML接受q_msg_hfilter与q_msg_pfilter参数,允许根据头部或属性内容获取消息。
JSON 与 XML 支持
独立的 API 简化了与外部系统的集成:
| 格式 | 插入 API | 读取 API |
|---|---|---|
| JSON | INSERT_MESSAGE | READ_MESSAGE |
| XML | INSERT_MESSAGE_XML | READ_MESSAGE_XML |
| 任意 | — | READ_MESSAGE_ANY |
延迟执行
使用 q_msg_enable_time 可以推迟消息的可用时间。
Source: …
底层原理
pgpro_queue 基于一个简单的原则:消息存放在普通的 PostgreSQL 表中。这使得能够进行 WAL 记录、崩溃安全,并且完整参与数据库的复制和备份机制。
恢复与流复制
每条消息都会写入 WAL,确保即使断电后也能恢复。其核心亮点是与 PostgreSQL 事务引擎紧密集成的 回滚重试 机制。如果处理失败,消息会透明地返回以供再次尝试——无需自定义逻辑。
另一个优势是它与内置的 Postgres Pro Enterprise 调度器 的协同工作。这个内部类似 cron 的系统可以启动周期性任务,这些任务本身会生成子任务并将其推入队列。
示例: 生成数千份地区报告。调度器创建地区任务,这些任务会根据当地时区将消息入队。
外部消息中间件如 Kafka 确实很强大,但往往是孤立使用的。在真实系统中,薄弱环节往往出现在集成层:网络、配置以及事务边界。使用 pgpro_queue,队列与数据处于同一宇宙中。得益于 WAL 与复制,消息在崩溃后仍能存活。管理员不再需要单独的仪表盘;所有操作都可以通过熟悉的 PostgreSQL 工具集完成。
重要技术说明
- 隔离级别 – 仅在
READ COMMITTED事务中受支持。 - 预准备事务(2PC) – 如果包含
READ_MESSAGE()的事务通过PREPARE TRANSACTION进行预准备,则该消息会保持锁定状态,直至执行COMMIT PREPARED或ROLLBACK PREPARED。- 注意:
ROLLBACK PREPARED会解锁消息,但 不会 触发重试。
- 注意:
路线图:内置队列的下一步
即将推出的功能包括:
- Pub/Sub 订阅系统 – 类似 RabbitMQ 交换机:生产者向主题发布,所有订阅者收到副本。这实现了完整的事件驱动架构。
- 回调通知 – 数据库将在消息到达时发送 HTTP 回调——不再需要轮询循环。
- 死信队列(DLQ) – 计划在后续版本中实现:用于存放有问题的消息的安全位置。
结论
对于企业系统,主要的好处是 可预测性。每条消息都在数据库事务中处理,消除了不一致性。基础设施变得更简单:不再是服务的丛林,而是一个统一的环境,队列、业务逻辑和调度都是一等公民。
这种方法非常适合错误代价高昂的领域——银行、医疗、政府登记。如果你的应用依赖原子操作且需要低运维开销,PostgreSQL 内置的队列不仅仅是便利——它们是必不可少的。它们不仅传递消息——它们为分布式系统的混乱带来秩序。
