跳转至

Style guide(风格指南)

PySpark is a wrapper language that allows you to interface with an Apache Spark backend to quickly process data. Spark can operate on very large datasets across a distributed network of servers, which provides major performance and reliability benefits when used correctly. It presents challenges, even for experienced Python developers, as the PySpark syntax draws on the JVM heritage of Spark and therefore implements code patterns that might be unfamiliar.

This opinionated guide to PySpark code style presents common situations and the associated best practices based on the most frequent recurring topics across the PySpark repositories we've encountered.

To enforce consistent code style, each main repository should have Pylint ↗ enabled, with the same configuration. We provide some PySpark-specific checkers ↗ that you can additionally include into your Pylint to match the rules listed in this document. See the documentation on enabling style checks for details on our built-in Pylint plugin for Python repositories.

Beyond PySpark specifics, the general practices of clean code are important in PySpark repositories - the Google PyGuide ↗ is a good starting point.

Prefer implicit column selection to direct access, except for disambiguation

# bad
df = df.select(F.lower(df1.colA), F.upper(df2.colB))

# good
df = df.select(F.lower(F.col("colA")), F.upper(F.col("colB")))

The preferred option may look more complicated, longer and polluted - and that is correct; in fact it is best to avoid using F.col() altogether. But there are certain cases where using it, or the alternative explicit selection, is unavoidable. There is however a very good reason to prefer the second example over the first one.

When using explicit columns as in the first case, both the dataframe name and schema are explicitly bound to the dataframe variable. This means that if df1 is deleted or renamed the reference df1.colA will break.

By contrast, F.col("colA") will always reference a column called “colA” in the dataframe being operated on, which in this case happens to be named df. It does not require keeping track of other dataframes' states at all, so the code becomes more local and less prone to “spooky interaction at a distance”, which are often challenging to debug.

Other good reasons to avoid the first case:

  • If the dataframe variable name is large, expressions involving it quickly become unwieldy;
  • If the column name has a space or some other unsupported character that requires access by the bracket operator then df1["colA"] is just as difficult to write as F.col(“colA”);
  • Assigning an abstract expression like F.col("prod_status") == 'Delivered' to a variable makes it reusable for multiple dataframes, while df.prod_status == 'Delivered' is always bound to df

Fortunately, usually a convoluted expression with F.col() is not required. The only exceptions are F.lower, F.upper, ... and these ↗.

Caveats

In some contexts, there is access to columns from more than one dataframe, and there may be an overlap in names. A common example is in matching expressions like df.join(df2, on=(df.key == df2.key), how='left'). In such cases it is fine to reference columns by their dataframe directly. You can also disambiguate joins using dataframe aliases (see more in the Joins section in this guide).

Avoid iterating over columns in favor of list comprehension

In general, for loops are inefficient in Spark. At a high level, this is because Spark is lazily evaluated and will only process one for loop at a time. This may cause slower run times if all parts of the loop can be processed at once, and may result in Driver out of memory errors (OOMs). To rename all columns in a dataset from uppercase to lower case, instead of the first example below (labeled # bad), we suggest using list comprehension instead (as in the second example labeled # good):

# bad
for colm in df.columns:
    df = df.withColumnRenamed(colm, colm.lower())
# good
df = df.select(
    *[F.col(colm).alias(colm.lower()) for colm in df.columns]
)

Using list comprehension as in the # good example will avoid the slow performance and query plan issues discussed above while still getting the same desired result.

Refactor complex logical operations

Logical operations, which often reside inside .filter() or F.when(), need to be readable. We apply the same rule as with chaining functions, keeping logic expressions inside the same code block to 3 expressions at most. If they grow longer, it is often a sign that the code can be simplified or extracted out. Extracting out complex logical operations into variables or functions makes the code easier to read and reason about, which also reduces bugs.

# bad
F.when( (df.prod_status == 'Delivered') | (((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & 
((df.currentRegistration.rlike('.+')) | ((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & 
(df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')))))), 'In Service')

We can simplify the code above in different ways. To start, let's focus on grouping the logic steps in a few named variables. Pyspark requires that expressions are wrapped with parentheses. This, mixed with actual parenthesis to group logical operations can hurt readability. For example the code above has a redundant (F.datediff(df.deliveryDate_actual, df.current_date) < 0) that the original author didn't notice because it's very hard to spot.

# better
has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+'))
delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
has_registration = (df.currentRegistration.rlike('.+'))
is_delivered = (df.prod_status == 'Delivered')

F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')

The above example is easier to read, and also drops the redundant expression. We can improve it further by reducing the number of operations.

# good
has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+'))
delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
has_registration = (df.currentRegistration.rlike('.+'))
is_delivered = (df.prod_status == 'Delivered')
is_active = (has_registration | has_operator)

F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')

Note how the F.when expression is now succinct and readable and the desired behavior is clear to anyone reviewing this code. The reader only needs to visit the individual expressions if they suspect there is an error there. It also makes each chunk of logic easy to test if you have unit tests in your code, and want to abstract them as functions.

There is still some duplication of code in the final example: we leave how to remove that duplication as an exercise for the reader.

Use select statements to specify a schema contract

Doing a select at the beginning of a PySpark transform, or before returning, is considered good practice. This select statement specifies the contract with both the reader and the code about the expected dataframe schema for inputs and outputs.

Any select should be seen as a cleaning operation that is preparing the dataframe for consumption by the next step in the transform.

Always aim to keep select statements as simple as possible. Due to common SQL idioms, allow for up to one function from spark.sql.function to be used per selected column, plus an optional .alias() to give it a meaningful name. Keep in mind that this should be used sparingly, and if there are more than three such uses in the same select, refactor it into a separate function like clean_<dataframe name>() to encapsulate the operation.

Never allow expressions involving more than one dataframe, or conditional operations like .when() to be used in a select.

# bad
aircraft = aircraft.select(
    'aircraft_id',
    'aircraft_msn',
    F.col('aircraft_registration').alias('registration'),
    'aircraft_type',
    F.avg('staleness').alias('avg_staleness'),
    F.col('number_of_economy_seats').cast('long'),
    F.avg('flight_hours').alias('avg_flight_hours'),
    'operator_code',
    F.col('number_of_business_seats').cast('long'),
)

Cluster together the operations of the same type together. All individual columns should be listed upfront, while calls to functions from spark.sql.function should go on separate lines.

# good
aircraft = aircraft.select(
    'aircraft_id', 'aircraft_msn', 'aircraft_type', 'operator_code',
    F.col('aircraft_registration').alias('registration'),
    F.col('number_of_economy_seats').cast('long'),
    F.col('number_of_business_seats').cast('long'),
    F.avg('staleness').alias('avg_staleness'),
    F.avg('flight_hours').alias('avg_flight_hours'),
)

The select() statement, by its nature, redefines the schema of a dataframe, so it naturally supports the inclusion or exclusion of columns, old and new, as well as the redefinition of pre-existing ones. By centralizing all such operations in a single statement, it becomes much easier to identify the final schema, which makes code more readable. It also makes code slightly more concise.

Instead of calling withColumnRenamed(), use aliases:

# bad
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# good
df.select('key', F.col('comments').alias('num_comments'))

Instead of using withColumn() to redefine type, cast in the select:

# bad
df.select('comments').withColumn('comments', F.col('comments').cast('double'))

# good
df.select(F.col('comments').cast('double'))

But keep it simple:

# bad
df.select(
    ((F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp())
    - F.unix_timestamp('created_at')) / 86400).alias('days_open')
)

# good
df.withColumn(
    'days_open',
    (F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400
)

Avoid including columns in the select statement if they are going to remain unused and choose instead an explicit set of columns - this is a preferred alternative to using .drop() since it guarantees that schema mutations won't cause unexpected columns bloating your dataframe. That said, dropping columns isn't inherently discouraged in all cases; for instance it is commonly appropriate after joins since it is common for joins to introduce redundant columns.

Finally, instead of adding new columns by means of the select statement, it's recommended using .withColumn() instead.

Empty columns

If you need to add an empty column to satisfy a schema, always use F.lit(None) for populating that column. Never use an empty string or some other string signalling an empty value (such as NA).

Beyond being semantically correct, one practical reason for this preserving the ability to use utilities like isNull, instead of having to verify empty strings, and nulls, and 'NA', etc.

# bad
df = df.withColumn('foo', F.lit(''))

# bad
df = df.withColumn('foo', F.lit('NA'))

# good - note necessary cast since `None` is typeless. Choose the appropriate type based on expected use.
df = df.withColumn('foo', F.lit(None).cast('string'))

Using comments

While comments can provide useful insight into code, it is often more valuable to refactor the code to improve its readability. The code should be readable by itself. If you are using comments to explain the logic step by step, you should refactor it.

# bad

# Cast the timestamp columns
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

In the example above, we can see that those columns are getting cast to Timestamp. The comment doesn't add much value. Moreover, a more verbose comment might still be unhelpful if it only provides information that already exists in the code. For example:

# bad

# Go through each column, remove 1000 because millis and cast to timestamp
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

Instead of leaving comments that just describe the logic you wrote, you should aim at leaving comments that give context, explaining the "why" of decisions you made when writing the code. This is particularly important for PySpark, since the reader can understand your code, but often doesn't have context on the data that feeds into your PySpark transform. Small pieces of logic might have involved hours of digging through data to understand the correct behavior, in which case comments explaining the rational are especially valuable.

# good

# The consumer of this dataset expects a timestamp instead of a date, and we need
# to adjust the time by 1000 because the original datasource is storing these as millis
# even though the documentation says it's actually a date.
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

UDFs (user defined functions)

It is highly recommended to avoid UDFs in all situations, as they are dramatically less performant than native PySpark. In most situations, logic that seems to necessitate a UDF can, in fact, be refactored to use only native PySpark functions.

Always avoid using functions that collect data to the Spark driver, such as:

  • DataFrame.collect()
  • DataFrame.first()
  • DataFrame.head(...)
  • DataFrame.take(...)
  • DataFrame.show(...)

Using these functions eliminates the benefits of a distributed framework like Spark, resulting in lower performance or out-of-memory errors. Instead of these functions, we strongly recommend using:

  • The Preview & Debug feature in Code Repository to inspect the state of variables (including Spark DataFrames) when developing transforms.
  • Native PySpark functions (if possible) or UDFs (in rare cases when the logic is impossible to encode using native functions) to process map/aggregate values in DataFrames.

Joins

Be careful with joins. If you perform a left join, and the right side has multiple matches for a key, that row will be duplicated as many times as there were matches. This is called a "join explosion" and can dramatically bloat the size of your dataset. Always double check your assumptions to see that the key you are joining on is unique, unless you are expecting the multiplication.

Bad joins are the source of many issues which can be tricky to debug. There are some things that help like specifying the how explicitly, even if you are using the default value (inner):

# bad
flights = flights.join(aircraft, 'aircraft_id')

# also bad
flights = flights.join(aircraft, 'aircraft_id', 'inner')

# good
flights = flights.join(aircraft, 'aircraft_id', how='inner')

Also avoid right joins. If you are about to use a right join, switch the order of your dataframes and use a left join instead. It is more intuitive, since the dataframe you are doing the operation on, is the one that you are centering your join around.

# bad
flights = aircraft.join(flights, 'aircraft_id', how='right')

# good
flights = flights.join(aircraft, 'aircraft_id', how='left')

When joining dataframes, avoid using expressions that duplicate columns in the output:

# bad - column aircraft_id will be duplicated in the output
output = flights.join(aircraft, flights.aircraft_id == aircraft.aircraft_id, how='inner')

# good
output = flights.join(aircraft, 'aircraft_id', how='inner')

Avoid renaming all columns to avoid collisions. You can instead just give an alias to the whole dataframe, and use that alias to select which columns you want in the end.

# bad
columns = ["start_time", "end_time", "idle_time", "total_time"]
for col in columns:
    flights = flights.withColumnRenamed(col, 'flights_' + col)
    parking = parking.withColumnRenamed(col, 'parking_' + col)

flights = flights.join(parking, on="flight_code", how="left")

flights = flights.select(
    F.col("flights_start_time").alias("flight_start_time"),
    F.col("flights_end_time").alias("flight_end_time"),
    F.col("parking_total_time").alias("client_parking_total_time")
)

# good
flights = flights.alias("flights")
parking = parking.alias("parking")

flights = flights.join(parking, on="flight_code", how="left")

flights = flights.select(
    F.col("flights.start_time").alias("flight_start_time"),
    F.col("flights.end_time").alias("flight_end_time"),
    F.col("parking.total_time").alias("client_parking_total_time")
)

In such cases, however, keep in mind that:

  • It's probably best to drop overlapping columns prior to joining if you don't need both;
  • In case you do need both, it might be best to rename one of them prior to joining;
  • You should always resolve ambiguous columns before outputting a dataset, since after the transform is finished running you can no longer distinguish them.

As a last word about joins, don't use .dropDuplicates() or .distinct() as a crutch. If unexpected duplicate rows are observed, there's almost always an underlying reason for why those duplicate rows appear. Adding .dropDuplicates() only masks this problem and adds overhead to the runtime.

Chaining of expressions

Chaining expressions is a contentious topic, but we do recommend some limits on the usage of chaining. See the conclusion of this section for a discussion of the rationale behind this recommendation.

Avoid chaining of expressions into multi-line expressions with different types. Particularly if they have different behaviors or contexts. For example mixing column creation or joining with selecting and filtering.

# bad
df = (
    df
    .select("a", "b", "c", "key")
    .filter(df.a == "truthiness")
    .withColumn("boverc", df.b / df.c)
    .join(df2, "key", how="inner")
    .join(df3, "key", how="left")
    .drop('c')
)

# better (separating into steps)
# first: we select and trim down the data that we need
# second: we create the columns that we need to have
# third: joining with other dataframes

df = (
    df
    .select("a", "b", "c", "key")
    .filter(df.a == "truthiness")
)

df = df.withColumn("boverc", df.b / df.c)

df = (
    df
    .join(df2, "key", how="inner")
    .join(df3, "key", how="left")
    .drop('c')
)

Having each group of expressions isolated into its own logical code block improves legibility and makes it easier to find relevant logic.

For example, a reader of the code below will likely jump to where they see dataframes being assigned df = df....

# bad
df = (
    df
    .select('foo', 'bar', 'foobar', 'abc')
    .filter(df.abc == 123)
    .join(another_table, 'some_field')
)

# better
df = (
    df
    .select('foo', 'bar', 'foobar', 'abc')
    .filter(F.col('abc') == 123)
)

df = df.join(another_table, 'some_field', how='inner')

There are legitimate reasons to chain expressions together. These commonly represent atomic logic steps, and are acceptable. Apply a rule with a maximum of number chained expressions in the same block to keep the code readable. We recommend chains of no longer than 3-5 statements.

If you find you are making longer chains, or getting trouble because of the size of your variables, consider extracting the logic into a separate function:

# bad
customers_with_shipping_address = (
    customers_with_shipping_address
    .select("a", "b", "c", "key")
    .filter(F.col("a") == "truthiness")
    .withColumn("boverc", F.col("b") / F.col("c"))
    .join(df2, "key", how="inner")
)

# also bad
customers_with_shipping_address = customers_with_shipping_address.select("a", "b", "c", "key")
customers_with_shipping_address = customers_with_shipping_address.filter(df.a == "truthiness")

customers_with_shipping_address = customers_with_shipping_address.withColumn("boverc", F.col("b") / F.col("c"))

customers_with_shipping_address = customers_with_shipping_address.join(df2, "key", how="inner")

# better
def join_customers_with_shipping_address(customers, df_to_join):

    customers = (
        customers
        .select("a", "b", "c", "key")
        .filter(df.a == "truthiness")
    )

    customers = customers.withColumn("boverc", F.col("b") / F.col("c"))
    customers = customers.join(df_to_join, "key", how="inner")
    return customers

In fact, chains of more than three statements are prime candidates to factor into separate, well-named functions since they are already encapsulated, isolated blocks of logic.

Rationale

There are several reasons behind these limits on chaining:

  • Differentiation between PySpark code and SQL code. Chaining is something that goes against most (if not all) other Python styling. You don’t chain in Python, you assign.
  • Discourage the creation of large single code blocks. These often would make more sense extracted as a named function.
  • This doesn’t need to be all or nothing; a maximum between three to five lines of chaining balances practicality with legibility.
  • If you are using an IDE, this makes it easier to use automatic extractions or do code movements (for example, Ctrl/Cmd+Shift+Up in PyCharm).
  • Large chains are hard to read and maintain, particularly if chains are nested.

Multi-line expressions

The reason you can chain expressions is because PySpark was developed from Spark, which comes from JVM languages. This meant some design patterns were transported, namely chainability.

However, Python doesn't support multiline expressions gracefully and the only alternatives are to either provide explicit line breaks, or wrap the expression in parentheses. You only need to provide explicit line breaks if the chain happens at the root node. For example:

# needs `\`
df = df.filter(F.col('event') == 'executing')\
       .filter(F.col('has_tests') == True)\
       .drop('has_tests')

# chain not in root node so it doesn't need the `\`
df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe')
                              .when(F.col('has_executed') == True, 'no tests but runs')
                              .otherwise('not safe'))

Thus to keep things consistent, wrap the entire expression into a single parenthesis block, and avoid using \:

# bad
df = df.filter(F.col('event') == 'executing')\
    .filter(F.col('has_tests') == True)\
    .drop('has_tests')

# good
df = (
  df
  .filter(F.col('event') == 'executing')
  .filter(F.col('has_tests') == True)
  .drop('has_tests')
)

Other Considerations and Recommendations

  • Be wary of functions that grow too large. As a general rule, a file should not be over 250 lines, and a function should not be over 70 lines.
  • Try to keep your code in logical blocks. For example, if you have multiple lines that reference the same things, try to keep them together. Separating them would make it harder for the reader to have context.
  • Test your code. If you can run the local tests, do that and make sure that your new code is covered by the tests. If you can't run the local tests, build the datasets on your branch and manually verify that the data looks as expected.
  • Avoid .otherwise(value) as a general fallback. If you are mapping a list of keys to a list of values and a number of unknown keys appear, using otherwise will mask all of these into one value.
  • Do not keep commented out code checked in the repository. This applies to single line of codes, functions, classes or modules. Rely on git and its capabilities of branching or looking at history instead.
  • When encountering a large single transformation composed of integrating multiple different source tables, split it into the natural sub-steps and extract the logic to functions. This allows for easier higher level readability and allows for code re-usability and consistency between transforms.
  • Try to be as explicit and descriptive as possible when naming functions or variables. Except for top level transforms wrapping object transformations, strive to capture what the function is actually doing as opposed to naming it just by the objects used inside of it.
  • Think twice about introducing new import aliases, unless there is a good reason behind it. For example, this may be reasonable if an imported module is called many times in different files and the module is common and familiar to almost all developers. Some of the established ones are types and functions from pySpark from pyspark.sql import types as T, functions as F.
  • Avoid using literal strings or integers in filtering conditions, new values of columns, and so on. Instead, extract them into variables, constants, dicts or classes as suitable, to capture their meaning. This makes the code more readable and enforces consistency across the repository.

中文翻译


风格指南

PySpark是一种封装语言,可对接Apache Spark后端实现数据快速处理。Spark能够跨服务器分布式网络(distributed network) 处理超大型数据集,使用得当的话可显著提升性能与可靠性。但即便对于经验丰富的Python开发者来说,PySpark也存在一定使用门槛——它的语法继承了Spark的JVM特性,因此采用的代码模式可能会让开发者感到陌生。

这份带有实践倾向性的PySpark代码风格指南,基于我们在众多PySpark代码仓库中遇到的高频共性问题,整理了常见场景及对应的最佳实践。

为了保证代码风格统一,所有核心仓库都应启用相同配置的Pylint ↗。我们还提供了PySpark专属检查器 ↗,你可以将其添加到Pylint中,以匹配本文列出的所有规则。如需了解Python仓库内置Pylint插件的相关信息,可查看启用风格检查的说明文档

除了PySpark专属规则之外,整洁代码的通用实践在PySpark仓库中也十分重要,谷歌的PyGuide ↗是不错的入门参考。

除非需要消除歧义,否则优先使用隐式列选择而非直接访问

# bad
df = df.select(F.lower(df1.colA), F.upper(df2.colB))

# good
df = df.select(F.lower(F.col("colA")), F.upper(F.col("colB")))

上面推荐的写法看起来可能更复杂、更长、不够简洁,这一点确实没错;实际上最好能完全避免使用F.col(),但在部分场景下,使用它或者其他显式选择方案是不可避免的。不过相较于第一个示例,优先选择第二种写法有非常充分的理由。

如果像第一个示例那样使用显式列,DataFrame名称和模式(schema) 都会与DataFrame变量显式绑定。这意味着如果df1被删除或重命名,df1.colA的引用就会失效。

与之相反,F.col("colA")始终会引用当前操作的DataFrame(本例中就是df)中名为colA的列,完全不需要追踪其他DataFrame的状态,因此代码的局部性更强,更不容易出现“远距离幽灵交互”的问题——这类问题通常调试难度很高。

不推荐第一种写法的其他理由: * 如果DataFrame变量名较长,涉及该变量的表达式会很快变得臃肿难读; * 如果列名包含空格或其他不支持的字符,需要用方括号运算符访问,此时df1["colA"]的写法和F.col("colA")复杂度相当; * 将F.col("prod_status") == 'Delivered'这类抽象表达式赋值给变量后,可以在多个DataFrame中复用,而df.prod_status == 'Delivered'始终与df绑定。

幸运的是,通常不需要编写包含F.col()的复杂表达式,唯一的例外是F.lowerF.upper这些函数 ↗

注意事项

在部分场景下,你需要访问多个DataFrame的列,可能存在列名重叠的情况。常见的例子是df.join(df2, on=(df.key == df2.key), how='left'这类匹配表达式,这种情况下直接通过DataFrame引用列是没问题的。你也可以通过DataFrame别名消除join时的歧义(详见本指南的连接(Joins) 章节)。

优先使用列表推导式,避免遍历列

通常来说,Spark中的for循环效率极低。从高层来看,这是因为Spark采用惰性求值(lazy evaluation) 机制,每次只能处理一个for循环。如果循环中的所有操作本可以并行处理,使用for循环会大幅降低运行速度,还可能导致驱动程序(Driver) 内存不足错误(OOM)。如果要将数据集中的所有列名从大写改为小写,不要使用下方标注# bad的第一个示例,我们建议使用列表推导式实现,也就是标注# good的第二个示例:

# bad
for colm in df.columns:
    df = df.withColumnRenamed(colm, colm.lower())
# good
df = df.select(
    *[F.col(colm).alias(colm.lower()) for colm in df.columns]
)

使用# good示例中的列表推导式,既能得到相同的预期结果,还能避免上文提到的性能低下、查询计划异常等问题。

重构复杂逻辑操作

通常位于.filter()F.when()中的逻辑操作需要保证可读性。我们采用与函数链式调用相同的规则:同一代码块内的逻辑表达式最多不超过3条。如果表达式更长,通常说明代码可以简化或抽离。将复杂逻辑操作抽离为变量或函数可以提升代码的可读性与可推导性,同时减少bug。

# bad
F.when( (df.prod_status == 'Delivered') | (((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & 
((df.currentRegistration.rlike('.+')) | ((F.datediff(df.deliveryDate_actual, df.current_date) < 0) & 
(df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+')))))), 'In Service')

我们可以通过多种方式简化上方的代码。首先,我们可以将逻辑步骤分组,赋值给几个命名变量。PySpark要求表达式用括号包裹,这与用于逻辑分组的括号混在一起时会严重影响可读性。比如上面的代码中就有一处重复的(F.datediff(df.deliveryDate_actual, df.current_date) < 0),因为很难被发现,原作者完全没有注意到。

# better
has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+'))
delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
has_registration = (df.currentRegistration.rlike('.+'))
is_delivered = (df.prod_status == 'Delivered')

F.when(is_delivered | (delivery_date_passed & (has_registration | has_operator)), 'In Service')

上面的示例可读性更高,也删掉了重复的表达式。我们可以进一步减少操作数量,优化代码。

# good
has_operator = (df.originalOperator.rlike('.+') | df.currentOperator.rlike('.+'))
delivery_date_passed = (F.datediff(df.deliveryDate_actual, df.current_date) < 0)
has_registration = (df.currentRegistration.rlike('.+'))
is_delivered = (df.prod_status == 'Delivered')
is_active = (has_registration | has_operator)

F.when(is_delivered | (delivery_date_passed & is_active), 'In Service')

可以看到现在的F.when表达式简洁易读,任何代码评审者都能一眼看懂预期逻辑。只有当读者怀疑某部分逻辑出错时,才需要去查看对应的独立表达式。如果你的代码有单元测试,想要把逻辑抽象为函数,这种写法也让每块逻辑都更容易测试。

最终的示例中仍然存在部分重复代码,如何消除这些重复就留作读者的练习。

使用select语句指定模式契约(schema contract)

在PySpark转换(transform) 开始时或返回结果前执行select操作是公认的最佳实践。select语句会向读者和代码明确约定输入和输出的DataFrame模式预期。

所有select操作都应被视为清洗操作,用于为转换的下一步处理准备好可用的DataFrame。

要始终尽量保持select语句简单。根据通用SQL习惯,每个选中的列最多可以使用1个spark.sql.function中的函数,还可以可选加.alias()给它一个有意义的名称。注意不要滥用这个规则,如果同一个select中这类用法超过3个,就应该把它重构为clean_<dataframe name>()这类独立函数来封装相关操作。

绝对不允许在select中使用涉及多个DataFrame的表达式,或者.when()这类条件操作。

# bad
aircraft = aircraft.select(
    'aircraft_id',
    'aircraft_msn',
    F.col('aircraft_registration').alias('registration'),
    'aircraft_type',
    F.avg('staleness').alias('avg_staleness'),
    F.col('number_of_economy_seats').cast('long'),
    F.avg('flight_hours').alias('avg_flight_hours'),
    'operator_code',
    F.col('number_of_business_seats').cast('long'),
)

把同类型的操作集中放在一起。所有独立列都应该放在最前面,spark.sql.function的调用单独占一行。

# good
aircraft = aircraft.select(
    'aircraft_id', 'aircraft_msn', 'aircraft_type', 'operator_code',
    F.col('aircraft_registration').alias('registration'),
    F.col('number_of_economy_seats').cast('long'),
    F.col('number_of_business_seats').cast('long'),
    F.avg('staleness').alias('avg_staleness'),
    F.avg('flight_hours').alias('avg_flight_hours'),
)

select()语句的本质就是重定义DataFrame的模式,因此它天然支持新增、删除新旧列,以及重定义已有列。把所有这类操作集中在一个语句中,可以更轻松地确认最终模式,提升代码可读性,同时也会让代码更简洁。

不要调用withColumnRenamed(),而是使用别名:

# bad
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# good
df.select('key', F.col('comments').alias('num_comments'))

不要使用withColumn()重定义类型,在select中进行类型转换:

# bad
df.select('comments').withColumn('comments', F.col('comments').cast('double'))

# good
df.select(F.col('comments').cast('double'))

但要保证简洁:

# bad
df.select(
    ((F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp())
    - F.unix_timestamp('created_at')) / 86400).alias('days_open')
)

# good
df.withColumn(
    'days_open',
    (F.coalesce(F.unix_timestamp('closed_at'), F.unix_timestamp()) - F.unix_timestamp('created_at')) / 86400
)

select语句中不要包含后续不会用到的列,应该显式指定需要的列集合——这是比使用.drop()更优的方案,因为它能保证模式变更不会引入意外列导致DataFrame臃肿。不过这并不是说所有场景下都不建议删除列:比如在join操作之后删除列通常是合适的,因为join经常会引入冗余列。

最后,不要通过select语句新增列,建议使用.withColumn()实现。

空列

如果你需要新增空列来满足模式要求,始终使用F.lit(None)填充该列。不要使用空字符串或者其他表示空值的字符串(比如NA)。

除了语义上更准确之外,一个非常实用的原因是这种写法可以保留使用isNull等工具的能力,不需要额外校验空字符串、null、'NA'等多种情况。

# bad
df = df.withColumn('foo', F.lit(''))

# bad
df = df.withColumn('foo', F.lit('NA'))

# good - note necessary cast since `None` is typeless. Choose the appropriate type based on expected use.
df = df.withColumn('foo', F.lit(None).cast('string'))

注释使用规范

虽然注释可以为代码提供有用的背景信息,但重构代码提升其可读性通常更有价值。代码本身就应该是可读的。如果你需要用注释一步步解释逻辑,说明你应该重构代码了。

# bad

# Cast the timestamp columns
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

在上面的示例中,我们可以看到这些列正在被转换为Timestamp类型,注释并没有提供额外价值。此外,如果注释只提供代码中已经存在的信息,就算写得更长也没用,比如:

# bad

# Go through each column, remove 1000 because millis and cast to timestamp
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

不要写只描述你写的逻辑是什么的注释,注释应该提供背景信息,解释你写代码时做相关决策的“原因”。这一点对于PySpark来说尤其重要,因为读者可以看懂你的代码,但通常不了解输入到PySpark转换中的数据的上下文。可能一小段逻辑是你花了好几个小时梳理数据才得出的正确处理方式,这种情况下解释背后逻辑的注释价值极高。

# good

# The consumer of this dataset expects a timestamp instead of a date, and we need
# to adjust the time by 1000 because the original datasource is storing these as millis
# even though the documentation says it's actually a date.
cols = ["start_date", "delivery_date"]
for c in cols:
    df = df.withColumn(c, F.from_unixtime(df[c] / 1000).cast(TimestampType()))

用户自定义函数(UDF, User Defined Functions)

我们强烈建议在所有场景下都尽量避免使用UDF,因为它们的性能远低于原生PySpark函数。大多数情况下,看似必须使用UDF的逻辑,实际上都可以重构为仅使用原生PySpark函数实现。

Collect及相关函数

始终避免使用会把数据收集到Spark驱动端的函数,比如: * DataFrame.collect() * DataFrame.first() * DataFrame.head(...) * DataFrame.take(...) * DataFrame.show(...)

使用这些函数会丧失Spark这类分布式框架的优势,导致性能降低或内存不足错误。我们强烈推荐使用以下方案替代这些函数: * 代码仓库中的预览与调试 ↗功能,可在开发转换时查看变量(包括Spark DataFrame)的状态。 * 优先使用原生PySpark函数处理DataFrame中的映射/聚合值,只有当逻辑无法用原生函数实现的极少数场景下才使用UDF。

连接(Joins)

使用join时要格外小心。如果你执行左连接,而右表中某个键有多个匹配项,那么对应的行就会被复制匹配次数那么多倍。这被称为join爆炸(join explosion),会导致数据集规模急剧膨胀。除非你预期会产生行复制,否则一定要反复确认你用于join的键是唯一的。

不规范的join是很多难调试问题的根源。有一些可以避免问题的习惯,比如显式指定how参数,就算你用的是默认值(inner)也要写:

# bad
flights = flights.join(aircraft, 'aircraft_id')

# also bad
flights = flights.join(aircraft, 'aircraft_id', 'inner')

# good
flights = flights.join(aircraft, 'aircraft_id', how='inner')

另外要避免使用right连接。如果你想要用right连接,不妨调换两个DataFrame的顺序,改用left连接。这种写法更符合直觉,因为你执行操作的DataFrame就是你join的核心主体。

# bad
flights = aircraft.join(flights, 'aircraft_id', how='right')

# good
flights = flights.join(aircraft, 'aircraft_id', how='left')

连接DataFrame时,避免使用会在输出中产生重复列的表达式:

# bad - column aircraft_id will be duplicated in the output
output = flights.join(aircraft, flights.aircraft_id == aircraft.aircraft_id, how='inner')

# good
output = flights.join(aircraft, 'aircraft_id', how='inner')

不要为了避免列名冲突重命名所有列。你可以给整个DataFrame设置别名,最后用别名来选择需要的列即可。

# bad
columns = ["start_time", "end_time", "idle_time", "total_time"]
for col in columns:
    flights = flights.withColumnRenamed(col, 'flights_' + col)
    parking = parking.withColumnRenamed(col, 'parking_' + col)

flights = flights.join(parking, on="flight_code", how="left")

flights = flights.select(
    F.col("flights_start_time").alias("flight_start_time"),
    F.col("flights_end_time").alias("flight_end_time"),
    F.col("parking_total_time").alias("client_parking_total_time")
)

# good
flights = flights.alias("flights")
parking = parking.alias("parking")

flights = flights.join(parking, on="flight_code", how="left")

flights = flights.select(
    F.col("flights.start_time").alias("flight_start_time"),
    F.col("flights.end_time").alias("flight_end_time"),
    F.col("parking.total_time").alias("client_parking_total_time")
)

不过在这类场景下,需要注意: * 如果不需要两边的重名列,最好在join之前就删掉不需要的列; * 如果两边的重名列都需要,最好在join之前就重命名其中一方的列; * 输出数据集之前一定要解决所有列名歧义的问题,因为转换运行完成后你就无法再区分这些重名列了。

关于join的最后一点提醒:不要把.dropDuplicates().distinct()当做兜底方案。如果出现了预期外的重复行,几乎一定有底层的原因。加上.dropDuplicates()只会掩盖问题,还会增加运行开销。

表达式链式调用

表达式链式调用是一个有争议的话题,但我们确实建议对链式调用的使用做一些限制。关于该建议背后的逻辑,可以查看本节的设计逻辑部分。

避免将不同类型的操作链式拼接成多行表达式,尤其是当这些操作的行为或上下文不同的时候。比如不要把列创建、join操作和选择、过滤操作混在一起。

# bad
df = (
    df
    .select("a", "b", "c", "key")
    .filter(df.a == "truthiness")
    .withColumn("boverc", df.b / df.c)
    .join(df2, "key", how="inner")
    .join(df3, "key", how="left")
    .drop('c')
)

# better (separating into steps)
# first: we select and trim down the data that we need
# second: we create the columns that we need to have
# third: joining with other dataframes

df = (
    df
    .select("a", "b", "c", "key")
    .filter(df.a == "truthiness")
)

df = df.withColumn("boverc", df.b / df.c)

df = (
    df
    .join(df2, "key", how="inner")
    .join(df3, "key", how="left")
    .drop('c')
)

把每组表达式拆分为独立的逻辑代码块可以提升可读性,也更容易找到对应的逻辑。

比如,下方代码的读者很可能会直接跳转到看到df = df...这类DataFrame赋值的位置。

# bad
df = (
    df
    .select('foo', 'bar', 'foobar', 'abc')
    .filter(df.abc == 123)
    .join(another_table, 'some_field')
)

# better
df = (
    df
    .select('foo', 'bar', 'foobar', 'abc')
    .filter(F.col('abc') == 123)
)

df = df.join(another_table, 'some_field', how='inner')

链式调用也有合理的使用场景,通常用于表示原子性逻辑步骤,这种情况是可以接受的。可以设置同一块中链式调用的最大数量规则来保证代码可读性,我们建议链式调用不要超过3-5条语句。

如果你发现你写的链式调用更长,或者因为变量名太长遇到问题,可以考虑把逻辑抽离为独立函数:

# bad
customers_with_shipping_address = (
    customers_with_shipping_address
    .select("a", "b", "c", "key")
    .filter(F.col("a") == "truthiness")
    .withColumn("boverc", F.col("b") / F.col("c"))
    .join(df2, "key", how="inner")
)

# also bad
customers_with_shipping_address = customers_with_shipping_address.select("a", "b", "c", "key")
customers_with_shipping_address = customers_with_shipping_address.filter(df.a == "truthiness")

customers_with_shipping_address = customers_with_shipping_address.withColumn("boverc", F.col("b") / F.col("c"))

customers_with_shipping_address = customers_with_shipping_address.join(df2, "key", how="inner")

# better
def join_customers_with_shipping_address(customers, df_to_join):

    customers = (
        customers
        .select("a", "b", "c", "key")
        .filter(df.a == "truthiness")
    )

    customers = customers.withColumn("boverc", F.col("b") / F.col("c"))
    customers = customers.join(df_to_join, "key", how="inner")
    return customers

实际上,超过3条语句的链式调用非常适合抽离为独立、命名规范的函数,因为它们本身已经是封装好的独立逻辑块。

设计逻辑

对链式调用做这些限制背后有多个原因: * 区分PySpark代码和SQL代码。链式调用不符合绝大多数(如果不是所有)Python风格规范,Python中通常不会用链式调用,而是用赋值语句。 * 避免创建过大的单块代码。这类大代码块通常更适合抽离为命名函数。 * 不需要完全禁止链式调用,最多3-5行的链式调用可以平衡实用性和可读性。 * 如果你使用IDE,这种写法可以更方便地使用自动抽取或代码移动功能(比如PyCharm中的Ctrl/Cmd+Shift+Up快捷键)。 * 过长的链式调用难以阅读和维护,尤其是嵌套的链式调用。

多行表达式

你可以使用链式表达式的原因是PySpark衍生于Spark,而Spark来自JVM语言生态,这意味着部分设计模式被继承了过来,也就是链式调用能力。

不过Python对多行表达式的支持并不友好,仅有的两种方案是显式加换行符,或者把表达式包裹在括号中。只有当链式调用在根节点时才需要加显式换行符,比如:

# needs `\`
df = df.filter(F.col('event') == 'executing')\
       .filter(F.col('has_tests') == True)\
       .drop('has_tests')

# chain not in root node so it doesn't need the `\`
df = df.withColumn('safety', F.when(F.col('has_tests') == True, 'is safe')
                              .when(F.col('has_executed') == True, 'no tests but runs')
                              .otherwise('not safe'))

因此为了保持一致性,把整个表达式包裹在一个括号块中,避免使用\:

# bad
df = df.filter(F.col('event') == 'executing')\
    .filter(F.col('has_tests') == True)\
    .drop('has_tests')

# good
df = (
  df
  .filter(F.col('event') == 'executing')
  .filter(F.col('has_tests') == True)
  .drop('has_tests')
)

其他注意事项与建议

  • 警惕函数变得过长。一般来说,单个文件不要超过250行,单个函数不要超过70行。
  • 尽量把代码按逻辑块组织。比如,如果你有多行代码引用相同的内容,尽量把它们放在一起,分开的话会增加读者理解上下文的难度。
  • 测试你的代码。如果你可以运行本地测试,一定要运行,确保新代码被测试覆盖。如果你不能运行本地测试,在你的分支上构建数据集,手动验证数据符合预期。
  • 尽量避免把.otherwise(value)作为通用兜底方案。如果你要把一组键映射为一组值,出现大量未知键时,使用otherwise会把所有未知键都映射为同一个值,掩盖问题。
  • 不要把注释掉的代码提交到仓库中,不管是单行代码、函数、类还是模块都不行。可以依赖git的分支或历史记录功能查看旧代码。
  • 如果你遇到需要集成多个不同源表的大型单一转换,把它拆分为自然的子步骤,把逻辑抽离为函数。这可以提升高层代码的可读性,同时提升代码可复用性,保证不同转换之间的一致性。
  • 命名函数或变量时尽量做到清晰、描述性强。除了封装对象转换的顶层转换之外,尽量体现函数的实际功能,不要只用函数内部用到的对象来命名。
  • 引入新的导入别名时要三思,除非有充分的理由。比如,如果一个导入的模块在不同文件中被频繁调用,而且几乎所有开发者都熟悉这个模块,那么设置别名是合理的。目前已公认的别名包括PySpark的types和functions模块:from pyspark.sql import types as T, functions as F
  • 避免在过滤条件、列新值等场景中使用字面量字符串或整数。可以根据场景把它们抽离为变量、常量、字典或类,体现它们的含义。这会提升代码可读性,同时保证整个仓库的一致性。