Quantified Self:构建面向10+可穿戴设备的生产级ETL管道

发布: (2026年3月9日 GMT+8 08:30)
7 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的完整文本内容,我将按照要求保留源链接、格式和代码块,仅翻译正文部分。

Quantified Self 时代,我们被数据淹没,却渴求洞察

在你的 Oura Ring 睡眠评分、Garmin 恢复指标以及 Apple Health 步数之间,个人健康数据散落在十几个专有的孤岛中。如果你曾尝试回答一个简单的问题,例如 “我的深度睡眠与不同应用中的运动强度之间有什么关联?”,你就会体会到碎片化 API 和不一致模式的痛苦。

构建一个强大的 ETL 流水线 和集中式 个人健康数据湖 是重新掌控你的指标的唯一途径。在本指南中,我们将逐步讲解一种专业级 数据工程 架构,旨在处理速率限制、模式漂移以及令人头疼的 “时区地狱”,并使用业界标准工具实现。

想了解更高级的数据工程模式和可投入生产的基础设施模板,请查看 WellAlly Blog 的深度文章。

架构概览:从 API 到洞察

处理 10 多种不同的可穿戴设备 API 需要一种解耦的架构。我们不希望某个 Fitbit API 失效导致整个摄取引擎崩溃。

graph TD
    subgraph "Data Sources (SaaS APIs)"
        A[Oura API] --> E
        B[Garmin Connect] --> E
        C[Apple Health / HealthKit] --> E
        D[Whoop / Strava] --> E
    end

    subgraph "Orchestration & Storage"
        E[Apache Airflow] -- "Extract & Load" --> F[(PostgreSQL – Bronze Layer)]
        G[Terraform] -- "Provisions" --> E
        G -- "Provisions" --> F
    end

    subgraph "Transformation & Modeling"
        F -- "Refine" --> H[dbt – Silver/Gold Layer]
        H -- "Unified Schema" --> I[(Final Health Data Lake)]
    end

    subgraph "Visualization"
        I --> J[Grafana / Metabase]
    end

技术栈

工具原因
基础设施即代码Terraform管理 RDS(PostgreSQL)和计算实例
编排Apache Airflow处理重试、回填以及 API 限流
转换dbt (data build tool)将混乱的 JSON 数据块转换为干净的关系表
数据库PostgreSQL个人需求的 “lakehouse”

第一步 – 使用 Terraform 进行基础设施配置

我们不进行手动设置。我们的环境必须可复现。下面是一个用于启动 PostgreSQL 实例的代码片段。

# main.tf
resource "aws_db_instance" "health_db" {
  allocated_storage   = 20
  engine              = "postgres"
  engine_version      = "15.3"
  instance_class      = "db.t3.micro"
  db_name             = "quantified_self"
  username            = var.db_user
  password            = var.db_password
  skip_final_snapshot = true
  publicly_accessible = true   # Restricted via Security Group
}

第2步 – 摄取逻辑(Airflow)

可穿戴设备最大的挑战是 OAuth2速率限制。我们使用 Airflow 的 HttpOperator 或自定义的 PythonOperator 来增量获取数据。

处理 “Bronze” 层(原始数据)

原始 JSON 响应直接存入 JSONB 列。这可以防止在 API 添加新字段时导致管道中断。

# airflow/dags/wearable_ingestion.py
from airflow.decorators import dag, task
from datetime import datetime
import requests

@dag(start_date=datetime(2023, 1, 1), schedule="@daily", catchup=False)
def wearable_ingestion():

    @task
    def fetch_oura_data():
        # Using Oura API as an example
        headers = {"Authorization": f"Bearer {OURA_TOKEN}"}
        response = requests.get(
            "https://api.ouraring.com/v2/usercollection/sleep",
            headers=headers,
        )
        response.raise_for_status()
        return response.json()["data"]

    @task
    def load_to_postgres(raw_data):
        # Insert into `raw_sleep_data` table (JSONB column)
        # Use `ON CONFLICT DO UPDATE` to handle data updates
        pass

    load_to_postgres(fetch_oura_data())

wearable_ingestion_dag = wearable_ingestion()

第 3 步 – 使用 dbt 进行标准化(魔法酱)

每款可穿戴设备对“睡眠”的定义都不一样。有的返回秒,有的返回分钟,时区通常也很混乱。dbt 让我们能够编写模块化的 SQL 来统一处理这些数据。

统一睡眠模型 (models/gold/fct_sleep.sql)

{{ config(materialized='table') }}

WITH unified_sleep AS (
    SELECT
        'oura' AS source,
        user_id,
        (data->>'total_sleep_duration')::int / 3600.0 AS duration_hours,
        (data->>'bedtime_start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
    FROM {{ ref('stg_oura_sleep') }}

    UNION ALL

    SELECT
        'whoop' AS source,
        user_id,
        (data->>'score'->>'stage_summary'->>'total_in_bed_milli')::bigint / 3600000.0 AS duration_hours,
        (data->>'start')::timestamp AT TIME ZONE 'UTC' AS bedtime_start
    FROM {{ ref('stg_whoop_sleep') }}
)

SELECT
    *,
    date_trunc('day', bedtime_start) AS report_date
FROM unified_sleep;

专业提示:解决时区冲突

健康数据对“用户本地时间”非常敏感。如果你从纽约飞往伦敦,你的睡眠记录可能会出现重复计数或漏记小时。

  1. 始终以 UTC 存储原始时间戳。
  2. 在可用时从 API 捕获时区偏移。
  3. 使用 dim_dates 表在 dbt 中将 UTC 时间戳映射回用户的“生物钟”日期。

如需深入了解复杂时间数据转换的处理方法,WellAlly Tech 团队已发布了一篇关于 “时间序列事件的生产数据建模” 的详尽指南。

结论 – 数据主权

通过构建自己的 ETL 流水线,你不仅仅是在制作仪表盘;你在构建一个可自行控制的资产。现在你可以:

  • 在不担心 API 中断的情况下查询跨设备指标。
  • 只需少量代码更改,即可将模型扩展到新的可穿戴设备或健康技术服务。
  • 将个人健康数据保持私密、可复现,并为高级分析做好准备(例如,预测睡眠质量模型、锻炼恢复相关性)。

祝构建愉快,愿你的数据始终干净、及时、归属你自己!

**Predict Burnout**: Correlate heart rate variability (HRV) with calendar meetings.

- **Cross‑Pollinate**: See how your Strava runs affect your Oura readiness.  
- **Future Proof**: If you switch from Garmin to Apple Watch, your historical data remains intact.

**What’s next?**

- [ ] Set up an **Alerting** system in Airflow if an API token expires.  
- [ ] Connect **Metabase** to your Gold layer for beautiful mobile‑friendly charts.  
- [ ] Explore  for more insights on scaling your data infrastructure!

**Are you tracking your life with code? Drop your favorite API in the comments!** 👇
0 浏览
Back to Blog

相关文章

阅读更多 »