Incremental transforms(增量转换(Incremental transforms))¶
Incremental computation is an efficient method of performing transforms on an input dataset to generate an output dataset. By leveraging the build history of a transform, incremental computation avoids the need to recompute the entire output dataset every time a transform is run. Note that the input dataset and output dataset cannot be the same; the input and output datasets must be different to avoid creating a cyclic (circular) dependency.
For end-to-end guidance on how to create and manage incremental pipelines, see the building pipelines section.
Example: Non-incremental and incremental transforms¶
In this section, we examine the benefits of incremental transforms by first considering a code example that does not use incremental transforms:
```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output
@transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars()
# This is inefficient because the filter function is performed over the entire input rather than on new data only
processed.write_table(students_df.filter(pl.col('hair') == 'Brown'))
python tab="Pandas"
from transforms.api import transform, Input, Output
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.polars()
# This is inefficient because the filter function is performed over the entire input rather than on new data only
processed.write_table(students_df[students_df['hair'] == 'Brown'])
```
```python tab="DuckDB" from transforms.api import transform, Input, Output
@transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, students, processed): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") processed.write_table(query)
```python tab="PySpark"
from transforms.api import transform, Input, Output
@transform.spark.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.dataframe()
# This is inefficient because the filter function is performed over the entire input rather than on new data only
processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
If any new data is added to the /examples/students_hair_eye_color input dataset, the filter() is performed over the entire input, rather than just the new data added to the input. This is wasteful of both compute resources and time.
If a transform can become aware of its build history, it can be smarter about how to compute its output. More specifically, it can use the changes made to the inputs to modify the output dataset. This process of using already materialized data when re-materializing tables is called incremental computation. Without incremental computation, the output dataset is always replaced by the latest transform output.
Let’s go back to the example transform shown above. The transform performs a filter() over the students dataset to write out students with brown hair. With incremental computation, if data about two new students is appended to students, the transform can use information about its build history to append only the new brown-haired students to the output:
+---+-----+-----+------+ +---+-----+-----+------+
| id| hair| eye| sex| | id| hair| eye| sex|
+---+-----+-----+------+ Build 1 +---+-----+-----+------+
| 17|Black|Green|Female| ---------> | 18|Brown|Green|Female|
| 18|Brown|Green|Female| +---+-----+-----+------+
| 19| Red|Black|Female|
+---+-----+-----+------+
... ...
+---+-----+-----+------+ Build 2 +---+-----+-----+------+
| 20|Brown|Amber|Female| ---------> | 20|Brown|Amber|Female|
| 21|Black|Blue |Male | +---+-----+-----+------+
+---+-----+-----+------+
The example transform above can therefore be rewritten using incremental logic with the following syntax:
```python tab="Polars" import polars as pl from transforms.api import transform, incremental, Input, Output
@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars('added') processed.write_table(students_df.filter(pl.col('hair') == 'Brown'))
```python tab="Pandas"
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.pandas('added')
processed.write_table(students_df[students_df['hair'] == 'Brown'])
```python tab="DuckDB" from transforms.api import transform, incremental, Input, Output
@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, students, processed): conn = ctx.duckdb(read_modes={students: "added"}).conn query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") processed.write_table(query)
```python tab="PySpark"
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.spark.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.dataframe('added')
processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
For more information on incremental transforms and the @incremental decorator, refer to the incremental transforms reference.
:::callout{theme="warning" title="Empty inputs in incremental transforms for streaming datasets"}
When running an incremental lightweight transform on a streaming dataset, the incremental window may contain no new files to process. In this case, calling .polars(lazy=True) on the empty input can trigger a BinderException because the underlying integration expects a non-empty list of files.
To avoid this error, add a check for empty input before reading. You can use the is_incremental property on the context and check the input for data before calling .polars(lazy=True):
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.using(
my_output=Output('/path/to/output'),
my_input=Input('/path/to/input'),
)
def compute(ctx, my_output, my_input):
if ctx.is_incremental and my_input.polars().is_empty():
return
df = my_input.polars(lazy=True)
my_output.write_table(df)
Alternatively, you can ensure that the incremental window always contains at least one file before running the transform. :::
:::callout{theme="neutral"} It is safe to compute joins when one of the datasets is a reference table that gets read fully and the other one is an incremental dataset that is read incrementally. However, reading both datasets that take part in a join incrementally requires special handling. Refer to the example leveraging incremental transforms to join large datasets for more information. :::
Use with media sets¶
Incremental computation is now supported for media sets. See the incremental media set documentation for details.
Limit the batch size of an incremental input¶
You can configure transactional dataset batching for incremental transforms. Review the documentation on limiting batch sizes of incremental inputs for more details.
中文翻译¶
增量转换(Incremental transforms)¶
增量计算是一种高效的方法,用于对输入数据集执行转换以生成输出数据集。通过利用转换的构建历史,增量计算避免了每次运行转换时都需要重新计算整个输出数据集。请注意,输入数据集和输出数据集不能相同;输入和输出数据集必须不同,以避免产生循环依赖。
有关创建和管理增量管道的端到端指南,请参阅构建管道部分。
示例:非增量转换与增量转换¶
在本节中,我们首先通过一个未使用增量转换的代码示例来考察增量转换的优势:
```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output
@transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars()
# 这种做法效率低下,因为过滤函数是在整个输入上执行的,而不是仅对新数据执行
processed.write_table(students_df.filter(pl.col('hair') == 'Brown'))
python tab="Pandas"
from transforms.api import transform, Input, Output
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.polars()
# 这种做法效率低下,因为过滤函数是在整个输入上执行的,而不是仅对新数据执行
processed.write_table(students_df[students_df['hair'] == 'Brown'])
```
```python tab="DuckDB" from transforms.api import transform, Input, Output
@transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, students, processed): conn = ctx.duckdb().conn query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") processed.write_table(query)
```python tab="PySpark"
from transforms.api import transform, Input, Output
@transform.spark.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.dataframe()
# 这种做法效率低下,因为过滤函数是在整个输入上执行的,而不是仅对新数据执行
processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
如果向 /examples/students_hair_eye_color 输入数据集添加了任何新数据,filter() 将在整个输入上执行,而不是仅对添加到输入中的新数据执行。这既浪费计算资源,也浪费时间。
如果转换能够感知其构建历史,就可以更智能地计算其输出。更具体地说,它可以使用对输入所做的更改来修改输出数据集。这种在重新物化表时使用已物化数据的过程称为增量计算(Incremental computation)。如果没有增量计算,输出数据集总是被最新的转换输出所替换。
让我们回到上面展示的示例转换。该转换对 students 数据集执行 filter(),以输出棕色头发的学生。使用增量计算,如果关于两名新学生的数据被追加到 students 中,转换可以利用其构建历史信息,仅将新的棕色头发学生追加到输出中:
+---+-----+-----+------+ +---+-----+-----+------+
| id| hair| eye| sex| | id| hair| eye| sex|
+---+-----+-----+------+ 构建 1 +---+-----+-----+------+
| 17|Black|Green|Female| ---------> | 18|Brown|Green|Female|
| 18|Brown|Green|Female| +---+-----+-----+------+
| 19| Red|Black|Female|
+---+-----+-----+------+
... ...
+---+-----+-----+------+ 构建 2 +---+-----+-----+------+
| 20|Brown|Amber|Female| ---------> | 20|Brown|Amber|Female|
| 21|Black|Blue |Male | +---+-----+-----+------+
+---+-----+-----+------+
因此,上述示例转换可以使用以下语法重写为增量逻辑:
```python tab="Polars" import polars as pl from transforms.api import transform, incremental, Input, Output
@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars('added') processed.write_table(students_df.filter(pl.col('hair') == 'Brown'))
```python tab="Pandas"
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.pandas('added')
processed.write_table(students_df[students_df['hair'] == 'Brown'])
```python tab="DuckDB" from transforms.api import transform, incremental, Input, Output
@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(ctx, students, processed): conn = ctx.duckdb(read_modes={students: "added"}).conn query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") processed.write_table(query)
```python tab="PySpark"
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.spark.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
students_df = students.dataframe('added')
processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))
有关增量转换和 @incremental 装饰器的更多信息,请参阅增量转换参考。
:::callout{theme="warning" title="流式数据集的增量转换中的空输入"}
在对流式数据集运行增量轻量级转换时,增量窗口可能不包含要处理的新文件。在这种情况下,对空输入调用 .polars(lazy=True) 可能会触发 BinderException,因为底层集成期望一个非空的文件列表。
为避免此错误,请在读取之前添加空输入检查。您可以使用上下文上的 is_incremental 属性,并在调用 .polars(lazy=True) 之前检查输入是否有数据:
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform.using(
my_output=Output('/path/to/output'),
my_input=Input('/path/to/input'),
)
def compute(ctx, my_output, my_input):
if ctx.is_incremental and my_input.polars().is_empty():
return
df = my_input.polars(lazy=True)
my_output.write_table(df)
或者,您可以确保在运行转换之前,增量窗口始终包含至少一个文件。 :::
:::callout{theme="neutral"} 当一个数据集是完全读取的参考表,而另一个是增量读取的增量数据集时,执行连接操作是安全的。但是,对参与连接的两个数据集都进行增量读取需要特殊处理。有关更多信息,请参考示例利用增量连接大型数据集。 :::
与媒体集(Media sets)一起使用¶
增量计算现在支持媒体集(Media sets)。详情请参阅增量媒体集文档。
限制增量输入的批处理大小¶
您可以为增量转换配置事务性数据集(Transactional dataset)批处理。有关更多详细信息,请查看限制增量输入的批处理大小的文档。