파트 6: Silver Layer – 정제, 보강 및 차원

발행: (2026년 1월 2일 오후 07:50 GMT+9)
3 분 소요
원문: Dev.to

Source: Dev.to

Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions 표지 이미지

Silver 레이어는 원시 이벤트를 분석에 바로 사용할 수 있는 레코드로 변환합니다:

  • 잘못된 데이터 정리
  • 스키마 적용
  • 비즈니스 컨텍스트 추가
  • 차원 모델링 적용

이 단계에서 가치가 창출됩니다.

데이터 정리 및 타입 강제 적용

  • Bronze 레이어는 그대로 유지되어야 합니다.
  • Silver 레이어는 정확성을 보장하고, 오류를 수집 단계와 분리합니다.
silver_stream = (
    spark.readStream
        .format("delta")
        .table("nyc_taxi.bronze.taxi_trips")
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "fare_amount",
            col("fare_amount").cast("double")
        )
        .filter(col("fare_amount") > 0)
)

Broadcast Join 사용

필요한 차원 모델링을 수행하려면 비교적 작은 테이블인 zip_dim과 조인해야 합니다. 큰 테이블을 executor 간에 셔플하는 것은 비용이 많이 들기 때문에, 성능 향상을 위해 broadcast join을 사용합니다.

Broadcast join 없이

Broadcast join 없이

Broadcast join 사용

Broadcast join 사용

워터마크 추가

실시간 데이터는 신속히 처리되어야 하지만, 지연 도착 데이터도 처리해야 합니다. 워터마크는 Spark에게 결과를 최종 확정하기 전에 최대 30분까지 늦은 데이터를 기다리도록 알려줍니다.

from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast

bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")

silver_df = (
    bronze_stream
        .withColumn(
            "pickup_zip",
            regexp_replace("pickup_zip", "\.0$", "").cast("int")
        )
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "tpep_dropoff_datetime",
            to_timestamp("tpep_dropoff_datetime")
        )
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .join(
            broadcast(zip_dim),
            bronze_stream.pickup_zip == zip_dim.zip_code,
            "left"
        )
        .select(
            "tpep_pickup_datetime",
            "tpep_dropoff_datetime",
            "trip_distance",
            "fare_amount",
            "pickup_zip",
            "region",
            "state"
        )
)

# Silver Delta 테이블로 스트리밍
(
    silver_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
        .trigger(availableNow=True)
        .toTable("nyc_taxi.silver.taxi_trips_enriched")
)

필요한 정리와 정규화가 완료되었으며, 이제 데이터는 추가 성숙 단계와 비즈니스 인사이트 생성에 사용할 준비가 되었습니다.

학습에 도움이 되길 바랍니다!

Back to Blog

관련 글

더 보기 »

RGB LED 사이드퀘스트 💡

markdown !Jennifer Davis https://media2.dev.to/dynamic/image/width=50,height=50,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%...

Mendex: 내가 만드는 이유

소개 안녕하세요 여러분. 오늘은 제가 누구인지, 무엇을 만들고 있는지, 그리고 그 이유를 공유하고 싶습니다. 초기 경력과 번아웃 저는 개발자로서 17년 동안 경력을 시작했습니다.