Part 5: Building a ZIP Code Dimension Table
Source: Dev.to
Why? The Need for it!
Fact tables (like taxi trips) are optimized for events:
- Pickup time
- Distance
- Fare
- Pickup ZIP
Analytics teams, however, ask questions such as:
- Trips by region
- Revenue by state
Storing these attributes repeatedly in the fact table:
- Increases storage
- Slows joins
Breaking these attributes into a dimensional model is the best practice. In our project, the use case of knowing the region of pickup/drop ZIP code paves the way for creating the zip_dim dimension table.
In real projects, ZIP metadata comes from:
- Census data
- Exposed via APIs
- Internal reference tables
For this project, we simulate it with some random‑range based hard‑coded values.
Layer Responsibility
Even though ZIPs appear in Bronze data, the dimension itself is curated, so it belongs in Silver, not Bronze.
Step 1: Create the schema for the zip_dim table
-- Example schema creation (adjust column types as needed)
CREATE TABLE IF NOT EXISTS nyc_taxi.raw.zip_dim (
zip_code STRING,
city STRING,
state STRING,
latitude DOUBLE,
longitude DOUBLE,
region STRING,
PRIMARY KEY (zip_code)
) USING DELTA;
Step 2: Read the unique and valid list of ZIPs – both pickup and drop – from the Bronze data
from pyspark.sql.functions import col, lower, trim
zip_stream = (
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("pickup_zip"))).alias("zip_code")
)
.unionByName(
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("dropoff_zip"))).alias("zip_code")
)
)
.filter(col("zip_code").rlike("^[0-9]{5}$")) # keep only valid 5‑digit ZIPs
.distinct()
)
Step 3: Assign random metadata to the ZIP values to simulate actual metadata seeding
import random
def generate_random_metadata(zip_code):
"""Return a dictionary with simulated metadata for a given ZIP."""
cities = ["New York", "Brooklyn", "Queens", "Bronx", "Staten Island"]
states = ["NY"]
regions = ["Northeast", "Midwest", "South", "West"]
return {
"zip_code": zip_code,
"city": random.choice(cities),
"state": random.choice(states),
"latitude": round(random.uniform(40.5, 40.9), 6),
"longitude": round(random.uniform(-74.3, -73.7), 6),
"region": random.choice(regions)
}
def upsert_zip_dim(batch_df, batch_id):
"""Upsert a batch of ZIP metadata into the zip_dim Delta table."""
enriched_df = batch_df.rdd.map(lambda row: generate_random_metadata(row.zip_code)).toDF()
(
enriched_df.write.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable("nyc_taxi.raw.zip_dim")
)
Step 4: Populate the nyc_taxi.raw.zip_dim Delta table with ZIP metadata by batch processing
(
zip_stream.writeStream
.foreachBatch(upsert_zip_dim)
.outputMode("update")
.option("checkpointLocation", "/tmp/checkpoints/zip_dim")
.start()
)
The ZIP dimension table nyc_taxi.raw.zip_dim is now ready.
Happy learning!