跳转至

Concept: Columns(概念:列(Columns))

To follow the examples in this document add: from pyspark.sql import functions as F.

Columns are managed by the PySpark class: pyspark.sql.Column. Column instances are created whenever you directly reference or derive an expression from an existing column. You can reference a column in any of the following ways:

  • F.col("column_name")
  • F.column("column_name")

:::callout{theme="neutral"} Referencing columns is not equivalent to performing a select, since "selecting" columns refers to subsetting (and reordering) the columns that you want to appear in the resulting dataset. :::

Table of Contents

Getting the schema

DataFrame.columns

Returns all column names as a python list

columns = df.columns # ['age', 'name']

DataFrame.dtypes

Returns all column names and their data types as a list of tuples

dtypes = df.dtypes # [('age', 'int'), ('name', 'string')]

Select

DataFrame.select(*cols)

Returns a new DataFrame with a subset of columns from the originating DataFrame.

For example, we have a DataFrame with 6 named columns: id, first_name, last_name, phone_number, address, is_active_member

id first_name last_name phone_number zip_code is_active_member
1 John Doe (123) 456-7890 10014 true
2 Jane Eyre (213) 555-1234 90007 true
... ... ... ... ... ...

You may want to transform the DataFrame to only contain the named columns that you care about (a subset of what is available). Let's say you only want a table of just a single column phone_number:

df = df.select("phone_number")
phone_number
(123) 456-7890
(213) 555-1234
...

Or perhaps you want just the id, first_name, and last_name (there are at least 3 different ways to accomplish the same task):

  1. Passing in column names directly:
    df = df.select("id", "first_name", "last_name")
    
    or passing in column instances:
    df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
    
  2. Passing in an array of column names:
    select_columns = ["id", "first_name", "last_name"]
    df = df.select(select_columns)
    
  3. Passing in an "unpacked" array:
select_columns = ["id", "first_name", "last_name"]
df = df.select(*select_columns)
# same as: df = df.select("id", "first_name", "last_name")
id first_name last_name
1 John Doe
2 Jane Eyre
... ... ...

The * before select_columns unpacks the array so that it functionally behaves the same way as #1 (see comment). This enables you to do the following:

select_columns = ["id", "first_name", "last_name"]
return df.select(*select_columns, "phone_number")
# same as: df = df.select("id", "first_name", "last_name", "phone_number")
id first_name last_name phone_number
1 John Doe (123) 456-7890
2 Jane Eyre (213) 555-1234
... ... ... ...

Keep in mind that your output dataset will only contain the columns that you selected, and in the order they were selected, instead of preserving the original column order. The names are unique and case sensitive, and must already exist as a column in the dataset you are selecting from.

An exception to that rule is that you can derive a new column and immediately select for it. You need to give the newly derived column an alias, or name:

string1 string2 string3 string4
first second third Fourth
one two three four
derived_column = F.concat_ws(":", F.col("string1"), F.col("string2"))
return df.select("string3", derived_column.alias("derived"))
string3 derived
third first:second
three one:two

Create, update

DataFrame.withColumn(name, column)

new_df = old_df.withColumn("column_name", derived_column)
  • new_df: the resulting dataframe that contains all columns from old_df, but with new_column_name added.
  • old_df: the dataframe that we want to apply a new column to
  • column_name: the name of the column you want to either create (if it doesn't exist in old_df) or update (if it already exists in old_df).
  • derived_column: the expression that derives a column, which is applied to every row under column_name (or whatever name you give the column).

Given an existing DataFrame, you can either create new columns or update existing columns with new or modified values using the withColumn method. This is particularly useful for the following goals:

  1. deriving a new value based on an existing value
df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
  1. casting values from one type to another
# cast `start_timestamp` to DateType and store the new value in `start_date`
df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
  1. updating the column
# update column `string` with an all-lowercased version of itself
df = df.withColumn("string", F.lower(F.col("string")))

Rename, alias

DataFrame.withColumnRenamed(name, rename)

Use .withColumnRenamed() to rename a column:

df = df.withColumnRenamed("old_name", "new_name")

Another way of viewing the task of renaming columns, which should give you an insight of how PySpark optimizes transformation statements, is:

df = df.withColumn("new_name", F.col("old_name")).drop("old_name")

But there are also several cases where you derive a new column without withColumn and must still name it. This is where alias (or its method alias, name) comes handy. Here are a few usage examples:

df = df.select(derived_column.alias("new_name"))
df = df.select(derived_column.name("new_name")) # same as .alias("new_name")
df = df.groupBy("group") \
    .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count"))

We can also leverage rename multiple columns at once:

renames = {
    "column": "column_renamed",
    "data": "data_renamed",
}

for colname, rename in renames.items():
    df = df.withColumnRenamed(colname, rename)

Drop

DataFrame.drop(*cols)

Returns a new DataFrame with a subset of columns from the originating DataFrame, dropping the specified columns. (This fails if schema doesn't contain the given column names.)

There are two ways to drop columns: the direct way, and the indirect way. The indirect way is to use select, which you'd select for a subset of columns that you want to keep. The direct way is to use drop, which you'd provide a subset of columns you want to discard. Both have similar usage syntax, except here order doesn't matter. A few examples:

id first_name last_name phone_number zip_code is_active_member
1 John Doe (123) 456-7890 10014 true
2 Jane Eyre (213) 555-1234 90007 true
... ... ... ... ... ...

Let's say you want to drop only one column, phone_number:

df = df.drop("phone_number")
id first_name last_name zip_code is_active_member
1 John Doe 10014 true
2 Jane Eyre 90007 true
... ... ... ... ...

Or perhaps you want to drop id, first_name, and last_name (there are at least 3 different ways to accomplish the same task):

  1. Passing in column names directly:
    df = df.drop("id", "first_name", "last_name")
    
    or
    df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
    
  2. Passing in an array:
    drop_columns = ["id", "first_name", "last_name"]
    df = df.drop(drop_columns)
    
  3. Passing in an "unpacked" array:
drop_columns = ["id", "first_name", "last_name"]
df = df.drop(*drop_columns)
# same as: df = df.drop("id", "first_name", "last_name")
phone_number zip_code is_active_member
(123) 456-7890 10014 true
(213) 555-1234 90007 true
... ... ...

The * before drop_columns unpacks the array so that it functionally behaves the same way as #1 (see comment). This enables you to do the following:

drop_columns = ["id", "first_name", "last_name"]
df = df.drop(*drop_columns, "phone_number")
# same as: df = df.drop("id", "first_name", "last_name", "phone_number")
zip_code is_active_member
10014 true
90007 true
... ...

Cast

Column.cast(type)

Here are all the DataTypes that exist: NullType, StringType, BinaryType, BooleanType, DateType, TimestampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructType, StructField

In general, you can convert most datatypes from one to another using the cast method on columns:

from pyspark.sql.types import StringType
df.select(df.age.cast(StringType()).alias("age"))
# assume df.age is of IntegerType()

or

df.select(df.age.cast("string").alias("age"))
# effectively the same as using StringType().
age
"2"
"5"

Casting essentially creates a newly derived column, on which you can directly perform select, withColumn, filter, etc. The concept of "downcasting" and "upcasting" also applies in PySpark, so you may lose more granular information stored in a previous datatype, or gain garbage information.

When, otherwise

F.when(condition, value).otherwise(value2)

Parameters:

  • condition - a boolean Column expression
  • value - a literal value or Column expression

Evaluates into a column expression that is identical to the value or value2 parameter. If Column.otherwise() is not invoked, a column expression for None (null) is returned for unmatched conditions.

The when, otherwise operators provides an analogy to an if-else statement that computes a new column value. The basic usage is:

# CASE WHEN (age >= 21) THEN true ELSE false END
at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False)
# CASE WHEN (last_name != "") THEN last_name ELSE null
last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None)
df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))

You can chain the when statements too, as many times as you'd need:

switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")

These evaluations can be assigned to columns, or used in a filter:

df = df.withColumn("switch", switch) # switch=A, B, or C
df = df.where(~F.isnull(last_name)) # filter for rows where the last_name (after empty strings were evaluated as null values) is not null

中文翻译

概念:列(Columns)

要遵循本文档中的示例,请添加:from pyspark.sql import functions as F

列(Columns)由PySpark类pyspark.sql.Column管理。当你直接引用现有列或从现有列派生表达式时,就会创建列实例(Column instances)。你可以通过以下任一方式引用列:

  • F.col("column_name")
  • F.column("column_name")

:::callout{theme="neutral"} 引用列(Referencing columns)并不等同于执行select操作,因为"选择(selecting)"列指的是对结果数据集中要显示的列进行子集选择(和重新排序)。 :::

目录

获取模式(Schema)

DataFrame.columns

以Python列表形式返回所有列名

columns = df.columns # ['age', 'name']

DataFrame.dtypes

以元组列表形式返回所有列名及其数据类型

dtypes = df.dtypes # [('age', 'int'), ('name', 'string')]

选择(Select)

DataFrame.select(*cols)

返回一个新的DataFrame,其中包含原始DataFrame中的列子集。

例如,我们有一个包含6个命名列的DataFrame:idfirst_namelast_namephone_numberaddressis_active_member

id first_name last_name phone_number zip_code is_active_member
1 John Doe (123) 456-7890 10014 true
2 Jane Eyre (213) 555-1234 90007 true
... ... ... ... ... ...

你可能希望转换DataFrame,使其只包含你关心的命名列(可用列的子集)。假设你只想要一个只包含phone_number列的表格:

df = df.select("phone_number")
phone_number
(123) 456-7890
(213) 555-1234
...

或者你可能只想要idfirst_namelast_name(至少有3种不同的方法可以实现相同的任务):

  1. 直接传入列名:
    df = df.select("id", "first_name", "last_name")
    
    或传入列实例:
    df = df.select(F.col("id"), F.col("first_name"), F.col("last_name"))
    
  2. 传入列名数组:
    select_columns = ["id", "first_name", "last_name"]
    df = df.select(select_columns)
    
  3. 传入"解包(unpacked)"数组:
select_columns = ["id", "first_name", "last_name"]
df = df.select(*select_columns)
# 等同于:df = df.select("id", "first_name", "last_name")
id first_name last_name
1 John Doe
2 Jane Eyre
... ... ...

select_columns前面的*解包(unpacks)数组,使其在功能上与#1相同(参见注释)。这使你能够执行以下操作:

select_columns = ["id", "first_name", "last_name"]
return df.select(*select_columns, "phone_number")
# 等同于:df = df.select("id", "first_name", "last_name", "phone_number")
id first_name last_name phone_number
1 John Doe (123) 456-7890
2 Jane Eyre (213) 555-1234
... ... ... ...

请记住,输出数据集将只包含你选择的列,并且按照选择的顺序排列,而不是保留原始列顺序。列名是唯一的且区分大小写,并且必须已经作为列存在于你从中选择的数据库中。

这条规则的一个例外是,你可以派生一个新列并立即选择它。你需要给新派生的列一个alias(别名)或名称:

string1 string2 string3 string4
first second third Fourth
one two three four
derived_column = F.concat_ws(":", F.col("string1"), F.col("string2"))
return df.select("string3", derived_column.alias("derived"))
string3 derived
third first:second
three one:two

创建、更新(Create, update)

DataFrame.withColumn(name, column)

new_df = old_df.withColumn("column_name", derived_column)
  • new_df:结果DataFrame,包含old_df中的所有列,并添加了new_column_name
  • old_df:我们要应用新列的DataFrame
  • column_name:你要创建(如果在old_df中不存在)或更新(如果在old_df中已存在)的列的名称。
  • derived_column:派生列的表达式,应用于column_name(或你给该列指定的任何名称)下的每一行。

给定一个现有的DataFrame,你可以使用withColumn方法创建新列或使用新值或修改后的值更新现有列。这对于以下目标特别有用:

  1. 基于现有值派生新值
df = df.withColumn("times_two", F.col("number") * 2) # times_two = number * 2
df = df.withColumn("concat", F.concat(F.col("string1"), F.col("string2")))
  1. 将值从一种类型转换为另一种类型
# 将`start_timestamp`转换为DateType并将新值存储在`start_date`中
df = df.withColumn("start_date", F.col("start_timestamp").cast("date"))
  1. 更新列
# 将`string`列更新为其全小写版本
df = df.withColumn("string", F.lower(F.col("string")))

重命名、别名(Rename, alias)

DataFrame.withColumnRenamed(name, rename)

使用.withColumnRenamed()重命名列:

df = df.withColumnRenamed("old_name", "new_name")

另一种看待重命名列任务的方式是(这应该能让你了解PySpark如何优化转换语句):

df = df.withColumn("new_name", F.col("old_name")).drop("old_name")

但在某些情况下,你派生新列时没有使用withColumn,仍然需要为其命名。这时alias(或其方法别名name)就派上用场了。以下是一些使用示例:

df = df.select(derived_column.alias("new_name"))
df = df.select(derived_column.name("new_name")) # 等同于 .alias("new_name")
df = df.groupBy("group") \
    .agg(F.sum("number").alias("sum_of_numbers"), F.count("*").alias("count"))

我们还可以一次性重命名多个列:

renames = {
    "column": "column_renamed",
    "data": "data_renamed",
}

for colname, rename in renames.items():
    df = df.withColumnRenamed(colname, rename)

删除(Drop)

DataFrame.drop(*cols)

返回一个新的DataFrame,其中包含原始DataFrame中的列子集,删除指定的列。(如果模式中不包含给定的列名,则会失败。)

有两种删除列的方法:直接方式和间接方式。间接方式是使用select,选择你想要保留的列子集。直接方式是使用drop,提供你想要丢弃的列子集。两者的使用语法类似,只是在这里顺序不重要。几个示例:

id first_name last_name phone_number zip_code is_active_member
1 John Doe (123) 456-7890 10014 true
2 Jane Eyre (213) 555-1234 90007 true
... ... ... ... ... ...

假设你只想删除一个列phone_number

df = df.drop("phone_number")
id first_name last_name zip_code is_active_member
1 John Doe 10014 true
2 Jane Eyre 90007 true
... ... ... ... ...

或者你可能想要删除idfirst_namelast_name(至少有3种不同的方法可以实现相同的任务):

  1. 直接传入列名:
    df = df.drop("id", "first_name", "last_name")
    
    df = df.drop(F.col("id"), F.col("first_name"), F.col("last_name"))
    
  2. 传入数组:
    drop_columns = ["id", "first_name", "last_name"]
    df = df.drop(drop_columns)
    
  3. 传入"解包(unpacked)"数组:
drop_columns = ["id", "first_name", "last_name"]
df = df.drop(*drop_columns)
# 等同于:df = df.drop("id", "first_name", "last_name")
phone_number zip_code is_active_member
(123) 456-7890 10014 true
(213) 555-1234 90007 true
... ... ...

drop_columns前面的*解包(unpacks)数组,使其在功能上与#1相同(参见注释)。这使你能够执行以下操作:

drop_columns = ["id", "first_name", "last_name"]
df = df.drop(*drop_columns, "phone_number")
# 等同于:df = df.drop("id", "first_name", "last_name", "phone_number")
zip_code is_active_member
10014 true
90007 true
... ...

类型转换(Cast)

Column.cast(type)

以下是所有存在的数据类型(DataTypes):NullTypeStringTypeBinaryTypeBooleanTypeDateTypeTimestampTypeDecimalTypeDoubleTypeFloatTypeByteTypeIntegerTypeLongTypeShortTypeArrayTypeMapTypeStructTypeStructField

通常,你可以使用列上的cast方法将大多数数据类型从一种转换为另一种:

from pyspark.sql.types import StringType
df.select(df.age.cast(StringType()).alias("age"))
# 假设df.age是IntegerType()类型

df.select(df.age.cast("string").alias("age"))
# 实际上等同于使用StringType()。
age
"2"
"5"

类型转换(Casting)本质上是创建一个新派生的列,你可以直接在其上执行selectwithColumnfilter等操作。"向下转型(downcasting)"和"向上转型(upcasting)"的概念也适用于PySpark,因此你可能会丢失先前数据类型中存储的更精细信息,或者获得垃圾信息。

条件判断(When, otherwise)

F.when(condition, value).otherwise(value2)

参数:

  • condition - 一个布尔类型的列表达式(Column expression)
  • value - 一个字面值或列表达式(Column expression)

计算为一个列表达式,该表达式与valuevalue2参数相同。如果未调用Column.otherwise(),则对于不匹配的条件,返回None(null)的列表达式。

whenotherwise运算符提供了类似于if-else语句的功能,用于计算新的列值。基本用法是:

# CASE WHEN (age >= 21) THEN true ELSE false END
at_least_21 = F.when(F.col("age") >= 21, True).otherwise(False)
# CASE WHEN (last_name != "") THEN last_name ELSE null
last_name = F.when(F.col("last_name") != "", F.col("last_name")).otherwise(None)
df = df.select(at_least_21.alias("at_least_21"), last_name.alias("last_name"))

你也可以根据需要多次链式调用when语句:

switch = F.when(F.col("age") >= 35, "A").when(F.col("age") >= 21, "B").otherwise("C")

这些计算结果可以分配给列,或用于过滤:

df = df.withColumn("switch", switch) # switch=A, B, 或 C
df = df.where(~F.isnull(last_name)) # 筛选出last_name(在空字符串被评估为null值之后)不为null的行