Quantified Self:构建面向10+可穿戴设备的生产级ETL管道
Source: Dev.to
请提供您希望翻译的完整文本内容,我将按照要求保留源链接、格式和代码块,仅翻译正文部分。
在 Quantified Self 时代,我们被数据淹没,却渴求洞察
在你的 Oura Ring 睡眠评分、Garmin 恢复指标以及 Apple Health 步数之间,个人健康数据散落在十几个专有的孤岛中。如果你曾尝试回答一个简单的问题,例如 “我的深度睡眠与不同应用中的运动强度之间有什么关联?”,你就会体会到碎片化 API 和不一致模式的痛苦。
构建一个强大的 ETL 流水线 和集中式 个人健康数据湖 是重新掌控你的指标的唯一途径。在本指南中,我们将逐步讲解一种专业级 数据工程 架构,旨在处理速率限制、模式漂移以及令人头疼的 “时区地狱”,并使用业界标准工具实现。
想了解更高级的数据工程模式和可投入生产的基础设施模板,请查看 WellAlly Blog 的深度文章。
架构概览:从 API 到洞察
处理 10 多种不同的可穿戴设备 API 需要一种解耦的架构。我们不希望某个 Fitbit API 失效导致整个摄取引擎崩溃。
graph TD
subgraph "Data Sources (SaaS APIs)"
A[Oura API] --> E
B[Garmin Connect] --> E
C[Apple Health / HealthKit] --> E
D[Whoop / Strava] --> E
end
subgraph "Orchestration & Storage"
E[Apache Airflow] -- "Extract & Load" --> F[(PostgreSQL – Bronze Layer)]
G[Terraform] -- "Provisions" --> E
G -- "Provisions" --> F
end
subgraph "Transformation & Modeling"
F -- "Refine" --> H[dbt – Silver/Gold Layer]
H -- "Unified Schema" --> I[(Final Health Data Lake)]
end
subgraph "Visualization"
I --> J[Grafana / Metabase]
end
技术栈
| 层 | 工具 | 原因 |
|---|---|---|
| 基础设施即代码 | Terraform | 管理 RDS(PostgreSQL)和计算实例 |
| 编排 | Apache Airflow | 处理重试、回填以及 API 限流 |
| 转换 | dbt (data build tool) | 将混乱的 JSON 数据块转换为干净的关系表 |
| 数据库 | PostgreSQL | 个人需求的 “lakehouse” |
第一步 – 使用 Terraform 进行基础设施配置
我们不进行手动设置。我们的环境必须可复现。下面是一个用于启动 PostgreSQL 实例的代码片段。
# main.tf
resource "aws_db_instance" "health_db" {
allocated_storage = 20
engine = "postgres"
engine_version = "15.3"
instance_class = "db.t3.micro"
db_name = "quantified_self"
username = var.db_user
password = var.db_password
skip_final_snapshot = true
publicly_accessible = true # Restricted via Security Group
}
第2步 – 摄取逻辑(Airflow)
可穿戴设备最大的挑战是 OAuth2 和 速率限制。我们使用 Airflow 的 HttpOperator 或自定义的 PythonOperator 来增量获取数据。
处理 “Bronze” 层(原始数据)
原始 JSON 响应直接存入 JSONB 列。这可以防止在 API 添加新字段时导致管道中断。
# airflow/dags/wearable_ingestion.py
from airflow.decorators import dag, task
from datetime import datetime
import requests
@dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
def wearable_ingestion():
@task
def fetch_oura_data():
# Using Oura API as an example
headers = {"Authorization": f"Bearer {OURA_TOKEN}"}
response = requests.get(
"https://api.ouraring.com/v2/usercollection/sleep",
headers=headers,
)
response.raise_for_status()
return response.json()["data"]
@task
def load_to_postgres(raw_data):
# Insert into `raw_sleep_data` table (JSONB column)
# Use `ON CONFLICT DO UPDATE` to handle data updates
pass
load_to_postgres(fetch_oura_data())
wearable_ingestion_dag = wearable_ingestion()
第 3 步 – 使用 dbt 进行标准化(魔法酱)
每款可穿戴设备对“睡眠”的定义都不一样。有的返回秒,有的返回分钟,时区通常也很混乱。dbt 让我们能够编写模块化的 SQL 来统一处理这些数据。
统一睡眠模型 (models/gold/fct_sleep.sql)
{{ config(materialized='table') }}
WITH unified_sleep AS (
SELECT
'oura' AS source,
user_id,
(data->>'total_sleep_duration')::int / 3600.0 AS duration_hours,
(data->>'bedtime_start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_oura_sleep') }}
UNION ALL
SELECT
'whoop' AS source,
user_id,
(data->>'score'->>'stage_summary'->>'total_in_bed_milli')::bigint / 3600000.0 AS duration_hours,
(data->>'start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
FROM {{ ref('stg_whoop_sleep') }}
)
SELECT
*,
date_trunc('day', bedtime_start) AS report_date
FROM unified_sleep;
专业提示:解决时区冲突
健康数据对“用户本地时间”非常敏感。如果你从纽约飞往伦敦,你的睡眠记录可能会出现重复计数或漏记小时。
- 始终以 UTC 存储原始时间戳。
- 在可用时从 API 捕获时区偏移。
- 使用
dim_dates表在 dbt 中将 UTC 时间戳映射回用户的“生物钟”日期。
如需深入了解复杂时间数据转换的处理方法,WellAlly Tech 团队已发布了一篇关于 “时间序列事件的生产数据建模” 的详尽指南。
结论 – 数据主权
通过构建自己的 ETL 流水线,你不仅仅是在制作仪表盘;你在构建一个可自行控制的资产。现在你可以:
- 在不担心 API 中断的情况下查询跨设备指标。
- 只需少量代码更改,即可将模型扩展到新的可穿戴设备或健康技术服务。
- 将个人健康数据保持私密、可复现,并为高级分析做好准备(例如,预测睡眠质量模型、锻炼恢复相关性)。
祝构建愉快,愿你的数据始终干净、及时、归属你自己!
**Predict Burnout**: Correlate heart rate variability (HRV) with calendar meetings.
- **Cross‑Pollinate**: See how your Strava runs affect your Oura readiness.
- **Future Proof**: If you switch from Garmin to Apple Watch, your historical data remains intact.
**What’s next?**
- [ ] Set up an **Alerting** system in Airflow if an API token expires.
- [ ] Connect **Metabase** to your Gold layer for beautiful mobile‑friendly charts.
- [ ] Explore for more insights on scaling your data infrastructure!
**Are you tracking your life with code? Drop your favorite API in the comments!** 👇