실시간 Kafka·Cassandra 파이프라인 구축
Source: Dev.to
소개
Apache Kafka와 Apache Cassandra는 서로의 강점을 보완하기 때문에 효과적으로 짝을 이룹니다. Kafka는 높은 처리량의 실시간 이벤트 스트리밍 및 수집을 담당하고, Cassandra는 처리된 데이터를 위한 확장 가능하고 내결함성 있으며 저지연 영구 저장소를 제공합니다.
예시: 영화 스트리밍 회사는 플랫폼에서 하루에 수십억 개의 이벤트(사용자 시청 행동, 재생 메트릭, 콘텐츠 추천 등)를 스트리밍합니다. Kafka는 이러한 이벤트를 실시간으로 스트리밍하고 처리할 수 있게 해줍니다. 그런 고속 스트림은 Cassandra에 소비되어 저장되며, Cassandra는 시계열 데이터와 사용자 활동 로그를 저장하는 고도로 확장 가능하고 내결함성 데이터베이스 역할을 합니다. 이 조합을 통해 영화 회사는 막대한 쓰기 처리량, 추천 엔진의 저지연 읽기, 전 세계 트래픽에 대한 신뢰성 있는 처리를 달성하면서 높은 가용성을 유지합니다. 바로 Netflix가 사용하는 방식입니다.
Cassandra는 다중 노드에 걸쳐 대용량 데이터를 처리하도록 설계된 오픈소스 NoSQL 데이터베이스이며, 컬럼형 저장 구조를 사용합니다. 각 노드(클러스터 내에서 데이터를 저장하고 읽기·쓰기 요청을 처리하는 단일 서버 또는 머신)에서 읽기·쓰기 작업을 지원하고, 노드 간 데이터 복제를 통해 단일 장애 지점 없이 높은 가용성을 보장합니다.
아래 단계에서는 Cassandra를 다운로드하고 시작하는 방법을 보여줍니다.
Cassandra 폴더 만들기
mkdir cassandra
Cassandra 다운로드
wget https://dlcdn.apache.org/cassandra/5.0.8/apache-cassandra-5.0.8-bin.tar.gz
압축 해제
tar -xvf apache-cassandra-5.0.8-bin.tar.gz
환경 변수 설정
export CASSANDRA_HOME=~/cassandra/apache-cassandra-5.0.8-bin
export PATH=$PATH:$CASSANDRA_HOME/bin
Cassandra 시작
압축 해제 후 생성된 cassandra 폴더로 이동한 뒤 bin/cassandra 명령을 실행합니다.
cd apache-cassandra-5.0.8
bin/cassandra
CQL 셸 시작
참고: Cassandra Query Language(CQL)는 Apache Cassandra의 기본 쿼리 언어로, SQL과 유사한 문법을 제공하면서도 Cassandra의 분산 와이드 컬럼 데이터 모델에 맞게 설계되었습니다. 전통적인 SQL과 달리 CQL은 키스페이스(데이터베이스), 테이블(와이드 컬럼 구조) 위에서 동작하며 파티션 키와 클러스터링 컬럼을 통해 데이터 분산 및 디스크 정렬을 지원합니다. CQL을 사용하면 테이블 생성, 데이터 삽입,
SELECT,WHERE,ORDER BY등을 이용한 조회, 경량 트랜잭션 등을 수행할 수 있습니다.
키스페이스(데이터베이스) 생성 및 사용
CREATE KEYSPACE weather_data2 WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
테이블 생성 후 SELECT 문으로 확인
(테이블 생성 구문은 생략)
발생 가능한 오류
- 시작 시 Cassandra 프로세스가 강제 종료됨: 메모리 부족이 원인일 수 있습니다.
jvm-clients.options파일을 편집하고 다음 옵션을 추가해 메모리 사용량을 지정하십시오. 초기값은 전체 메모리의 절반으로 계산됩니다. - 루트 사용자로 Cassandra 실행 금지: 오류는 아니지만 권장되지 않는 설정입니다. 루트 권한으로 실행하면 문제 발생 가능성이 있습니다.
bin/cassandra 로 성공적으로 시작된 경우 다음과 같이 보입니다. 해당 터미널을 열어 두고, 다음 단계 진행을 위해 새 터미널을 엽니다.
pkill -f cassandra
Kafka-Cassandra 파이프라인
다음 예시에서는 OpenWeather API에서 실시간 날씨 데이터를 Kafka로 스트리밍하고, 이를 Cassandra 데이터베이스에 저장하는 과정을 보여줍니다.
1. Kafka Producer + OpenWeather API
from kafka import KafkaProducer
import requests
from dotenv import load_dotenv
import os
import json
load_dotenv()
API_KEY = os.getenv('API_KEY')
def pull_weather_data():
cities = ['New York', 'London', 'Johannesburg', 'Nairobi', 'Cairo', 'Doha', 'Tokyo', 'Sydney']
cities_weather_data = []
for city in cities:
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}"
response = requests.get(url)
weather_data = response.json()
cities_weather_data.append({
'City': weather_data['name'],
'Country': weather_data['sys']['country'],
'Temparature': weather_data['main']['temp'],
'Humidity': weather_data['main']['humidity'],
'Feels_Like': weather_data['main']['feels_like'],
'Last_update_time': weather_data['dt']
})
return cities_weather_data
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic = 'open_weather_api_cities_data'
while True:
weather_data = pull_weather_data()
producer.send(topic, weather_data)
print(f'From openweather: {weather_data}')
time.sleep(10)
위 파이썬 코드(Producer)는 OpenWeather API를 호출해 지정된 도시들의 날씨 데이터를 수집하고, 이를 Kafka 토픽에 전송합니다.
실시간 데이터 스트리밍 초보자 가이드 – Apache Kafka
(코드 실행 결과는 이미지 등으로 제공될 수 있음)
2. Kafka Consumer + Cassandra에 삽입
from kafka import KafkaConsumer
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from datetime import datetime
import json
cluster = Cluster(['localhost'])
session = cluster.connect()
session.set_keyspace('kafka_data')
print('Connected to cassandra')
insert_query = SimpleStatement(
'''
INSERT INTO cities_weather_data(
city,
country,
last_update_time,
temperature,
feels_like,
humidity
)
values (%s, %s, %s, %s, %s, %s)
'''
)
consumer = KafkaConsumer(
'open_weather_api_cities_data',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print('Connected to Kafka and Consumer started...')
for message in consumer:
weather_dict_data = message.value
print(weather_dict_data)
for data in weather_dict_data:
timestamp = datetime.fromtimestamp(data["Last_update_time"])
session.execute(
insert_query,
(
data['City'],
data['Country'],
timestamp,
data['Temparature'],
data['Feels_Like'],
data['Humidity']
)
)
print(f"Inserted: {data['City']}")
위 코드는 다음 순서로 동작합니다.
- 의존성 import –
KafkaConsumer로 Kafka와 연결해 토픽 메시지를 소비하고,Cluster로 Python을 Cassandra 클러스터에 연결합니다.SimpleStatement로 CQL 문을 준비·실행하고,datetime으로 타임스탬프를 변환하며,json으로 Kafka에서 전달된 JSON 메시지를 파이썬 딕셔너리로 역직렬화합니다. - Cassandra 연결 –
Cluster(['localhost'])로 로컬 Cassandra 인스턴스에 연결하고,cluster.connect()로 세션을 생성합니다.session.set_keyspace('kafka_data')로 작업할 키스페이스를 지정합니다. - INSERT 쿼리 준비 –
SimpleStatement()로 INSERT 문을 미리 준비해 변수에 저장합니다. - Kafka 연결 –
KafkaConsumer()로 지정된 토픽(open_weather_api_cities_data)을 구독하고,bootstrap_servers='localhost:9092'로 로컬 Kafka 브로커에 연결합니다.auto_offset_reset='earliest'로 토픽의 첫 번째 메시지부터 읽기 시작하도록 설정하고, 수신된 메시지를 JSON 딕셔너리로 변환합니다. - 무한 루프 – 토픽에 새로운 메시지가 도착할 때마다
for message in consumer가 실행됩니다. 각 메시지에서 날씨 데이터를 추출하고,datetime.fromtimestamp로 UNIX 타임스탬프를 파이썬datetime객체로 변환한 뒤, 준비된 INSERT 쿼리를 사용해 Cassandra에 저장합니다.
이 과정을 통해 실시간으로 수집된 날씨 데이터가 Kafka를 통해 흐르고, Cassandra에 영구 저장됩니다.