第6部分:Silver Layer – 清洗、丰富和维度
发布: (2026年1月2日 GMT+8 18:50)
2 min read
原文: Dev.to
Source: Dev.to

银层通过以下方式将原始事件转换为可用于分析的记录:
- 清理错误数据
- 强制执行模式
- 添加业务上下文
- 应用维度建模
价值就在此产生。
数据清洗与类型强制
- Bronze 层必须保持原样。
- Silver 层确保数据正确性,并将错误与摄取过程隔离。
silver_stream = (
spark.readStream
.format("delta")
.table("nyc_taxi.bronze.taxi_trips")
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"fare_amount",
col("fare_amount").cast("double")
)
.filter(col("fare_amount") > 0)
)
使用广播连接
为了实现所需的维度建模,需要与 zip_dim(一个相对较小的表)进行连接。将大表在执行器之间进行 shuffle 成本高昂,因此使用广播连接来提升性能。
未使用广播连接

使用广播连接

添加水印
实时数据必须及时处理,但也需要处理迟到的数据。水印告诉 Spark 在最终确定结果之前,最多等待 30 分钟的迟到数据。
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
bronze_stream = spark.readStream.table("nyc_taxi.bronze.taxi_trips")
zip_dim = spark.read.table("nyc_taxi.raw.zip_dim")
silver_df = (
bronze_stream
.withColumn(
"pickup_zip",
regexp_replace("pickup_zip", "\.0$", "").cast("int")
)
.withColumn(
"tpep_pickup_datetime",
to_timestamp("tpep_pickup_datetime")
)
.withColumn(
"tpep_dropoff_datetime",
to_timestamp("tpep_dropoff_datetime")
)
.withWatermark("tpep_pickup_datetime", "30 minutes")
.join(
broadcast(zip_dim),
bronze_stream.pickup_zip == zip_dim.zip_code,
"left"
)
.select(
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"trip_distance",
"fare_amount",
"pickup_zip",
"region",
"state"
)
)
# 将流写入 Silver Delta 表
(
silver_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/Volumes/nyc_taxi/infra/checkpoints/silver")
.trigger(availableNow=True)
.toTable("nyc_taxi.silver.taxi_trips_enriched")
)
所需的清洗和标准化已经完成,数据现在已准备好进行进一步成熟和业务洞察生成。
祝学习愉快!