Joins(连接(Joins))¶
A DataFrame in PySpark can be joined to another dataframe or to itself just as tables can be joined in SQL. Dataframes are joined to other dataframes with the .join() method. It takes a DataFrame, a join constraint such as the name of a column to join on, and a method (left, right, inner, etc.)
Simple left join¶
df_joined = df_left.join(df_right, 'key', 'left')
df_joined is now the result of a left join on df_left.key == df_right.key. PySpark automatically drops one of the copies of the key column so that df_joined only contains one column named key.
:::callout{theme="neutral"}
If the key(s) to be joined on in df_left and df_right do not have the same name, it is recommended to rename them first before performing the join.
:::
Colliding names¶
Be sure to rename or drop any fields you are not explicitly joining on that have the same name as these will collide once the join is complete. All columns in a DataFrame can be renamed to have a certain prefix in a loop like so,
for column in df.columns:
df = df.withColumnRenamed(column, 'some_prefix_' + column)
Joining on multiple fields¶
The .join() method can take a list of fields to join on instead of a single field.
df_joined = df_left.join(df_right, ['column1', 'column2', 'column3'], 'left')
df_joined is now a join on column1, column2 and column3. Again, this assumes the column names are consistent between df_left and df_right.
Advanced arbitrary join constraints¶
PySpark supports using an arbitrary expression to join using logical operators. Suppose we want to join on a column ID, a date start in our left DataFrame being before a date end in our right DataFrame, and depending on the contents of a certain field X, possibly require or not require that Y in our right DataFrame contains yet another value.
key_constraint = df_left.ID == df_right.ID
date_constraint = df_left.start < df_right.end
case_constraint = F.when(df_left.X == 'some_value', df_right.Y == 'some_other_value')\
.otherwise(True)
combined_constraints = key_constraint & date_constraint & case_constraint
df_joined = df_left.join(df_right, combined_constraints, 'left')
Cross join (Cartesian product)¶
Use a cross join to generate all combinations of rows between two dataframes, also known as the Cartesian product, without any matching by key or other constraint. Cross joins should be avoided if possible due to their risk of introducing memory and performance problems.
:::callout{theme="warning" title="Warning"} Don't use a cross join if you intend to immediately filter down the results. Instead, embed your filter criteria into the join constraint for a more efficient solution (see Advanced arbitrary join constraints above). :::
You must explicitly import the profile CROSS_JOIN_ENABLED in your Code Repository to use cross joins.
from transforms.api import configure
@configure(profile=["CROSS_JOIN_ENABLED"])
@transform_df(
...
)
def my_compute_function(input_a, input_b):
return input_a.crossJoin(input_b)
中文翻译¶
连接(Joins)¶
PySpark中的DataFrame可以像SQL中连接表一样,与另一个DataFrame或自身进行连接。DataFrame通过.join()方法与其他DataFrame连接。该方法接受一个DataFrame、一个连接约束条件(如要连接的列名)以及连接方式(left左连接、right右连接、inner内连接等)。
简单左连接(Simple left join)¶
df_joined = df_left.join(df_right, 'key', 'left')
df_joined现在是对df_left.key == df_right.key执行left左连接的结果。PySpark会自动删除key列的重复副本,因此df_joined中仅包含一个名为key的列。
:::callout{theme="neutral"}
如果df_left和df_right中要连接的键(key)名称不同,建议在执行连接前先重命名这些列。
:::
列名冲突(Colliding names)¶
请务必重命名或删除未显式用于连接但名称相同的字段,否则连接完成后会产生冲突。可以通过循环为DataFrame中的所有列添加统一前缀来重命名,示例如下:
for column in df.columns:
df = df.withColumnRenamed(column, 'some_prefix_' + column)
多字段连接(Joining on multiple fields)¶
.join()方法可以接受一个字段列表(而非单个字段)作为连接条件。
df_joined = df_left.join(df_right, ['column1', 'column2', 'column3'], 'left')
df_joined现在是对column1、column2和column3进行连接的结果。同样,这要求df_left和df_right中的列名保持一致。
高级任意连接约束(Advanced arbitrary join constraints)¶
PySpark支持使用任意表达式通过逻辑运算符进行连接。假设我们需要根据列ID进行连接,同时要求左侧DataFrame中的日期start早于右侧DataFrame中的日期end,并且根据字段X的内容,可能要求右侧DataFrame中的Y包含另一个特定值。
key_constraint = df_left.ID == df_right.ID
date_constraint = df_left.start < df_right.end
case_constraint = F.when(df_left.X == 'some_value', df_right.Y == 'some_other_value')\
.otherwise(True)
combined_constraints = key_constraint & date_constraint & case_constraint
df_joined = df_left.join(df_right, combined_constraints, 'left')
交叉连接(Cross join)(笛卡尔积)¶
交叉连接用于生成两个DataFrame之间所有行的组合,即笛卡尔积(Cartesian product),无需通过键或其他约束进行匹配。由于交叉连接可能引发内存和性能问题,应尽量避免使用。
:::callout{theme="warning" title="警告"} 如果计划在交叉连接后立即过滤结果,请不要使用交叉连接。相反,应将过滤条件嵌入到连接约束中,以获得更高效的解决方案(请参阅上文高级任意连接约束)。 :::
要使用交叉连接,必须在代码仓库(Code Repository)中显式导入配置文件 CROSS_JOIN_ENABLED。
from transforms.api import configure
@configure(profile=["CROSS_JOIN_ENABLED"])
@transform_df(
...
)
def my_compute_function(input_a, input_b):
return input_a.crossJoin(input_b)