데이터 분석 및 ML 파이프라인에서의 Repository Pattern
Source: Dev.to
위에 제공된 소스 링크 외에 번역하고자 하는 본문 내용이 보이지 않습니다. 번역이 필요한 텍스트를 알려주시면 한국어로 번역해 드리겠습니다.
1. 대부분의 ML 프로젝트에서 핵심 문제
아주 현실적인 예부터 시작해봅시다.
df = pd.read_sql("""
SELECT customer_id,
SUM(amount) AS total_amount
FROM transactions
WHERE transaction_date >= '2025-01-01'
GROUP BY customer_id
""", conn)
처음에는 효율적으로 보입니다.
하지만 시간이 지나면서:
- SQL이 복잡해진다
- 비즈니스 로직이 쿼리에 섞인다
- 피처 로직이 중복된다
- 어떤 쿼리가 어떤 모델에 사용되는지 모른다
- 테스트가 어려워진다
결국 ML 파이프라인이 긴밀하게 결합된 혼란스러운 상태가 된다.
👉 Repository 패턴은 이러한 상황을 방지하기 위해 존재합니다.
2. 핵심 아이디어 (쉽게 말하면)
“내 ML 파이프라인은 데이터가 어디서 오는지 알 필요가 없고 — 어떤 데이터가 필요한지만 알아야 합니다.”
So instead of saying:
“이 SQL 쿼리를 사용해서 MySQL에서 데이터를 가져와”
Your pipeline says:
“이 날짜들 사이의 모든 거래를 가져와”
That’s it.
3. 정신 모델: 데이터 변환기로서의 리포지토리
리포지토리를 다음 사이의 번역가로 생각하세요:
| 🧠 비즈니스 / ML 로직 | 🗄️ 물리적 데이터 저장소 (MySQL) |
|---|---|
| ML 파이프라인이 말함: “transactions” | 데이터베이스가 말함: “tables, joins, SQL” |
| 리포지토리는 양쪽 모두 말함 |
리포지토리는 다음을 숨깁니다:
- SQL
- 연결 관리
- 데이터베이스 특이점
- 성능 튜닝
4. 레포지토리 패턴을 활용한 ML 파이프라인 (개념적 뷰)
MySQL Database
|
| (SQL, connections, credentials)
v
Repository Layer
|
| (clean Python objects)
v
Feature Engineering
|
v
Model Training
Important rule:
파이프라인은 레포지토리의 계약에만 의존해야 하며, 하위 저장소에 의존해서는 안 됩니다.
5. Step 1: Define What the Pipeline Cares About (Domain Model)
분석에서는 무거운 ORM 엔터티가 필요하지 않습니다 – 의미 있는 데이터 구조만 있으면 됩니다.
from dataclasses import dataclass
from datetime import date
@dataclass
class Transaction:
customer_id: int
amount: float
transaction_date: date
transaction_type: str
왜 중요한가
- SQL 없음
- MySQL 없음
- Pandas 없음
- 순수 Python
이것은 도메인 언어입니다: 분석가와 ML 엔지니어가 이해할 수 있는 내용입니다.
6. Step 2: Define the Repository Contract (The Promise)
Now we ask: What data does the ML pipeline need?
Not how to get it, just what.
from abc import ABC, abstractmethod
from typing import List
from datetime import date
class TransactionRepository(ABC):
@abstractmethod
def get_transactions(
self,
start_date: date,
end_date: date
) -> List[Transaction]:
pass
Key idea
This is a promise: any data source, any database, any storage engine – as long as it fulfills this contract.
7. 왜 이것이 강력한가
현재:
- ML 파이프라인은 인터페이스에 의존합니다
- MySQL에 의존하지 않습니다
- PyMySQL에 의존하지 않습니다
이를 통해 얻는 것:
- 테스트 가능성
- 유연성
- 깔끔한 설계
8. 단계 3: PyMySQL을 사용해 리포지토리 구현
이제 — 바로 지금 — MySQL을 다룹니다.
연결 도우미
import pymysql
def get_connection():
return pymysql.connect(
host="localhost",
user="analytics_user",
password="analytics_pwd",
database="analytics_db",
cursorclass=pymysql.cursors.DictCursor
)
MySQL 기반 리포지토리
class MySQLTransactionRepository(TransactionRepository):
def get_transactions(self, start_date, end_date):
query = """
SELECT customer_id,
amount,
transaction_date,
transaction_type
FROM transactions
WHERE transaction_date BETWEEN %s AND %s
"""
conn = get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query, (start_date, end_date))
rows = cursor.fetchall()
return [
Transaction(
customer_id=row["customer_id"],
amount=float(row["amount"]),
transaction_date=row["transaction_date"],
transaction_type=row["transaction_type"]
)
for row in rows
]
finally:
conn.close()
방금 무슨 일이 있었나요?
- SQL이 격리됩니다
- 연결 수명 주기가 제어됩니다
- 원시 행이 도메인 객체로 변환됩니다
시스템의 다른 모든 부분은 깨끗하게 유지됩니다.
9. Step 4: 특성 엔지니어링 (순수 데이터 로직)
This layer doesn’t know:
- 데이터가 어디서 왔는지
- 어떻게 쿼리되었는지
- MySQL, CSV, 혹은 API인지 여부
import pandas as pd
class TransactionFeatureEngineer:
def build_customer_features(self, transactions):
df = pd.DataFrame([t.__dict__ for t in transactions])
features = (
df.groupby("customer_id")
.agg(
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
txn_count=("amount", "count")
)
.reset_index()
)
return features
Why this is clean
- 결정론적
- 단위 테스트가 쉬움
- 부작용 없음
- 모델 전반에 재사용 가능
10. 단계 5: 모델 훈련 레이어
Again — no database awareness.
다시 말하지만 — 데이터베이스 인식이 없습니다.
from sklearn.ensemble import RandomForestClassifier
class ModelTrainingService:
def train(self, X, y):
model = RandomForestClassifier(
n_estimators=100,
random_state=42
)
model.fit(X, y)
return model
The model only cares about features, not data sources.
모델은 특징에만 신경 쓰며, 데이터 소스에는 신경 쓰지 않습니다.
11. 단계 6: 파이프라인 조정
이곳이 모든 것이 하나로 모이는 유일한 장소입니다.
from datetime import date
repo = MySQLTransactionRepository()
feature_engineer = TransactionFeatureEngineer()
trainer = ModelTrainingService()
# Fetch
transactions = repo.get_transactions(
date(2025, 1, 1),
date(2025, 12, 31)
)
# Features
features_df = feature_engineer.build_customer_features(transactions)
# Example target
features_df["target"] = (
features_df["total_amount"] > 100000
).astype(int)
X = features_df[["total_amount", "avg_amount", "txn_count"]]
y = features_df["target"]
# Train
model = traine
Note: 마지막 줄(
model = traine)은 원본 내용을 그대로 보존하기 위해 의도적으로 그대로 두었습니다.
r.train(X, y)
이 코드는 배관 작업이 아니라 이야기를 읽는 듯합니다.
12. 진정한 초능력: MySQL 없이 테스트하기
Now comes the magic: an in‑memory repository.
class InMemoryTransactionRepository(TransactionRepository):
def __init__(self, transactions):
self.transactions = transactions
def get_transactions(self, start_date, end_date):
return [
t for t in self.transactions
if start_date
]
Repositories answer “WHAT data?”
그 질문들을 별도로 유지하면 파이프라인이 정상적으로 동작합니다.