第12天:UDF vs Pandas UDF
Source: Dev.to
欢迎来到 Spark Mastery 系列的第 12 天!
UDF(用户自定义函数)会显著拖慢 Spark 作业——仅添加一个 UDF,运行时间就可能增加至 10 倍。了解其原因并避免这种陷阱至关重要。
UDF(用户自定义函数)
UDF 是应用于 Spark DataFrame 的 Python 函数。
from pyspark.sql.functions import udf
@udf("string")
def reverse_name(name):
return name[::-1]
使用普通 UDF 时,Spark 必须:
- 将每条记录发送到 Python
- 执行 Python 代码
- 将结果转换回 JVM
- 将结果合并回 DataFrame
每条记录都要跨越 Python ↔ JVM 边界,这非常慢。
内置函数 — 始终首选
Spark 的原生函数是用 Scala 实现的,具备向量化并经过 Catalyst 优化。它们还支持谓词下推和列裁剪。
df.withColumn("upper_name", upper(col("name")))
**规则:**如果 Spark 已提供内置函数,绝不要自行编写 UDF。
Pandas UDF — 正规 UDF 的最佳替代方案
普通 UDF 逐行在 Python 中处理。
Pandas UDF 使用 Apache Arrow 对整批数据(向量化)进行操作,带来巨大的速度提升。
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def multiply_by_two(col):
return col * 2
Spark 以批次而非逐行的方式发送数据,从而实现显著的性能提升。
Pandas UDF 的类型
标量 Pandas UDF
@pandas_udf("double")
def add_one(col):
return col + 1
分组映射 UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def my_grouped_map(pdf):
# 对每个分组的 pandas DataFrame 进行自定义转换
return pdf
典型使用场景:
- 时间序列转换
- 按用户模型训练
- 按分组清洗
分组聚合 UDF
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def my_grouped_agg(pdf):
# 为每个分组返回单个聚合值
return pdf.mean()
**适用场景:**统计聚合、机器学习指标等。
何时应使用普通 UDF?
仅在以下情况考虑使用普通 UDF:
- 没有合适的内置函数可用
- 操作无法向量化
- 需要大量自定义 Python 逻辑
在典型的 ETL 流程中,这类情况很少见。
实际案例:性能差异
| 方法 | 运行时间 |
|---|---|
| 普通 UDF | 50 秒 |
| Pandas UDF | 8 秒 |
| 内置函数 | 1 秒 |
如此鲜明的对比说明了高级工程师为何除非万不得已才会使用普通 UDF。
总结指南
- 尽可能使用内置函数。
- 对可向量化的自定义逻辑使用 Pandas UDF。
- 将 普通 UDF 仅保留给真正例外的情况。
遵循这些实践,你的 Spark 作业性能将会大幅提升。