预算有限的事件驱动架构:Kotlin 协程 + Redis Streams
I’m sorry, but I don’t have the ability to retrieve the content from external links. If you can provide the text you’d like translated, I’ll be happy to translate it into Simplified Chinese while preserving the formatting and code blocks as requested.
我们将构建的内容
今天我们将使用 Kotlin 协程和 Redis Streams 构建一个轻量级的事件驱动管道。完成后,你将拥有一个能够缓冲和批处理事件的生产者、基于 Kotlin Flows 的消费者,以及一个干净的抽象层,使你以后可以在不重写应用的情况下换成 Kafka。
前置条件
- Kotlin 1.9+ 与协程 (
kotlinx-coroutines-core) - 正在运行的 Redis 6.2+ 实例(Docker 也可以:
docker run -p 6379:6379 redis:7) - 支持协程的 Lettuce Redis 客户端 (
lettuce-core) - 对 Kotlin 协程和
Flow有基本了解
第一步:构建缓冲生产者
生产者使用 Kotlin Channel 实现本地背压,然后将事件批量刷新到 Redis Streams。
class EventProducer(
private val redis: RedisCoroutinesCommands,
private val stream: String,
bufferSize: Int = 1024
) {
private val channel = Channel(bufferSize)
suspend fun emit(event: Map) {
channel.send(event)
}
fun startFlusher(scope: CoroutineScope, batchSize: Int = 100) {
scope.launch {
val batch = mutableListOf()
for (event in channel) {
batch.add(event)
if (batch.size >= batchSize || channel.isEmpty) {
batch.forEach { redis.xadd(stream, it) }
batch.clear()
}
}
}
}
}Channel 在缓冲区满时会挂起调用者,而不是丢弃事件。这种背压传播使得该设置在生产环境中安全。
第2步:使用 Flow 构建消费者
每个消费者通过 XREADGROUP 从 Redis 消费组读取数据,并将消息发送到 Kotlin 的 Flow 中。
fun CoroutineScope.consumeStream(
redis: RedisCoroutinesCommands,
stream: String,
group: String,
consumer: String
): Flow = flow {
runCatching { redis.xgroupCreate(stream, group, "0") }
while (currentCoroutineContext().isActive) {
val messages = redis.xreadgroup(
Consumer.from(group, consumer),
XReadArgs.Builder.count(50).block(Duration.ofSeconds(2)),
XReadArgs.StreamOffset.lastConsumed(stream)
)
messages.forEach { emit(it) }
}
}一个消费者协程大约占用 200 字节的堆内存。实际使用中,单个 512 MB 容器可以运行 500 个并发消费者,处理约 12 000 条事件/秒,p99 延迟低于 15 ms。
第三步:抽象事件总线
从一开始就将所有内容封装在接口后面,以避免后期昂贵的重写。
interface EventBus {
suspend fun publish(topic: String, event: Map)
fun subscribe(topic: String, group: String): Flow
}您的 RedisEventBus 实现了此接口。当您不再满足于 Redis Streams 时,可以编写一个 KafkaEventBus 并以最小的改动进行替换。
注意事项
- 保留受内存限制。 Redis Streams 位于 RAM 中。如果需要保留时间超过 48–72 小时,或流的深度超出可用内存,Kafka 更加经济。使用
MAXLEN与XADD来限制内存使用。 - 没有模式注册表。 需要自行管理模式演进(例如,在事件映射中添加版本字段
"v")。 - 跨数据中心复制脆弱。 Redis 复制适用于单区域部署;多区域场景更适合使用 Kafka。
channel.isEmpty不是原子操作。 批量刷新检查是尽力而为的;在极高吞吐量下,可能会刷新比预期更小的批次。这对正确性没有影响。- 消费者组创建默认不是幂等的。
runCatching包裹的xgroupCreate可防止在组已存在时出现错误。
何时迁移到 Kafka
监控两个指标:持续事件速率和保留深度。当你持续达到约 40 K 事件/秒,或需要超过 72 小时的保留时,开始规划迁移。在此阈值以下,Kafka 会额外增加约 $400/月 的基础设施成本和运营开销,而这些可能并非必要。
结论
如果每秒事件数低于 50 K,建议从 Redis Streams 开始。使用 Kotlin 协程 Channel 来实现背压,并使用 Flow 进行消费。从一开始就将事件总线抽象为接口。这种方法能够以 Kafka 成本和复杂度的一小部分,提供生产就绪的事件驱动工作流。