跳转至

Incremental usage guide(增量使用指南)

Incremental decorator

:::callout{theme="success"} This guide refers to incremental vs non-incremental builds. It is assumed that in all cases, the incremental() decorator is being used. Thus, this terminology just refers to whether the transform is actually run incrementally. :::

The incremental()decorator can be used to wrap a transform's compute function with logic for enabling incremental computation:

```python tab="Polars" import polars as pl from transforms.api import transform, incremental, Input, Output

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars(mode='added') # Or just students.polars() since 'added' is default processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
from transforms.api import transform, incremental, Input, Output

@incremental()
@transform.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    # DuckDB automatically supports incremental read modes
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas(mode='added') # Or just students.pandas() since 'added' is default processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
from transforms.api import transform, incremental, Input, Output

@incremental()
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe('added')
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

The incremental() decorator can be used to wrap any existing transform that uses the transform(), transform.using(), transform_df(), or transform_pandas() decorator. Note that the compute function for your transform must support being run both incrementally and non-incrementally. The incremental() decorator does two key things:

  • It allows the transform to look up information about its previous build. Using this information, the incremental()decorator then decides whether or not the transform can be run incrementally according to the requirements described below.
  • It converts the input, output, and context objects into the incremental subclasses that provide additional functionality. Specifically, TransformInput becomes IncrementalTransformInput, TransformOutput becomes IncrementalTransformOutput, and TransformContext becomes IncrementalTransformContext. These incremental objects are then passed into the transform wrapped by the decorator.

The incremental decorator takes six arguments:

transforms.api.incremental(
    require_incremental=False,
    semantic_version=1,
    snapshot_inputs=None,
    allow_retention=False,
    strict_append=False,
    v2_semantics=False
)

Setting the require_incremental argument to True makes the transform fail if it cannot run incrementally. There are two cases where the transform is allowed to run as a snapshot, even if require_incremental=True:

  1. One of the outputs has never been built before.
  2. The semantic version has changed, meaning a snapshot was explicitly requested.

To debug the cause of a transform failing to run incrementally, look in the driver logs for warning transforms.api._incremental: Not running incrementally.

The semantic_version argument on the \:func:~transforms.api.incremental decorator allows you to force the next run of the transform to be non-incremental.

  • If the semantic version of the current run is different than the semantic version of the previous run, the transform will run non-incrementally.
  • If not specified, the semantic version is set to 1.
  • If the semantic version of the previous run does not exist (for example, when converting an existing transform to incremental transform), value 1 is assumed. This allows the transform to start running incrementally without requiring a new snapshot.
  • To force a subsequent run of the transform to be non-incremental, you can bump the semantic_version argument on the @incremental() decorator.

The snapshot_inputs argument allows you to define some inputs as snapshot inputs which, unlike non-snapshot inputs, support update and delete modifications. See snapshot inputs for more information.

Setting the allow_retention argument to True allows deletion of files in input and output datasets by retention policies while maintaining incrementality of your transform.

If the strict_append parameter is set to True and the input datasets are incremental, then the underlying Foundry transaction type is set to be an APPEND, and an APPEND transaction will be used for incremental writes. Trying to overwrite an existing file will lead to an exception. If the input datasets are not incremental, strict_append will run as SNAPSHOT. You should use require_incremental=True to ensure the code runs incrementally as APPEND. Trying to overwrite an existing file will succeed. Note that the write operation may not overwrite any files, even auxiliary ones such as Parquet summary metadata or Hadoop SUCCESS files. Incremental writes for all Foundry formats should support strict_append mode.

If the v2_semantics parameter is set to True, v2 incremental semantics will be used. There should be no difference in behavior between v2 and v1 incremental semantics, and we recommend all users set this to True. Non-Catalog input and output resources may only be read from/written to incrementally if using v2 semantics.

Important information

As mentioned above, the compute function for your transform wrapped with the incremental() decorator must support being run both incrementally and non-incrementally. Default read and write modes (explained in more detail throughout the rest of this guide) can assist with this dual-logic requirement, but it may still be necessary to branch based on the is_incremental property of the compute context.

Another key point is that using the incremental() decorator with transform_df() or transform_pandas() only gives you access to the default read and write modes. This is sufficient if you have a transform where the added output rows are a function only of the added input rows (refer to the append example). If, however, your transform performs more complex logic (such as joins, aggregations, or distinct) that requires you to set the input read mode or the output write mode, then you should use the incremental() decorator with transform(). Using the incremental decorator with transform() allows you to set the read and write modes.

:::callout{theme="warning" title="Warning"} Note that the Code Repositories preview feature will always run transforms in non-incremental mode. This is true even when require_incremental=True is passed into the incremental() decorator. This is unlike the VS Code preview feature which respects incrementality. :::

Incremental modes of inputs and outputs

Incremental transforms are a powerful feature. However, the API can be unintuitive when using for the first time. Incremental transforms behave the same way in VS Code preview as they would during a build. We recommend using VS Code preview when experimenting with the API and understanding it in practice.

IncrementalTransformInput

The transforms.api.IncrementalTransformInput object extends the data reading methods to take an optional read mode parameter.

If you define a transform using the incremental decorator, the read modes behave differently depending on whether your transform is run incrementally or non-incrementally:

Read mode Incremental behavior Non-incremental behavior
added * Returns a DataFrame containing any new rows appended to the input since the last time the transform ran. Returns a DataFrame containing the entire dataset since all rows are considered unseen.
previous Returns a DataFrame containing the entire input given to the transform the last time it ran. Returns an empty DataFrame.
current Returns a DataFrame containing the entire input dataset for the current run. Returns a DataFrame containing the entire input dataset for the current run. This will be the same as added.

The default read mode is added.

There are instances where it is undesirable for an input to be treated in an incremental fashion despite the transform being marked as incremental(). See the snapshot inputs section for more information and how the read mode behavior differs for these types of inputs.

Note that the default output read mode is current, and the available output read modes are added, current, and previous. For more information about output read modes, refer to the section below.

The nature of incremental transforms means that we load all of the past transactions on the input datasets from the last SNAPSHOT transaction to build the input view. If you begin to see progressive slowness in your incremental transform, we recommend running a SNAPSHOT build on your incremental input datasets.

IncrementalTransformOutput

The transforms.api.IncrementalTransformOutput object provides access to read and write modes for the output dataset. The key to writing logic compatible with both incremental and non-incremental builds is the default write mode of modify. There are two write modes:

  • modify: This mode modifies the existing output with data written during the build. For example, calling write_dataframe() or write_table() when the output is in modify mode will append the written DataFrame to the existing output.
  • replace: This mode fully replaces the output with data written during the build.

When a transform is run incrementally, the default write mode for the output is set to modify. When a transform is run non-incrementally, the default write mode for the output is set to replace.

Recall that the default read mode for input DataFrames is added. Because of the default input read mode of added and the default output write mode of modify, writing logic compatible with incremental and non-incremental builds becomes much easier:

```python tab="Polars" import polars as pl

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # Read only the rows we haven't seen before. new_students_df = students.polars() # this is equivalent to students.polars('added')

# When non-incremental, we read all rows and replace the output.
# When incremental, we read only new rows, and append them to the output.
processed.write_table(
    new_students_df.filter(pl.col("hair") == 'Brown')
)

python tab="DuckDB" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(ctx, students, processed): # Read only the rows we haven't seen before. conn = ctx.duckdb().conn new_students_df_filtered = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") # When non-incremental, we read all rows and replace the output. # When incremental, we read only new rows, and append them to the output. processed.write_table( new_students_df_filtered ) ```

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # Read only the rows we haven't seen before. new_students_df = students.pandas() # this is equivalent to students.pandas('added')

# When non-incremental, we read all rows and replace the output.
# When incremental, we read only new rows, and append them to the output.
processed.write_table(
    new_students_df[new_students_df.hair == 'Brown']
)

python tab="PySpark" @incremental() @transform.spark.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # Read only the rows we haven't seen before. new_students_df = students.dataframe() # this is equivalent to students.dataframe('added') # When non-incremental, we read all rows and replace the output. # When incremental, we read only new rows, and append them to the output. processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') ) ```

There are more complex use cases for incremental computation when it might be required to compute the correct write mode and set it manually. This can be done using the set_mode() method on the incremental output.

:::callout{theme="neutral"} The output write mode can be set manually when using the transform() or transform.using() decorators. With these decorators, you can use set_mode() before you explicitly call the write method to save the output. The transform_df() and transform_pandas() decorators, however, call write_dataframe() and write_pandas() to save the DataFrame output. This means the write mode used will be determined by the incremental() decorator. :::

:::callout{theme="warning" title="Warning"} When using set_mode(), ensure that this is valid behavior both when the transform is run incrementally or non-incrementally. If this is not the case, you should use the is_incremental property. :::

In addition to the write mode, the transforms.api.IncrementalTransformOutput makes it possible to read DataFrames from the output dataset. This can be done using the data reading methods such as pandas(), polars(), or dataframe(), which take an optional read mode parameter. When using transform.using(), specify the mode as a keyword argument: output.pandas(mode='previous'). Default read mode is set to current and other available output read modes are added and previous. Read mode behaves differently depending on what the dataset's write mode is set to.

:::callout{theme="success"} Although default read mode is current, in most cases you should use previous. Other read modes should be used to read dataset after writing to it. :::

Valid combination for reading data from the previous run

The transform must be running incrementally (ctx.is_incremental is True) to access the previous output of the transform. If the transform is running non-incrementally, read modes that would ordinarily allow access to the previous output will return no rows for the previous output data. The behavior described in the table below is motivated by the semantics of incrementality and the fact that the transactions of the current and previous read modes are resolved at the beginning of the transform run.

Output read mode Output write mode Was new data written? Behavior
current modify No There is no use case for these settings. Use previous mode instead.
current modify Yes dataframe() returns the full content of the output of the transform (as it was at the beginning of the build), plus data written to output in the currently running build.
current replace No There is no use case for these settings. Use previous mode instead.
current replace Yes dataframe() returns data written to output in the currently running build.
added modify/replace No There is no use case for these settings. Use previous mode instead.
added modify/replace Yes dataframe() returns data written to output in the currently running build.
previous modify Yes/No dataframe() returns the full content of the output of the transform (as it was at the beginning of the build). Schema is a required field when reading with previous if the transform is running non-incrementally.
previous replace Yes/No dataframe() returns the full content of the output of the transform (as it was at the beginning of the build). Schema is a required field when reading with previous if the transform is running non-incrementally.

The schema ↗ you provide when reading the previous Dataframe is used to generate an empty Dataframe in the case where the transform is running non-incrementally. If the transform is running incrementally, this schema will be compared against the actual schema of the last output. An exception will be raised if the column types, column nullability, or order of the columns do not match. To make sure the order of the columns stays the same, you can use the following construct:

previous = out.dataframe('previous', schema)  # schema is a pyspark.sql.types.StructType

out.write_dataframe(df.select(schema.fieldNames()))

:::callout{theme="neutral"} Foundry saves all columns as nullable, regardless of the schema used in your transform. As a result, your build will fail with a SchemaMismatchError when reading from the output in previous mode if you supply a schema with some fields set to non-nullable. :::

Review the merge and replace example for more information.

:::callout{theme="neutral"} The same read modes are used for both the dataframe() and filesystem() methods to allow working with both structured and unstructured datasets. :::

Note that there is no built-in way for accessing the data written by the previous invocation of a transform. If that data is needed, you can add an auxiliary output dataset to where each transform run can write a copy (or related metadata) of the written outputs, but in replace write mode. This will allow you to only reference the written outputs of the previous transform in the future.

Valid combinations for reading data written in the current run

Output read mode Output write mode Was new data written?
current or added modify / replace Yes

Prefer added as it makes your intentions clearer. A scenario that would benefit from reading the data written by the current transformation is when validating the entire content of the dataset so that the build can be failed on validation failure. This way, recomputing or caching data in Spark to run the checks is not necessary.

IncrementalTransformContext

Compared to the TransformContext object, the IncrementalTransformContext object provides an additional property: is_incremental. This property is set to True if the transform is run incrementally, this means:

  • the default output write mode is set to modify, and
  • the inputs default read mode is set to added.

Summary of incremental modes

The incremental decorator lets you access the previous inputs and outputs of the transform by specifying read mode "previous". This way you can base the current build on historical context. If the transform is run in snapshot mode, the "previous" dataframes will be empty because this is the first run, or because the logic or data changed significantly necessitating a recompute.

However, the most common case is to use "added" mode for inputs and "modify" mode for outputs. These modes are used by default. They let you retrieve the newly added rows from an input dataset, process them, and append them to an output dataset.

Instead of appending rows to the output, you may want to modify some existing rows already present in an output dataset. For that, use the "replace" mode as demonstrated in the examples for common scenarios.

Requirements for incremental computation

Below, we will analyze an incremental transform that filters students to only include those with brown hair:

```python tab="Polars" import polars as pl

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars() processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
@incremental()
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas() processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
@incremental()
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe()
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

Suppose the /examples/students_hair_eye_color input dataset is fully replaced with a new set of students. As we can see, appending the new set of students to the previous output results in an incorrect output dataset. This is an example of a situation where the incremental() decorator would decide to not run the transform incrementally.

For a transform to be run incrementally, the following requirements must be met:

All of the transform's non_snapshot input datasets had only files added to them since the last run or had deletions coming from retention.

If a transform has the incremental() decorator but any of the above requirements are not met, the transform will automatically be run non-incrementally. This means the default output write mode will be set to replace instead of modify and inputs will be presented non-incrementally. It also means that reading from output in the transform will return an empty dataframe, because the previous history is not accessible. Similarly, inputs will also be presented non-incrementally. If we set require_incremental=True, the transform will fail rather than running non-incrementally.

:::callout{theme="neutral"} It is often desirable to allow certain inputs to be fully rewritten without affecting the ability to run the transform incrementally. See Snapshot Inputs for more information. :::

:::callout{theme="success"} It is possible to force a transform to only run incrementally (unless it has never been run before or the semantic version was bumped) with the require_incremental=True argument passed into the incremental decorator. If the transform cannot run incrementally it will deliberately fail rather than attempt to run non-incrementally. :::

Append-only input changes

A transform can be run incrementally if all its incremental inputs had only files added to them (with APPEND or UPDATE transactions) since the last run.

Conversely, a transform cannot be run incrementally if any of the following is true of its incremental inputs:

  • The incremental inputs were fully rewritten (for example, had SNAPSHOT transactions).
  • The incremental inputs updated or deleted files through UPDATE or DELETE transactions.

For instance, if the list of students in students_hair_eye_color completely changes, the previous output of filtered students is invalid and must be replaced.

Inputs with deletions coming from retention

If an upstream dataset grows indefinitely and you want to be able to delete old rows (using retention in Foundry) without affecting incrementality of downstream computations, the incremental transform depending on that dataset must be explicitly set to allow retained input. This can be done by using the allow_retention argument of the transforms.api.incremental decorator.

  • If this field is set to True, all deletions coming from retention policies will be ignored when evaluating if the inputs preserve incrementality. This means that removed inputs coming from Retention will not compromise incrementality, and that if the only non-added inputs are inputs with retained rows, the transform will still run incrementally.
  • If the field is False (default), any removed-type changes in the input dataset will cause the transform to run a snapshot.

```python tab="Polars" import polars as pl

@incremental(allow_retention=True) @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars() processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
@incremental(allow_retention=True)
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental(allow_retention=True) @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas() processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
@incremental(allow_retention=True)
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe()
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

In the above example, if the transform is run after a set of changes in the dataset /examples/students_hair_eye_color that includes only added changes and removed changes made using retention policies, the transform will run incrementally. If any removed changes made in other ways or any modified changes are present, a snapshot will be triggered.

:::callout{theme="warning" title="Warning"} Specifying allow_retention=True only prevents effects on incrementality from removed changes that come from retention policies. Any other delete in the input dataset would still cause the transform to run a snapshot instead of incremental computation. :::

Snapshot inputs

In some cases, inputs can be fully rewritten without invalidating the incrementality of the transform. For example, suppose you have a simple reference dataset that maps phone number country codes to countries and is periodically rewritten. Changes to this dataset do not necessarily invalidate the results of any previous computation and therefore should not prevent the transform being run incrementally.

By default, as described above, a transform cannot be run incrementally if any input has been fully rewritten since the transform was last run. Snapshot inputs are excluded from this check and their start transaction allowed to differ between runs.

Snapshot inputs can be configured by using the snapshot_inputs argument on the incremental() decorator.

If your incremental transform uses Cipher resources and bellaso_python_lib, these encrypters, decrypters, and hashers need to be listed as snapshot inputs.

```python tab="Polars" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # this will be all unseen phone numbers since the previous run phone_numbers_df = phone_numbers.polars() # this will be all country codes, regardless of what has been seen previously country_codes_df = country_codes.polars()

output.write_table(
    phone_numbers_df.join(country_codes_df, left_on='country_code', right_on='code', how='left')
)

python tab="DuckDB" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(ctx, phone_numbers, country_codes, output): conn = ctx.duckdb().conn joined_query = conn.sql(""" SELECT * FROM phone_numbers LEFT JOIN country_codes ON phone_numbers.country_code = country_codes.code """) output.write_table(joined_query) ```

```python tab="Pandas" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # this will be all unseen phone numbers since the previous run phone_numbers_df = phone_numbers.pandas() # this will be all country codes, regardless of what has been seen previously country_codes_df = country_codes.pandas()

output.write_table(
    phone_numbers_df.merge(country_codes_df, left_on='country_code', right_on='code', how='left')
)

python tab="PySpark" @incremental(snapshot_inputs=['country_codes']) @transform.spark.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # this will be all unseen phone numbers since the previous run phone_numbers = phone_numbers.dataframe() # this will be all country codes, regardless of what has been seen previously country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer')) ```

The behavior of snapshot inputs are identical when a transform runs incrementally or non-incrementally. As such, added and current read modes will always return the entire dataset. All other read modes will return the empty dataset.

:::callout{theme="neutral"} Given that there are no constraints around previously seen versions of snapshot inputs, it is possible to add or remove snapshot inputs while retaining the ability to run the transform incrementally. Remember that if the modification of the inputs fundamentally changes the semantics of the transform, it is worth reviewing whether the semantic_version argument on the incremental() decorator should be updated. :::

Changes to inputs

The list of existing inputs can be modified. Incrementality will be preserved in the case where either of the following is true:

  • New inputs or new snapshot inputs are added.
  • Existing inputs or existing snapshot inputs are removed.

Note that an incremental transform must have at least one input.

The start transactions for each of the non-snapshot input datasets must be consistent with those used for the previous run.

Outputs last built by same transform

For multiple-output incremental transforms, the last committed transaction of every previously built output must be generated by the same transform.

Outputs with no prior build history will be exempt from the above condition and will not prevent the build from running incrementally.

Input and output datasets for transforms must be different

Data transformations take an input dataset, perform operations, and generate an output dataset. The input dataset and output dataset for a transform must be different. Having the same dataset as both input and output will result in a cyclic (circular) dependency, making the transform impossible to execute.

Summary of requirements for incremental computation

A transform can be run incrementally if and only if all its incremental inputs only had files appended to them; where files were deleted, those files were only deleted using retention with allow_retention=True. Snapshot inputs are excluded from this check.


中文翻译

增量使用指南

增量装饰器(Incremental decorator)

:::callout{theme="success"} 本指南涉及增量构建与非增量构建。假设在所有情况下都使用了incremental()装饰器。因此,此术语仅指转换是否实际以增量方式运行。 :::

incremental()装饰器可用于包装转换的计算函数,使其具备增量计算能力:

```python tab="Polars" import polars as pl from transforms.api import transform, incremental, Input, Output

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars(mode='added') # 或者直接使用 students.polars(),因为 'added' 是默认模式 processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
from transforms.api import transform, incremental, Input, Output

@incremental()
@transform.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    # DuckDB 自动支持增量读取模式
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas(mode='added') # 或者直接使用 students.pandas(),因为 'added' 是默认模式 processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
from transforms.api import transform, incremental, Input, Output

@incremental()
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe('added')
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

incremental()装饰器可用于包装任何使用transform()transform.using()transform_df()transform_pandas()装饰器的现有转换。请注意,转换的计算函数必须支持以增量方式和非增量方式运行。incremental()装饰器有两个关键作用:

  • 它允许转换查找有关其先前构建的信息。利用这些信息,incremental()装饰器会根据下文描述的要求决定转换是否可以增量运行。
  • 它将输入、输出和上下文对象转换为提供额外功能的增量子类。具体来说,TransformInput变为IncrementalTransformInputTransformOutput变为IncrementalTransformOutputTransformContext变为IncrementalTransformContext。这些增量对象随后会被传递给由该装饰器包装的转换。

增量装饰器接受六个参数:

transforms.api.incremental(
    require_incremental=False,
    semantic_version=1,
    snapshot_inputs=None,
    allow_retention=False,
    strict_append=False,
    v2_semantics=False
)

require_incremental参数设置为True后,如果转换无法增量运行,则会失败。有两种情况下允许转换以快照(Snapshot)方式运行,即使设置了require_incremental=True

  1. 某个输出从未被构建过。
  2. 语义版本(Semantic version)已更改,意味着明确请求了快照。

要调试转换无法增量运行的原因,请在驱动程序日志中查找警告transforms.api._incremental: Not running incrementally

\:func:~transforms.api.incremental装饰器上的semantic_version参数允许您强制转换的下一次运行以非增量方式执行。

  • 如果当前运行的语义版本与上一次运行的语义版本不同,转换将以非增量方式运行。
  • 如果未指定,语义版本默认为1
  • 如果上一次运行的语义版本不存在(例如,将现有转换转换为增量转换时),则假定值为1。这允许转换开始增量运行,而无需新的快照。
  • 要强制转换的后续运行以非增量方式执行,可以增加@incremental()装饰器上的semantic_version参数。

snapshot_inputs参数允许您将某些输入定义为快照输入(Snapshot inputs),与非快照输入不同,它们支持更新和删除修改。有关更多信息,请参阅快照输入

allow_retention参数设置为True允许通过保留策略(Retention policies)删除输入和输出数据集中的文件,同时保持转换的增量性。

如果strict_append参数设置为True且输入数据集是增量的,则底层 Foundry 事务(Transaction)类型将设置为APPEND,并且增量写入将使用APPEND事务。尝试覆盖现有文件将导致异常。 如果输入数据集不是增量的,strict_append将以SNAPSHOT方式运行。您应该使用require_incremental=True来确保代码以APPEND方式增量运行。尝试覆盖现有文件将成功。 请注意,写入操作可能不会覆盖任何文件,即使是辅助文件,如 Parquet 摘要元数据或 Hadoop SUCCESS 文件。所有 Foundry 格式的增量写入都应支持strict_append模式。

如果v2_semantics参数设置为True,将使用 v2 增量语义。v2 和 v1 增量语义之间应该没有行为差异,我们建议所有用户将此设置为True。非目录(Non-Catalog)输入和输出资源只有在使用 v2 语义时才能以增量方式读取/写入。

重要信息

如上所述,使用incremental()装饰器包装的转换的计算函数必须支持以增量方式和非增量方式运行。默认的读写模式(本指南其余部分将更详细地解释)可以帮助满足这种双重逻辑要求,但可能仍然需要根据计算上下文is_incremental属性进行分支处理。

另一个关键点是,将incremental()装饰器与transform_df()transform_pandas()一起使用,只能让您访问默认的读写模式。如果您的转换中,添加的输出行仅取决于添加的输入行(请参阅仅追加输入更改),这已经足够了。但是,如果您的转换执行更复杂的逻辑(例如连接、聚合或去重),需要您设置输入读取模式或输出写入模式,那么您应该将incremental()装饰器与transform()一起使用。将增量装饰器与transform()一起使用,允许您设置读写模式。

:::callout{theme="warning" title="警告"} 请注意,代码仓库(Code Repositories)预览功能将始终以非增量模式运行转换。即使将require_incremental=True传递给incremental()装饰器也是如此。这与尊重增量性的 VS Code 预览功能不同。 :::

输入和输出的增量模式

增量转换是一个强大的功能。然而,首次使用时,API 可能不太直观。增量转换在 VS Code 预览中的行为与在构建期间相同。我们建议在试验 API 并在实践中理解它时使用 VS Code 预览

IncrementalTransformInput

transforms.api.IncrementalTransformInput对象扩展了数据读取方法,使其接受一个可选的读取模式参数。

如果您使用增量装饰器定义转换,则读取模式的行为取决于您的转换是以增量方式还是非增量方式运行:

读取模式 增量行为 非增量行为
added * 返回一个包含自上次转换运行以来追加到输入中的任何新行的 DataFrame 返回一个包含整个数据集的 DataFrame,因为所有行都被视为未见过的
previous 返回一个包含上次转换运行时提供给转换的整个输入的 DataFrame 返回一个空的 DataFrame
current 返回一个包含当前运行整个输入数据集的 DataFrame 返回一个包含当前运行整个输入数据集的 DataFrame。这与 added 相同。

默认读取模式是 added

在某些情况下,尽管转换被标记为 incremental(),但希望不以增量方式处理输入。有关更多信息以及这些类型输入的读取模式行为有何不同,请参阅快照输入部分。

请注意,默认的输出读取模式是 current,可用的输出读取模式有 addedcurrentprevious。有关输出读取模式的更多信息,请参阅下面的部分。

增量转换的性质意味着我们加载输入数据集上自上次 SNAPSHOT 事务以来的所有过去事务,以构建输入视图。如果您开始看到增量转换逐渐变慢,我们建议在增量输入数据集上运行 SNAPSHOT 构建。

IncrementalTransformOutput

transforms.api.IncrementalTransformOutput对象提供对输出数据集的读写模式的访问。编写兼容增量构建和非增量构建的逻辑的关键是默认写入模式为 modify。有两种写入模式:

  • modify:此模式使用构建期间写入的数据修改现有输出。例如,当输出处于 modify 模式时调用 write_dataframe()write_table() 会将写入的 DataFrame 追加到现有输出。
  • replace:此模式使用构建期间写入的数据完全替换输出。

当转换以增量方式运行时,输出的默认写入模式设置为 modify。当转换以非增量方式运行时,输出的默认写入模式设置为 replace

回想一下,输入 DataFrame 的默认读取模式是 added。由于默认的输入读取模式为 added 和默认的输出写入模式为 modify,编写兼容增量构建和非增量构建的逻辑变得更加容易:

```python tab="Polars" import polars as pl

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # 只读取我们之前未见过的行。 new_students_df = students.polars() # 这等同于 students.polars('added')

# 非增量时,我们读取所有行并替换输出。
# 增量时,我们只读取新行,并将它们追加到输出中。
processed.write_table(
    new_students_df.filter(pl.col("hair") == 'Brown')
)

python tab="DuckDB" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(ctx, students, processed): # 只读取我们之前未见过的行。 conn = ctx.duckdb().conn new_students_df_filtered = conn.sql("SELECT * FROM students WHERE hair = 'Brown'") # 非增量时,我们读取所有行并替换输出。 # 增量时,我们只读取新行,并将它们追加到输出中。 processed.write_table( new_students_df_filtered ) ```

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # 只读取我们之前未见过的行。 new_students_df = students.pandas() # 这等同于 students.pandas('added')

# 非增量时,我们读取所有行并替换输出。
# 增量时,我们只读取新行,并将它们追加到输出中。
processed.write_table(
    new_students_df[new_students_df.hair == 'Brown']
)

python tab="PySpark" @incremental() @transform.spark.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def incremental_filter(students, processed): # 只读取我们之前未见过的行。 new_students_df = students.dataframe() # 这等同于 students.dataframe('added') # 非增量时,我们读取所有行并替换输出。 # 增量时,我们只读取新行,并将它们追加到输出中。 processed.write_dataframe( new_students_df.filter(new_students_df.hair == 'Brown') ) ```

对于更复杂的增量计算用例,可能需要计算正确的写入模式并手动设置。这可以使用增量输出上的 set_mode() 方法来完成。

:::callout{theme="neutral"} 在使用 transform()transform.using() 装饰器时,可以手动设置输出写入模式。使用这些装饰器,您可以在显式调用写入方法保存输出之前使用 set_mode()。然而,transform_df()transform_pandas() 装饰器会调用 write_dataframe()write_pandas() 来保存 DataFrame 输出。这意味着使用的写入模式将由 incremental() 装饰器决定。 :::

:::callout{theme="warning" title="警告"} 使用 set_mode() 时,请确保无论转换是以增量方式还是非增量方式运行,此行为都是有效的。如果不是这种情况,您应该使用 is_incremental 属性。 :::

除了写入模式之外,transforms.api.IncrementalTransformOutput 还使得从输出数据集读取 DataFrame 成为可能。这可以使用数据读取方法来完成,例如 pandas()polars()dataframe(),这些方法接受一个可选的读取模式参数。使用 transform.using() 时,将模式指定为关键字参数:output.pandas(mode='previous')。默认读取模式设置为 current,其他可用的输出读取模式是 addedprevious。读取模式的行为取决于数据集的写入模式设置。

:::callout{theme="success"} 尽管默认读取模式是 current,但在大多数情况下,您应该使用 previous。其他读取模式应在写入数据集后用于读取数据集。 :::

从上次运行读取数据的有效组合

转换必须以增量方式运行(ctx.is_incremental is True)才能访问转换的上次输出。如果转换以非增量方式运行,通常允许访问上次输出的读取模式将返回上次输出数据的空行。下表中描述的行为是由增量性的语义以及 currentprevious 读取模式的事务在转换运行开始时解析的事实所驱动的。

输出读取模式 输出写入模式 是否写入了新数据? 行为
current modify 这些设置没有用例。请改用 previous 模式。
current modify dataframe() 返回转换输出的完整内容(构建开始时的状态),加上当前正在运行的构建中写入输出的数据。
current replace 这些设置没有用例。请改用 previous 模式。
current replace dataframe() 返回当前正在运行的构建中写入输出的数据。
added modify/replace 这些设置没有用例。请改用 previous 模式。
added modify/replace dataframe() 返回当前正在运行的构建中写入输出的数据。
previous modify 是/否 dataframe() 返回转换输出的完整内容(构建开始时的状态)。如果转换以非增量方式运行,使用 previous 读取时,模式(Schema)是必填字段。
previous replace 是/否 dataframe() 返回转换输出的完整内容(构建开始时的状态)。如果转换以非增量方式运行,使用 previous 读取时,模式是必填字段。

您提供的用于读取先前 DataFrame 的 schema ↗ 用于在转换以非增量方式运行时生成一个空的 DataFrame。如果转换以增量方式运行,此 schema 将与上次输出的实际 schema 进行比较。如果列类型、列可空性或列顺序不匹配,将引发异常。为确保列顺序保持不变,您可以使用以下结构:

previous = out.dataframe('previous', schema)  # schema 是一个 pyspark.sql.types.StructType

out.write_dataframe(df.select(schema.fieldNames()))

:::callout{theme="neutral"} 无论您的转换中使用何种 schema,Foundry 都会将所有列保存为可空。因此,如果您提供的 schema 中某些字段设置为非空,则在以 previous 模式从输出读取时,您的构建将失败并出现 SchemaMismatchError。 :::

有关更多信息,请查看合并与替换(Merge and replace)示例。

:::callout{theme="neutral"} dataframe()filesystem() 方法使用相同的读取模式,以允许处理结构化和非结构化数据集。 :::

请注意,没有内置的方法来访问转换上一次调用写入的数据。如果需要这些数据,您可以添加一个辅助输出数据集,每个转换运行可以将写入输出的副本(或相关元数据)写入其中,但使用 replace 写入模式。这将允许您将来仅引用上一次转换的写入输出。

读取当前运行中写入数据的有效组合

输出读取模式 输出写入模式 是否写入了新数据?
currentadded modify / replace

优先使用 added,因为它使您的意图更清晰。 从读取当前转换写入的数据中受益的场景是验证数据集的全部内容,以便在验证失败时使构建失败。这样,就无需在 Spark 中重新计算或缓存数据来运行检查。

IncrementalTransformContext

TransformContext 对象相比,IncrementalTransformContext 对象提供了一个额外的属性:is_incremental。如果转换以增量方式运行,此属性设置为 True,这意味着:

  • 默认输出写入模式设置为 modify,并且
  • 输入的默认读取模式设置为 added

增量模式总结

增量装饰器允许您通过指定读取模式 "previous" 来访问转换的上次输入和输出。这样,您就可以基于历史上下文进行当前构建。如果转换以快照模式运行,则 "previous" 数据帧将为空,因为这是第一次运行,或者因为逻辑或数据发生了重大变化,需要重新计算。

然而,最常见的情况是对输入使用 "added" 模式,对输出使用 "modify" 模式。这些模式是默认使用的。它们允许您从输入数据集中检索新添加的行,处理它们,并将它们追加到输出数据集中。

您可能希望修改输出数据集中已存在的某些行,而不是将行追加到输出中。为此,请使用 "replace" 模式,如常见场景示例所示。

增量计算的要求

下面,我们将分析一个增量转换,该转换过滤学生,只包括那些棕色头发的学生:

```python tab="Polars" import polars as pl

@incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars() processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
@incremental()
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental() @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas() processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
@incremental()
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe()
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

假设 /examples/students_hair_eye_color 输入数据集被一组新的学生完全替换。正如我们所见,将新的一组学生追加到先前的输出会导致输出数据集不正确。这是一个 incremental() 装饰器决定增量运行转换的情况示例。

要使转换增量运行,必须满足以下要求:

转换的所有非快照输入数据集自上次运行以来仅添加了文件有来自保留的删除

如果转换具有 incremental() 装饰器,但上述任何要求未满足,则转换将自动以非增量方式运行。这意味着默认输出写入模式将设置为 replace 而不是 modify,并且输入将以非增量方式呈现。这也意味着在转换中从输出读取将返回一个空数据帧,因为无法访问先前的历史记录。同样,输入也将以非增量方式呈现。如果我们设置 require_incremental=True,转换将失败,而不是以非增量方式运行。

:::callout{theme="neutral"} 通常希望允许某些输入被完全重写,而不影响增量运行转换的能力。有关更多信息,请参阅快照输入。 :::

:::callout{theme="success"} 可以通过将 require_incremental=True 参数传递给 incremental 装饰器来强制转换仅以增量方式运行(除非它从未运行过或语义版本已增加)。如果转换无法增量运行,它将故意失败,而不是尝试以非增量方式运行。 :::

仅追加输入更改

如果转换的所有增量输入自上次运行以来仅添加了文件(通过 APPENDUPDATE 事务),则转换可以增量运行。

相反,如果转换的增量输入存在以下任何一种情况,则转换无法增量运行:

  • 增量输入被完全重写(例如,有 SNAPSHOT 事务)。
  • 增量输入通过 UPDATEDELETE 事务更新或删除了文件。

例如,如果 students_hair_eye_color 中的学生列表完全改变,则先前过滤后的学生输出无效,必须替换。

来自保留的输入删除

如果上游数据集无限增长,并且您希望能够删除旧行(使用 Foundry 中的保留(Retention))而不影响下游计算的增量性,则依赖该数据集的增量转换必须明确设置为允许保留输入。这可以通过使用 transforms.api.incremental 装饰器的 allow_retention 参数来完成。

  • 如果此字段设置为 True,则在评估输入是否保持增量性时,将忽略所有来自保留策略的删除。这意味着来自保留的 removed 输入不会损害增量性,并且如果唯一的非 added 输入是包含保留行的输入,则转换仍将增量运行。
  • 如果该字段为 False(默认值),则输入数据集中的任何 removed 类型更改都将导致转换运行快照。

```python tab="Polars" import polars as pl

@incremental(allow_retention=True) @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.polars() processed.write_table(students_df.filter(pl.col("hair") == 'Brown'))

```python tab="DuckDB"
@incremental(allow_retention=True)
@transform.using(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(ctx, students, processed):
    conn = ctx.duckdb().conn
    filtered_query = conn.sql("SELECT * FROM students WHERE hair = 'Brown'")
    processed.write_table(filtered_query)

```python tab="Pandas" @incremental(allow_retention=True) @transform.using( students=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_hair_color(students, processed): students_df = students.pandas() processed.write_table(students_df[students_df.hair == 'Brown'])

```python tab="PySpark"
@incremental(allow_retention=True)
@transform.spark.using(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_hair_color(students, processed):
    students_df = students.dataframe()
    processed.write_dataframe(students_df.filter(students_df.hair == 'Brown'))

在上面的示例中,如果转换在数据集 /examples/students_hair_eye_color 的一组更改之后运行,该组更改仅包括 added 更改和使用保留策略进行的 removed 更改,则转换将增量运行。如果存在任何以其他方式进行的 removed 更改或任何 modified 更改,则将触发快照。

:::callout{theme="warning" title="警告"} 指定 allow_retention=True 仅防止来自保留策略的 removed 更改对增量性产生影响。输入数据集中的任何其他删除仍将导致转换运行快照而不是增量计算。 :::

快照输入

在某些情况下,输入可以被完全重写而不会使转换的增量性失效。例如,假设您有一个简单的参考数据集,将电话号码国家代码映射到国家,并且该数据集会定期重写。对此数据集的更改不一定会使任何先前计算的结果失效,因此不应阻止转换增量运行。

默认情况下,如上所述,如果自上次运行转换以来任何输入被完全重写,则转换无法增量运行。快照输入被排除在此检查之外,并且允许其起始事务在不同运行之间不同。

可以使用 incremental() 装饰器上的 snapshot_inputs 参数配置快照输入。

如果您的增量转换使用 Cipher 资源bellaso_python_lib,则这些加密器、解密器和哈希器需要列为快照输入。

```python tab="Polars" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # 这将是自上次运行以来所有未见过的电话号码 phone_numbers_df = phone_numbers.polars() # 这将是所有国家代码,无论之前见过什么 country_codes_df = country_codes.polars()

output.write_table(
    phone_numbers_df.join(country_codes_df, left_on='country_code', right_on='code', how='left')
)

python tab="DuckDB" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(ctx, phone_numbers, country_codes, output): conn = ctx.duckdb().conn joined_query = conn.sql(""" SELECT * FROM phone_numbers LEFT JOIN country_codes ON phone_numbers.country_code = country_codes.code """) output.write_table(joined_query) ```

```python tab="Pandas" @incremental(snapshot_inputs=['country_codes']) @transform.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # 这将是自上次运行以来所有未见过的电话号码 phone_numbers_df = phone_numbers.pandas() # 这将是所有国家代码,无论之前见过什么 country_codes_df = country_codes.pandas()

output.write_table(
    phone_numbers_df.merge(country_codes_df, left_on='country_code', right_on='code', how='left')
)

python tab="PySpark" @incremental(snapshot_inputs=['country_codes']) @transform.spark.using( phone_numbers=Input('/examples/phone_numbers'), country_codes=Input('/examples/country_codes'), output=Output('/examples/phone_numbers_to_country') ) def map_phone_number_to_country(phone_numbers, country_codes, output): # 这将是自上次运行以来所有未见过的电话号码 phone_numbers = phone_numbers.dataframe() # 这将是所有国家代码,无论之前见过什么 country_codes = country_codes.dataframe() cond = [phone_numbers.country_code == country_codes.code] output.write_dataframe(phone_numbers.join(country_codes, on=cond, how='left_outer')) ```

当转换以增量方式或非增量方式运行时,快照输入的行为是相同的。因此,addedcurrent 读取模式将始终返回整个数据集。所有其他读取模式将返回空数据集。

:::callout{theme="neutral"} 鉴于对快照输入的先前版本没有约束,可以在保留增量运行转换能力的同时添加或删除快照输入。请记住,如果输入的修改从根本上改变了转换的语义,则值得检查是否应该更新 incremental() 装饰器上的 semantic_version 参数。 :::

输入更改

可以修改现有输入的列表。在以下任一情况下,增量性将得到保留:

  • 添加了新的输入或新的快照输入。
  • 删除了现有的输入或现有的快照输入。

请注意,增量转换必须至少有一个输入。

每个非快照输入数据集的起始事务必须与上次运行中使用的事务一致。

输出上次由同一转换构建

对于多输出增量转换,每个先前构建的输出的最后提交事务必须由同一转换生成。

没有先前构建历史的输出将不受上述条件的限制,并且不会阻止构建增量运行。

转换的输入和输出数据集必须不同

数据转换接受一个输入数据集,执行操作,并生成一个输出数据集。转换的输入数据集和输出数据集必须不同。将同一数据集同时作为输入和输出将导致循环(循环)依赖,使得转换无法执行。

增量计算要求总结

当且仅当转换的所有增量输入仅追加了文件时,转换才能增量运行;如果删除了文件,则这些文件仅在使用 allow_retention=True 的保留策略下被删除。快照输入被排除在此检查之外。