Kontiki,Python 中的异步微服务框架

发布: (2026年3月30日 GMT+8 18:01)
6 分钟阅读
原文: Dev.to

Source: Dev.to

Kontiki 是一个基于 AMQP (RabbitMQ)、使用 aio‑pikaasyncio 的 Python 微服务框架。
对于熟悉 Nameko 的团队来说,其整体理念会感到熟悉:

  • 基于 RabbitMQ 的消息驱动服务
  • RPC 与事件作为一等公民

Kontiki 属于同一“家族”的框架,同时侧重于:

  • asyncio‑native 实现
  • delegate‑oriented 服务结构
  • configuration‑driven 运行器

该项目是开源的(Apache‑2.0),代码托管在 GitHub 上:
github.com/kontiki-org/kontiki

服务结构

Kontiki 鼓励将服务类保持为 入口层,并将领域逻辑移入 委托ServiceDelegate),使用 setup/start/stop 生命周期钩子。

# delegate.py
from kontiki.delegate import ServiceDelegate
from kontiki.messaging import Messenger, on_event, rpc

class MyDelegate(ServiceDelegate):
    async def setup(self):
        # initialize from self.container.config
        pass

    async def start(self):
        # optional: start background tasks / open connections
        pass

    async def stop(self):
        # optional: stop background tasks / close connections
        pass

    def do_something(self, x: int) -> int:
        return x * 2
# service.py
class MyService:
    name = "compute-api"          # optional; defaults to class name
    delegate = MyDelegate()
    messenger = Messenger()       # publish events / call other services

    @rpc
    async def compute(self, x: int):
        return self.delegate.do_something(x)

    @on_event("thing_happened")
    async def on_thing(self, payload):
        await self.messenger.publish("thing_processed", {"payload": payload})

运行服务

通过 cli.run(...) 将服务暴露为 CLI 入口点 并将其注册为脚本。

# myapp/main.py
from kontiki.runner import cli
from myapp.service import MyService

def run():
    cli.run(MyService, "Example Kontiki service.", version="1.0.0")

将入口点添加到 pyproject.toml(Poetry 示例):

[tool.poetry.scripts]
my_service = "myapp.main:run"

使用一个(或多个)配置文件运行服务:

my_service --config config.yaml

事件(Fire‑and‑Forget)

消费者使用 @on_event("event_type") 进行订阅;发布者通过 Messenger.publish(...) 触发事件。

# Server‑side (service)
class UserEventsService:
    @on_event("user.created")
    async def on_user_created(self, payload: dict):
        ...
# Client‑side (stand‑alone script)
async def publish_event_from_script():
    async with Messenger(standalone=True) as messenger:
        await messenger.publish("user.created", {"id": "u_123"})
# Client‑side (from another service)
class BillingService:
    messenger = Messenger()

    @rpc
    async def create_invoice(self, user_id: str):
        await self.messenger.publish("user.created", {"id": user_id})
        return "ok"

RPC(请求/回复)

Kontiki RPC 是基于 AMQP 的同步请求/回复。

  • 服务器端 – 使用 @rpc 暴露处理器(可以返回 rpc_error(code, message))。
  • 客户端 – 使用 RpcProxy 调用远程方法。
# Server‑side (service)
class RpcService:
    @rpc
    async def rpc_example(self, feature: str):
        if feature == "bad_input":
            return rpc_error("USER_INPUT_ERROR", "Invalid feature value")
        return "ok"
# Client‑side (stand‑alone script)
async def call_rpc_from_script():
    async with Messenger(standalone=True) as messenger:
        rpc_service = RpcProxy(messenger, service_name="RpcService")
        return await rpc_service.rpc_example("bad_input")
# Client‑side (from another service)
class ApiGatewayService:
    messenger = Messenger()

    @rpc
    async def compute(self, feature: str):
        rpc_service = RpcProxy(self.messenger, service_name="RpcService")
        return await rpc_service.rpc_example(feature)

HTTP 端点

Kontiki 可以直接使用 @http(...) 来暴露 HTTP 路由。
可选地使用 Pydantic 模型验证请求体,并生成 OpenAPI/Swagger UI。

class SimpleHttpService:
    http_error_handlers = {HttpExampleError: (400, "Example error occurred")}

    @http("/health", "GET", version="v1", response_model=HelloResponse)
    async def health(self, request):
        return HelloResponse(message="ok").model_dump()

定期任务

使用 @task(interval=..., immediate=...) 在服务循环中运行周期性的异步工作。

class TaskService:
    @task(interval=10, immediate=True)
    async def task_example(self):
        ...

配置

服务可以通过可重复的 --config 使用多个 YAML 文件启动(例如,共享的 base.yaml 加上环境特定的 env.yaml)。

# base.yaml
kontiki:
  amqp:
    url: amqp://guest:guest@localhost/

app:
  cache:
    ttl_seconds: 60
# env.yaml
app:
  cache:
    enabled: true
my_service --config base.yaml --config env.yaml

配置值通常在 setup() 中使用点分路径加载:

from kontiki.configuration import get_parameter
from kontiki.delegate import ServiceDelegate

class MyDelegate(ServiceDelegate):
    async def setup(self):
        self.cache_ttl_seconds = get_parameter(
            self.container.config,
            "app.cache.ttl_seconds",
            60,
        )

健康与降级模式检查

Kontiki 注册服务 运行时,服务可以注册并暴露运行状态:

  • 可配置间隔的 心跳
  • 降级模式 检查(自定义逻辑)
  • 可选的 事件/异常跟踪 以提升可视性
from pathlib import Path
from kontiki.decorators import degraded_on

class MyService:
    @degraded_on
    def is_degraded(self) -> bool:
        required_dir = Path("/var/lib/my-service/uploads")
        return not required_dir.exists()

为了提供运维友好的视图,兄弟仓库 kontiki-tui 提供了一个终端 UI,用于检查和操作 Kontiki 生态系统。

TL;DR

Kontiki 非常适合在 RabbitMQ 上构建 基于 asyncio 的、消息驱动的微服务,它提供了用于 事件RPC 的一流原语,同时抽象掉了底层 AMQP 接线、生命周期管理和配置处理。

为每个服务提供合理的管道

本文提供了一个快速概览。Kontiki 还围绕核心功能提供了高级选项:

  • 会话
  • 广播投递
  • 消息头部
  • 错误/重试行为
  • 以及更多…

仓库中包含了一套完整的可运行示例。你可以在本地启动 RabbitMQ 并通过 Makefile 目标运行大多数示例,例如:

make run-amqp
make run-

(将 “ 替换为所需的 run-* 命令。)

0 浏览
Back to Blog

相关文章

阅读更多 »

我 Fork 了 Httpx

我无法从提供的 URL 中检索内容,因此无法按要求清理和格式化文章。如果您能直接提供文章的文本,...