Market Data 통합기: Python, MongoDB 및 Kafka로 실제 데이터 소비

발행: (2026년 3월 15일 오전 05:08 GMT+9)
5 분 소요
원문: Dev.to

Source: Dev.to

안녕하세요, 여러분!

My Broker B3 시리즈를 이어서, 오늘은 브라질 금융 시장의 실제 데이터를 전체 생태계에 공급하는 구성 요소인 Broker Market Data API에 대해 이야기하겠습니다.

이 파이썬 마이크로서비스는 인게스터 역할을 하며 외부 세계(API Brappi)와 우리 내부 인프라를 연결합니다.

🏗️ 솔루션 및 데이터 흐름

여기서 목표는 자산 가격이 항상 최신 상태로 다른 서비스에 제공되도록 하는 것입니다. 흐름은 세 가지 주요 단계로 설계되었습니다:

  • 스케줄된 인게스트: 서비스는 50개의 자산(예: PETR4, VALE3 같은 블루칩, FIIs 및 ETFs 포함)으로 구성된 Watchlist를 순회합니다.
  • 히스토리 영속화 (MongoDB): 어떤 처리도 하기 전에 전체 페이로드를 MongoDB에 저장합니다. 이는 감사와 향후 분석을 위한 데이터를 보장합니다.
  • 이벤트 스트리밍 (Kafka): 최신 가격이 trading-assets-market-data-v1 토픽에 게시되어 다른 마이크로서비스가 실시간으로 반응할 수 있게 합니다.

🛠️ 구현 상세

요청 HTTP와 데이터 드라이버 통합에 빠른 파이썬 3.12를 선택했습니다.

Kafka에서 키(Message Key)의 중요성

이 서비스에서 중요한 기술적 결정은 자산 티커를 Kafka 메시지의 키(key) 로 사용하는 것이었습니다.

왜 중요한가요?
Kafka는 파티션 내부에서만 메시지 순서를 보장합니다. 티커(예: PETR4)를 키로 지정하면 Kafka는 해당 자산의 모든 메시지가 같은 파티션에 들어가도록 하여, 소비자가 정확한 발생 순서대로 이벤트를 읽을 수 있게 하고, 오래된 가격이 최신 가격 뒤에 처리되는 상황을 방지합니다.

코드 하이라이트

  • Rate Limiting: Brapi 무료 플랜을 사용하므로, API 호출 사이에 time.sleep(0.5)를 삽입해 제한을 준수하고 throttling을 방지합니다.
  • Data Mapping: 페이로드를 표준화된 형식으로 변환한 뒤 Kafka에 전송하여, 소비 서비스가 티커, 가격, 거래량, 타임스탬프만 받도록 합니다.
# Trecho do mapeamento de dados no main.py
payload = {
    "ticker": result.get("symbol"),
    "price": result.get("regularMarketPrice"),
    "volume": result.get("regularMarketVolume"),
    "updated_at": result.get("regularMarketTime")
}

✅ 실행 검증

통합이 정상 작동하는지 확인하기 위해 두 가지 출력 지점을 검증했습니다:

MongoDB

market_data_db 데이터베이스의 price_history 컬렉션을 확인했으며, 문서는 저장소에서 생성한 created_at 필드와 함께 삽입됩니다.

MongoDB validation

Kafka

관리 인터페이스를 통해 토픽에 키(티커)와 값이 올바르게 들어오는 것을 확인했습니다.

Kafka validation

🚀 결론

이 서비스를 운영함으로써 우리 시뮬레이터는 이제 실시간으로 시장을 “볼” 수 있게 되었습니다. 다음 과제는 Java API 내부에서 이 이벤트를 소비하여 사용자 포트폴리오를 업데이트하는 것입니다.

인게스트 전략이나 파이썬과 Kafka 사용에 대해 궁금한 점이 있나요? 댓글로 알려 주세요!

이 시리즈를 계속 보려면 여기를 클릭하세요.

링크

0 조회
Back to Blog

관련 글

더 보기 »

트라비고

Gemini와 함께 말하는 속도만큼 빠르게 여행하세요! 라이브 에이전트가 몰입형 스토리텔링 및 3D 내비게이션과 만나는 곳. 이 프로젝트는 Gemini Live Ag...에 진입하기 위해 만들어졌습니다.