第5部分:构建 ZIP Code 维度表

发布: (2026年1月2日 GMT+8 18:49)
3 分钟阅读
原文: Dev.to

Source: Dev.to

为什么?需求来源!

事实表(如出租车行程)针对事件进行了优化:

  • 上车时间
  • 行驶距离
  • 费用
  • 上车邮编

然而,分析团队会提出诸如:

  • 按地区统计行程
  • 按州统计收入

如果把这些属性一次又一次地存放在事实表中:

  • 会增加存储空间
  • 会降低连接速度

将这些属性拆分到维度模型中是最佳实践。在我们的项目中,了解上车/下车邮编所属地区的需求为创建 zip_dim 维度表奠定了基础。

在实际项目中,邮编元数据来源于:

  • 人口普查数据
  • 通过 API 暴露的数据
  • 内部参考表

本项目中,我们使用一些基于随机范围的硬编码值来模拟。

层级职责

虽然邮编出现在 Bronze 层数据中,但维度本身是经过策划的,因此应放在 Silver 层,而不是 Bronze。

步骤 1:为 zip_dim 表创建模式

-- Example schema creation (adjust column types as needed)
CREATE TABLE IF NOT EXISTS nyc_taxi.raw.zip_dim (
    zip_code STRING,
    city STRING,
    state STRING,
    latitude DOUBLE,
    longitude DOUBLE,
    region STRING,
    PRIMARY KEY (zip_code)
) USING DELTA;

步骤 2:从 Bronze 数据中读取唯一且有效的邮编列表——包括上车和下车邮编

from pyspark.sql.functions import col, lower, trim

zip_stream = (
    spark.readStream.format("delta")
    .table("nyc_taxi.bronze.taxi_trips")
    .select(
        trim(lower(col("pickup_zip"))).alias("zip_code")
    )
    .unionByName(
        spark.readStream.format("delta")
        .table("nyc_taxi.bronze.taxi_trips")
        .select(
            trim(lower(col("dropoff_zip"))).alias("zip_code")
        )
    )
    .filter(col("zip_code").rlike("^[0-9]{5}$"))  # keep only valid 5‑digit ZIPs
    .distinct()
)

步骤 3:为 ZIP 值分配随机元数据,以模拟实际的元数据填充

import random

def generate_random_metadata(zip_code):
    """Return a dictionary with simulated metadata for a given ZIP."""
    cities = ["New York", "Brooklyn", "Queens", "Bronx", "Staten Island"]
    states = ["NY"]
    regions = ["Northeast", "Midwest", "South", "West"]
    return {
        "zip_code": zip_code,
        "city": random.choice(cities),
        "state": random.choice(states),
        "latitude": round(random.uniform(40.5, 40.9), 6),
        "longitude": round(random.uniform(-74.3, -73.7), 6),
        "region": random.choice(regions)
    }

def upsert_zip_dim(batch_df, batch_id):
    """Upsert a batch of ZIP metadata into the zip_dim Delta table."""
    enriched_df = batch_df.rdd.map(lambda row: generate_random_metadata(row.zip_code)).toDF()
    (
        enriched_df.write.format("delta")
        .mode("append")
        .option("mergeSchema", "true")
        .saveAsTable("nyc_taxi.raw.zip_dim")
    )

步骤 4:通过批处理将 ZIP 元数据写入 nyc_taxi.raw.zip_dim Delta 表

(
    zip_stream.writeStream
    .foreachBatch(upsert_zip_dim)
    .outputMode("update")
    .option("checkpointLocation", "/tmp/checkpoints/zip_dim")
    .start()
)

ZIP 维度表 nyc_taxi.raw.zip_dim 已经准备就绪。

祝学习愉快!

Back to Blog

相关文章

阅读更多 »

RGB LED 支线任务 💡

markdown !Jennifer Davishttps://media2.dev.to/dynamic/image/width=50,height=50,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%...

Mendex:我为何构建

介绍 大家好。今天我想分享一下我是谁、我在构建什么以及为什么。 早期职业生涯与倦怠 我在 17 年前开始我的 developer 生涯……