第5部分:构建 ZIP Code 维度表
发布: (2026年1月2日 GMT+8 18:49)
3 分钟阅读
原文: Dev.to
Source: Dev.to
为什么?需求来源!
事实表(如出租车行程)针对事件进行了优化:
- 上车时间
- 行驶距离
- 费用
- 上车邮编
然而,分析团队会提出诸如:
- 按地区统计行程
- 按州统计收入
如果把这些属性一次又一次地存放在事实表中:
- 会增加存储空间
- 会降低连接速度
将这些属性拆分到维度模型中是最佳实践。在我们的项目中,了解上车/下车邮编所属地区的需求为创建 zip_dim 维度表奠定了基础。
在实际项目中,邮编元数据来源于:
- 人口普查数据
- 通过 API 暴露的数据
- 内部参考表
本项目中,我们使用一些基于随机范围的硬编码值来模拟。
层级职责
虽然邮编出现在 Bronze 层数据中,但维度本身是经过策划的,因此应放在 Silver 层,而不是 Bronze。
步骤 1:为 zip_dim 表创建模式
-- Example schema creation (adjust column types as needed)
CREATE TABLE IF NOT EXISTS nyc_taxi.raw.zip_dim (
zip_code STRING,
city STRING,
state STRING,
latitude DOUBLE,
longitude DOUBLE,
region STRING,
PRIMARY KEY (zip_code)
) USING DELTA;
步骤 2:从 Bronze 数据中读取唯一且有效的邮编列表——包括上车和下车邮编
from pyspark.sql.functions import col, lower, trim
zip_stream = (
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("pickup_zip"))).alias("zip_code")
)
.unionByName(
spark.readStream.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.select(
trim(lower(col("dropoff_zip"))).alias("zip_code")
)
)
.filter(col("zip_code").rlike("^[0-9]{5}$")) # keep only valid 5‑digit ZIPs
.distinct()
)
步骤 3:为 ZIP 值分配随机元数据,以模拟实际的元数据填充
import random
def generate_random_metadata(zip_code):
"""Return a dictionary with simulated metadata for a given ZIP."""
cities = ["New York", "Brooklyn", "Queens", "Bronx", "Staten Island"]
states = ["NY"]
regions = ["Northeast", "Midwest", "South", "West"]
return {
"zip_code": zip_code,
"city": random.choice(cities),
"state": random.choice(states),
"latitude": round(random.uniform(40.5, 40.9), 6),
"longitude": round(random.uniform(-74.3, -73.7), 6),
"region": random.choice(regions)
}
def upsert_zip_dim(batch_df, batch_id):
"""Upsert a batch of ZIP metadata into the zip_dim Delta table."""
enriched_df = batch_df.rdd.map(lambda row: generate_random_metadata(row.zip_code)).toDF()
(
enriched_df.write.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable("nyc_taxi.raw.zip_dim")
)
步骤 4:通过批处理将 ZIP 元数据写入 nyc_taxi.raw.zip_dim Delta 表
(
zip_stream.writeStream
.foreachBatch(upsert_zip_dim)
.outputMode("update")
.option("checkpointLocation", "/tmp/checkpoints/zip_dim")
.start()
)
ZIP 维度表 nyc_taxi.raw.zip_dim 已经准备就绪。
祝学习愉快!