지저분한 JSON에서 건강 인사이트로: DBT와 BigQuery를 활용한 현대적인 ETL 파이프라인 구축

발행: (2026년 2월 10일 오전 09:15 GMT+9)
5 분 소요
원문: Dev.to

Source: Dev.to

아키텍처: 원시 바이트에서 대시보드까지

우리는 데이터 무결성을 보장하기 위해 메달리온 아키텍처 (Bronze, Silver, Gold)를 사용할 것입니다.

graph TD
    A[Google Health Connect JSON] -->|Trigger| B(Google Cloud Functions)
    B -->|Streaming Ingest| C[(BigQuery: Bronze - Raw)]
    C -->|dbt run| D[(BigQuery: Silver - Staging)]
    D -->|dbt test/build| E[(BigQuery: Gold - Metrics)]
    E -->|Visualize| F[Looker Studio]
    style B fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px

사전 요구 사항

  • Google Cloud Platform (GCP) 계정에 BigQuery가 활성화되어 있어야 합니다.
  • DBT Core 또는 DBT Cloud가 설정되어 있어야 합니다.
  • 기술 스택: Cloud Functions용 Python, DBT용 SQL, 그리고 시각화를 위한 Looker Studio.

Step 1: Cloud Functions로 “혼란”을 수집하기

Google Health Connect 내보내기 파일은 종종 Cloud Storage 버킷에 덤프됩니다. 서버리스 트리거가 이 파일들을 읽어 raw JSON 문자열로 BigQuery에 파이프합니다.

import base64
import json
from google.cloud import bigquery

# Initialize the BigQuery client
client = bigquery.Client()
dataset_id = 'health_data_raw'
table_id = 'health_connect_ingest'

def ingest_health_data(event, context):
    """Triggered by a change to a Cloud Storage bucket."""
    file_name = event['name']
    print(f"Processing file: {file_name}")

    # Store the entire record as a single JSON column initially
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)

    rows_to_insert = [
        {"raw_content": json.dumps(event), "ingested_at": "AUTO"}
    ]

    errors = client.insert_rows_json(table, rows_to_insert)
    if not errors:
        print("New rows have been added.")
    else:
        print(f"Encountered errors while inserting rows: {errors}")

단계 2: DBT를 사용한 정규화 (마법 소스)

JSON이 Bronze 레이어에 저장되면, DBT는 이를 heart_rate, step_count, sleep_duration와 같은 사용 가능한 컬럼으로 평탄화합니다.

스테이징 모델 (stg_heart_rate.sql)

-- models/staging/stg_heart_rate.sql

WITH raw_data AS (
    SELECT 
        JSON_EXTRACT_SCALAR(raw_content, '$.device_id') AS device_id,
        JSON_EXTRACT(raw_content, '$.metrics.heart_rate') AS hr_array,
        CAST(JSON_EXTRACT_SCALAR(raw_content, '$.timestamp') AS TIMESTAMP) AS event_timestamp
    FROM {{ source('health_raw', 'health_connect_ingest') }}
)

SELECT
    device_id,
    event_timestamp,
    CAST(JSON_EXTRACT_SCALAR(hr_item, '$.bpm') AS FLOAT64) AS bpm
FROM raw_data,
UNNEST(JSON_QUERY_ARRAY(hr_array)) AS hr_item
WHERE bpm IS NOT NULL

Step 3: 프로덕션 등급 패턴

프로덕션 환경에서는 다음이 필요합니다:

  • 데이터 품질 테스트
  • 문서화
  • 천천히 변하는 차원을 위한 스냅샷 생성

늦게 도착하는 데이터 처리, 멀티 테넌트 스키마 및 기타 “Day 2” 작업에 대한 심층 정보를 원하시면 WellAlly Tech Blog 기술 블로그를 참고하세요.

Source:

Step 4: Looker Studio에서 트렌드 시각화

Gold 모델(예: fct_daily_health_summary)을 사용하여 BigQuery를 Looker Studio에 연결합니다.

  1. 프로젝트 선택 – 사용 중인 GCP 프로젝트.
  2. 테이블dbt_metrics.fct_daily_health_summary.
  3. 측정항목Avg(bpm)Sum(steps)를 사용해 시계열 차트를 생성합니다.

지저분했던 JSON이 이제는 깔끔한 선 그래프로 표시되어, 체육관에 다니기 시작한 후 휴식 심박수가 어떻게 감소하는지 보여줍니다.

결론: 공개적으로 배우기

ETL 파이프라인을 구축하는 것은 단순히 데이터를 이동시키는 것이 아니라 데이터를 유용하게 만드는 것입니다. DBTBigQuery를 결합하면 다음과 같은 시스템을 얻을 수 있습니다:

  • 버전 관리 – 모든 로직이 SQL 파일에 존재합니다.
  • 확장성 – BigQuery가 무거운 작업을 처리합니다.
  • 확장 가능 – Oura Ring, Apple Watch 또는 기타 소스를 새로운 스테이징 모델을 만들어 추가할 수 있습니다.

건강 데이터로 무엇을 만들고 있나요? 댓글에 DBT 팁과 경험을 공유해주세요!

Back to Blog

관련 글

더 보기 »

스마트파인드.ai

스마트파인드 소개 — AI‑기반 검색 및 채팅 어시스턴트 스마트파인드는 제품 발견을 통합하도록 설계된 AI‑기반 검색 및 대화형 어시스턴트입니다.