파트 5: ZIP 코드 차원 테이블 구축
Source: Dev.to
왜? 필요성!
팩트 테이블(예: 택시 여행)은 이벤트에 최적화되어 있습니다:
- 픽업 시간
- 거리
- 요금
- 픽업 ZIP
하지만 분석 팀은 다음과 같은 질문을 합니다:
- 지역별 여행 수
- 주별 매출
이러한 속성을 팩트 테이블에 반복해서 저장하면:
- 저장 용량이 증가하고
- 조인 속도가 느려집니다
이 속성들을 차원 모델로 분리하는 것이 모범 사례입니다. 우리 프로젝트에서는 픽업/드롭 ZIP 코드의 지역을 알 필요가 있기 때문에 zip_dim 차원 테이블을 만드는 것이 시작점이 됩니다.
실제 프로젝트에서는 ZIP 메타데이터를 다음과 같은 출처에서 가져옵니다:
- 인구 조사 데이터
- API를 통해 제공
- 내부 참조 테이블
이 프로젝트에서는 임의 범위 기반의 하드코딩된 값으로 시뮬레이션합니다.
레이어 책임
ZIP 코드가 Bronze 데이터에 나타나더라도 차원 자체는 정제된 것이므로 Silver 레이어에 속합니다. Bronze가 아닙니다.
Step 1: zip_dim 테이블 스키마 생성
-- Example schema creation (adjust column types as needed)
CREATE TABLE IF NOT EXISTS nyc_taxi.raw.zip_dim (
zip_code STRING,
city STRING,
state STRING,
latitude DOUBLE,
longitude DOUBLE,
region STRING,
PRIMARY KEY (zip_code)
) USING DELTA;
Step 2: Bronze 데이터에서 픽업 및 드롭 ZIP의 고유하고 유효한 목록 읽기
from pyspark.sql.functions import col, lower, trim
zip_stream = (
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("pickup_zip"))).alias("zip_code")
)
.unionByName(
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("dropoff_zip"))).alias("zip_code")
)
)
.filter(col("zip_code").rlike("^[0-9]{5}$")) # keep only valid 5‑digit ZIPs
.distinct()
)
Step 3: 실제 메타데이터 시딩을 모방하기 위해 ZIP 값에 무작위 메타데이터 할당
import random
def generate_random_metadata(zip_code):
"""Return a dictionary with simulated metadata for a given ZIP."""
cities = ["New York", "Brooklyn", "Queens", "Bronx", "Staten Island"]
states = ["NY"]
regions = ["Northeast", "Midwest", "South", "West"]
return {
"zip_code": zip_code,
"city": random.choice(cities),
"state": random.choice(states),
"latitude": round(random.uniform(40.5, 40.9), 6),
"longitude": round(random.uniform(-74.3, -73.7), 6),
"region": random.choice(regions)
}
def upsert_zip_dim(batch_df, batch_id):
"""Upsert a batch of ZIP metadata into the zip_dim Delta table."""
enriched_df = batch_df.rdd.map(lambda row: generate_random_metadata(row.zip_code)).toDF()
(
enriched_df.write.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable("nyc_taxi.raw.zip_dim")
)
Step 4: 배치 처리로 ZIP 메타데이터를 nyc_taxi.raw.zip_dim Delta 테이블에 채우기
(
zip_stream.writeStream
.foreachBatch(upsert_zip_dim)
.outputMode("update")
.option("checkpointLocation", "/tmp/checkpoints/zip_dim")
.start()
)
ZIP 차원 테이블 nyc_taxi.raw.zip_dim이 이제 준비되었습니다.
행복한 학습 되세요!