Kontiki, an async microservices framework in Python
Source: Dev.to
Kontiki is a Python micro‑services framework built on AMQP (RabbitMQ) via aio‑pika and asyncio.
For teams familiar with Nameko, the overall philosophy will feel familiar:
- Message‑driven services on top of RabbitMQ
- RPC and events as first‑class primitives
Kontiki lives in the same “family” of frameworks while leaning into:
- an asyncio‑native implementation
- a delegate‑oriented service structure
- a configuration‑driven runner
The project is open‑source (Apache‑2.0) and the code is available on GitHub:github.com/kontiki-org/kontiki
Service Structure
Kontiki encourages keeping the service class as an entry‑point layer and moving domain logic into delegates (ServiceDelegate) with setup/start/stop lifecycle hooks.
# delegate.py
from kontiki.delegate import ServiceDelegate
from kontiki.messaging import Messenger, on_event, rpc
class MyDelegate(ServiceDelegate):
async def setup(self):
# initialize from self.container.config
pass
async def start(self):
# optional: start background tasks / open connections
pass
async def stop(self):
# optional: stop background tasks / close connections
pass
def do_something(self, x: int) -> int:
return x * 2# service.py
class MyService:
name = "compute-api" # optional; defaults to class name
delegate = MyDelegate()
messenger = Messenger() # publish events / call other services
@rpc
async def compute(self, x: int):
return self.delegate.do_something(x)
@on_event("thing_happened")
async def on_thing(self, payload):
await self.messenger.publish("thing_processed", {"payload": payload})Running a Service
Expose the service as a CLI entry‑point via cli.run(...) and register it as a script.
# myapp/main.py
from kontiki.runner import cli
from myapp.service import MyService
def run():
cli.run(MyService, "Example Kontiki service.", version="1.0.0")Add the entry‑point to pyproject.toml (Poetry example):
[tool.poetry.scripts]
my_service = "myapp.main:run"Run the service with one (or more) configuration files:
my_service --config config.yamlEvents (Fire‑and‑Forget)
Consumers subscribe with @on_event("event_type"); publishers emit events via Messenger.publish(...).
# Server‑side (service)
class UserEventsService:
@on_event("user.created")
async def on_user_created(self, payload: dict):
...# Client‑side (stand‑alone script)
async def publish_event_from_script():
async with Messenger(standalone=True) as messenger:
await messenger.publish("user.created", {"id": "u_123"})# Client‑side (from another service)
class BillingService:
messenger = Messenger()
@rpc
async def create_invoice(self, user_id: str):
await self.messenger.publish("user.created", {"id": user_id})
return "ok"RPC (Request/Reply)
Kontiki RPC is synchronous request/reply over AMQP.
- Server side – expose handlers with
@rpc(can returnrpc_error(code, message)). - Client side – use
RpcProxyto call remote methods.
# Server‑side (service)
class RpcService:
@rpc
async def rpc_example(self, feature: str):
if feature == "bad_input":
return rpc_error("USER_INPUT_ERROR", "Invalid feature value")
return "ok"# Client‑side (stand‑alone script)
async def call_rpc_from_script():
async with Messenger(standalone=True) as messenger:
rpc_service = RpcProxy(messenger, service_name="RpcService")
return await rpc_service.rpc_example("bad_input")# Client‑side (from another service)
class ApiGatewayService:
messenger = Messenger()
@rpc
async def compute(self, feature: str):
rpc_service = RpcProxy(self.messenger, service_name="RpcService")
return await rpc_service.rpc_example(feature)HTTP Endpoints
Kontiki can expose HTTP routes directly using @http(...).
Optionally validate request bodies with Pydantic models and generate OpenAPI/Swagger UI.
class SimpleHttpService:
http_error_handlers = {HttpExampleError: (400, "Example error occurred")}
@http("/health", "GET", version="v1", response_model=HelloResponse)
async def health(self, request):
return HelloResponse(message="ok").model_dump()Periodic Tasks
Use @task(interval=..., immediate=...) to run periodic async work inside the service loop.
class TaskService:
@task(interval=10, immediate=True)
async def task_example(self):
...Configuration
Services can be started with multiple YAML files via repeatable --config (e.g., a shared base.yaml plus an environment‑specific env.yaml).
# base.yaml
kontiki:
amqp:
url: amqp://guest:guest@localhost/
app:
cache:
ttl_seconds: 60# env.yaml
app:
cache:
enabled: truemy_service --config base.yaml --config env.yamlConfiguration values are typically loaded in setup() using dot‑based paths:
from kontiki.configuration import get_parameter
from kontiki.delegate import ServiceDelegate
class MyDelegate(ServiceDelegate):
async def setup(self):
self.cache_ttl_seconds = get_parameter(
self.container.config,
"app.cache.ttl_seconds",
60,
)Health & Degraded‑Mode Checks
When the Kontiki registry service is running, services can register and expose operational state:
- Heartbeats at a configurable interval
- Degraded mode checks (custom logic)
- Optional event/exception tracking for visibility
from pathlib import Path
from kontiki.decorators import degraded_on
class MyService:
@degraded_on
def is_degraded(self) -> bool:
required_dir = Path("/var/lib/my-service/uploads")
return not required_dir.exists()For an operator‑friendly view, the sibling repository kontiki-tui provides a terminal UI to inspect and operate the Kontiki ecosystem.
TL;DR
Kontiki is a good fit for asyncio‑native, message‑driven micro‑services on RabbitMQ, offering first‑class primitives for events and RPC while abstracting away the low‑level AMQP wiring, lifecycle management, and configuration handling.
Rational plumbing for every service
This article provides a quick overview. Kontiki also ships advanced options around the core features:
- Sessions
- Broadcast delivery
- Message headers
- Error/retry behavior
- And more…
The repository includes a solid set of runnable examples. You can start RabbitMQ locally and run most of them via the Makefile targets, for example:
make run-amqp
make run-(Replace “ with the desired run-* command.)