Window(窗口函数(Window))¶
A window function allows a user to append aggregates and other values to rows in a dataframe without losing columns that aren't involved in the aggregates.
Motivating Example¶
Suppose you have a transactions dataset with each transaction a customer has made, and you want to compute the average_spend for each customer and append it to each transaction. To do this using joins, you would need to perform a group by to get the averages and then join back to the original table:
averages = transactions\
.groupBy('customer')\
.agg(
F.avg('spend').alias('average_spend')
)
transactions = transactions.join(averages, ['customer'], 'left_outer')
If you wanted to get the maximum spend, this logic becomes even more complex, as you now have to compute the maximum instead of the average and then join back onto the maximum:
maximums = transactions\
.groupBy('customer')\
.max(
F.avg('spend').alias('max_spend')
)
transactions = transactions\
.join(
averages,
(transactions.customer == maximums.customer) &\
(transactions.spend == maximums.max_spend),
'left_outer'
).drop(maximums.customer)
Window functions, however, allow you to simplify this code by first defining a window and then computing aggregates "over" the window:
from pyspark.sql.window import Window
window = Window()\
.partitionBy('customer')\
.orderBy('spend')
transactions = transactions\
.withColumn('average_spend', F.avg('spend').over(window))
.withColumn('max_spend', F.max('spend').over(window))
In addition, there are several functions that may only be used with windows. These are known as Window Functions and are described in the next section.
Window Functions¶
dense_rank()¶
lag(col, count=1, default=None)¶
lead(col, count=1, default=None)¶
ntile(n)¶
percent_rank()¶
rank()¶
row_number()¶
window(timeColumn, windowDuration, slideDuration=None, startTime=None)¶
中文翻译¶
窗口函数(Window)¶
窗口函数(Window function)允许用户向数据框(DataFrame)的行中追加聚合值和其他数值,同时保留未参与聚合的列。
示例说明¶
假设您有一个包含每位客户(customer)每笔交易(transactions)的数据集,并且您想要计算每位客户的平均消费(average_spend),并将其追加到每笔交易中。如果使用连接(join)方式实现,您需要先进行分组聚合(group by)获取平均值,然后再连接回原始表:
averages = transactions\
.groupBy('customer')\
.agg(
F.avg('spend').alias('average_spend')
)
transactions = transactions.join(averages, ['customer'], 'left_outer')
如果您想要获取最大消费额,这个逻辑会变得更加复杂,因为您现在需要计算最大值而非平均值,然后再连接回最大值:
maximums = transactions\
.groupBy('customer')\
.max(
F.avg('spend').alias('max_spend')
)
transactions = transactions\
.join(
averages,
(transactions.customer == maximums.customer) &\
(transactions.spend == maximums.max_spend),
'left_outer'
).drop(maximums.customer)
然而,窗口函数(Window function)允许您简化这段代码:首先定义一个窗口(window),然后在窗口上计算聚合值:
from pyspark.sql.window import Window
window = Window()\
.partitionBy('customer')\
.orderBy('spend')
transactions = transactions\
.withColumn('average_spend', F.avg('spend').over(window))
.withColumn('max_spend', F.max('spend').over(window))
此外,还有一些函数只能与窗口函数一起使用。这些函数被称为窗口函数(Window Functions),将在下一节中介绍。