Concept: User-defined functions(概念:用户自定义函数(User-defined functions))¶
User Defined Functions let you use your own arbitrary Python in PySpark. For example, you could use a UDF to parse information from a complicated text format in each row of your dataset.
After declaration, a UDF works similarly to built in PySpark functions such as concat, date_diff, trim, etc.
Motivation¶
Unintuitively, under normal circumstances data is never actually brought into your Python code. When you manipulate DataFrames using PySpark, you are describing the steps that the Spark cluster should take in a distributed, parallel fashion to get your final DataFrame. This allows Spark and Foundry to scale almost ad infinitum, but introduces the minor setup of UDFs for injecting code to run within the cluster on actual data. PySpark sends your UDF code to each server running your query.
Consider not using UDFs¶
The overhead of Python as opposed to Spark's optimized built in functionality makes UDFs relatively slow. Consider expressing your logic with PySpark's built-ins.
Example¶
"Weather report: rain 55-62"
Suppose we want to get the low temperature from the following weather format, in this case 55. We can write the following ordinary Python function,
def extract_low_temperature(weather_report):
return int(weather_report.split(' ')[-1].split('-')[0])
We'll create a UDF around our function extract_low_temperature to integrate it into our PySpark query. Creating a UDF involves providing our function and its expected return type in PySpark's type system.
# Import the necessary type
from pyspark.sql.types import IntegerType
# Wrap our function as a UDF
low_temp_udf = F.udf(extract_low_temperature, IntegerType())
Now the UDF can be used on a DataFrame, taking a whole column as an argument.
df = df.withColumn('low', low_temp_udf(F.col('weather_report')))
| id | weather_report | low |
|---|---|---|
| 1 | Weather report: rain 55-62 | 55 |
| 2 | Weather report: sun 69-74 | 69 |
| 3 | Weather report: clouds 31-34 | 31 |
Reading from Multiple Columns¶
A UDF can take arbitrary column arguments. The column arguments correspond to the function arguments.
from pyspark.sql.types import StringType
def weather_quality(temperature, windy):
if temperature > 70 and windy == False:
return "good"
else:
return "bad"
weather_udf = F.udf(weather_quality, StringType())
df = df.withColumn('quality', weather_udf(F.col('temp'), F.col('wind')))
| id | temp | wind | quality |
|---|---|---|---|
| 1 | 73 | false | good |
| 2 | 36 | false | bad |
| 3 | 90 | true | bad |
中文翻译¶
概念:用户自定义函数(User-defined functions)¶
用户自定义函数(User Defined Functions, UDFs)允许您在PySpark中使用任意自定义Python代码。例如,您可以使用UDF从数据集中每行的复杂文本格式中解析信息。
声明后,UDF的工作方式与内置的PySpark函数(如concat、date_diff、trim等)类似。
动机¶
与直觉相反,在正常情况下,数据实际上永远不会被带入您的Python代码中。当您使用PySpark操作DataFrame时,您是在描述Spark集群应以分布式、并行方式执行的步骤,以获取最终的DataFrame。这使得Spark和Foundry几乎可以无限扩展,但同时也引入了UDF的少量配置工作,以便将代码注入到集群中实际数据上运行。PySpark会将您的UDF代码发送到运行查询的每台服务器。
考虑不使用UDF¶
与Spark优化的内置功能相比,Python的开销使得UDF相对较慢。请考虑使用PySpark的内置函数来表达您的逻辑。
示例¶
"Weather report: rain 55-62"
假设我们想从以下天气格式中获取低温,本例中为55。我们可以编写以下普通的Python函数:
def extract_low_temperature(weather_report):
return int(weather_report.split(' ')[-1].split('-')[0])
我们将围绕函数extract_low_temperature创建一个UDF,以将其集成到PySpark查询中。创建UDF需要提供我们的函数及其在PySpark类型系统中的预期返回类型。
# 导入必要的类型
from pyspark.sql.types import IntegerType
# 将函数包装为UDF
low_temp_udf = F.udf(extract_low_temperature, IntegerType())
现在,该UDF可以在DataFrame上使用,并将整个列作为参数。
df = df.withColumn('low', low_temp_udf(F.col('weather_report')))
| id | weather_report | low |
|---|---|---|
| 1 | Weather report: rain 55-62 | 55 |
| 2 | Weather report: sun 69-74 | 69 |
| 3 | Weather report: clouds 31-34 | 31 |
从多列读取¶
UDF可以接受任意列参数。列参数对应函数的参数。
from pyspark.sql.types import StringType
def weather_quality(temperature, windy):
if temperature > 70 and windy == False:
return "good"
else:
return "bad"
weather_udf = F.udf(weather_quality, StringType())
df = df.withColumn('quality', weather_udf(F.col('temp'), F.col('wind')))
| id | temp | wind | quality |
|---|---|---|---|
| 1 | 73 | false | good |
| 2 | 36 | false | bad |
| 3 | 90 | true | bad |