跳转至

PySpark transforms(PySpark 转换)

PySpark is a wrapper language that allows you to interface with an Apache Spark backend to quickly process data. Spark can operate on very large datasets across a distributed network of servers, which provides major performance and reliability benefits when used correctly. However, it also comes with some limitations, especially if you're more used to relational database systems such as SQL. For example, it is impossible for Spark to know exactly where a row exists on which server, thus there is no way to directly select a specific row to update or drop. If you are used to thinking about your database this way, you will have to adjust your conceptual model to think about the dataset as a whole and process the data based on columns, not rows.

  • DataFrame: is a collection of rows under named columns
  • Structurally similar to an SQL database, but non-relational
  • Immutable: a DataFrame cannot be changed after it is created, but it can be transformed into a new DataFrame (resulting in two DataFrames: the original, and the transformed). Datasets can be overwritten, but Foundry keeps track of version history so that you may explore and jump back to older builds at any time.
  • Lazily evaluated: a series of transformation tasks are evaluated as a single (combined) action, which is then performed when a build is triggered.
  • Resilient Distributed Datasets: (RDD) is the underlying data structure of a DataFrame. By partitioning the DataFrame into multiple non-intersecting subsets, transformations can be evaluated in parallel on multiple computers (nodes) in a cluster (network) of computers. This all happens under the hood, but is important to keep in mind when writing in PySpark.
  • Shared Variables: by default, Spark sends separately-managed copies of each variable used in transformation tasks to each parallel computer (node)—for efficiency's sake. If you must share a variable across tasks, Spark supports two types of shared variables:
  • Broadcast Variables: caches (saves) a value in memory (RAM) that is broadcasted to all computers (nodes) in the cluster
  • Accumulators: variables that can be added or aggregated, including (but not limited to) counters and summations. This concept is related to GroupedData, and is useful for statistical calculations.
  • Why use DataFrames?: Spark DataFrames are designed and optimized to process large collections (petabytes+) of structured data.
  • Why should I write PySpark code?: PySpark enables you to customize how you want to transform your datasets in Code Repositories and Code Workbook, in more complex and flexible ways than you could in Contour or Blacksmith alone.
  • What isn't PySpark for?: PySpark is designed for you to transform datasets, but not to access individual values. You might be able to calculate sums and averages, but you can't and shouldn't reference the data directly.

Unlike SQL, where queries result in "views" (virtual table result-sets), processing datasets with PySpark results in entirely new datasets. This allows you to not only build new datasets based off derived datasets, other members of your organization can reuse the intermediary dataset for their own data processing tasks too. In Palantir Foundry, which is a data operating system, datasets are automatically linked via parent-child (or, source-result) directed tree relationships. This makes it easy for anyone to trace the data lineage of Spark transformations. In other words, you can explore how your dataset's dependencies are built, and where those datasets come from. You can also discover how other members of your organization have also used a dataset so that you can learn from examples or effectively reduce duplicative work.

Understanding PySpark Code

Starter Code Basics

In Code Workbook, your function may look something like this:

def new_frame(old_frame):
  df = old_frame
  # df = transformations on df
  return df
  • old_frame: references a DataFrame that represents a Dataset stored within Foundry. old_frame is immutable meaning that it cannot be modified within this new_frame function. In a sense, all intermediate step of transformation produces a new, immutable dataframe, which we may want to transform again or return as-is. This isn't entirely true, but as a cognitive model it will help you organize your code better.
  • new_frame: within this function is where you may define a series of transformations you want to see applied to old_frame. Your return statement should return a DataFrame (which we've called df in this example). Under the hood, every transformation you've applied to that DataFrame is combined & optimized, before it is applied against the input dataset. Once you trigger a build with your code, the results are saved into a new Dataset file in Foundry, which you can explore once the build completes.

The data within a DataFrame cannot be directly referenced as it's not an Array nor Dictionary. Practically-speaking, it's impossible to determine where any of the data is located at any given moment anyway because of all the partitioning and shuffling happening under the hood. Unless you are filtering or aggregating the dataset, the code you write should be relatively agnostic to the contents of the dataset. Sorting is generally expensive and slow, so the rule of thumb is to assume every row is randomly ordered, constrain your toolset to columns, filters, aggregates, and your own creative problem-solving.

:::callout{theme="warning" title="Warning"} It's very important that you keep track of the schema of the column coming in because PySpark is not type-safe and will try to evaluate all transform operations, and interrupts when any operation fails during runtime.

Do not perform math functions on strings or dates, or string operations on numbers, or date manipulations on integers, because the behavior of conflicting types is hard to predict.

Be sure to cast values to the correct types before operating on them. :::

Named Columns

Each column of the DataFrame is named (and re-nameable). Column names are unique and case-sensitive. Stick to these guidelines for Foundry Datasets:

  • Always use lowercase lettering or numbers.
  • Separate words with _ (underscores) instead of spaces (because spaces are not allowed).
  • Avoid camelCasedColumnNames by convention.
  • Never use special characters, such as ( , ), or &.
  • Aggregation functions sometimes automatically name a column with special characters. You will need to provide an alias or rename the column before returning the dataframe in your transformation.

Chaining Transforms

When you jump into existing code you'll notice there's no hard-line rule as to how you should name your variables referencing DataFrames. In this cheatsheet, DataFrames will be referenced to as df, but in other examples it could be raw, out, input, table, something_specific. Anything goes, as long as it gets the job done.

You'll also notice this pattern:

df = df.select("firstName", "age")
df = df.withColumn("age", df.age.cast("integer"))
df = df.filter(df.age > 21)
df = df.withColumnRenamed("firstName", "first_name")
return df

Or (the same thing, written differently):

return df.select("firstName", "age") \
       .withColumn("age", df.age.cast("integer")) \
       .filter(df.age > 21) \
       .withColumnRenamed("firstName", "first_name")

If you're not familiar with coding: df on the left side of the = is where the result of transformations applied to df on the right side is stored, before moving on to the next line of code. In this example, we stored the result into a variable of the same name, essentially overriding what df contains after each step. You could use a different name to hold the result of the DataFrame transformation, but in most cases it's okay to override the variable name and move on. At the end of each transformation function, we must return the new dataframe either as a variable (in the first example) or as the result of the last transformation (in the second example).

Both examples accomplish the same thing:

  1. select only 2 columns of df that we want to include in our transformed dataset
  2. cast the age column to ensure it's an integer and not a string.
  3. filter the rows of our dataset to only include entries of age > 21
  4. rename the column firstName to first_name

The resulting dataset will only have two columns first_name, age and people age 21 or under are excluded. That's what df contains at the end, and you can return it or apply more transformations to it. We'll explore these transforms in more detail in the following sections.

Writing PySpark in Foundry

There are two tools for writing PySpark in Foundry: Code Repositories and Code Workbook.

In Code Repositories, you must declare the following import statement at the top of your .py document in order to use most functions:

from pyspark.sql import functions as F

In Code Workbook, this is a global import that has already been included, so you can use most functions without additional configurations.

This reference is not exhaustive and will focus on providing some guidance on common patterns and best practices. For a full list of the pySpark SQL functions, you can reference the official Apache Spark documentation ↗.


中文翻译

PySpark 转换

PySpark 是一种封装语言,允许您与 Apache Spark 后端交互以快速处理数据。Spark 可以在分布式服务器网络上处理非常大的数据集,如果使用得当,可以带来显著的性能和可靠性优势。然而,它也有一些局限性,特别是如果您更习惯使用 SQL 等关系型数据库系统。例如,Spark 无法确切知道某一行位于哪台服务器上,因此无法直接选择特定行进行更新或删除。如果您习惯于以这种方式思考数据库,那么您必须调整您的概念模型,将数据集视为一个整体,并基于列(而非行)来处理数据。

  • DataFrame(数据框): 是在命名列下的行集合
  • 结构上类似于 SQL 数据库,但非关系型
  • 不可变(Immutable): DataFrame 在创建后无法被更改,但可以转换为新的 DataFrame(产生两个 DataFrame:原始的和转换后的)。数据集可以被覆盖,但 Foundry 会保留版本历史,以便您可以随时探索和回退到旧的构建版本。
  • 惰性求值(Lazily evaluated): 一系列转换任务被作为一个(组合的)操作进行求值,然后在触发构建时执行。
  • 弹性分布式数据集(Resilient Distributed Datasets,RDD): 是 DataFrame 的底层数据结构。通过将 DataFrame 分区为多个不相交的子集,转换可以在计算机集群(网络)中的多台计算机(节点)上并行求值。这一切都在底层进行,但在编写 PySpark 时需要牢记。
  • 共享变量(Shared Variables): 默认情况下,Spark 会将转换任务中使用的每个变量的单独管理的副本发送到每台并行计算机(节点)——这是出于效率考虑。如果您必须在任务之间共享变量,Spark 支持两种类型的共享变量:
  • 广播变量(Broadcast Variables): 在内存(RAM)中缓存(保存)一个值,该值被广播到集群中的所有计算机(节点)
  • 累加器(Accumulators): 可以添加或聚合的变量,包括(但不限于)计数器和求和。这个概念与 GroupedData 相关,对于统计计算很有用。
  • 为什么要使用 DataFrame?: Spark DataFrame 旨在设计和优化用于处理大量(PB 级以上)结构化数据。
  • 为什么我应该编写 PySpark 代码?: PySpark 使您能够在代码仓库(Code Repositories)和代码工作簿(Code Workbook)中自定义如何转换数据集,其方式比单独使用 Contour 或 Blacksmith 更复杂、更灵活。
  • PySpark 不适合做什么?: PySpark 旨在让您转换数据集,但不能访问单个值。您可能能够计算总和和平均值,但不能也不应该直接引用数据。

与 SQL 不同,SQL 中的查询会产生"视图"(虚拟表结果集),而使用 PySpark 处理数据集会产生全新的数据集。这不仅允许您基于派生数据集构建新数据集,您组织中的其他成员也可以将中间数据集用于他们自己的数据处理任务。在作为数据操作系统的 Palantir Foundry 中,数据集通过父子(或源-结果)有向树关系自动链接。这使得任何人都可以轻松追踪 Spark 转换的数据谱系(data lineage)。换句话说,您可以探索数据集的依赖关系是如何构建的,以及这些数据集来自何处。您还可以发现组织中的其他成员如何使用某个数据集,以便您可以从示例中学习或有效减少重复工作。

理解 PySpark 代码

入门代码基础

在代码工作簿(Code Workbook)中,您的函数可能如下所示:

def new_frame(old_frame):
  df = old_frame
  # df = 对 df 进行转换
  return df
  • old_frame:引用一个代表 Foundry 中存储的数据集DataFrameold_frame不可变的,意味着它不能在此 new_frame 函数内被修改。从某种意义上说,每个中间转换步骤都会产生一个新的、不可变的 DataFrame,我们可能想要再次转换它或原样返回。这并不完全准确,但作为一种认知模型,它将帮助您更好地组织代码。
  • new_frame:在此函数内,您可以定义希望应用于 old_frame 的一系列转换。您的 return 语句应返回一个 DataFrame(在此示例中我们称之为 df)。在底层,您对该 DataFrame 应用的每个转换都会被组合并优化,然后再应用于输入数据集。一旦您使用代码触发构建,结果将保存到 Foundry 中的一个新数据集文件中,构建完成后您可以探索该文件。

DataFrame 中的数据不能被直接引用,因为它既不是 Array 也不是 Dictionary。实际上,由于底层发生的所有分区和洗牌(shuffling),在任何给定时刻都无法确定任何数据的位置。除非您正在过滤或聚合数据集,否则您编写的代码应该相对不关心数据集的内容。排序通常代价高昂且速度缓慢,因此经验法则是假设每一行都是随机排序的,将您的工具集限制为列、过滤器、聚合以及您自己的创造性问题解决方法。

:::callout{theme="warning" title="警告"} 跟踪传入列的 schema(模式)非常重要,因为 PySpark 不是类型安全的,它会尝试评估所有转换操作,并在运行时任何操作失败时中断。

不要对字符串或日期执行数学函数,不要对数字执行字符串操作,也不要对整数执行日期操作,因为冲突类型的行为很难预测。

在对值进行操作之前,请务必将它们转换为正确的类型。 :::

命名列

DataFrame 的每一列都有名称(并且可以重命名)。列名是唯一的且区分大小写。请遵循 Foundry 数据集的这些准则:

  • 始终使用小写字母或数字。
  • 使用 _(下划线)分隔单词,而不是空格(因为不允许使用空格)。
  • 按照惯例,避免使用 camelCasedColumnNames(驼峰式列名)。
  • 切勿使用特殊字符,例如 ()&
  • 聚合函数有时会自动使用特殊字符命名列。在转换中返回 DataFrame 之前,您需要为该列提供别名(alias)或重命名该列。

链式转换

当您查看现有代码时,您会注意到对于引用 DataFrame 的变量如何命名,并没有硬性规定。在本速查表中,DataFrame 将被引用为 df,但在其他示例中,它可能是 rawoutinputtablesomething_specific。只要能完成任务,任何名称都可以。

您还会注意到这种模式:

df = df.select("firstName", "age")
df = df.withColumn("age", df.age.cast("integer"))
df = df.filter(df.age > 21)
df = df.withColumnRenamed("firstName", "first_name")
return df

或者(相同的事情,不同的写法):

return df.select("firstName", "age") \
       .withColumn("age", df.age.cast("integer")) \
       .filter(df.age > 21) \
       .withColumnRenamed("firstName", "first_name")

如果您不熟悉编码:= 左侧的 df 是存储应用于右侧 df 的转换结果的地方,然后才进入下一行代码。在此示例中,我们将结果存储到同名的变量中,本质上是在每一步之后覆盖 df 包含的内容。您可以使用不同的名称来保存 DataFrame 转换的结果,但在大多数情况下,覆盖变量名并继续是可以的。在每个转换函数的末尾,我们必须返回新的 DataFrame,可以作为一个变量(在第一个示例中),也可以作为最后一个转换的结果(在第二个示例中)。

两个示例都完成了相同的事情:

  1. 只选择我们想要包含在转换后数据集中的 2 列 df
  2. 转换 age 列以确保它是整数而不是字符串
  3. 过滤数据集的行,只包含 age > 21 的条目
  4. 将列 firstName 重命名为 first_name

结果数据集将只有两列 first_nameage,并且年龄在 21 岁及以下的人被排除在外。这就是 df 最终包含的内容,您可以 return 它或对其应用更多转换。我们将在以下部分更详细地探讨这些转换。

在 Foundry 中编写 PySpark

在 Foundry 中有两种编写 PySpark 的工具:代码仓库(Code Repositories)代码工作簿(Code Workbook)

代码仓库中,您必须在 .py 文档顶部声明以下导入语句才能使用大多数函数:

from pyspark.sql import functions as F

代码工作簿中,这是一个已经包含的全局导入,因此您无需额外配置即可使用大多数函数。

本参考并非详尽无遗,将重点提供一些关于常见模式和最佳实践的指导。有关 pySpark SQL 函数的完整列表,您可以参考官方 Apache Spark 文档 ↗