Integrador de Market Data: Consumindo Dados Reais com Python, MongoDB e Kafka
Source: Dev.to
Olá, pessoal!
Dando continuidade à série My Broker B3, hoje vamos falar sobre o componente que alimenta todo o ecossistema com dados reais do mercado financeiro brasileiro: o Broker Market Data API.
Este microserviço em Python atua como um ingestor, conectando o mundo externo (API Brappi) à nossa infraestrutura interna.
🏗️ A Solução e o Fluxo de Dados
O objetivo aqui é garantir que os preços dos ativos estejam sempre atualizados para os outros serviços. O fluxo foi desenhado em três etapas principais:
- Ingestão Agendada: O serviço percorre uma Watchlist de 50 ativos (incluindo Blue Chips como PETR4 e VALE3, além de FIIs e ETFs).
- Persistência Histórica (MongoDB): Antes de qualquer processamento, o payload completo é salvo no MongoDB. Isso garante auditoria e dados para análises futuras.
- Streaming de Eventos (Kafka): O preço atualizado é publicado no tópico
trading-assets-market-data-v1, permitindo que qualquer outro microserviço reaja em tempo real.
🛠️ Detalhes da Implementação
Escolhi Python 3.12 pela sua agilidade em lidar com requisições HTTP e integração com drivers de dados.
A importância da Chave no Kafka (Message Key)
Uma decisão técnica vital neste serviço foi o uso do ticker do ativo como chave (key) da mensagem enviada ao Kafka.
Por que isso é importante?
O Kafka garante a ordenação das mensagens apenas dentro de uma partição. Ao definir o ticker (ex.: PETR4) como chave, o Kafka assegura que todas as mensagens desse ativo caiam sempre na mesma partição, garantindo que qualquer consumidor leia os eventos na ordem exata em que ocorreram e evitando que um preço antigo seja processado após um preço mais recente.
Destaques do Código
- Rate Limiting: Como utilizo o plano gratuito da Brapi, o código implementa um
time.sleep(0.5)entre as chamadas para respeitar os limites da API e evitar throttling. - Data Mapping: O payload é transformado para um formato padronizado antes de ser enviado para o Kafka, garantindo que os serviços consumidores recebam apenas o necessário (ticker, price, volume e timestamp).
# Trecho do mapeamento de dados no main.py
payload = {
"ticker": result.get("symbol"),
"price": result.get("regularMarketPrice"),
"volume": result.get("regularMarketVolume"),
"updated_at": result.get("regularMarketTime")
}✅ Validando a Execução
Para garantir que a integração está funcionando, validei os dois pontos de saída:
MongoDB
Verifiquei a coleção price_history no banco market_data_db, onde os documentos são inseridos com o campo created_at gerado pelo repositório.

Kafka
Através da interface de gestão, confirmei a chegada das mensagens com as chaves (tickers) e valores corretos no tópico.

🚀 Conclusão
Com este serviço rodando, nosso simulador agora “enxerga” o mercado em tempo real. O próximo desafio é consumir esses eventos de dentro das APIs Java para atualizar as carteiras dos usuários.
Ficou com alguma dúvida sobre a estratégia de ingestão ou sobre o uso do Kafka com Python? Deixe nos comentários!
Para acompanhar essa série, clique aqui.