使用 amqp-contract 构建类型安全的 AMQP 消息传递

发布: (2025年12月25日 GMT+8 09:49)
7 min read
原文: Dev.to

Source: Dev.to

如果你使用过 RabbitMQ 或在 TypeScript 中进行 AMQP 消息传递,你可能已经体会到未类型化消息、分散的验证逻辑以及因数据结构不匹配而导致的运行时错误的持续担忧。有没有更好的办法?

今天,我很高兴向大家介绍 amqp‑contract —— 一个 TypeScript 库,为 AMQP 消息传递带来契约优先开发、端到端类型安全以及自动验证。

传统 AMQP 开发的问题

发布者(无类型安全)

// ❌ Traditional approach – no type safety
import amqp from 'amqplib';

const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();

await channel.assertExchange('orders', 'topic', { durable: true });

channel.publish(
  'orders',
  'order.created',
  Buffer.from(
    JSON.stringify({
      orderId: 'ORD-123',
      amount: 99.99,
      // Did I forget any required fields?
    })
  )
);

消费者(无类型信息)

// ❌ No type information
channel.consume('order-processing', (msg) => {
  const data = JSON.parse(msg.content.toString()); // unknown type
  console.log(data.orderId); // No autocomplete, no validation
  // Is this the right field name? Who knows!
});

问题

  • 没有类型安全 – 在消息边界上 TypeScript 的优势消失。
  • 手动校验 – 每条消息都必须手动验证,否则会出现运行时错误。
  • 定义分散 – 消息结构是隐式的或分布在代码库各处。
  • 重构噩梦 – 重命名字段意味着要逐一查找所有使用位置。
  • 文档漂移 – 代码和文档很快不同步。

进入 amqp‑contract

amqp‑contract 通过契约优先(contract‑first)的方法解决这些问题,灵感来源于诸如 tRPCoRPCts‑rest 等库。它将它们的端到端类型安全理念迁移到消息队列上。

1️⃣ 定义你的契约

import {
  defineContract,
  defineExchange,
  defineQueue,
  definePublisher,
  defineConsumer,
  defineMessage,
  defineQueueBinding,
} from '@amqp-contract/contract';
import { z } from 'zod';

// AMQP 资源
const ordersExchange = defineExchange('orders', 'topic', { durable: true });
const orderProcessingQueue = defineQueue('order-processing', { durable: true });

// 消息模式
const orderMessage = defineMessage(
  z.object({
    orderId: z.string(),
    customerId: z.string(),
    items: z.array(
      z.object({
        productId: z.string(),
        quantity: z.number().int().positive(),
        price: z.number().positive(),
      })
    ),
    totalAmount: z.number().positive(),
    status: z.enum(['pending', 'processing', 'completed']),
  })
);

// 契约组合
export const contract = defineContract({
  exchanges: { orders: ordersExchange },
  queues: { orderProcessing: orderProcessingQueue },
  bindings: {
    orderBinding: defineQueueBinding(
      orderProcessingQueue,
      ordersExchange,
      { routingKey: 'order.created' }
    ),
  },
  publishers: {
    orderCreated: definePublisher(ordersExchange, orderMessage, {
      routingKey: 'order.created',
    }),
  },
  consumers: {
    processOrder: defineConsumer(orderProcessingQueue, orderMessage),
  },
});

2️⃣ 类型安全的发布

import { TypedAmqpClient } from '@amqp-contract/client';
import { contract } from './contract';

const clientResult = await TypedAmqpClient.create({
  contract,
  urls: ['amqp://localhost'],
});

if (clientResult.isError()) {
  throw clientResult.error;
}

const client = clientResult.value;

// ✅ 完全类型化 – TypeScript 完全知道哪些字段是必需的
const result = await client.publish('orderCreated', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  items: [{ productId: 'PROD-789', quantity: 2, price: 49.99 }],
  totalAmount: 99.98,
  status: 'pending',
});

result.match({
  Ok: () => console.log('✅ Published'),
  Error: (error) => console.error('❌ Failed:', error),
});

3️⃣ 类型安全的消费

import { TypedAmqpWorker } from '@amqp-contract/worker';
import { contract } from './contract';

const workerResult = await TypedAmqpWorker.create({
  contract,
  handlers: {
    // ✅ `message` 根据模式已完整类型化
    processOrder: async (message) => {
      console.log(`Processing order: ${message.orderId}`);
      console.log(`Customer: ${message.customerId}`);
      console.log(`Total: $${message.totalAmount}`);

      // ✅ 所有字段都有自动补全
      message.items.forEach((item) => {
        console.log(`- ${item.quantity}x Product ${item.productId}`);
      });
    },
  },
  urls: ['amqp://localhost'],
});

workerResult.match({
  Ok: () => console.log('✅ Worker ready'),
  Error: (error) => { throw error; },
});

使 amqp‑contract 与众不同的关键特性

🔒 端到端类型安全

TypeScript 类型会自动从你的合同流向发布者和消费者。无需手动类型注解。如果你重构模式,TypeScript 会立即显示所有需要更新的地方。

✅ 自动验证

消息在网络边界使用 Standard Schema v1 自动进行验证。这与 ZodValibotArkType 兼容,让你可以灵活选择喜欢的验证库。

🛠️ 编译时检查

TypeScript 在运行时之前捕获错误:

// ❌ TypeScript error – "orderDeleted" doesn't exist
await client.publish('orderDeleted', { orderId: '123' });

// ❌ TypeScript error – missing handler
await TypedAmqpWorker.create({
  contract,
  handlers: {}, // forgot processOrder!
  urls: ['amqp://localhost'],
});

📄 AsyncAPI 3.0 生成

自动从你的合同生成 AsyncAPI 规范:

import { AsyncAPIGenerator } from '@amqp-contract/asyncapi';
import { ZodToJsonSchemaConverter } from '@orpc/zod/zod4';

const generator = new AsyncAPIGenerator({
  schemaConverters: [new ZodToJsonSchemaConverter()],
});

const spec = await generator.generate(contract, {
  info: {
    title: 'Order Processing API',
    version: '1.0.0',
  },
  servers: {
    production: {
      host: 'rabbitmq.example.com:5672',
      protocol: 'amqp',
    },
  },
});

🎯 一流的 NestJS 支持

如果你在使用 NestJSamqp-contract 提供了专用的集成包:

import { Module } from '@nestjs/common';
import { AmqpWorkerModule } from '@amqp-contract/worker-nestjs';
import { AmqpClientModule } from '@amqp-contract/client-nestjs';

@Module({
  imports: [
    AmqpWorkerModule.forRoot({
      contract,
      handlers: {
        processOrder: async (message) => {
          console.log('Processing:', message.orderId);
        },
      },
      connection: process.env.RABBITMQ_URL,
    }),
    AmqpClientModule.forRoot({
      contract,
      connection: process.env.RABBITMQ_URL,
    }),
  ],
})
export class AppModule {}

为什么选择 amqp‑contract

与原始 amqplib 对比

Featureamqp‑contractamqplib
✅ Type safety
✅ Automatic validation
✅ Compile‑time checks
✅ Refactoring support
✅ Documentation from code

与其他解决方案对比

与其他 AMQP 库不同,amqp‑contract

  • 首先关注 类型安全 —— 类型来源于你的合同。
  • 使用 Standard Schema v1 —— 与多种验证库兼容。
  • 生成 AsyncAPI 规范 —— 自动文档。
  • 提供 显式错误处理 —— 使用 Result 类型。
  • 框架无关 —— 可独立使用,也可与 NestJS 配合。

入门

安装

# Core packages
pnpm add @amqp-contract/contract @amqp-contract/client @amqp-contract/worker

# Choose your schema library
pnpm add zod          # or valibot, or arktype

# AMQP client
pnpm add amqplib @types/amqplib

快速开始

  1. 使用 schema 定义你的 contract
  2. 创建一个 client 用于发布消息。
  3. 创建一个 worker 用于消费消息。
  4. 享受端到端的类型安全

立即尝试!

amqp‑contract 是开源的(MIT 许可证),并可在 npm 上获取:

查看 完整文档 以获取详细指南、API 参考和示例。

结论

类型安全不应仅限于应用程序边界。使用 amqp‑contract,你可以将在 TypeScript 中享受的同等级别的类型安全和开发者体验带入 AMQP 消息层。

  • 不再与运行时错误搏斗。
  • 不再手动验证消息。
  • 不再担心重构带来的问题。

立即开始构建类型安全、经过验证且易于维护的消息系统吧。

你有什么想法? 你尝试过 amqp‑contract 吗?在评论中分享你在类型安全消息方面的经验吧!

Back to Blog

相关文章

阅读更多 »

从混乱到秩序的前端

它是如何工作的 - 后端为微服务更新 GraphQL schema。 - 前端拉取最新的 schema,创建查询/变更并重新生成 type。 - 任何…