Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions

Published: (January 2, 2026 at 05:50 AM EST)
2 min read
Source: Dev.to

Source: Dev.to

Cover image for Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions

The Silver layer converts raw events into analytics‑ready records by:

  • Cleaning bad data
  • Enforcing schema
  • Adding business context
  • Applying dimensional modeling

This is where value is created.

Data Cleansing and Type Enforcement

  • The Bronze layer must remain untouched.
  • The Silver layer enforces correctness and isolates errors from ingestion.
silver_stream = (
    spark.readStream
        .format("delta")
        .table("nyc_taxi.bronze.taxi_trips")
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "fare_amount",
            col("fare_amount").cast("double")
        )
        .filter(col("fare_amount") > 0)
)

Using Broadcast Joins

To capture the required dimensional modeling we need to join with zip_dim, a relatively small table. Shuffling large tables across executors is costly, so a broadcast join is used as a performance improvement.

Without Broadcast join

Without Broadcast join

With Broadcast join

With Broadcast join

Adding Watermarks

Real‑time data must be processed promptly, but we also need to handle late arrivals. A watermark tells Spark to wait for up to 30 minutes of late data before finalizing results.

from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast

bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")

silver_df = (
    bronze_stream
        .withColumn(
            "pickup_zip",
            regexp_replace("pickup_zip", "\.0$", "").cast("int")
        )
        .withColumn(
            "tpep_pickup_datetime",
            to_timestamp("tpep_pickup_datetime")
        )
        .withColumn(
            "tpep_dropoff_datetime",
            to_timestamp("tpep_dropoff_datetime")
        )
        .withWatermark("tpep_pickup_datetime", "30 minutes")
        .join(
            broadcast(zip_dim),
            bronze_stream.pickup_zip == zip_dim.zip_code,
            "left"
        )
        .select(
            "tpep_pickup_datetime",
            "tpep_dropoff_datetime",
            "trip_distance",
            "fare_amount",
            "pickup_zip",
            "region",
            "state"
        )
)

# Stream to Silver Delta table
(
    silver_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
        .trigger(availableNow=True)
        .toTable("nyc_taxi.silver.taxi_trips_enriched")
)

The required cleansing and normalization have been performed, and the data is now ready for further maturation and business insight generation.

Happy learning!

Back to Blog

Related posts

Read more »