Part 3: AMQP Setup and Calls - Ktor Native Worker Tutorial
Source: Dev.to
Asynchronous Message Processing with AMQP (RabbitMQ)
Architecture Overview
The messaging system consists of:
- MessageBroker – Interface defining message broker operations
- RabbitMqMessageBroker – RabbitMQ implementation using the Kourier AMQP client
- MessageHandler – Interface for message handlers
- SendNotificationHandler – Handler for processing notification events
- SendNotificationEvent – Data class representing notification events
Constants Configuration
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt
object Constants {
const val RABBITMQ_EXCHANGE = "notifications_exchange"
const val RABBITMQ_QUEUE = "notifications_queue"
const val RABBITMQ_ROUTING_KEY = "notifications.send"
}
These constants define:
- Exchange name for routing messages
- Queue name for storing messages
- Routing key for binding the queue to the exchange
MessageBroker Interface
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt
interface MessageBroker {
suspend fun initialize()
suspend fun publish(exchange: String, routingKey: String, message: String)
suspend fun startConsuming(queue: String, handler: MessageHandler)
}
The interface defines three core operations:
initialize()– Set up the connection and declare exchanges/queuespublish()– Send a message to an exchange with a routing keystartConsuming()– Start consuming messages from a queue with a handler
RabbitMQ Implementation
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt
class RabbitMqMessageBroker(
private val coroutineScope: CoroutineScope,
private val host: String,
private val port: Int,
private val user: String,
private val password: String,
) : MessageBroker {
private lateinit var amqpConnection: AMQPConnection
private lateinit var amqpChannel: AMQPChannel
override suspend fun initialize() {
amqpConnection = createRobustAMQPConnection(coroutineScope) {
server {
host = this@RabbitMqMessageBroker.host
port = this@RabbitMqMessageBroker.port
user = this@RabbitMqMessageBroker.user
password = this@RabbitMqMessageBroker.password
}
}
amqpChannel = amqpConnection.openChannel().also {
it.exchangeDeclare {
name = Constants.RABBITMQ_EXCHANGE
type = BuiltinExchangeType.TOPIC
}
it.queueDeclare {
name = Constants.RABBITMQ_QUEUE
durable = true
}
it.queueBind {
queue = Constants.RABBITMQ_QUEUE
exchange = Constants.RABBITMQ_EXCHANGE
routingKey = Constants.RABBITMQ_ROUTING_KEY
}
}
}
override suspend fun publish(exchange: String, routingKey: String, message: String) {
amqpChannel.basicPublish(
body = message.toByteArray(),
exchange = exchange,
routingKey = routingKey,
properties = properties {
deliveryMode = 2u // Make message persistent
}
)
}
override suspend fun startConsuming(queue: String, handler: MessageHandler) {
amqpChannel.basicConsume(
queue = queue,
noAck = false,
onDelivery = { delivery ->
handler(delivery.message.routingKey, delivery.message.body.decodeToString())
amqpChannel.basicAck(delivery.message.deliveryTag, false)
}
)
}
}
Initialization Flow
| Step | Description |
|---|---|
| Connection Creation | Uses createRobustAMQPConnection() for automatic reconnection; configured with RabbitMQ server credentials; scoped to a coroutine scope for lifecycle management |
| Channel Setup | Opens an AMQP channel; declares a TOPIC exchange for flexible routing; declares a durable queue (survives server restarts); binds the queue to the exchange with a routing key |
| Publishing Messages | Converts the message string to bytes; sets delivery mode to 2 (persistent messages); routes the message via exchange and routing key |
| Consuming Messages | Sets up a consumer on the queue; uses manual acknowledgment (noAck = false); delegates handling to the provided MessageHandler; acknowledges the message after successful processing (basicAck) |
Using the Kourier AMQP Client
The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust), which provides:
- Kotlin Multiplatform support
- Native compatibility (JVM and Native targets)
- Robust connection with automatic reconnection
- Coroutine‑based API
Dependency Configuration
build.gradle.kts
sourceSets {
commonMain.dependencies {
implementation(libs.amqp)
// ... other dependencies
}
}
gradle/libs.versions.toml
[versions]
amqp = "0.4.0"
[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }
MessageHandler Interface
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt
interface MessageHandler {
suspend operator fun invoke(routingKey: String, body: String)
}
With this setup, your application can publish notification events to RabbitMQ and process them asynchronously using a dedicated worker, improving responsiveness and scalability.
Handler Invocation Details
The handler uses the invoke operator, allowing instances to be called like functions. It receives:
routingKey– the routing key of the messagebody– the message body as a string
Notification Event and Handler
Event Data Class
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt
@Serializable
data class SendNotificationEvent(
val title: String,
val body: String,
val token: String,
)
Handler Implementation
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt
class SendNotificationHandler(
private val notificationService: NotificationService,
) : MessageHandler {
override suspend fun invoke(routingKey: String, body: String) {
val event = Serialization.json.decodeFromString(body)
notificationService.sendNotification(
token = event.token,
title = event.title,
body = event.body,
)
}
}
Handler Flow
- Receives the message body as a JSON string.
- Deserializes it to
SendNotificationEventusing Kotlinx Serialization. - Calls
NotificationServiceto send the notification. - The service is injected via constructor (dependency injection).
Serialization Setup
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt
object Serialization {
val json = Json {
ignoreUnknownKeys = true
explicitNulls = false
}
}
This configured JSON instance is used for:
- Serializing events before publishing to RabbitMQ.
- Deserializing events when consuming messages.
- Providing consistent JSON handling across the application.
Configuration details
ignoreUnknownKeys = true– allows adding fields without breaking old consumers.explicitNulls = false– omitsnullvalues from JSON output.
Integration with the Application
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt
fun Application.configureMessageBroker() = runBlocking {
val messageBroker by inject()
messageBroker.initialize()
messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get())
}
This configuration function:
- Injects the
MessageBrokerinstance from Koin. - Initializes the connection and channel.
- Starts consuming messages with
SendNotificationHandler.
Environment Configuration
RabbitMQ connection parameters are supplied via environment variables in the Koin module:
single {
RabbitMqMessageBroker(
coroutineScope = this@mainModule,
host = getEnv("RABBITMQ_HOST") ?: "localhost",
port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
user = getEnv("RABBITMQ_USER") ?: "guest",
password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
)
}
Default values
| Variable | Default |
|---|---|
RABBITMQ_HOST | localhost |
RABBITMQ_PORT | 5672 |
RABBITMQ_USER | guest |
RABBITMQ_PASSWORD | guest |
Message Flow
- An HTTP request arrives at the API endpoint.
- The route publishes a
SendNotificationEventto RabbitMQ. - The message is stored in the queue.
- The consumer receives the message.
SendNotificationHandlerdeserializes and processes it.NotificationServicesends the FCM notification.
Benefits of this architecture
- Decouples HTTP request handling from notification sending.
- Provides reliability through message persistence.
- Enables scalability via asynchronous processing.
- Isolates errors (notification failures don’t affect HTTP responses).
Summary
The AMQP setup demonstrates:
- A clean, interface‑based design for message brokers.
- RabbitMQ integration using the Kourier AMQP client.
- A message‑handler pattern for processing events.
- Serialization with Kotlinx Serialization.
- Environment‑based configuration.
- A worker pattern for asynchronous processing.
In the next part, we’ll explore how to define HTTP routes that publish messages to this message broker.