Market Data 集成器:使用 Python、MongoDB 和 Kafka 消费真实数据

发布: (2026年3月15日 GMT+8 04:08)
4 分钟阅读
原文: Dev.to

Source: Dev.to

大家好!

继续 My Broker B3 系列,今天我们来讨论为整个生态系统提供巴西金融市场真实数据的组件:Broker Market Data API

这个用 Python 编写的微服务充当 ingestor(数据摄取器),将外部世界(API Brappi)连接到我们的内部基础设施。

🏗️ 解决方案与数据流

这里的目标是确保资产价格始终保持最新,以供其他服务使用。数据流设计为三个主要阶段:

  • 定时摄取:服务遍历包含 50 只资产的 Watchlist(包括 PETR4、VALE3 等蓝筹股,以及 FIIs 和 ETFs)。
  • 历史持久化(MongoDB):在任何处理之前,完整的 payload 会保存到 MongoDB。这保证了审计能力并为后续分析提供数据。
  • 事件流(Kafka):更新后的价格会发布到主题 trading-assets-market-data-v1,使任何其他微服务能够实时响应。

🛠️ 实现细节

我选择 Python 3.12,因为它在处理 HTTP 请求和集成数据驱动方面非常灵活。

Kafka 中键(Message Key)的重要性

此服务的关键技术决策是使用 资产代码(ticker)作为发送到 Kafka 的消息键(key)

为什么这很重要?
Kafka 只在同一分区内保证消息的顺序。将 ticker(例如 PETR4)设为键后,Kafka 确保该资产的所有消息始终落在 同一分区,从而保证任何消费者都能按事件发生的准确顺序读取,并避免旧价格在新价格之后被处理。

代码亮点

  • 速率限制:由于使用 Brapi 的免费套餐,代码在调用之间实现了 time.sleep(0.5),以遵守 API 限制并避免 throttling
  • 数据映射:在发送到 Kafka 之前,payload 会被转换为标准化格式,确保消费者服务只收到必要的信息(ticker、price、volume 和 timestamp)。
# Trecho do mapeamento de dados no main.py
payload = {
    "ticker": result.get("symbol"),
    "price": result.get("regularMarketPrice"),
    "volume": result.get("regularMarketVolume"),
    "updated_at": result.get("regularMarketTime")
}

✅ 验证执行

为确保集成正常工作,我验证了两个输出点:

MongoDB

我检查了 market_data_db 数据库中的 price_history 集合,文档会使用仓库生成的 created_at 字段插入。

MongoDB validation

Kafka

通过管理界面,我确认了消息已正确带有键(ticker)和值到达相应主题。

Kafka validation

🚀 结论

该服务运行后,我们的模拟器现在能够实时“看到”市场。下一个挑战是从 Java API 中消费这些事件,以更新用户的投资组合。

对摄取策略或在 Python 中使用 Kafka 有任何疑问吗?欢迎在评论区留言!

想要跟进本系列,请点击这里

0 浏览
Back to Blog

相关文章

阅读更多 »