Quantified Self: 10개 이상의 웨어러블을 위한 프로덕션급 ETL 파이프라인 구축
Source: Dev.to
위에 제공된 링크의 본문을 번역하려면 실제 텍스트가 필요합니다. 번역하고 싶은 전체 내용(마크다운 형식 포함)을 알려주시면 한국어로 번역해 드리겠습니다.
Quantified Self 시대에 우리는 데이터에 빠져 있지만 인사이트는 갈증을 느낍니다
Oura Ring의 수면 점수, Garmin의 회복 지표, Apple Health의 걸음 수 등 개인 건강 데이터가 수십 개의 독점적인 사일로에 흩어져 있습니다. “깊은 수면이 다양한 앱에서의 운동 강도와 어떻게 연관되는가?” 같은 간단한 질문에 답하려고 시도해 본 적이 있다면, 파편화된 API와 일관되지 않은 스키마 때문에 겪는 어려움을 잘 아실 겁니다.
견고한 ETL 파이프라인과 중앙 집중식 개인 건강 데이터 레이크를 구축하는 것이 메트릭에 대한 소유권을 되찾는 유일한 방법입니다. 이 가이드에서는 레이트 리밋, 스키마 드리프트, 그리고 악명 높은 “Timezone Hell”을 산업 표준 도구로 처리하도록 설계된 전문 수준 데이터 엔지니어링 아키텍처를 단계별로 살펴보겠습니다.
보다 고급 데이터 엔지니어링 패턴 및 프로덕션 준비 인프라 템플릿을 확인하려면 WellAlly Blog에서 심층 기사들을 참고하세요.
아키텍처: API에서 인사이트까지
10개 이상의 다양한 웨어러블 API를 처리하려면 분리된 아키텍처가 필요합니다. 실패한 Fitbit API가 전체 인제션 엔진을 중단시키는 것을 원하지 않습니다.
graph TD
subgraph "Data Sources (SaaS APIs)"
A[Oura API] --> E
B[Garmin Connect] --> E
C[Apple Health / HealthKit] --> E
D[Whoop / Strava] --> E
end
subgraph "Orchestration & Storage"
E[Apache Airflow] -- "Extract & Load" --> F[(PostgreSQL – Bronze Layer)]
G[Terraform] -- "Provisions" --> E
G -- "Provisions" --> F
end
subgraph "Transformation & Modeling"
F -- "Refine" --> H[dbt – Silver/Gold Layer]
H -- "Unified Schema" --> I[(Final Health Data Lake)]
end
subgraph "Visualization"
I --> J[Grafana / Metabase]
end
기술 스택
| 레이어 | 도구 | 이유 |
|---|---|---|
| 코드형 인프라스트럭처 | Terraform | RDS (PostgreSQL)와 컴퓨트 인스턴스를 관리합니다 |
| 오케스트레이션 | Apache Airflow | 재시도, 백필, API 속도 제한을 처리합니다 |
| 변환 | dbt (data build tool) | 복잡한 JSON 블롭을 깔끔한 관계형 테이블로 변환합니다 |
| 데이터베이스 | PostgreSQL | 개인용 “레이크하우스” |
Step 1 – Terraform으로 기반 구축
우리는 수동 설정을 하지 않습니다. 환경은 재현 가능해야 합니다. 아래는 PostgreSQL 인스턴스를 띄우는 코드 스니펫입니다.
# main.tf
resource "aws_db_instance" "health_db" {
allocated_storage = 20
engine = "postgres"
engine_version = "15.3"
instance_class = "db.t3.micro"
db_name = "quantified_self"
username = var.db_user
password = var.db_password
skip_final_snapshot = true
publicly_accessible = true # Restricted via Security Group
}
Step 2 – 인제션 로직 (Airflow)
웨어러블에서 가장 큰 도전 과제는 OAuth2와 rate limiting입니다. 우리는 Airflow의 HttpOperator 또는 사용자 정의 PythonOperator를 사용하여 데이터를 점진적으로 가져옵니다.
“Bronze” 레이어 처리 (원시 데이터)
원시 JSON 응답은 JSONB 컬럼에 직접 저장됩니다. 이렇게 하면 API가 새로운 필드를 추가해도 파이프라인이 중단되지 않습니다.
# airflow/dags/wearable_ingestion.py
from airflow.decorators import dag, task
from datetime import datetime
import requests
@dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
def wearable_ingestion():
@task
def fetch_oura_data():
# Using Oura API as an example
headers = {"Authorization": f"Bearer {OURA_TOKEN}"}
response = requests.get(
"https://api.ouraring.com/v2/usercollection/sleep",
headers=headers,
)
response.raise_for_status()
return response.json()["data"]
@task
def load_to_postgres(raw_data):
# Insert into `raw_sleep_data` table (JSONB column)
# Use `ON CONFLICT DO UPDATE` to handle data updates
pass
load_to_postgres(fetch_oura_data())
wearable_ingestion_dag = wearable_ingestion()
Step 3 – dbt를 이용한 정규화 (마법 소스)
모든 웨어러블은 “수면”이 어떤 모습인지에 대한 자체적인 기준을 가지고 있습니다. 어떤 기기는 초 단위, 다른 기기는 분 단위로 반환하고, 타임존은 보통 엉망입니다. dbt를 사용하면 모듈식 SQL을 작성해 이를 정리할 수 있습니다.
통합 수면 모델 (models/gold/fct_sleep.sql)
{{ config(materialized='table') }}
WITH unified_sleep AS (
SELECT
'oura' AS source,
user_id,
(data->>'total_sleep_duration')::int / 3600.0 AS duration_hours,
(data->>'bedtime_start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_oura_sleep') }}
UNION ALL
SELECT
'whoop' AS source,
user_id,
(data->>'score'->>'stage_summary'->>'total_in_bed_milli')::bigint / 3600000.0 AS duration_hours,
(data->>'start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_whoop_sleep') }}
)
SELECT
*,
date_trunc('day', bedtime_start) AS report_date
FROM unified_sleep;
프로 팁: 타임존 충돌 해결
헬스 데이터는 “사용자 현지 시간”에 민감합니다. 뉴욕에서 런던으로 비행하면 수면 기록이 시간이 겹치거나 건너뛰는 현상이 발생할 수 있습니다.
- 항상 원시 타임스탬프를 UTC로 저장하세요.
- 가능할 경우 API에서 오프셋을 캡처하세요.
- dbt의
dim_dates테이블을 사용해 UTC 타임스탬프를 사용자의 “생체 시계” 날짜로 매핑하세요.
복잡한 시계열 데이터 변환을 깊이 있게 다루고 싶다면, WellAlly Tech 팀이 *“Production Data Modeling for Time‑Series Events”*에 관한 포괄적인 가이드를 공개했습니다.
결론 – 데이터 주권
자신만의 ETL 파이프라인을 구축하면 단순히 대시보드를 만드는 것이 아니라, 직접 제어할 수 있는 자산을 만드는 것입니다. 이제 다음을 할 수 있습니다:
- API 장애를 걱정하지 않고 기기 간 메트릭을 조회합니다.
- 최소한의 코드 변경으로 새로운 웨어러블이나 헬스‑테크 서비스에 모델을 확장합니다.
- 개인 건강 데이터를 프라이버시를 유지하면서 재현 가능하고, 고급 분석(예: 예측 수면‑품질 모델, 운동‑회복 상관관계)에 바로 활용할 수 있습니다.
즐거운 개발 되시고, 여러분의 데이터가 언제나 깨끗하고, 시기적절하며, 여러분 것임을 기원합니다!
**Predict Burnout**: Correlate heart rate variability (HRV) with calendar meetings.
- **Cross‑Pollinate**: See how your Strava runs affect your Oura readiness.
- **Future Proof**: If you switch from Garmin to Apple Watch, your historical data remains intact.
**What’s next?**
- [ ] Set up an **Alerting** system in Airflow if an API token expires.
- [ ] Connect **Metabase** to your Gold layer for beautiful mobile‑friendly charts.
- [ ] Explore for more insights on scaling your data infrastructure!
**Are you tracking your life with code? Drop your favorite API in the comments!** 👇