Market Data 集成器:使用 Python、MongoDB 和 Kafka 消费真实数据
Source: Dev.to
大家好!
继续 My Broker B3 系列,今天我们来讨论为整个生态系统提供巴西金融市场真实数据的组件:Broker Market Data API。
这个用 Python 编写的微服务充当 ingestor(数据摄取器),将外部世界(API Brappi)连接到我们的内部基础设施。
🏗️ 解决方案与数据流
这里的目标是确保资产价格始终保持最新,以供其他服务使用。数据流设计为三个主要阶段:
- 定时摄取:服务遍历包含 50 只资产的 Watchlist(包括 PETR4、VALE3 等蓝筹股,以及 FIIs 和 ETFs)。
- 历史持久化(MongoDB):在任何处理之前,完整的 payload 会保存到 MongoDB。这保证了审计能力并为后续分析提供数据。
- 事件流(Kafka):更新后的价格会发布到主题
trading-assets-market-data-v1,使任何其他微服务能够实时响应。
🛠️ 实现细节
我选择 Python 3.12,因为它在处理 HTTP 请求和集成数据驱动方面非常灵活。
Kafka 中键(Message Key)的重要性
此服务的关键技术决策是使用 资产代码(ticker)作为发送到 Kafka 的消息键(key)。
为什么这很重要?
Kafka 只在同一分区内保证消息的顺序。将 ticker(例如 PETR4)设为键后,Kafka 确保该资产的所有消息始终落在 同一分区,从而保证任何消费者都能按事件发生的准确顺序读取,并避免旧价格在新价格之后被处理。
代码亮点
- 速率限制:由于使用 Brapi 的免费套餐,代码在调用之间实现了
time.sleep(0.5),以遵守 API 限制并避免 throttling。 - 数据映射:在发送到 Kafka 之前,payload 会被转换为标准化格式,确保消费者服务只收到必要的信息(ticker、price、volume 和 timestamp)。
# Trecho do mapeamento de dados no main.py
payload = {
"ticker": result.get("symbol"),
"price": result.get("regularMarketPrice"),
"volume": result.get("regularMarketVolume"),
"updated_at": result.get("regularMarketTime")
}✅ 验证执行
为确保集成正常工作,我验证了两个输出点:
MongoDB
我检查了 market_data_db 数据库中的 price_history 集合,文档会使用仓库生成的 created_at 字段插入。

Kafka
通过管理界面,我确认了消息已正确带有键(ticker)和值到达相应主题。

🚀 结论
该服务运行后,我们的模拟器现在能够实时“看到”市场。下一个挑战是从 Java API 中消费这些事件,以更新用户的投资组合。
对摄取策略或在 Python 中使用 Kafka 有任何疑问吗?欢迎在评论区留言!
想要跟进本系列,请点击这里。