Databricks compute pushdown(Databricks 计算下推(Databricks compute pushdown))¶
Foundry offers the ability to push down compute to Databricks when using virtual tables. When using Databricks virtual tables registered to the same source as inputs and outputs to a pipeline, it is possible to fully federate compute to Databricks.
This documentation walks you through the process of authoring a Python transform in Code Repositories that can be executed entirely in your Databricks environment. This capability leverages Databricks Connect. Refer to the official Databricks documentation ↗ for more information on the features and limitations of Databricks Connect.
Quick start¶
This example shows how to use a Databricks transform in a Python transform pipeline. Suppose we have the following Spark pipeline using PySpark via @transform:
from pyspark.sql.functions import col
from transforms.api import transform, Input, Output
@transform.spark.using(
source_table=Input('/Project/folder/input'),
output_table=Output('/Project/folder/output'),
)
def compute(source_table: TransformInput, output_table: TransformOutput):
df = source_table.dataframe()
df = df.filter(col('id') > 1)
output_table.write_dataframe(df)
To turn this into a Databricks transform, you must:
- Review the prerequisites to using tables in Python transforms.
- Install
databricks-connectfrom the Libraries tab. - Import and apply the
@databricksdecorator to your transform.
For more details, consult the setup section of the documentation below.
from pyspark.sql.functions import col
from transforms.api import transform
from transforms.tables import (
databricks,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
output_table=TableOutput(
'/Project/folder/output', # Where to register the Databricks output as a virtual table in Foundry
"ri.magritte..source.1234", # Your Databricks source connection
"CATALOG.SCHEMA.TABLE", # The location to which the transform output will be written in Databricks
),
)
def compute(source_table: DatabricksInput, output_table: DatabricksOutput):
df = source_table.dataframe()
df = df.filter(col('id') > 1)
output_table.write_dataframe(df)
Given Databricks Connect uses the pyspark.sql.DataFrame ↗ API, you will see the logic of the code itself is largely unchanged. The primary difference is the transform uses Spark compute running in Databricks.
TableInput and TableOutput parameters are used instead of Input and Output to reference the input and output Databricks virtual tables of the transform. The transform function is passed DatabricksInput and DatabricksOutput parameters that can be used to read from and write to tables in Databricks.
:::callout{theme="neutral"}
The @databricks decorator, as shown above, is only compatible with TableInput and TableOutput parameters. The tables referenced as inputs and outputs to the transform must be registered on the same Databricks source.
Incremental computation using the @incremental decorator is not currently supported when using compute pushdown to Databricks.
:::
Setup¶
To use compute pushdown with Databricks:
- Create a Python code repository.
- Review the prerequisites to using tables in Python transforms.
- Install the
databricks-connectlibrary. We recommend using version 16.4, as this is fully compatible with serverless compute. If using classic compute, you must use a version that is compatible with your Databricks cluster version. Refer to the official Databricks documentation ↗ for more information.
The databricks-connect library can be installed using the Libraries tab of your Code Repository environment. Note that databricks-connect is only available via PyPI, and is not available through Conda. Alternatively, you can manually add the library under the pip requirements block in the conda_recipe/meta.yaml file. For example, to install version 16.4 of databricks-connect, add:
requirements:
pip:
- databricks-connect >=16.4.0,<17.0.0
API highlights¶
The following sections highlight the differences between the Databricks transform API and a regular Python transform. Before using @databricks, ensure you have performed the setup steps above.
A Databricks transform uses transforms.tables.TableInput and transforms.tables.TableOutput parameters to reference the input and output Databricks virtual tables. A TableInput can reference a virtual table by file path or RID. A TableOutput requires:
- The file path in Foundry where the virtual table will be created.
- The source where the virtual table will be registered.
- The location in Databricks where the table will be written. This can be specified using
DatabricksTable("<catalog>", "<schema>", "<table>")or"<catalog>.<schema>.<table>"syntax.catalog,schemaandtablecorrespond to the three-level namespace structure of a table identifier in Unity Catalog. Refer to the official Databricks documentation ↗ for more information.
The API of DatabricksInput and DatabricksOutput are similar to a regular Python transform. .dataframe() loads the input table as a pyspark.sql.DataFrame. .write_dataframe() writes a pyspark.sql.DataFrame to the output table. The transform logic itself can be expressed using PySpark transformation.
:::callout{theme="warning"} Not all features of PySpark are supported in Databricks Connect. Refer to the official Databricks documentation ↗ for details on feature availability and limitations. :::
Output table types¶
You can write output tables in Databricks using one of the following table types:
- Managed Delta: Table stored in Delta Lake format in Databricks-managed storage.
- External Delta: Table stored in Delta Lake format in an external storage location.
- Managed Iceberg: Table stored in Iceberg format in Databricks-managed storage.
You can use the format and location parameters on DatabricksTable or write_dataframe to configure the output table type. If format and location are not specified the table will be written as a managed Delta table (the default table type in Databricks). Use format="iceberg" to write the output as a managed Iceberg table. Use location="<STORAGE_LOCATION>" to write the output as an external Delta table, where <STORAGE_LOCATION> refers to an external storage path in cloud object storage where the table will be stored.
The example below shows a transform writing to three output tables of different types.
from transforms.tables import (
databricks,
DatabricksTable,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
managed_delta_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.MANAGED_DELTA_TABLE",
),
managed_iceberg_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.MANAGED_ICEBERG_TABLE",
),
external_delta_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
DatabricksTable(
catalog="CATALOG",
schema="SCHEMA",
table="EXTERNAL_DELTA_TABLE",
location="s3://some-bucket/path/to/table"
),
),
)
def compute(
source_table: DatabricksInput,
managed_delta_output_table: DatabricksOutput,
managed_iceberg_output_table: DatabricksOutput,
external_delta_output_table: DatabricksOutput,
):
df = source_table.dataframe()
# write to a managed Delta table
managed_delta_output_table.write_dataframe(df)
# write to a managed Iceberg table by specifying the format option inline
managed_iceberg_output_table.write_dataframe(df, format="iceberg")
# write to an external table where the location has been specified in the DatabricksTable
external_delta_output_table.write_dataframe(df)
Refer to the official Databricks documentation ↗ for more information on tables types.
Compute configuration¶
By default, a Databricks Connect session will be established using serverless compute. This is equivalent to using @databricks(serverless=True).
Alternatively, you can use with_compute(cluster_id="<cluster-id>") to configure a connection to a specific compute cluster.
For information on compute configuration for Databricks Connect, review the official Databricks documentation ↗.
User-defined functions (UDFs)¶
User-defined functions (UDFs) are supported in Databricks Connect. When you execute a PySpark DataFrame operation that includes UDFs, Databricks Connect serializes the UDF and sends it to the server as part of the request. For full details on UDF features and limitations, refer to the official Databricks documentation ↗.
The example below defines a simple UDF that squares the values in a column:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from transforms.api import transform
from transforms.tables import (
databricks,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@udf(returnType=IntegerType())
def double(x):
return x * x
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.TABLE",
),
)
def compute(source_table: DatabricksInput, output_table: DatabricksOutput):
df = source_table.dataframe()
df = df.withColumn("doubled", double(col("id")))
output_table.write_dataframe(df)
Specify Python dependencies for UDFs¶
Databricks Connect supports specifying Python dependencies required for UDFs. These dependencies are installed on Databricks compute as part of the UDF's Python environment. You can use with_dependencies(dependencies=["<dependency>"]) to configure additional dependencies to be installed in the UDF environment.
The example below defines a UDF that depends on the pyjokes package:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from transforms.api import transform
from transforms.tables import databricks, DatabricksOutput, TableOutput
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@transform.databricks.using(
output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.TABLE",
),
).with_dependencies(dependencies=["pyjokes<1"])
def compute(output_table: DatabricksOutput):
df = output_table.spark_session.range(1, 10)
df = df.withColumn("jokes", get_joke())
output_table.write_dataframe(df)
:::callout{theme="neutral"}
If you encounter an error such as ModuleNotFoundError: No module named 'myproject', it usually means your UDF is referencing code that exists only in your local environment and is not available to the Databricks Spark workers executing the UDF. To avoid this error, ensure that your UDF does not directly depend on functions or classes defined in local modules. You can resolve this in one of the following ways:
- Inline the necessary logic directly within the UDF definition.
- Define both the UDF and its supporting code within the body of the
@transformfunction. - Wrap the UDF and its supporting code in a function that returns a reference to the UDF, and call this function within the transform to create the UDF. :::
中文翻译¶
Databricks 计算下推(Databricks compute pushdown)¶
Foundry 支持在使用虚拟表(Virtual tables)时将计算下推到 Databricks。当使用注册到同一数据源的 Databricks 虚拟表作为管道的输入和输出时,可以将计算完全联邦到 Databricks。
本文档将引导您在代码仓库(Code Repositories)中编写一个Python 转换(Python transform),该转换可以完全在您的 Databricks 环境中执行。此功能利用了 Databricks Connect。有关 Databricks Connect 的功能和限制的更多信息,请参阅官方 Databricks 文档 ↗。
快速入门(Quick start)¶
以下示例展示了如何在 Python 转换管道(Python transform pipeline)中使用 Databricks 转换。假设我们有以下使用 PySpark 通过 @transform 的 Spark 管道:
from pyspark.sql.functions import col
from transforms.api import transform, Input, Output
@transform.spark.using(
source_table=Input('/Project/folder/input'),
output_table=Output('/Project/folder/output'),
)
def compute(source_table: TransformInput, output_table: TransformOutput):
df = source_table.dataframe()
df = df.filter(col('id') > 1)
output_table.write_dataframe(df)
要将其转换为 Databricks 转换,您必须:
- 查看在 Python 转换中使用表的先决条件。
- 从库(Libraries) 选项卡安装
databricks-connect。 - 导入并应用
@databricks装饰器到您的转换。
更多详情,请参考本文档下方的设置部分。
from pyspark.sql.functions import col
from transforms.api import transform
from transforms.tables import (
databricks,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
output_table=TableOutput(
'/Project/folder/output', # 在 Foundry 中注册 Databricks 输出作为虚拟表的位置
"ri.magritte..source.1234", # 您的 Databricks 源连接
"CATALOG.SCHEMA.TABLE", # 转换输出将在 Databricks 中写入的位置
),
)
def compute(source_table: DatabricksInput, output_table: DatabricksOutput):
df = source_table.dataframe()
df = df.filter(col('id') > 1)
output_table.write_dataframe(df)
由于 Databricks Connect 使用 pyspark.sql.DataFrame ↗ API,您会看到代码本身的逻辑基本保持不变。主要区别在于转换使用的是在 Databricks 中运行的 Spark 计算。
使用 TableInput 和 TableOutput 参数代替 Input 和 Output 来引用转换的输入和输出 Databricks 虚拟表。转换函数接收 DatabricksInput 和 DatabricksOutput 参数,用于从 Databricks 中的表读取数据和写入数据。
:::callout{theme="neutral"}
如上所示的 @databricks 装饰器仅与 TableInput 和 TableOutput 参数兼容。作为转换输入和输出引用的表必须注册在同一个 Databricks 源(Databricks source)上。
使用 @incremental 装饰器的增量计算(Incremental computation)目前在使用 Databricks 计算下推时不受支持。
:::
设置(Setup)¶
要使用 Databricks 计算下推:
- 创建一个 Python 代码仓库(Python code repository)。
- 查看在 Python 转换中使用表的先决条件。
- 安装
databricks-connect库。我们建议使用版本 16.4,因为它与无服务器计算(serverless compute)完全兼容。如果使用经典计算(classic compute),您必须使用与您的 Databricks 集群版本兼容的版本。更多信息请参考官方 Databricks 文档 ↗。
databricks-connect 库可以使用代码仓库环境的库(Libraries) 选项卡进行安装。请注意,databricks-connect 仅通过 PyPI 提供,不通过 Conda 提供。或者,您也可以手动将库添加到 conda_recipe/meta.yaml 文件的 pip 依赖块中。例如,要安装版本 16.4 的 databricks-connect,请添加:
requirements:
pip:
- databricks-connect >=16.4.0,<17.0.0
API 要点(API highlights)¶
以下部分重点介绍了 Databricks 转换 API 与常规 Python 转换之间的区别。在使用 @databricks 之前,请确保您已完成上述设置步骤。
Databricks 转换使用 transforms.tables.TableInput 和 transforms.tables.TableOutput 参数来引用输入和输出 Databricks 虚拟表。TableInput 可以通过文件路径或 RID 引用虚拟表。TableOutput 需要:
- Foundry 中虚拟表将被创建的文件路径。
- 虚拟表将被注册的源(source)。
- Databricks 中表将被写入的位置。可以使用
DatabricksTable("<catalog>", "<schema>", "<table>")或"<catalog>.<schema>.<table>"语法指定。catalog、schema和table对应于 Unity Catalog 中表标识符的三级命名空间结构。更多信息请参考官方 Databricks 文档 ↗。
DatabricksInput 和 DatabricksOutput 的 API 与常规 Python 转换类似。.dataframe() 将输入表加载为 pyspark.sql.DataFrame。.write_dataframe() 将 pyspark.sql.DataFrame 写入输出表。转换逻辑本身可以使用 PySpark 转换来表达。
:::callout{theme="warning"} 并非所有 PySpark 功能都在 Databricks Connect 中受支持。有关功能可用性和限制的详细信息,请参考官方 Databricks 文档 ↗。 :::
输出表类型(Output table types)¶
您可以使用以下表类型之一在 Databricks 中写入输出表:
- 托管 Delta(Managed Delta): 以 Delta Lake 格式存储在 Databricks 托管存储中的表。
- 外部 Delta(External Delta): 以 Delta Lake 格式存储在外部存储位置中的表。
- 托管 Iceberg(Managed Iceberg): 以 Iceberg 格式存储在 Databricks 托管存储中的表。
您可以在 DatabricksTable 或 write_dataframe 上使用 format 和 location 参数来配置输出表类型。如果未指定 format 和 location,表将作为托管 Delta 表(Databricks 中的默认表类型)写入。使用 format="iceberg" 将输出写入为托管 Iceberg 表。使用 location="<STORAGE_LOCATION>" 将输出写入为外部 Delta 表,其中 <STORAGE_LOCATION> 指的是云对象存储中表将存储的外部存储路径。
以下示例展示了一个写入三种不同类型输出表的转换。
from transforms.tables import (
databricks,
DatabricksTable,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
managed_delta_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.MANAGED_DELTA_TABLE",
),
managed_iceberg_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.MANAGED_ICEBERG_TABLE",
),
external_delta_output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
DatabricksTable(
catalog="CATALOG",
schema="SCHEMA",
table="EXTERNAL_DELTA_TABLE",
location="s3://some-bucket/path/to/table"
),
),
)
def compute(
source_table: DatabricksInput,
managed_delta_output_table: DatabricksOutput,
managed_iceberg_output_table: DatabricksOutput,
external_delta_output_table: DatabricksOutput,
):
df = source_table.dataframe()
# 写入托管 Delta 表
managed_delta_output_table.write_dataframe(df)
# 通过内联指定格式选项写入托管 Iceberg 表
managed_iceberg_output_table.write_dataframe(df, format="iceberg")
# 写入已在 DatabricksTable 中指定位置的外部表
external_delta_output_table.write_dataframe(df)
有关表类型的更多信息,请参考官方 Databricks 文档 ↗。
计算配置(Compute configuration)¶
默认情况下,将使用无服务器计算(serverless compute)建立 Databricks Connect 会话。这等同于使用 @databricks(serverless=True)。
或者,您可以使用 with_compute(cluster_id="<cluster-id>") 来配置连接到特定的计算集群。
有关 Databricks Connect 计算配置的信息,请查看官方 Databricks 文档 ↗。
用户自定义函数(User-defined functions, UDFs)¶
Databricks Connect 支持用户自定义函数(UDFs)。当您执行包含 UDF 的 PySpark DataFrame 操作时,Databricks Connect 会序列化 UDF 并将其作为请求的一部分发送到服务器。有关 UDF 功能和限制的完整详细信息,请参考官方 Databricks 文档 ↗。
以下示例定义了一个简单的 UDF,用于对列中的值进行平方运算:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from transforms.api import transform
from transforms.tables import (
databricks,
DatabricksInput,
DatabricksOutput,
TableInput,
TableOutput,
)
@udf(returnType=IntegerType())
def double(x):
return x * x
@transform.databricks.using(
source_table=TableInput('/Project/folder/input'),
output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.TABLE",
),
)
def compute(source_table: DatabricksInput, output_table: DatabricksOutput):
df = source_table.dataframe()
df = df.withColumn("doubled", double(col("id")))
output_table.write_dataframe(df)
为 UDF 指定 Python 依赖(Specify Python dependencies for UDFs)¶
Databricks Connect 支持指定 UDF 所需的 Python 依赖。这些依赖作为 UDF Python 环境的一部分安装在 Databricks 计算上。您可以使用 with_dependencies(dependencies=["<dependency>"]) 来配置要在 UDF 环境中安装的额外依赖。
以下示例定义了一个依赖于 pyjokes 包的 UDF:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from transforms.api import transform
from transforms.tables import databricks, DatabricksOutput, TableOutput
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@transform.databricks.using(
output_table=TableOutput(
'/Project/folder/output',
"ri.magritte..source.1234",
"CATALOG.SCHEMA.TABLE",
),
).with_dependencies(dependencies=["pyjokes<1"])
def compute(output_table: DatabricksOutput):
df = output_table.spark_session.range(1, 10)
df = df.withColumn("jokes", get_joke())
output_table.write_dataframe(df)
:::callout{theme="neutral"}
如果您遇到类似 ModuleNotFoundError: No module named 'myproject' 的错误,这通常意味着您的 UDF 引用了仅存在于本地环境中的代码,而执行 UDF 的 Databricks Spark 工作节点无法访问这些代码。为避免此错误,请确保您的 UDF 不直接依赖于在本地模块中定义的函数或类。您可以通过以下方式之一解决此问题:
- 将必要的逻辑内联 直接定义在 UDF 定义中。
- 将 UDF 及其支持代码都定义 在
@transform函数体内。 - 将 UDF 及其支持代码包装在一个函数中,该函数返回对 UDF 的引用,然后在转换中调用此函数来创建 UDF。 :::