From Messy JSON to Health Insights: Building a Modern ETL Pipeline with DBT and BigQuery
Source: Dev.to
The Architecture: From Raw Bytes to Dashboard
We’ll use a Medallion Architecture (Bronze, Silver, Gold) to ensure data integrity.
graph TD
A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions)
B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)]
C -->|dbt run| D[(BigQuery: Silver - Staging)]
D -->|dbt test/build| E[(BigQuery: Gold - Metrics)]
E -->|Visualize| F[Looker Studio]
style B fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
Prerequisites
- Google Cloud Platform (GCP) account with BigQuery enabled.
- DBT Core or DBT Cloud set up.
- Tech stack: Python (for Cloud Functions), SQL (for DBT), and Looker Studio for visualization.
Step 1: Ingesting the “Mess” with Cloud Functions
Google Health Connect exports are often dumped into a Cloud Storage bucket. A serverless trigger reads these files and pipes them into BigQuery as raw JSON strings.
import base64
import json
from google.cloud import bigquery
# Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest'
def ingest_health_data(event, context):
"""Triggered by a change to a Cloud Storage bucket."""
file_name = event['name']
print(f"Processing file: {file_name}")
# Store the entire record as a single JSON column initially
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
rows_to_insert = [
{"raw_content": json.dumps(event), "ingested_at": "AUTO"}
]
errors = client.insert_rows_json(table, rows_to_insert)
if not errors:
print("New rows have been added.")
else:
print(f"Encountered errors while inserting rows: {errors}")
Step 2: Normalization with DBT (The Magic Sauce)
With JSON stored in the Bronze layer, DBT flattens it into usable columns such as heart_rate, step_count, and sleep_duration.
Staging Model (stg_heart_rate.sql)
-- models/staging/stg_heart_rate.sql
WITH raw_data AS (
SELECT
JSON_EXTRACT_SCALAR(raw_content, '$.device_id') AS device_id,
JSON_EXTRACT(raw_content, '$.metrics.heart_rate') AS hr_array,
CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) AS event_timestamp
FROM {{ source('health_raw', 'health_connect_ingest') }}
)
SELECT
device_id,
event_timestamp,
CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) AS bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) AS hr_item
WHERE bpm IS NOT NULL
Step 3: Production‑Grade Patterns
For production environments you’ll want:
- Data quality tests
- Documentation
- Snapshotting for slowly changing dimensions
For deeper dives on handling late‑arriving data, multi‑tenant schemas, and other “Day 2” operations, see the technical blog at WellAlly Tech Blog.
Step 4: Visualizing Trends in Looker Studio
Connect BigQuery to Looker Studio using the Gold models (e.g., fct_daily_health_summary).
- Select Project – your GCP project.
- Table –
dbt_metrics.fct_daily_health_summary. - Metrics – create a time‑series chart with
Avg(bpm)andSum(steps).
The messy JSON now appears as a clean line graph showing how your resting heart rate drops after you start going to the gym.
Conclusion: Learning in Public
Building ETL pipelines isn’t just about moving data; it’s about making data useful. By combining DBT with BigQuery, you get a system that is:
- Version controlled – all logic lives in SQL files.
- Scalable – BigQuery handles the heavy lifting.
- Extensible – add Oura Ring, Apple Watch, or other sources by creating new staging models.
What are you building with your health data? Share your DBT tips and experiences in the comments!