Part 3: AMQP Setup and Calls - Ktor Native Worker Tutorial

Published: (December 21, 2025 at 05:44 PM EST)
5 min read
Source: Dev.to

Source: Dev.to

Asynchronous Message Processing with AMQP (RabbitMQ)

Architecture Overview

The messaging system consists of:

  • MessageBroker – Interface defining message broker operations
  • RabbitMqMessageBroker – RabbitMQ implementation using the Kourier AMQP client
  • MessageHandler – Interface for message handlers
  • SendNotificationHandler – Handler for processing notification events
  • SendNotificationEvent – Data class representing notification events

Constants Configuration

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt

object Constants {
    const val RABBITMQ_EXCHANGE   = "notifications_exchange"
    const val RABBITMQ_QUEUE      = "notifications_queue"
    const val RABBITMQ_ROUTING_KEY = "notifications.send"
}

These constants define:

  • Exchange name for routing messages
  • Queue name for storing messages
  • Routing key for binding the queue to the exchange

MessageBroker Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt

interface MessageBroker {
    suspend fun initialize()
    suspend fun publish(exchange: String, routingKey: String, message: String)
    suspend fun startConsuming(queue: String, handler: MessageHandler)
}

The interface defines three core operations:

  • initialize() – Set up the connection and declare exchanges/queues
  • publish() – Send a message to an exchange with a routing key
  • startConsuming() – Start consuming messages from a queue with a handler

RabbitMQ Implementation

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt

class RabbitMqMessageBroker(
    private val coroutineScope: CoroutineScope,
    private val host: String,
    private val port: Int,
    private val user: String,
    private val password: String,
) : MessageBroker {

    private lateinit var amqpConnection: AMQPConnection
    private lateinit var amqpChannel: AMQPChannel

    override suspend fun initialize() {
        amqpConnection = createRobustAMQPConnection(coroutineScope) {
            server {
                host = this@RabbitMqMessageBroker.host
                port = this@RabbitMqMessageBroker.port
                user = this@RabbitMqMessageBroker.user
                password = this@RabbitMqMessageBroker.password
            }
        }

        amqpChannel = amqpConnection.openChannel().also {
            it.exchangeDeclare {
                name = Constants.RABBITMQ_EXCHANGE
                type = BuiltinExchangeType.TOPIC
            }
            it.queueDeclare {
                name = Constants.RABBITMQ_QUEUE
                durable = true
            }
            it.queueBind {
                queue = Constants.RABBITMQ_QUEUE
                exchange = Constants.RABBITMQ_EXCHANGE
                routingKey = Constants.RABBITMQ_ROUTING_KEY
            }
        }
    }

    override suspend fun publish(exchange: String, routingKey: String, message: String) {
        amqpChannel.basicPublish(
            body = message.toByteArray(),
            exchange = exchange,
            routingKey = routingKey,
            properties = properties {
                deliveryMode = 2u // Make message persistent
            }
        )
    }

    override suspend fun startConsuming(queue: String, handler: MessageHandler) {
        amqpChannel.basicConsume(
            queue = queue,
            noAck = false,
            onDelivery = { delivery ->
                handler(delivery.message.routingKey, delivery.message.body.decodeToString())
                amqpChannel.basicAck(delivery.message.deliveryTag, false)
            }
        )
    }
}

Initialization Flow

StepDescription
Connection CreationUses createRobustAMQPConnection() for automatic reconnection; configured with RabbitMQ server credentials; scoped to a coroutine scope for lifecycle management
Channel SetupOpens an AMQP channel; declares a TOPIC exchange for flexible routing; declares a durable queue (survives server restarts); binds the queue to the exchange with a routing key
Publishing MessagesConverts the message string to bytes; sets delivery mode to 2 (persistent messages); routes the message via exchange and routing key
Consuming MessagesSets up a consumer on the queue; uses manual acknowledgment (noAck = false); delegates handling to the provided MessageHandler; acknowledges the message after successful processing (basicAck)

Using the Kourier AMQP Client

The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust), which provides:

  • Kotlin Multiplatform support
  • Native compatibility (JVM and Native targets)
  • Robust connection with automatic reconnection
  • Coroutine‑based API

Dependency Configuration

build.gradle.kts

sourceSets {
    commonMain.dependencies {
        implementation(libs.amqp)
        // ... other dependencies
    }
}

gradle/libs.versions.toml

[versions]
amqp = "0.4.0"

[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }

MessageHandler Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt

interface MessageHandler {
    suspend operator fun invoke(routingKey: String, body: String)
}

With this setup, your application can publish notification events to RabbitMQ and process them asynchronously using a dedicated worker, improving responsiveness and scalability.

Handler Invocation Details

The handler uses the invoke operator, allowing instances to be called like functions. It receives:

  • routingKey – the routing key of the message
  • body – the message body as a string

Notification Event and Handler

Event Data Class

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt

@Serializable
data class SendNotificationEvent(
    val title: String,
    val body: String,
    val token: String,
)

Handler Implementation

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt

class SendNotificationHandler(
    private val notificationService: NotificationService,
) : MessageHandler {
    override suspend fun invoke(routingKey: String, body: String) {
        val event = Serialization.json.decodeFromString(body)
        notificationService.sendNotification(
            token = event.token,
            title = event.title,
            body = event.body,
        )
    }
}

Handler Flow

  • Receives the message body as a JSON string.
  • Deserializes it to SendNotificationEvent using Kotlinx Serialization.
  • Calls NotificationService to send the notification.
  • The service is injected via constructor (dependency injection).

Serialization Setup

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt

object Serialization {
    val json = Json {
        ignoreUnknownKeys = true
        explicitNulls = false
    }
}

This configured JSON instance is used for:

  • Serializing events before publishing to RabbitMQ.
  • Deserializing events when consuming messages.
  • Providing consistent JSON handling across the application.

Configuration details

  • ignoreUnknownKeys = true – allows adding fields without breaking old consumers.
  • explicitNulls = false – omits null values from JSON output.

Integration with the Application

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt

fun Application.configureMessageBroker() = runBlocking {
    val messageBroker by inject()
    messageBroker.initialize()
    messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get())
}

This configuration function:

  • Injects the MessageBroker instance from Koin.
  • Initializes the connection and channel.
  • Starts consuming messages with SendNotificationHandler.

Environment Configuration

RabbitMQ connection parameters are supplied via environment variables in the Koin module:

single {
    RabbitMqMessageBroker(
        coroutineScope = this@mainModule,
        host = getEnv("RABBITMQ_HOST") ?: "localhost",
        port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
        user = getEnv("RABBITMQ_USER") ?: "guest",
        password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
    )
}

Default values

VariableDefault
RABBITMQ_HOSTlocalhost
RABBITMQ_PORT5672
RABBITMQ_USERguest
RABBITMQ_PASSWORDguest

Message Flow

  1. An HTTP request arrives at the API endpoint.
  2. The route publishes a SendNotificationEvent to RabbitMQ.
  3. The message is stored in the queue.
  4. The consumer receives the message.
  5. SendNotificationHandler deserializes and processes it.
  6. NotificationService sends the FCM notification.

Benefits of this architecture

  • Decouples HTTP request handling from notification sending.
  • Provides reliability through message persistence.
  • Enables scalability via asynchronous processing.
  • Isolates errors (notification failures don’t affect HTTP responses).

Summary

The AMQP setup demonstrates:

  • A clean, interface‑based design for message brokers.
  • RabbitMQ integration using the Kourier AMQP client.
  • A message‑handler pattern for processing events.
  • Serialization with Kotlinx Serialization.
  • Environment‑based configuration.
  • A worker pattern for asynchronous processing.

In the next part, we’ll explore how to define HTTP routes that publish messages to this message broker.

Back to Blog

Related posts

Read more »