Repository Pattern in a Data Analytics / ML Pipeline

Published: (February 2, 2026 at 12:17 PM EST)
4 min read
Source: Dev.to

Source: Dev.to

1. The Core Problem in Most ML Projects

Let’s start with something very real.

df = pd.read_sql("""
    SELECT customer_id,
           SUM(amount) AS total_amount
    FROM transactions
    WHERE transaction_date >= '2025-01-01'
    GROUP BY customer_id
""", conn)

At first, this feels efficient.
But over time:

  • The SQL grows
  • Business logic sneaks into queries
  • Feature logic is duplicated
  • Nobody knows which query feeds which model
  • Testing becomes painful

Eventually, the ML pipeline becomes a tightly‑coupled mess.

👉 The Repository Pattern exists to stop this from happening.

2. The Big Idea (In Plain English)

“My ML pipeline should not know where the data comes from — only what data it needs.”

So instead of saying:

“Give me data from MySQL using this SQL query”

Your pipeline says:

“Give me all transactions between these dates”

That’s it.

3. Mental Model: Repositories as Data Translators

Think of a repository as a translator between:

🧠 Business / ML logic🗄️ Physical data storage (MySQL)
ML Pipeline speaks: “transactions”Database speaks: “tables, joins, SQL”
Repository speaks both

The repository hides:

  • SQL
  • Connection handling
  • Database quirks
  • Performance tuning

4. ML Pipeline with Repository Pattern (Conceptual View)

MySQL Database
     |
     |  (SQL, connections, credentials)
     v
Repository Layer
     |
     |  (clean Python objects)
     v
Feature Engineering
     |
     v
Model Training

Important rule:

The pipeline should only depend on the repository’s contract, not on the underlying storage.

5. Step 1: Define What the Pipeline Cares About (Domain Model)

In analytics, we don’t need heavy ORM entities – we just need meaningful data structures.

from dataclasses import dataclass
from datetime import date

@dataclass
class Transaction:
    customer_id: int
    amount: float
    transaction_date: date
    transaction_type: str

Why this matters

  • No SQL
  • No MySQL
  • No Pandas
  • Pure Python

This is domain language: something analysts and ML engineers understand.

6. Step 2: Define the Repository Contract (The Promise)

Now we ask: What data does the ML pipeline need?
Not how to get it, just what.

from abc import ABC, abstractmethod
from typing import List
from datetime import date

class TransactionRepository(ABC):
    @abstractmethod
    def get_transactions(
        self,
        start_date: date,
        end_date: date
    ) -> List[Transaction]:
        pass

Key idea

This is a promise: any data source, any database, any storage engine – as long as it fulfills this contract.

7. Why This Is Powerful

At this point:

  • The ML pipeline depends on an interface
  • It does NOT depend on MySQL
  • It does NOT depend on PyMySQL

This gives you:

  • Testability
  • Flexibility
  • Clean design

8. Step 3: Implement the Repository Using PyMySQL

Now — and only now — we touch MySQL.

Connection helper

import pymysql

def get_connection():
    return pymysql.connect(
        host="localhost",
        user="analytics_user",
        password="analytics_pwd",
        database="analytics_db",
        cursorclass=pymysql.cursors.DictCursor
    )

MySQL‑backed Repository

class MySQLTransactionRepository(TransactionRepository):
    def get_transactions(self, start_date, end_date):
        query = """
            SELECT customer_id,
                   amount,
                   transaction_date,
                   transaction_type
            FROM transactions
            WHERE transaction_date BETWEEN %s AND %s
        """

        conn = get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(query, (start_date, end_date))
                rows = cursor.fetchall()

            return [
                Transaction(
                    customer_id=row["customer_id"],
                    amount=float(row["amount"]),
                    transaction_date=row["transaction_date"],
                    transaction_type=row["transaction_type"]
                )
                for row in rows
            ]
        finally:
            conn.close()

What just happened?

  • SQL is isolated
  • Connection lifecycle is controlled
  • Raw rows are converted into domain objects

Everything else in the system stays clean.

9. Step 4: Feature Engineering (Pure Data Logic)

This layer doesn’t know:

  • Where data came from
  • How it was queried
  • Whether it was MySQL, CSV, or API
import pandas as pd

class TransactionFeatureEngineer:
    def build_customer_features(self, transactions):
        df = pd.DataFrame([t.__dict__ for t in transactions])

        features = (
            df.groupby("customer_id")
              .agg(
                  total_amount=("amount", "sum"),
                  avg_amount=("amount", "mean"),
                  txn_count=("amount", "count")
              )
              .reset_index()
        )
        return features

Why this is clean

  • Deterministic
  • Easy to unit test
  • No side effects
  • Reusable across models

10. Step 5: Model Training Layer

Again — no database awareness.

from sklearn.ensemble import RandomForestClassifier

class ModelTrainingService:
    def train(self, X, y):
        model = RandomForestClassifier(
            n_estimators=100,
            random_state=42
        )
        model.fit(X, y)
        return model

The model only cares about features, not data sources.

11. Step 6: Orchestrating the Pipeline

This is the only place everything comes together.

from datetime import date

repo = MySQLTransactionRepository()
feature_engineer = TransactionFeatureEngineer()
trainer = ModelTrainingService()

# Fetch
transactions = repo.get_transactions(
    date(2025, 1, 1),
    date(2025, 12, 31)
)

# Features
features_df = feature_engineer.build_customer_features(transactions)

# Example target
features_df["target"] = (
    features_df["total_amount"] > 100000
).astype(int)

X = features_df[["total_amount", "avg_amount", "txn_count"]]
y = features_df["target"]

# Train
model = traine

Note: The final line (model = traine) is intentionally left as‑is to preserve the original content.

r.train(X, y)

This code reads like a story, not plumbing.

12. The Real Superpower: Testing Without MySQL

Now comes the magic: an in‑memory repository.

class InMemoryTransactionRepository(TransactionRepository):
    def __init__(self, transactions):
        self.transactions = transactions

    def get_transactions(self, start_date, end_date):
        return [
            t for t in self.transactions
            if start_date
        ]

Repositories answer “WHAT data?”

Keep those questions separate — and your pipelines stay sane.

Back to Blog

Related posts

Read more »