Building Type-Safe AMQP Messaging with amqp-contract
Source: Dev.to
If you’ve worked with RabbitMQ or AMQP messaging in TypeScript, you’ve probably experienced the frustration of untyped messages, scattered validation logic, and the constant fear of runtime errors from mismatched data structures. What if there was a better way?
Today, I’m excited to introduce amqp‑contract — a TypeScript library that brings contract‑first development, end‑to‑end type safety, and automatic validation to AMQP messaging.
The Problem with Traditional AMQP Development
Publisher (no type safety)
// ❌ Traditional approach – no type safety
import amqp from 'amqplib';
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange('orders', 'topic', { durable: true });
channel.publish(
'orders',
'order.created',
Buffer.from(
JSON.stringify({
orderId: 'ORD-123',
amount: 99.99,
// Did I forget any required fields?
})
)
);
Consumer (no type information)
// ❌ No type information
channel.consume('order-processing', (msg) => {
const data = JSON.parse(msg.content.toString()); // unknown type
console.log(data.orderId); // No autocomplete, no validation
// Is this the right field name? Who knows!
});
Issues
- No Type Safety – TypeScript benefits disappear at the messaging boundary.
- Manual Validation – Every message must be validated by hand, or you risk runtime errors.
- Scattered Definitions – Message shapes are implicit or spread across the codebase.
- Refactoring Nightmares – Renaming a field means hunting down every usage.
- Documentation Drift – Code and docs quickly fall out of sync.
Enter amqp‑contract
amqp‑contract solves these problems with a contract‑first approach, inspired by libraries such as tRPC, oRPC, and ts‑rest. It adapts their end‑to‑end type‑safety philosophy to message queues.
1️⃣ Define Your Contract
import {
defineContract,
defineExchange,
defineQueue,
definePublisher,
defineConsumer,
defineMessage,
defineQueueBinding,
} from '@amqp-contract/contract';
import { z } from 'zod';
// AMQP resources
const ordersExchange = defineExchange('orders', 'topic', { durable: true });
const orderProcessingQueue = defineQueue('order-processing', { durable: true });
// Message schema
const orderMessage = defineMessage(
z.object({
orderId: z.string(),
customerId: z.string(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
})
),
totalAmount: z.number().positive(),
status: z.enum(['pending', 'processing', 'completed']),
})
);
// Contract composition
export const contract = defineContract({
exchanges: { orders: ordersExchange },
queues: { orderProcessing: orderProcessingQueue },
bindings: {
orderBinding: defineQueueBinding(
orderProcessingQueue,
ordersExchange,
{ routingKey: 'order.created' }
),
},
publishers: {
orderCreated: definePublisher(ordersExchange, orderMessage, {
routingKey: 'order.created',
}),
},
consumers: {
processOrder: defineConsumer(orderProcessingQueue, orderMessage),
},
});
2️⃣ Type‑Safe Publishing
import { TypedAmqpClient } from '@amqp-contract/client';
import { contract } from './contract';
const clientResult = await TypedAmqpClient.create({
contract,
urls: ['amqp://localhost'],
});
if (clientResult.isError()) {
throw clientResult.error;
}
const client = clientResult.value;
// ✅ Fully typed – TypeScript knows exactly what fields are required
const result = await client.publish('orderCreated', {
orderId: 'ORD-123',
customerId: 'CUST-456',
items: [{ productId: 'PROD-789', quantity: 2, price: 49.99 }],
totalAmount: 99.98,
status: 'pending',
});
result.match({
Ok: () => console.log('✅ Published'),
Error: (error) => console.error('❌ Failed:', error),
});
3️⃣ Type‑Safe Consuming
import { TypedAmqpWorker } from '@amqp-contract/worker';
import { contract } from './contract';
const workerResult = await TypedAmqpWorker.create({
contract,
handlers: {
// ✅ `message` is fully typed based on the schema
processOrder: async (message) => {
console.log(`Processing order: ${message.orderId}`);
console.log(`Customer: ${message.customerId}`);
console.log(`Total: $${message.totalAmount}`);
// ✅ Autocomplete for all fields
message.items.forEach((item) => {
console.log(`- ${item.quantity}x Product ${item.productId}`);
});
},
},
urls: ['amqp://localhost'],
});
workerResult.match({
Ok: () => console.log('✅ Worker ready'),
Error: (error) => { throw error; },
});
Key Features That Make amqp‑contract Special
🔒 End-to-End Type Safety
TypeScript types flow automatically from your contract to publishers and consumers. No manual type annotations needed. If you refactor your schema, TypeScript immediately shows you every place that needs updating.
✅ Automatic Validation
Messages are automatically validated at network boundaries using Standard Schema v1. This works with Zod, Valibot, and ArkType, giving you the flexibility to choose your preferred validation library.
🛠️ Compile‑Time Checks
TypeScript catches errors before runtime:
// ❌ TypeScript error – "orderDeleted" doesn't exist
await client.publish('orderDeleted', { orderId: '123' });
// ❌ TypeScript error – missing handler
await TypedAmqpWorker.create({
contract,
handlers: {}, // forgot processOrder!
urls: ['amqp://localhost'],
});
📄 AsyncAPI 3.0 Generation
Automatically generate AsyncAPI specifications from your contracts:
import { AsyncAPIGenerator } from '@amqp-contract/asyncapi';
import { ZodToJsonSchemaConverter } from '@orpc/zod/zod4';
const generator = new AsyncAPIGenerator({
schemaConverters: [new ZodToJsonSchemaConverter()],
});
const spec = await generator.generate(contract, {
info: {
title: 'Order Processing API',
version: '1.0.0',
},
servers: {
production: {
host: 'rabbitmq.example.com:5672',
protocol: 'amqp',
},
},
});
🎯 First‑Class NestJS Support
If you’re using NestJS, amqp-contract provides dedicated integration packages:
import { Module } from '@nestjs/common';
import { AmqpWorkerModule } from '@amqp-contract/worker-nestjs';
import { AmqpClientModule } from '@amqp-contract/client-nestjs';
@Module({
imports: [
AmqpWorkerModule.forRoot({
contract,
handlers: {
processOrder: async (message) => {
console.log('Processing:', message.orderId);
},
},
connection: process.env.RABBITMQ_URL,
}),
AmqpClientModule.forRoot({
contract,
connection: process.env.RABBITMQ_URL,
}),
],
})
export class AppModule {}
Why Choose amqp‑contract?
Compared to Raw amqplib
| Feature | amqp‑contract | amqplib |
|---|---|---|
| ✅ Type safety | ✅ | ❌ |
| ✅ Automatic validation | ✅ | ❌ |
| ✅ Compile‑time checks | ✅ | ❌ |
| ✅ Refactoring support | ✅ | ❌ |
| ✅ Documentation from code | ✅ | ❌ |
Compared to Other Solutions
Unlike other AMQP libraries, amqp‑contract:
- Focuses on type safety first – types are derived from your contract.
- Uses Standard Schema v1 – compatible with multiple validation libraries.
- Generates AsyncAPI specs – automatic documentation.
- Provides explicit error handling – uses Result types.
- Is framework agnostic – works standalone or with NestJS.
Getting Started
Installation
# Core packages
pnpm add @amqp-contract/contract @amqp-contract/client @amqp-contract/worker
# Choose your schema library
pnpm add zod # or valibot, or arktype
# AMQP client
pnpm add amqplib @types/amqplib
Quick Start
- Define your contract with schemas.
- Create a client to publish messages.
- Create a worker to consume messages.
- Enjoy type safety end‑to‑end!
Try It Today!
amqp‑contract is open source (MIT license) and available on npm:
Check out the full documentation for detailed guides, API reference, and examples.
Conclusion
Type safety shouldn’t stop at your application boundaries. With amqp‑contract, you can bring the same level of type safety and developer experience you enjoy with TypeScript to your AMQP messaging layer.
- Stop fighting runtime errors.
- Stop manually validating messages.
- Stop worrying about refactoring.
Start building type‑safe, validated, and maintainable messaging systems today.
What do you think? Have you tried amqp‑contract? Share your experiences with type‑safe messaging in the comments!