跳转至

Transforms(转换(Transforms))

Foundry Transforms are the core building blocks of data processing pipelines. They define how data flows from inputs to outputs, encapsulating both the computational logic and resource requirements in a single, declarative unit.

Transforms are registered in Pipelines that can be made up of multiple transform definitions.

Define transforms

Every transform consists of three key components:

  1. Decorators: Define inputs, outputs, and configuration
  2. Function signature: Matches the inputs and outputs declared in the decorator
  3. Compute logic: The actual data processing code

Below is the basic pattern used to define transforms:

from transforms.api import transform, Input, Output


@transform.using( # decorator
    hair_eye_color=Input('/examples/students_hair_eye_color'), # inputs
    processed=Output('/examples/hair_eye_color_processed') # outputs
)
def filter_hair_color(hair_eye_color, processed):  # function signature matches input/output
    ... # compute logic

Python transforms can be configured with either single-node (lightweight) or multi-node (Spark) compute engines. By default, transforms run on a single-node, and you can load the data as a Polars ↗ or pandas ↗ DataFrame. Polars is a DataFrame library for transforming tabular data. It is known for its performance, stability, and ease of use. Pandas is a widely adopted and easy to use data analysis tool.

To retrieve a DataFrame, call the appropriate method on your input as shown below:

```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Polars DataFrame df = hair_eye_color.polars()

# Apply filtering logic
filtered = df.filter(pl.col('hair') == 'Brown')

# Write the result
processed.write_table(filtered)

python tab="DuckDB" from transforms.api import transform, Input, Output @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, hair_eye_color, processed): # Get DuckDB connection conn = ctx.duckdb().conn # Apply filtering logic using SQL filtered = conn.sql("SELECT * FROM hair_eye_color WHERE hair = 'Brown'") # Write the result processed.write_table(filtered) ```

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Pandas DataFrame df = hair_eye_color.pandas()

# Apply filtering logic
filtered = df[df['hair'] == 'Brown']

# Write the result
processed.write_table(filtered)

The example below demonstrates that transforms can take multiple inputs and outputs.python from transforms.api import transform, Input, Output @transform.using( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ... ```

Work with varied data formats

The lightweight transforms API supports loading data in a variety of formats, as demonstrated in the example below:

@transform.using(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input'))
def compute(ctx, output, dataset):
    polars_df = dataset.polars()         # polars_df is a polars.DataFrame object
    lazy_df = dataset.polars(lazy=True)  # Activate streaming mode, lazy_df is a polars.LazyFrame
    pandas_df = dataset.pandas()         # pandas_df is a pandas.DataFrame
    arrow_table = dataset.arrow()        # arrow_table is a pyarrow.Table

    # DuckDB connection with pre-registered input datasets
    conn = ctx.duckdb().conn
    df_from_duckdb = conn.sql("SELECT * FROM dataset").fetchdf()  # dataset is automatically registered

    output.write_table(lazy_df)          # Any of the formats above can be passed to write_table

Note that calling dataset.pandas() expects pandas to be installed in your environment. Likewise, dataset.polars(...) requires Polars to be available.

Configure compute resources

You can control the compute resources allocated to your transforms using the .with_resources() method.

from transforms.api import transform, Input, Output


@transform.using(
    large_dataset=Input('/datasets/large_file'),
    processed=Output('/datasets/processed')
).with_resources(
    cpu_cores=8,          # Use 8 CPU cores
    memory_gb=16,         # Allocate 16GB memory
)
def process_large_dataset(large_dataset, processed):
    ...
  • cpu_cores: The number of CPU cores. This can be fractional, for example, 0.5, defaults to 2.
  • memory_mb or memory_gb: Memory allocation; only use one, not both.

Transform context

There may be cases when a data transformation depends on things other than its input datasets. For instance, a transformation may be required to access the current Spark session or to contact an external service. In such cases, you can inject a TransformContext object into the transformation.

To inject a TransformContext object, your compute function must accept a parameter called ctx. Note that this also means that no inputs or outputs may be named ctx. Refer to the API reference ↗ for the full set of TransformContext capabilities.

from transforms.api import transform, Output


@transform.using(
    out=Output('/examples/context')
)
def generate_dataframe(ctx, out):
    ...

Pyspark transforms

By default, transforms run on a single compute node and leverage data processing libraries like pandas or Polars. For big data compute workflows, PySpark ↗ can be leveraged in Python transforms.

PySpark is a wrapper language that allows you to interface with an Apache Spark backend to quickly process data. Spark can operate on very large datasets across a distributed network of servers, which provides scaling and reliability benefits when used correctly. However, it also comes with higher overhead and resource usage, making it a poor choice for small to medium scale data.

While a high-level overview of the PySpark transforms APIs are given here, refer to the Python transforms (Spark) documentation for more details.

Define PySpark transforms

PySpark transforms are defined with similar syntax to standard transforms.

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): ... # transform logic goes here

Similarly to Polars or pandas transforms, a DataFrame object can be retrieved from the transform input.

```python tab="PySpark"
from transforms.api import transform, Input, Output


@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(hair_eye_color, processed):
    filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown')
    processed.write_dataframe(filtered_df)

:::callout{theme="neutral"} The DataFrame objects returned by transform inputs are PySpark DataFrames. For more information about working with PySpark, refer to the Spark Python API documentation ↗. :::

PySpark transforms can also take multiple inputs and outputs:

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ...

Since PySpark transforms only support one data processing library, the `transform_df` decorator can be used to directly access the [`DataFrame` ↗](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html) for convenience.

```python tab="PySpark"
from transforms.api import transform_df, Input, Output


@transform_df(
    Output('/examples/hair_eye_color_processed'),
    hair_eye_color=Input('/examples/students_hair_eye_color')
)
def filter_hair_color(hair_eye_color):
    return hair_eye_color.filter(hair_eye_color.hair == 'Brown')

Configure PySpark transforms

In PySpark transforms, resources are configured with spark profiles. These allow strict access controls for large scale compute usage.

Transform logic level versioning

:::callout{theme="warning" title="Warning"} For TLLV to function correctly, your code must declare all imports at the module level and should not attempt to patch or otherwise modify objects in another module. If this is not the case in your project, you must disable TLLV. Refer to the code example below for more information.

TLLV is enabled by default. To disable TLLV set tllv in transformsPython configuration to false. This configuration is inside the build.gradle file in your Python transforms subproject.

transformsPython {
    tllv false
}
:::

A transform’s version is a string that is used to compare two versions of a transform when considering logic staleness. A transform’s output is up to date if its inputs are unchanged and if the transform’s version is unchanged. If the version changes, the transform’s output will be invalidated and recomputed.

By default, a transform’s version includes the following:

  • The module where the transform is defined
  • All modules that the transform depends on
  • Any project dependencies

If any of these change, the version string will be changed. If you want to invalidate outputs if a change happens in file that is not covered by listed parts, set tllvFiles in the transformsPython configuration. An example use case is if you are reading a file's configuration and want to invalidate outputs when the configuration is changed.

Transactionality

Transforms execute in the context of a transaction that is opened on the outputs and committed upon success of the job. If no logic is defined on outputs or errors during write are caught and swallowed, a successful transform will result in an empty snapshot on such outputs.


中文翻译


转换(Transforms)

Foundry Transforms 是数据处理流水线(Pipeline)的核心构建块,它定义了数据从输入到输出的流转逻辑,将计算逻辑和资源要求封装在单个声明式单元中。

转换需注册到Pipelines中,一个流水线可包含多个转换定义。

定义转换

每个转换都由三个核心组件构成: 1. 装饰器(Decorator):定义输入、输出和配置项 2. 函数签名(Function signature):与装饰器中声明的输入、输出一一对应 3. 计算逻辑(Compute logic):实际执行的数据处理代码

以下是定义转换的基础模板:

from transforms.api import transform, Input, Output


@transform.using( # decorator
    hair_eye_color=Input('/examples/students_hair_eye_color'), # inputs
    processed=Output('/examples/hair_eye_color_processed') # outputs
)
def filter_hair_color(hair_eye_color, processed):  # function signature matches input/output
    ... # compute logic

Python转换可配置为单节点(轻量)或多节点(Spark)计算引擎(Compute engine)。默认情况下,转换运行在单节点上,你可以将数据加载为Polars ↗pandas ↗数据帧(DataFrame)。Polars是用于处理表格数据的DataFrame库,以高性能、高稳定性和易用性著称;Pandas是被广泛使用的易用数据分析工具。

要获取DataFrame,可对输入对象调用对应方法,示例如下: ```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Polars DataFrame df = hair_eye_color.polars()

# Apply filtering logic
filtered = df.filter(pl.col('hair') == 'Brown')

# Write the result
processed.write_table(filtered)

python tab="DuckDB" from transforms.api import transform, Input, Output @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, hair_eye_color, processed): # Get DuckDB connection conn = ctx.duckdb().conn # Apply filtering logic using SQL filtered = conn.sql("SELECT * FROM hair_eye_color WHERE hair = 'Brown'") # Write the result processed.write_table(filtered) ```

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): # Load data as Pandas DataFrame df = hair_eye_color.pandas()

# Apply filtering logic
filtered = df[df['hair'] == 'Brown']

# Write the result
processed.write_table(filtered)

以下示例展示了转换可支持多输入、多输出:python from transforms.api import transform, Input, Output @transform.using( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ... ```

处理多种数据格式

轻量转换API支持加载多种格式的数据,示例如下:

@transform.using(output=Output('/Project/folder/output'), dataset=Input('/Project/folder/input'))
def compute(ctx, output, dataset):
    polars_df = dataset.polars()         # polars_df is a polars.DataFrame object
    lazy_df = dataset.polars(lazy=True)  # Activate streaming mode, lazy_df is a polars.LazyFrame
    pandas_df = dataset.pandas()         # pandas_df is a pandas.DataFrame
    arrow_table = dataset.arrow()        # arrow_table is a pyarrow.Table

    # DuckDB connection with pre-registered input datasets
    conn = ctx.duckdb().conn
    df_from_duckdb = conn.sql("SELECT * FROM dataset").fetchdf()  # dataset is automatically registered

    output.write_table(lazy_df)          # Any of the formats above can be passed to write_table

注意:调用dataset.pandas()需要你的环境中已安装pandas,同理dataset.polars(...)需要环境中存在Polars依赖。

配置计算资源

你可以通过.with_resources()方法控制分配给转换的计算资源:

from transforms.api import transform, Input, Output


@transform.using(
    large_dataset=Input('/datasets/large_file'),
    processed=Output('/datasets/processed')
).with_resources(
    cpu_cores=8,          # Use 8 CPU cores
    memory_gb=16,         # Allocate 16GB memory
)
def process_large_dataset(large_dataset, processed):
    ...

  • cpu_cores:CPU核心数,支持小数(例如0.5),默认值为2
  • memory_mbmemory_gb:内存配置,二者仅可选择其一,不可同时使用。

转换上下文(Transform Context)

部分场景下,数据转换逻辑可能依赖输入数据集以外的资源,例如转换需要访问当前Spark会话、或请求外部服务。这种情况下,你可以向转换中注入TransformContext对象。

要注入TransformContext对象,你的计算函数需要接收一个名为ctx的参数,同时这意味着所有输入、输出的命名都不能为ctx。完整的TransformContext能力说明请参考API参考 ↗

from transforms.api import transform, Output


@transform.using(
    out=Output('/examples/context')
)
def generate_dataframe(ctx, out):
    ...

PySpark转换

默认情况下,转换运行在单个计算节点上,使用pandas、Polars等数据处理库。对于大数据计算工作流,你可以在Python转换中使用PySpark ↗

PySpark是一种封装语言,允许你对接Apache Spark后端实现快速数据处理。Spark可跨分布式服务器集群处理超大规模数据集,正确使用时可提供优秀的扩展性和可靠性,但它的开销和资源占用也更高,不适合中小规模的数据处理场景。

本文仅提供PySpark转换API的高层概述,更多细节请参考Python转换(Spark)文档。

定义PySpark转换

PySpark转换的定义语法和标准转换基本一致: ```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(hair_eye_color, processed): ... # transform logic goes here

和Polars、pandas转换类似,你可以从转换输入中获取DataFrame对象:
```python tab="PySpark"
from transforms.api import transform, Input, Output


@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(hair_eye_color, processed):
    filtered_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown')
    processed.write_dataframe(filtered_df)

:::callout{theme="neutral"} 转换输入返回的DataFrame对象为PySpark DataFrame。了解更多PySpark的使用方法,请参考Spark Python API文档 ↗。 :::

PySpark转换同样支持多输入、多输出: ```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( hair_color=Input('/examples/students_hair_color'), eye_color=Input('/examples/students_eye_color'), males=Output('/examples/hair_eye_color_males'), females=Output('/examples/hair_eye_color_females'), ) def filter_hair_color(hair_color, eye_color, males, females): ...

由于PySpark转换仅支持一种数据处理库,为简化使用,你可以使用`transform_df`装饰器直接获取[`DataFrame` ↗](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)对象:
```python tab="PySpark"
from transforms.api import transform_df, Input, Output


@transform_df(
    Output('/examples/hair_eye_color_processed'),
    hair_eye_color=Input('/examples/students_hair_eye_color')
)
def filter_hair_color(hair_eye_color):
    return hair_eye_color.filter(hair_eye_color.hair == 'Brown')

配置PySpark转换

在PySpark转换中,资源通过Spark配置文件(Spark profile)进行配置,这类配置支持对大规模计算资源的使用进行严格的权限管控。

转换逻辑层级版本控制(Transform Logic Level Versioning, TLLV)

:::callout{theme="warning" title="警告"} 要让TLLV正常工作,你的代码必须在模块层级声明所有导入,不得尝试修补或修改其他模块中的对象。如果你的项目不满足该要求,必须禁用TLLV。参考下方代码示例了解更多信息。

TLLV默认处于启用状态。 如需禁用TLLV,请将transformsPython配置中的tllv设置为false,该配置位于Python转换子项目的build.gradle文件中。

transformsPython {
    tllv false
}
:::

转换版本是一个字符串,用于判断转换逻辑是否过期时对比两个版本的差异。如果转换的输入未发生变化,且转换版本未变更,则转换的输出为最新状态;如果版本发生变化,转换的输出将被标记为失效,需要重新计算。

默认情况下,转换的版本包含以下内容: * 定义转换的模块 * 转换依赖的所有模块 * 项目的所有依赖项

以上任意项发生变更,版本字符串就会变化。如果你希望在上述范围以外的文件发生变更时也触发输出失效,可以在transformsPython配置中设置tllvFiles,适用场景例如:你读取了某个配置文件的内容,希望配置变更时失效对应输出。

事务性(Transactionality)

转换在事务上下文中执行,事务会在输出上开启,任务执行成功后提交。如果输出没有定义写入逻辑,或者写入过程中的错误被捕获并忽略,成功执行的转换会在对应输出上生成空快照。