Pipelines(管道(Pipelines))¶
Each Python transforms sub-project in a repository exposes a single transforms.api.Pipeline object. This Pipeline object is used to do the following:
- Register datasets in Foundry with instructions for how to build them.
- Locate and execute the
@transformlogic responsible for building a given dataset during a Foundry build.
In most cases, the default repository setup will register transforms automatically, and the Pipeline object will not require special configuration.
Add transforms to a pipeline¶
When a transform that is associated with your project’s pipeline declares a dataset as an Output, you can build this dataset in Foundry. The two recommended ways to add transforms to a Pipeline object are automatic registration and manual registration.
:::callout{theme="neutral"}
If you have a more advanced workflow and/or want to explicitly add each transform to your project’s pipeline, you can use manual registration. Otherwise, it is highly recommended that you use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the discover_transforms method recursively discovers any transforms defined at the module level. Refer to the sections below for more information.
:::
Automatic registration¶
:::callout{theme="warning"}
The discover_transforms method imports every module it finds. As a result, any code in your imported modules will be executed.
:::
As the complexity of a project grows, manually adding transforms to a Pipeline object can become unwieldy. To remedy this, the Pipeline object provides the discover_transforms() method to recursively discover all transforms in a Python module or package.
from transforms.api import Pipeline
import my_module # This is where your transform definition lives
my_pipeline = Pipeline()
my_pipeline.discover_transforms(my_module)
Manual registration¶
Transforms can be manually added to a Pipeline object using the add_transforms() function. This function takes any number of transforms and adds them to the pipeline. It also checks whether any two transforms declare the same output dataset.
from transforms.api import transform, Pipeline, Input, Output
@transform.using(
my_output=Output('/path/to/output/dataset'),
my_input=Input('/path/to/input/dataset')
)
def my_compute_function(my_output, my_input):
my_output.write_table(my_input.polars())
my_pipeline = Pipeline()
my_pipeline.add_transforms(my_compute_function)
Transform generation¶
:::callout{theme="warning"} If you want to define a data transformation that creates multiple outputs, you can either use transform generation or define a multiple-output transform. With transform generation, it may be necessary to read and process the same input once for each output. With a multiple-output transform, it is possible to read and process the input just once. For more information, review the documentation on optimizing multi-output transforms. :::
You may want to re-use the same data transformation logic across multiple transforms. For instance, consider the following scenarios:
- You have an input dataset with information about various states. You have code that filters the input by state and then calculates various statistics.
- You have multiple input datasets that may contain null values. You have code that removes any null values.
In both cases, it would be useful to use the same data transformation code across multiple transforms. Instead of separately defining a transform object for each of your outputs, you can generate transform objects using a for-loop and register them in bulk to your project’s pipeline.
Below is an example of a transform generator:
```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output
def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.polars() filtered = df.filter(pl.col('source') == source) output.write_table(filtered) transforms.append(compute_function) return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
```python tab="DuckDB"
from transforms.api import transform, Input, Output
def transform_generator(sources):
transforms = []
# This example uses multiple input datasets. You can also generate multiple outputs
# from a single input dataset.
for source in sources:
@transform.using(
my_input=Input('/sources/{source}/input'.format(source=source)),
output=Output('/sources/{source}/output'.format(source=source))
)
def compute_function(ctx, my_input, output, source=source):
# To capture the source variable in the function, pass it as a keyword argument with a default value.
conn = ctx.duckdb().conn
query = conn.sql(f"""SELECT * FROM my_input WHERE source = '{source}'""")
output.write_table(query)
transforms.append(compute_function)
return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
```python tab="Pandas" from transforms.api import transform, Input, Output
def transform_generator(sources): transforms = [] # This example uses multiple input datasets. You can also generate multiple outputs # from a single input dataset. for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # To capture the source variable in the function, pass it as a keyword argument with a default value. df = my_input.pandas() filtered = df[df['source'] == source] output.write_table(filtered) transforms.append(compute_function) return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
If using manual registration, you can then add the generated transforms to the pipeline. If you are unfamiliar with the `*` syntax, refer to the [Python documentation ↗](https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists).
```python
import my_module
my_pipeline = Pipeline()
my_pipeline.add_transforms(*my_module.TRANSFORMS)
:::callout{theme="warning" title="Important considerations"} Read the following considerations carefully for information on how to avoid errors and failures. :::
- Capturing the source value: To capture the source variable in the function, you must pass the
sourcekeyword argument with a default value in your compute function. - Using a for-loop to generate transforms: The loop for generating your transform objects must be within a function, since Python for-loops do not create new scopes. If a function is not used, automatic registration will mistakenly only discover the final transform object defined in your for-loop. This function should return a list of the generated transform objects, and the return value should be set equal to a variable. Following these criteria in a module that is configured to be discovered with automatic registration will allow you to use automatic registration with generated transforms. Alternatively, you can use manual registration.
- Changes to the list of input datasets: If the list of input datasets changes between builds, for example, if the list of input datasets is read from a file that is modified between builds, the build will fail. This is because the new dataset references will not be found in the job specification for the build.
- Dynamic input/output naming: Dynamic input/output naming is not possible in transforms. When the CI job runs, all relations between inputs and outputs are determined, including the links between unique identifiers and dataset names. Output datasets that do not exist are created, and a JobSpec is added to them.
- When a dataset is built, the reference to the repository, source file, and the entry point of the function that creates the dataset is obtained from the JobSpec. Following this, the build process is initiated and your function is called to generate the final result. If there are changes in your inputs or outputs and the build process is launched, it will lead to an error because the JobSpecs are no longer valid. This disrupts the connection between the unique identifier and the dataset name.
- Manual registration in Code Repositories: The Build button in Code Repositories may not work for manual registration, and will present a No transforms discovered in the pipeline from the requested file error. You can still build these datasets with Data Lineage or Dataset Preview.
Entry points¶
:::callout{theme="neutral"}
The default entry point and Pipeline object setup is sufficient for most use cases. Configuring the entry point is only recommended for multi-pipeline repositories or differing directory structures.
:::
The runtime responsible for executing a Python transformation needs to be able to find the project’s Pipeline object. To export a Pipeline object, add it to the entry_points argument in the setup.py file in a Python transforms sub-project. For more information about entry points, refer to the setuptools library documentation ↗.
By default, it is required that each Python sub-project exports a transforms.pipelines entry point named root. This entry point references the module name and the Pipeline attribute.
For example, if you have a Pipeline called “my_pipeline” defined in myproject/pipeline.py as show below:
from transforms.api import Pipeline
my_pipeline = Pipeline()
You can register this Pipeline in setup.py as follows:
from setuptools import find_packages, setup
setup(
entry_points={
'transforms.pipelines': [
'root = myproject.pipeline:my_pipeline'
]
}
)
In the code above, root refers to the name of the Pipeline object you are exporting. myproject.pipeline refers to the module containing your Pipeline, and my_pipeline refers to the Pipeline attribute defined in that module.
中文翻译¶
管道(Pipelines)¶
仓库中的每个 Python 转换(transforms)子项目都会暴露一个单独的 transforms.api.Pipeline 对象。该 Pipeline 对象用于执行以下操作:
- 在 Foundry 中注册数据集(datasets),并附带如何构建它们的说明。
- 在 Foundry 构建期间,定位并执行负责构建指定数据集的
@transform逻辑。
在大多数情况下,默认的仓库设置会自动注册转换(transforms),Pipeline 对象无需特殊配置。
向管道添加转换(transforms)¶
当与项目管道关联的转换将数据集声明为输出(Output)时,您可以在 Foundry 中构建该数据集。向 Pipeline 对象添加转换的两种推荐方式是自动注册和手动注册。
:::callout{theme="neutral"}
如果您有更高级的工作流,和/或希望显式地将每个转换添加到项目的管道中,可以使用手动注册。否则,强烈建议使用自动注册,以确保注册代码简洁且集中。使用自动注册时,discover_transforms 方法会递归发现模块级别定义的任何转换。更多信息请参考以下章节。
:::
自动注册(Automatic registration)¶
:::callout{theme="warning"}
discover_transforms 方法会导入它找到的每个模块。因此,导入模块中的任何代码都将被执行。
:::
随着项目复杂度的增加,手动向 Pipeline 对象添加转换可能会变得难以管理。为解决此问题,Pipeline 对象提供了 discover_transforms() 方法,用于递归发现 Python 模块或包中的所有转换。
from transforms.api import Pipeline
import my_module # 这是您的转换定义所在位置
my_pipeline = Pipeline()
my_pipeline.discover_transforms(my_module)
手动注册(Manual registration)¶
可以使用 add_transforms() 函数手动将转换添加到 Pipeline 对象。该函数接受任意数量的转换并将其添加到管道中。它还会检查是否有两个转换声明了相同的输出数据集。
from transforms.api import transform, Pipeline, Input, Output
@transform.using(
my_output=Output('/path/to/output/dataset'),
my_input=Input('/path/to/input/dataset')
)
def my_compute_function(my_output, my_input):
my_output.write_table(my_input.polars())
my_pipeline = Pipeline()
my_pipeline.add_transforms(my_compute_function)
转换生成(Transform generation)¶
:::callout{theme="warning"} 如果您想定义一个创建多个输出的数据转换,可以使用转换生成或定义多输出转换。使用转换生成时,可能需要为每个输出读取并处理一次相同的输入。而使用多输出转换,则可以只读取和处理一次输入。更多信息请查阅优化多输出转换的文档。 :::
您可能希望跨多个转换重用相同的数据转换逻辑。例如,考虑以下场景:
- 您有一个包含各州信息的数据集。您有按州过滤输入并计算各种统计数据的代码。
- 您有多个可能包含空值的输入数据集。您有删除空值的代码。
在这两种情况下,跨多个转换使用相同的数据转换代码会非常有用。您无需为每个输出单独定义转换对象,而是可以使用 for 循环生成转换对象,并将它们批量注册到项目的管道中。
以下是一个转换生成器的示例:
```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output
def transform_generator(sources): transforms = [] # 此示例使用了多个输入数据集。您也可以从单个输入数据集生成多个输出。 for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # 要捕获函数中的 source 变量,请将其作为带有默认值的关键字参数传递。 df = my_input.polars() filtered = df.filter(pl.col('source') == source) output.write_table(filtered) transforms.append(compute_function) return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
```python tab="DuckDB"
from transforms.api import transform, Input, Output
def transform_generator(sources):
transforms = []
# 此示例使用了多个输入数据集。您也可以从单个输入数据集生成多个输出。
for source in sources:
@transform.using(
my_input=Input('/sources/{source}/input'.format(source=source)),
output=Output('/sources/{source}/output'.format(source=source))
)
def compute_function(ctx, my_input, output, source=source):
# 要捕获函数中的 source 变量,请将其作为带有默认值的关键字参数传递。
conn = ctx.duckdb().conn
query = conn.sql(f"""SELECT * FROM my_input WHERE source = '{source}'""")
output.write_table(query)
transforms.append(compute_function)
return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
```python tab="Pandas" from transforms.api import transform, Input, Output
def transform_generator(sources): transforms = [] # 此示例使用了多个输入数据集。您也可以从单个输入数据集生成多个输出。 for source in sources: @transform.using( my_input=Input('/sources/{source}/input'.format(source=source)), output=Output('/sources/{source}/output'.format(source=source)) ) def compute_function(my_input, output, source=source): # 要捕获函数中的 source 变量,请将其作为带有默认值的关键字参数传递。 df = my_input.pandas() filtered = df[df['source'] == source] output.write_table(filtered) transforms.append(compute_function) return transforms
TRANSFORMS = transform_generator(['src1', 'src2', 'src3'])
如果使用手动注册,您可以将生成的转换添加到管道中。如果您不熟悉 `*` 语法,请参考 [Python 文档 ↗](https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists)。
```python
import my_module
my_pipeline = Pipeline()
my_pipeline.add_transforms(*my_module.TRANSFORMS)
:::callout{theme="warning" title="重要注意事项"} 请仔细阅读以下注意事项,了解如何避免错误和失败。 :::
- 捕获 source 值: 要在函数中捕获 source 变量,您必须在计算函数中将
source作为带有默认值的关键字参数传递。 - 使用 for 循环生成转换: 生成转换对象的循环必须位于函数内部,因为 Python 的 for 循环不会创建新的作用域。如果不使用函数,自动注册将错误地只发现 for 循环中定义的最后一个转换对象。该函数应返回生成的转换对象列表,并将返回值赋给一个变量。在配置为使用自动注册发现的模块中遵循这些标准,将允许您对生成的转换使用自动注册。或者,您也可以使用手动注册。
- 输入数据集列表的更改: 如果输入数据集列表在构建之间发生变化,例如,列表是从一个在构建之间被修改的文件中读取的,则构建将失败。这是因为新的数据集引用将无法在构建的作业规范(JobSpec)中找到。
- 动态输入/输出命名: 转换中不支持动态输入/输出命名。当 CI 作业运行时,所有输入和输出之间的关系都会被确定,包括唯一标识符和数据集名称之间的链接。不存在的输出数据集会被创建,并为其添加一个JobSpec。
- 当构建数据集时,会从 JobSpec 中获取对仓库、源文件以及创建数据集的函数入口点的引用。随后,构建过程启动,并调用您的函数以生成最终结果。如果输入或输出发生变化并启动了构建过程,将导致错误,因为 JobSpecs 不再有效。这会破坏唯一标识符与数据集名称之间的连接。
- 代码仓库中的手动注册: 代码仓库中的构建按钮可能不适用于手动注册,并会显示在请求的文件中未发现管道中的转换错误。您仍然可以通过数据沿袭(Data Lineage)或数据集预览(Dataset Preview)来构建这些数据集。
入口点(Entry points)¶
:::callout{theme="neutral"}
默认的入口点和 Pipeline 对象设置足以满足大多数用例。仅建议在多管道仓库或目录结构不同的情况下配置入口点。
:::
负责执行 Python 转换的运行时需要能够找到项目的 Pipeline 对象。要导出一个 Pipeline 对象,请将其添加到 Python 转换子项目的 setup.py 文件中的 entry_points 参数中。有关入口点的更多信息,请参考 setuptools 库文档 ↗。
默认情况下,要求每个 Python 子项目导出一个名为 root 的 transforms.pipelines 入口点。该入口点引用模块名称和 Pipeline 属性。
例如,如果您有一个名为 "my_pipeline" 的 Pipeline,定义在 myproject/pipeline.py 中,如下所示:
from transforms.api import Pipeline
my_pipeline = Pipeline()
您可以在 setup.py 中按如下方式注册此 Pipeline:
from setuptools import find_packages, setup
setup(
entry_points={
'transforms.pipelines': [
'root = myproject.pipeline:my_pipeline'
]
}
)
在上述代码中,root 指的是您正在导出的 Pipeline 对象的名称。myproject.pipeline 指的是包含您的 Pipeline 的模块,而 my_pipeline 指的是该模块中定义的 Pipeline 属性。