从混乱的 JSON 到健康洞察:使用 DBT 和 BigQuery 构建现代 ETL Pipeline
Source: Dev.to
请提供您希望翻译的正文内容,我将为您翻译成简体中文并保留原始的 Markdown 格式。
架构:从原始字节到仪表盘
我们将使用 Medallion Architecture(青铜、白银、黄金)来确保数据完整性。
graph TD
A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions)
B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)]
C -->|dbt run| D[(BigQuery: Silver - Staging)]
D -->|dbt test/build| E[(BigQuery: Gold - Metrics)]
E -->|Visualize| F[Looker Studio]
style B fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
前置条件
- Google Cloud Platform (GCP) 账户并已启用 BigQuery。
- 已设置 DBT Core 或 DBT Cloud。
- 技术栈:Python(用于 Cloud Functions)、SQL(用于 DBT)以及 Looker Studio 用于可视化。
步骤 1:使用 Cloud Functions 处理“混乱”
Google Health Connect 导出的数据通常会被写入 Cloud Storage 存储桶。一个无服务器触发器读取这些文件,并将其作为原始 JSON 字符串导入 BigQuery。
import base64
import json
from google.cloud import bigquery
# Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest'
def ingest_health_data(event, context):
"""Triggered by a change to a Cloud Storage bucket."""
file_name = event['name']
print(f"Processing file: {file_name}")
# Store the entire record as a single JSON column initially
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
rows_to_insert = [
{"raw_content": json.dumps(event), "ingested_at": "AUTO"}
]
errors = client.insert_rows_json(table, rows_to_insert)
if not errors:
print("New rows have been added.")
else:
print(f"Encountered errors while inserting rows: {errors}")
第2步:使用 DBT 进行规范化(神奇调味料)
在 Bronze 层存储的 JSON,DBT 将其展开为可用的列,如 heart_rate、step_count 和 sleep_duration。
暂存模型(stg_heart_rate.sql)
-- models/staging/stg_heart_rate.sql
WITH raw_data AS (
SELECT
JSON_EXTRACT_SCALAR(raw_content, '$.device_id') AS device_id,
JSON_EXTRACT(raw_content, '$.metrics.heart_rate') AS hr_array,
CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) AS event_timestamp
FROM {{ source('health_raw', 'health_connect_ingest') }}
)
SELECT
device_id,
event_timestamp,
CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) AS bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) AS hr_item
WHERE bpm IS NOT NULL
第3步:生产级模式
- 数据质量测试
- 文档编写
- 对缓慢变化维度进行快照
如需深入了解处理迟到数据、多租户模式以及其他“Day 2”操作,请参阅 WellAlly Tech Blog 上的技术博客。
第4步:在 Looker Studio 中可视化趋势
使用 Gold 模型(例如 fct_daily_health_summary)将 BigQuery 连接到 Looker Studio。
- Select Project – 您的 GCP 项目。
- Table –
dbt_metrics.fct_daily_health_summary。 - Metrics – 创建包含
Avg(bpm)和Sum(steps)的时间序列图表。
现在,混乱的 JSON 显示为一条清晰的折线图,展示了您开始去健身房后静息心率的下降情况。
结论:公开学习
构建 ETL 流水线不仅仅是搬运数据;更是让数据 有用。通过将 DBT 与 BigQuery 结合,你将得到一个如下的系统:
- 版本控制 – 所有逻辑都保存在 SQL 文件中。
- 可扩展 – BigQuery 负责繁重的计算。
- 可扩展 – 通过创建新的 staging 模型,可添加 Oura Ring、Apple Watch 或其他来源。
你正在用健康数据构建什么?在评论中分享你的 DBT 小技巧和经验吧!