Part 6: Silver Layer – Cleansing, Enrichment, and Dimensions
Source: Dev.to

The Silver layer converts raw events into analytics‑ready records by:
- Cleaning bad data
- Enforcing schema
- Adding business context
- Applying dimensional modeling
This is where value is created.
Data Cleansing and Type Enforcement
- The Bronze layer must remain untouched.
- The Silver layer enforces correctness and isolates errors from ingestion.
silver_stream = (
spark.readStream
.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"fare_amount",
col("fare_amount").cast("double")
)
.filter(col("fare_amount") > 0)
)
Using Broadcast Joins
To capture the required dimensional modeling we need to join with zip_dim, a relatively small table. Shuffling large tables across executors is costly, so a broadcast join is used as a performance improvement.
Without Broadcast join

With Broadcast join

Adding Watermarks
Real‑time data must be processed promptly, but we also need to handle late arrivals. A watermark tells Spark to wait for up to 30 minutes of late data before finalizing results.
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")
silver_df = (
bronze_stream
.withColumn(
"pickup_zip",
regexp_replace("pickup_zip", "\.0$", "").cast("int")
)
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"tpep_dropoff_datetime",
to_timestamp("tpep_dropoff_datetime")
)
.withWatermark("tpep_pickup_datetime", "30 minutes")
.join(
broadcast(zip_dim),
bronze_stream.pickup_zip == zip_dim.zip_code,
"left"
)
.select(
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"trip_distance",
"fare_amount",
"pickup_zip",
"region",
"state"
)
)
# Stream to Silver Delta table
(
silver_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
.trigger(availableNow=True)
.toTable("nyc_taxi.silver.taxi_trips_enriched")
)
The required cleansing and normalization have been performed, and the data is now ready for further maturation and business insight generation.
Happy learning!