Quantified Self: Building a Production-Grade ETL Pipeline for 10+ Wearables
Source: Dev.to
In the era of the Quantified Self, we are drowning in data but starving for insights
Between your Oura Ring’s sleep scores, Garmin’s recovery metrics, and Apple Health’s step counts, personal health data is scattered across a dozen proprietary silos. If you’ve ever tried to answer a simple question like “How does my deep sleep correlate with my workout intensity across different apps?”, you know the struggle of fragmented APIs and inconsistent schemas.
Building a robust ETL pipeline and a centralized personal health data lake is the only way to reclaim ownership of your metrics. In this guide we’ll walk through a professional‑grade data‑engineering architecture designed to handle rate limits, schema drift, and the dreaded “Timezone Hell” using industry‑standard tools.
For more advanced data‑engineering patterns and production‑ready infrastructure templates, check out the deep‑dive articles at WellAlly Blog.
The Architecture: From API to Insight
Handling 10+ different wearable APIs requires a decoupled architecture. We don’t want a failing Fitbit API to crash our entire ingestion engine.
graph TD
subgraph "Data Sources (SaaS APIs)"
A[Oura API] --> E
B[Garmin Connect] --> E
C[Apple Health / HealthKit] --> E
D[Whoop / Strava] --> E
end
subgraph "Orchestration & Storage"
E[Apache Airflow] -- "Extract & Load" --> F[(PostgreSQL – Bronze Layer)]
G[Terraform] -- "Provisions" --> E
G -- "Provisions" --> F
end
subgraph "Transformation & Modeling"
F -- "Refine" --> H[dbt – Silver/Gold Layer]
H -- "Unified Schema" --> I[(Final Health Data Lake)]
end
subgraph "Visualization"
I --> J[Grafana / Metabase]
end
The Tech Stack
| Layer | Tool | Why |
|---|---|---|
| Infrastructure as Code | Terraform | Manages RDS (PostgreSQL) and compute instances |
| Orchestration | Apache Airflow | Handles retries, backfills, and API rate‑limiting |
| Transformation | dbt (data build tool) | Turns messy JSON blobs into clean, relational tables |
| Database | PostgreSQL | The “lakehouse” for personal needs |
Step 1 – Provisioning the Foundation with Terraform
We don’t do manual setups. Our environment must be reproducible. Below is a snippet to spin up a PostgreSQL instance.
# main.tf
resource "aws_db_instance" "health_db" {
allocated_storage = 20
engine = "postgres"
engine_version = "15.3"
instance_class = "db.t3.micro"
db_name = "quantified_self"
username = var.db_user
password = var.db_password
skip_final_snapshot = true
publicly_accessible = true # Restricted via Security Group
}
Step 2 – The Ingestion Logic (Airflow)
The biggest challenge with wearables is OAuth2 and rate limiting. We use Airflow’s HttpOperator or a custom PythonOperator to fetch data incrementally.
Handling the “Bronze” Layer (Raw Data)
Raw JSON responses are stored directly in a JSONB column. This prevents the pipeline from breaking if an API adds new fields.
# airflow/dags/wearable_ingestion.py
from airflow.decorators import dag, task
from datetime import datetime
import requests
@dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
def wearable_ingestion():
@task
def fetch_oura_data():
# Using Oura API as an example
headers = {"Authorization": f"Bearer {OURA_TOKEN}"}
response = requests.get(
"https://api.ouraring.com/v2/usercollection/sleep",
headers=headers,
)
response.raise_for_status()
return response.json()["data"]
@task
def load_to_postgres(raw_data):
# Insert into `raw_sleep_data` table (JSONB column)
# Use `ON CONFLICT DO UPDATE` to handle data updates
pass
load_to_postgres(fetch_oura_data())
wearable_ingestion_dag = wearable_ingestion()
Step 3 – Normalization with dbt (The Magic Sauce)
Every wearable has its own idea of what “sleep” looks like. One returns seconds, another minutes, and timezones are usually a mess. dbt lets us write modular SQL to clean this up.
Unified Sleep Model (models/gold/fct_sleep.sql)
{{ config(materialized='table') }}
WITH unified_sleep AS (
SELECT
'oura' AS source,
user_id,
(data->>'total_sleep_duration')::int / 3600.0 AS duration_hours,
(data->>'bedtime_start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_oura_sleep') }}
UNION ALL
SELECT
'whoop' AS source,
user_id,
(data->>'score'->>'stage_summary'->>'total_in_bed_milli')::bigint / 3600000.0 AS duration_hours,
(data->>'start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_whoop_sleep') }}
)
SELECT
*,
date_trunc('day', bedtime_start) AS report_date
FROM unified_sleep;
Pro‑Tip: Solving the Timezone Conflict
Health data is sensitive to “user local time.” If you fly from NYC to London, your sleep record might double‑count or skip hours.
- Always store raw timestamps in UTC.
- Capture the offset from the API when it’s available.
- Use a
dim_datestable in dbt to map UTC timestamps back to the user’s “body‑clock” day.
For a deeper dive into handling complex temporal data transformations, the team at WellAlly Tech has published an extensive guide on “Production Data Modeling for Time‑Series Events.”
Conclusion – Data Sovereignty
By building your own ETL pipeline you aren’t just making a dashboard; you’re building an asset you control. You can now:
- Query cross‑device metrics without worrying about API outages.
- Extend the model to new wearables or health‑tech services with minimal code changes.
- Keep your personal health data private, reproducible, and ready for advanced analytics (e.g., predictive sleep‑quality models, workout‑recovery correlations).
Happy building, and may your data always be clean, timely, and yours!
**Predict Burnout**: Correlate heart rate variability (HRV) with calendar meetings.
- **Cross‑Pollinate**: See how your Strava runs affect your Oura readiness.
- **Future Proof**: If you switch from Garmin to Apple Watch, your historical data remains intact.
**What’s next?**
- [ ] Set up an **Alerting** system in Airflow if an API token expires.
- [ ] Connect **Metabase** to your Gold layer for beautiful mobile‑friendly charts.
- [ ] Explore for more insights on scaling your data infrastructure!
**Are you tracking your life with code? Drop your favorite API in the comments!** 👇