데이터 분석 및 ML 파이프라인에서의 Repository Pattern

발행: (2026년 2월 3일 오전 02:17 GMT+9)
7 분 소요
원문: Dev.to

Source: Dev.to

위에 제공된 소스 링크 외에 번역하고자 하는 본문 내용이 보이지 않습니다. 번역이 필요한 텍스트를 알려주시면 한국어로 번역해 드리겠습니다.

1. 대부분의 ML 프로젝트에서 핵심 문제

아주 현실적인 예부터 시작해봅시다.

df = pd.read_sql("""
    SELECT customer_id,
           SUM(amount) AS total_amount
    FROM transactions
    WHERE transaction_date >= '2025-01-01'
    GROUP BY customer_id
""", conn)

처음에는 효율적으로 보입니다.
하지만 시간이 지나면서:

  • SQL이 복잡해진다
  • 비즈니스 로직이 쿼리에 섞인다
  • 피처 로직이 중복된다
  • 어떤 쿼리가 어떤 모델에 사용되는지 모른다
  • 테스트가 어려워진다

결국 ML 파이프라인이 긴밀하게 결합된 혼란스러운 상태가 된다.

👉 Repository 패턴은 이러한 상황을 방지하기 위해 존재합니다.

2. 핵심 아이디어 (쉽게 말하면)

“내 ML 파이프라인은 데이터가 어디서 오는지 알 필요가 없고 — 어떤 데이터가 필요한지만 알아야 합니다.”

So instead of saying:

“이 SQL 쿼리를 사용해서 MySQL에서 데이터를 가져와”

Your pipeline says:

“이 날짜들 사이의 모든 거래를 가져와”

That’s it.

3. 정신 모델: 데이터 변환기로서의 리포지토리

리포지토리를 다음 사이의 번역가로 생각하세요:

🧠 비즈니스 / ML 로직🗄️ 물리적 데이터 저장소 (MySQL)
ML 파이프라인이 말함: “transactions”데이터베이스가 말함: “tables, joins, SQL”
리포지토리는 양쪽 모두 말함

리포지토리는 다음을 숨깁니다:

  • SQL
  • 연결 관리
  • 데이터베이스 특이점
  • 성능 튜닝

4. 레포지토리 패턴을 활용한 ML 파이프라인 (개념적 뷰)

MySQL Database
     |
     |  (SQL, connections, credentials)
     v
Repository Layer
     |
     |  (clean Python objects)
     v
Feature Engineering
     |
     v
Model Training

Important rule:

파이프라인은 레포지토리의 계약에만 의존해야 하며, 하위 저장소에 의존해서는 안 됩니다.

5. Step 1: Define What the Pipeline Cares About (Domain Model)

분석에서는 무거운 ORM 엔터티가 필요하지 않습니다 – 의미 있는 데이터 구조만 있으면 됩니다.

from dataclasses import dataclass
from datetime import date

@dataclass
class Transaction:
    customer_id: int
    amount: float
    transaction_date: date
    transaction_type: str

왜 중요한가

  • SQL 없음
  • MySQL 없음
  • Pandas 없음
  • 순수 Python

이것은 도메인 언어입니다: 분석가와 ML 엔지니어가 이해할 수 있는 내용입니다.

6. Step 2: Define the Repository Contract (The Promise)

Now we ask: What data does the ML pipeline need?
Not how to get it, just what.

from abc import ABC, abstractmethod
from typing import List
from datetime import date

class TransactionRepository(ABC):
    @abstractmethod
    def get_transactions(
        self,
        start_date: date,
        end_date: date
    ) -> List[Transaction]:
        pass

Key idea

This is a promise: any data source, any database, any storage engine – as long as it fulfills this contract.

7. 왜 이것이 강력한가

현재:

  • ML 파이프라인은 인터페이스에 의존합니다
  • MySQL에 의존하지 않습니다
  • PyMySQL에 의존하지 않습니다

이를 통해 얻는 것:

  • 테스트 가능성
  • 유연성
  • 깔끔한 설계

8. 단계 3: PyMySQL을 사용해 리포지토리 구현

이제 — 바로 지금 — MySQL을 다룹니다.

연결 도우미

import pymysql

def get_connection():
    return pymysql.connect(
        host="localhost",
        user="analytics_user",
        password="analytics_pwd",
        database="analytics_db",
        cursorclass=pymysql.cursors.DictCursor
    )

MySQL 기반 리포지토리

class MySQLTransactionRepository(TransactionRepository):
    def get_transactions(self, start_date, end_date):
        query = """
            SELECT customer_id,
                   amount,
                   transaction_date,
                   transaction_type
            FROM transactions
            WHERE transaction_date BETWEEN %s AND %s
        """

        conn = get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(query, (start_date, end_date))
                rows = cursor.fetchall()

            return [
                Transaction(
                    customer_id=row["customer_id"],
                    amount=float(row["amount"]),
                    transaction_date=row["transaction_date"],
                    transaction_type=row["transaction_type"]
                )
                for row in rows
            ]
        finally:
            conn.close()

방금 무슨 일이 있었나요?

  • SQL이 격리됩니다
  • 연결 수명 주기가 제어됩니다
  • 원시 행이 도메인 객체로 변환됩니다

시스템의 다른 모든 부분은 깨끗하게 유지됩니다.

9. Step 4: 특성 엔지니어링 (순수 데이터 로직)

This layer doesn’t know:

  • 데이터가 어디서 왔는지
  • 어떻게 쿼리되었는지
  • MySQL, CSV, 혹은 API인지 여부
import pandas as pd

class TransactionFeatureEngineer:
    def build_customer_features(self, transactions):
        df = pd.DataFrame([t.__dict__ for t in transactions])

        features = (
            df.groupby("customer_id")
              .agg(
                  total_amount=("amount", "sum"),
                  avg_amount=("amount", "mean"),
                  txn_count=("amount", "count")
              )
              .reset_index()
        )
        return features

Why this is clean

  • 결정론적
  • 단위 테스트가 쉬움
  • 부작용 없음
  • 모델 전반에 재사용 가능

10. 단계 5: 모델 훈련 레이어

Again — no database awareness.
다시 말하지만 — 데이터베이스 인식이 없습니다.

from sklearn.ensemble import RandomForestClassifier

class ModelTrainingService:
    def train(self, X, y):
        model = RandomForestClassifier(
            n_estimators=100,
            random_state=42
        )
        model.fit(X, y)
        return model

The model only cares about features, not data sources.
모델은 특징에만 신경 쓰며, 데이터 소스에는 신경 쓰지 않습니다.

11. 단계 6: 파이프라인 조정

이곳이 모든 것이 하나로 모이는 유일한 장소입니다.

from datetime import date

repo = MySQLTransactionRepository()
feature_engineer = TransactionFeatureEngineer()
trainer = ModelTrainingService()

# Fetch
transactions = repo.get_transactions(
    date(2025, 1, 1),
    date(2025, 12, 31)
)

# Features
features_df = feature_engineer.build_customer_features(transactions)

# Example target
features_df["target"] = (
    features_df["total_amount"] > 100000
).astype(int)

X = features_df[["total_amount", "avg_amount", "txn_count"]]
y = features_df["target"]

# Train
model = traine

Note: 마지막 줄(model = traine)은 원본 내용을 그대로 보존하기 위해 의도적으로 그대로 두었습니다.

r.train(X, y)

이 코드는 배관 작업이 아니라 이야기를 읽는 듯합니다.

12. 진정한 초능력: MySQL 없이 테스트하기

Now comes the magic: an in‑memory repository.

class InMemoryTransactionRepository(TransactionRepository):
    def __init__(self, transactions):
        self.transactions = transactions

    def get_transactions(self, start_date, end_date):
        return [
            t for t in self.transactions
            if start_date
        ]

Repositories answer “WHAT data?”

그 질문들을 별도로 유지하면 파이프라인이 정상적으로 동작합니다.

Back to Blog

관련 글

더 보기 »