Repository Pattern in a Data Analytics / ML Pipeline
Source: Dev.to
1. The Core Problem in Most ML Projects
Let’s start with something very real.
df = pd.read_sql("""
SELECT customer_id,
SUM(amount) AS total_amount
FROM transactions
WHERE transaction_date >= '2025-01-01'
GROUP BY customer_id
""", conn)
At first, this feels efficient.
But over time:
- The SQL grows
- Business logic sneaks into queries
- Feature logic is duplicated
- Nobody knows which query feeds which model
- Testing becomes painful
Eventually, the ML pipeline becomes a tightly‑coupled mess.
👉 The Repository Pattern exists to stop this from happening.
2. The Big Idea (In Plain English)
“My ML pipeline should not know where the data comes from — only what data it needs.”
So instead of saying:
“Give me data from MySQL using this SQL query”
Your pipeline says:
“Give me all transactions between these dates”
That’s it.
3. Mental Model: Repositories as Data Translators
Think of a repository as a translator between:
| 🧠 Business / ML logic | 🗄️ Physical data storage (MySQL) |
|---|---|
| ML Pipeline speaks: “transactions” | Database speaks: “tables, joins, SQL” |
| Repository speaks both |
The repository hides:
- SQL
- Connection handling
- Database quirks
- Performance tuning
4. ML Pipeline with Repository Pattern (Conceptual View)
MySQL Database
|
| (SQL, connections, credentials)
v
Repository Layer
|
| (clean Python objects)
v
Feature Engineering
|
v
Model Training
Important rule:
The pipeline should only depend on the repository’s contract, not on the underlying storage.
5. Step 1: Define What the Pipeline Cares About (Domain Model)
In analytics, we don’t need heavy ORM entities – we just need meaningful data structures.
from dataclasses import dataclass
from datetime import date
@dataclass
class Transaction:
customer_id: int
amount: float
transaction_date: date
transaction_type: str
Why this matters
- No SQL
- No MySQL
- No Pandas
- Pure Python
This is domain language: something analysts and ML engineers understand.
6. Step 2: Define the Repository Contract (The Promise)
Now we ask: What data does the ML pipeline need?
Not how to get it, just what.
from abc import ABC, abstractmethod
from typing import List
from datetime import date
class TransactionRepository(ABC):
@abstractmethod
def get_transactions(
self,
start_date: date,
end_date: date
) -> List[Transaction]:
pass
Key idea
This is a promise: any data source, any database, any storage engine – as long as it fulfills this contract.
7. Why This Is Powerful
At this point:
- The ML pipeline depends on an interface
- It does NOT depend on MySQL
- It does NOT depend on PyMySQL
This gives you:
- Testability
- Flexibility
- Clean design
8. Step 3: Implement the Repository Using PyMySQL
Now — and only now — we touch MySQL.
Connection helper
import pymysql
def get_connection():
return pymysql.connect(
host="localhost",
user="analytics_user",
password="analytics_pwd",
database="analytics_db",
cursorclass=pymysql.cursors.DictCursor
)
MySQL‑backed Repository
class MySQLTransactionRepository(TransactionRepository):
def get_transactions(self, start_date, end_date):
query = """
SELECT customer_id,
amount,
transaction_date,
transaction_type
FROM transactions
WHERE transaction_date BETWEEN %s AND %s
"""
conn = get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(query, (start_date, end_date))
rows = cursor.fetchall()
return [
Transaction(
customer_id=row["customer_id"],
amount=float(row["amount"]),
transaction_date=row["transaction_date"],
transaction_type=row["transaction_type"]
)
for row in rows
]
finally:
conn.close()
What just happened?
- SQL is isolated
- Connection lifecycle is controlled
- Raw rows are converted into domain objects
Everything else in the system stays clean.
9. Step 4: Feature Engineering (Pure Data Logic)
This layer doesn’t know:
- Where data came from
- How it was queried
- Whether it was MySQL, CSV, or API
import pandas as pd
class TransactionFeatureEngineer:
def build_customer_features(self, transactions):
df = pd.DataFrame([t.__dict__ for t in transactions])
features = (
df.groupby("customer_id")
.agg(
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
txn_count=("amount", "count")
)
.reset_index()
)
return features
Why this is clean
- Deterministic
- Easy to unit test
- No side effects
- Reusable across models
10. Step 5: Model Training Layer
Again — no database awareness.
from sklearn.ensemble import RandomForestClassifier
class ModelTrainingService:
def train(self, X, y):
model = RandomForestClassifier(
n_estimators=100,
random_state=42
)
model.fit(X, y)
return model
The model only cares about features, not data sources.
11. Step 6: Orchestrating the Pipeline
This is the only place everything comes together.
from datetime import date
repo = MySQLTransactionRepository()
feature_engineer = TransactionFeatureEngineer()
trainer = ModelTrainingService()
# Fetch
transactions = repo.get_transactions(
date(2025, 1, 1),
date(2025, 12, 31)
)
# Features
features_df = feature_engineer.build_customer_features(transactions)
# Example target
features_df["target"] = (
features_df["total_amount"] > 100000
).astype(int)
X = features_df[["total_amount", "avg_amount", "txn_count"]]
y = features_df["target"]
# Train
model = traine
Note: The final line (
model = traine) is intentionally left as‑is to preserve the original content.
r.train(X, y)
This code reads like a story, not plumbing.
12. The Real Superpower: Testing Without MySQL
Now comes the magic: an in‑memory repository.
class InMemoryTransactionRepository(TransactionRepository):
def __init__(self, transactions):
self.transactions = transactions
def get_transactions(self, start_date, end_date):
return [
t for t in self.transactions
if start_date
]
Repositories answer “WHAT data?”
Keep those questions separate — and your pipelines stay sane.