지저분한 JSON에서 건강 인사이트로: DBT와 BigQuery를 활용한 현대적인 ETL 파이프라인 구축
Source: Dev.to
아키텍처: 원시 바이트에서 대시보드까지
우리는 데이터 무결성을 보장하기 위해 메달리온 아키텍처 (Bronze, Silver, Gold)를 사용할 것입니다.
graph TD
A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions)
B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)]
C -->|dbt run| D[(BigQuery: Silver - Staging)]
D -->|dbt test/build| E[(BigQuery: Gold - Metrics)]
E -->|Visualize| F[Looker Studio]
style B fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
사전 요구 사항
- Google Cloud Platform (GCP) 계정에 BigQuery가 활성화되어 있어야 합니다.
- DBT Core 또는 DBT Cloud가 설정되어 있어야 합니다.
- 기술 스택: Cloud Functions용 Python, DBT용 SQL, 그리고 시각화를 위한 Looker Studio.
Step 1: Cloud Functions로 “혼란”을 수집하기
Google Health Connect 내보내기 파일은 종종 Cloud Storage 버킷에 덤프됩니다. 서버리스 트리거가 이 파일들을 읽어 raw JSON 문자열로 BigQuery에 파이프합니다.
import base64
import json
from google.cloud import bigquery
# Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest'
def ingest_health_data(event, context):
"""Triggered by a change to a Cloud Storage bucket."""
file_name = event['name']
print(f"Processing file: {file_name}")
# Store the entire record as a single JSON column initially
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
rows_to_insert = [
{"raw_content": json.dumps(event), "ingested_at": "AUTO"}
]
errors = client.insert_rows_json(table, rows_to_insert)
if not errors:
print("New rows have been added.")
else:
print(f"Encountered errors while inserting rows: {errors}")
단계 2: DBT를 사용한 정규화 (마법 소스)
JSON이 Bronze 레이어에 저장되면, DBT는 이를 heart_rate, step_count, sleep_duration와 같은 사용 가능한 컬럼으로 평탄화합니다.
스테이징 모델 (stg_heart_rate.sql)
-- models/staging/stg_heart_rate.sql
WITH raw_data AS (
SELECT
JSON_EXTRACT_SCALAR(raw_content, '$.device_id') AS device_id,
JSON_EXTRACT(raw_content, '$.metrics.heart_rate') AS hr_array,
CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) AS event_timestamp
FROM {{ source('health_raw', 'health_connect_ingest') }}
)
SELECT
device_id,
event_timestamp,
CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) AS bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) AS hr_item
WHERE bpm IS NOT NULL
Step 3: 프로덕션 등급 패턴
프로덕션 환경에서는 다음이 필요합니다:
- 데이터 품질 테스트
- 문서화
- 천천히 변하는 차원을 위한 스냅샷 생성
늦게 도착하는 데이터 처리, 멀티 테넌트 스키마 및 기타 “Day 2” 작업에 대한 심층 정보를 원하시면 WellAlly Tech Blog 기술 블로그를 참고하세요.
Source: …
Step 4: Looker Studio에서 트렌드 시각화
Gold 모델(예: fct_daily_health_summary)을 사용하여 BigQuery를 Looker Studio에 연결합니다.
- 프로젝트 선택 – 사용 중인 GCP 프로젝트.
- 테이블 –
dbt_metrics.fct_daily_health_summary. - 측정항목 –
Avg(bpm)와Sum(steps)를 사용해 시계열 차트를 생성합니다.
지저분했던 JSON이 이제는 깔끔한 선 그래프로 표시되어, 체육관에 다니기 시작한 후 휴식 심박수가 어떻게 감소하는지 보여줍니다.
결론: 공개적으로 배우기
ETL 파이프라인을 구축하는 것은 단순히 데이터를 이동시키는 것이 아니라 데이터를 유용하게 만드는 것입니다. DBT와 BigQuery를 결합하면 다음과 같은 시스템을 얻을 수 있습니다:
- 버전 관리 – 모든 로직이 SQL 파일에 존재합니다.
- 확장성 – BigQuery가 무거운 작업을 처리합니다.
- 확장 가능 – Oura Ring, Apple Watch 또는 기타 소스를 새로운 스테이징 모델을 만들어 추가할 수 있습니다.
건강 데이터로 무엇을 만들고 있나요? 댓글에 DBT 팁과 경험을 공유해주세요!