从混乱的 JSON 到健康洞察:使用 DBT 和 BigQuery 构建现代 ETL Pipeline

发布: (2026年2月10日 GMT+8 08:15)
4 分钟阅读
原文: Dev.to

Source: Dev.to

请提供您希望翻译的正文内容,我将为您翻译成简体中文并保留原始的 Markdown 格式。

架构:从原始字节到仪表盘

我们将使用 Medallion Architecture(青铜、白银、黄金)来确保数据完整性。

graph TD
    A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions)
    B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)]
    C -->|dbt run| D[(BigQuery: Silver - Staging)]
    D -->|dbt test/build| E[(BigQuery: Gold - Metrics)]
    E -->|Visualize| F[Looker Studio]
    style B fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px

前置条件

  • Google Cloud Platform (GCP) 账户并已启用 BigQuery。
  • 已设置 DBT CoreDBT Cloud
  • 技术栈:Python(用于 Cloud Functions)、SQL(用于 DBT)以及 Looker Studio 用于可视化。

步骤 1:使用 Cloud Functions 处理“混乱”

Google Health Connect 导出的数据通常会被写入 Cloud Storage 存储桶。一个无服务器触发器读取这些文件,并将其作为原始 JSON 字符串导入 BigQuery。

import base64
import json
from google.cloud import bigquery

# Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest'

def ingest_health_data(event, context):
    """Triggered by a change to a Cloud Storage bucket."""
    file_name = event['name']
    print(f"Processing file: {file_name}")

    # Store the entire record as a single JSON column initially
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)

    rows_to_insert = [
        {"raw_content": json.dumps(event), "ingested_at": "AUTO"}
    ]

    errors = client.insert_rows_json(table, rows_to_insert)
    if not errors:
        print("New rows have been added.")
    else:
        print(f"Encountered errors while inserting rows: {errors}")

第2步:使用 DBT 进行规范化(神奇调味料)

Bronze 层存储的 JSON,DBT 将其展开为可用的列,如 heart_ratestep_countsleep_duration

暂存模型(stg_heart_rate.sql

-- models/staging/stg_heart_rate.sql

WITH raw_data AS (
    SELECT 
        JSON_EXTRACT_SCALAR(raw_content, '$.device_id') AS device_id,
        JSON_EXTRACT(raw_content, '$.metrics.heart_rate') AS hr_array,
        CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) AS event_timestamp
    FROM {{ source('health_raw', 'health_connect_ingest') }}
)

SELECT
    device_id,
    event_timestamp,
    CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) AS bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) AS hr_item
WHERE bpm IS NOT NULL

第3步:生产级模式

  • 数据质量测试
  • 文档编写
  • 对缓慢变化维度进行快照

如需深入了解处理迟到数据、多租户模式以及其他“Day 2”操作,请参阅 WellAlly Tech Blog 上的技术博客。

第4步:在 Looker Studio 中可视化趋势

使用 Gold 模型(例如 fct_daily_health_summary)将 BigQuery 连接到 Looker Studio。

  1. Select Project – 您的 GCP 项目。
  2. Tabledbt_metrics.fct_daily_health_summary
  3. Metrics – 创建包含 Avg(bpm)Sum(steps) 的时间序列图表。

现在,混乱的 JSON 显示为一条清晰的折线图,展示了您开始去健身房后静息心率的下降情况。

结论:公开学习

构建 ETL 流水线不仅仅是搬运数据;更是让数据 有用。通过将 DBTBigQuery 结合,你将得到一个如下的系统:

  • 版本控制 – 所有逻辑都保存在 SQL 文件中。
  • 可扩展 – BigQuery 负责繁重的计算。
  • 可扩展 – 通过创建新的 staging 模型,可添加 Oura Ring、Apple Watch 或其他来源。

你正在用健康数据构建什么?在评论中分享你的 DBT 小技巧和经验吧!

0 浏览
Back to Blog

相关文章

阅读更多 »

新文章

您确定要隐藏此 comment 吗?它将在您的 post 中被隐藏,但仍可通过 comment 的 permalink 查看。Hide child comments 如我们……

设置 Ollama、NGROK 和 LangChain

!Breno A.https://media2.dev.to/dynamic/image/width=50,height=50,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fu...