Incremental transforms(增量转换)¶
:::callout{theme="warning" title="Warning"} Incremental computation is an advanced feature. Ensure that you understand the rest of the user guide before making use of this feature. :::
The transforms shown so far in the user guide recompute their entire output datasets every time they are run. This can lead to a lot of unnecessary work. Consider the following example:
package myproject.datasets;
import com.palantir.transforms.lang.java.api.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public final class FilterTransform {
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().read();
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write();
}
}
If any new data is added to the /examples/students_hair_eye_color
input dataset, the filter() is performed over the entire input, rather
than just the new data added to the input. This is wasteful in compute
resource and time.
If a transform can become aware of its build history, it can be smarter about how to compute its output. More specifically, it can use the changes made to the inputs to modify the output dataset. This process of using already materialized data when re-materializing tables is called incremental computation. Without incremental computation, the output dataset is always replaced by the latest output of the transform.
Let's go back to the example transform shown above. The transform
performs a filter() over the students dataset to write out students
with brown hair. With incremental computation, if data about two new
students is appended to students, the transform can use information
about its build history to append only the new brown-haired students to
the output:
RAW DERIVED
+---+-----+-----+ +---+-----+-----+
| id| hair| eye| | id| hair| eye|
+---+-----+-----+ Build 1 +---+-----+-----+
| 17|Black|Brown| ---------> | 18|Brown|Brown|
| 18|Brown|Brown| +---+-----+-----+
| 19| Red|Brown|
+---+-----+-----+ ...
...
+---+-----+-----+ Build 2 +---+-----+-----+
| 20|Brown|Brown| ---------> | 20|Brown|Brown|
| 21|Black|Blue | +---+-----+-----+
+---+-----+-----+
Write an incremental transform¶
We will guide you step by step on how to write an incremental transform using transforms-java. As opposed to transforms-python, transforms-java doesn't use annotations to automatically verify incrementality and apply transformations in incremental fashion. The process of writing incremental transforms in java is controlled more directly by the user, who can explicitly decide in which case a transformation should act incrementally and when not. By interpreting how the input dataset was modified, the user can decide whether to update the output dataset in incremental fashion or in snapshot-like fashion.
Interpret your input¶
The first step to take involves interpretation of your input. The input dataset could be modified in multiple ways, and we will be able to apply incremental transformations only in some specific circumstances. DataFrameModificationType (or FilesModificationType) expresses the different ways a dataset can be modified. The different modes are:
- APPENDED
- UPDATED
- NEW_VIEW
- UNCHANGED
Based on how the input has changed we can make decisions on what to read from the input dataset and what to write to the output dataset.
Read the input¶
Knowing how the input was modified allows us to read it accordingly. If
a transaction only appended data we are sure we can safely act
incrementally and read only what was modified. If, instead, we have a
change to the input dataset including modification of already existing
rows we may want to re read the whole view. Transforms-Java API allows
for different read modes for input dataset thanks to the
readForRange() method.
ReadRange exposes the possible reading ranges. The different modes are:
- UNPROCESSED
- PROCESSED
- ENTIRE_VIEW
By interpreting the input modification type we can then decide how to read our data, as shown in the example below.
private ReadRange getReadRange(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case APPENDED:
LOG.info("Append-only changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case UPDATED:
LOG.info("Update-type changes in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
case NEW_VIEW:
LOG.info("New view in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset "
+ input.asDataFrame().modificationType());
}
}
We can then modify our compute method accordingly.
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write();
}
:::callout{theme="warning" title="Warning"} At this point we are only reading different portions of the input dataset but not acting differently on the output dataset. Running the code in this example up to this point will always result in a snapshot transaction on the output, no matter which portion of the input you are reading. Proceed until the end of the tutorial before applying your incremental transforms in order to understand how to correctly modify the output dataset. :::
Transform the data¶
In this step it's on the user to apply whichever transformation of the data is needed. Remember that depending on the input modification the data read will differ. In our case the transformation is a simple filter for brown eyes, that we can isolate as:
inputDf = inputDf.filter("eye = 'Brown'");
Write the output¶
Once we have interpreted the modifications in the input dataset, read the desired portion of the input and transformed the data according to our transformation logic, we can write our output accordingly. WriteMode provides the different writing modes. The different modes are:
- SNAPSHOT
- UPDATE
For example, in our case, we can choose the output type based on the input modification type.
private WriteMode getWriteMode(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case APPENDED:
LOG.info("Append-only changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case UPDATED:
LOG.info("Update-type changes in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
case NEW_VIEW:
LOG.info("new view in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType());
}
}
:::callout{theme="warning" title="Warning"}
Do not confuse WriteMode.UPDATE and
DataFrameModificationType.UPDATED. The former results in an
incremental modification of output dataset that will result in a
DataFrameModificationType.APPENDED for downstream datasets. The latter
is a modification of the input dataset that includes both appends and
modification in existing rows.
:::
Finally, the write() function can be can be modified to include a
write mode:
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput));
}
Putting it all together¶
We can build a simple incremental filtering transform by putting the pieces together.
package myproject.datasets;
import com.palantir.transforms.lang.java.api.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FilterTransform {
private static final Logger LOG = LoggerFactory.getLogger(FilterTransform.class);
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput));
}
private ReadRange getReadRange(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case APPENDED:
LOG.info("Append-only changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case UPDATED:
LOG.info("Update-type changes in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
case NEW_VIEW:
LOG.info("New view in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset "
+ input.asDataFrame().modificationType());
}
}
private WriteMode getWriteMode(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case APPENDED:
LOG.info("Append-only changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case UPDATED:
LOG.info("Update-type changes in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
case NEW_VIEW:
LOG.info("new view in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType());
}
}
}
As introduced above, we evaluate the input modification type and read the input accordingly. We then decide whether to incrementally update the output dataset or start a new snapshot transaction.
Best practices¶
Switch between snapshot and incremental¶
Let's say you want to mostly run incremental transforms but sometimes need to rerun a snapshot of your dataset.
To avoid manually hardcoding the desired result, you can add a new input that results in the output using the SNAPSHOT write mode whenever this input is modified. This new input will essentially then act as a snapshot trigger dataset. Note that you will have to adapt the read range of the transform's other inputs based on the modification type of this new snapshot trigger dataset.
It is also possible to externally force a snapshot by creating an empty append transaction without provenance. However, transforms-java does not expose such functionality, and it is therefore out of the scope of this guide.
Advanced features¶
:::callout{theme="warning" title="Warning"} The advanced features in this section can have serious negative effects if used incorrectly. Do not use these features if you are not absolutely certain of the implications. If run without appropriate care and caution, there is a high risk of unwanted consequences. Contact your Palantir representative if you have any questions. :::
:::callout{theme="warning" title="Warning"}
Advanced features are usually included with annotations on top of your @Compute function. However, if your transform is manually registered, you will need to add the properties to the Transform Builder instead.
:::
Ignore incremental deletes¶
If an incremental build depends on an append-only dataset growing indefinitely and there is insufficient disk space for that growth, it may become necessary to delete parts of the upstream dataset.
However, this may break incrementality as the modification of the original dataset will not result in a APPENDED modification type.
IncrementalOptions.IGNORE_INCREMENTAL_DELETES will avoid this and not treat deletions in the upstream dataset as breaking changes.
:::callout{theme="warning" title="Warning"} It is only possible to ignore incremental deletes in low-level transforms. :::
@Compute
@UseIncrementalOptions(IncrementalOptions.IGNORE_INCREMENTAL_DELETES)
public void myComputeFunction(
@Input("/Users/admin/students_data") FoundryInput myInput,
@Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) {
...
If your transform is manually registered, add the property to the builder as in the following code block.
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
.computeFunctionInstance(new MyLowLevelManualFunction())
.putParameterToInputAlias("myInput", "/path/to/input/dataset")
.putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
.ignoreIncrementalDeletes(true)
.build();
Ignore schema change¶
:::callout{theme="warning" title="Warning"} Note that a schema modification in the input dataset may have unexpected consequences when combined with incremental transforms.
Read all of the documentation below and ensure that you understand all potential implications before using this feature. :::
:::callout{theme="warning" title="Warning"} It is only possible to ignore schema changes in low level transforms. :::
If the schema of the dataset an incremental build depends on changes, the change will result in a DataFrameModificationType.NEW_VIEW, possibly breaking incrementality.
However, if the IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE option is set, a schema change won't result in a new view.
Instead, the schema change in the input dataset will be interpreted as DataFrameModificationType.UNCHANGED and a SchemaModificationType flag SchemaModificationType.NEW_SCHEMA will be set, allowing the user to explicitly treat this special case.
@Compute
@UseIncrementalOptions(IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE)
public void myComputeFunction(
@Input("/Users/admin/students_data") FoundryInput myInput,
@Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) {
...
}
If your transform is manually registered, add the property to the builder as in the following code block.
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
.computeFunctionInstance(new MyLowLevelManualFunction())
.putParameterToInputAlias("myInput", "/path/to/input/dataset")
.putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
.useSchemaModificationType(true)
.build();
The build related to the transformation will succeed or fail depending on how the transformation depends on the input dataset. More precisely, if the transformation depends on columns involved in the schema change, modification to those column will make the incremental transformation fail. In these cases, a new snapshot is required before being able to use incremental transforms again.
The transformation depends on a certain column if:
- It contains modifications depending explicitly on that column (for example, if we have
filter("eye = 'Brown'")and the columneyeis renamed or deleted in the RAW dataset, then if we retrigger ourFilterTransformthe incremental update will fail). - Modified columns appeared in the output dataset (for example, if we remove the column
hairin our example RAW datasets, ourFilterTransformwill fail).
If the transformation doesn't depend on the schema changes, the incremental build will succeed.
For example, if we first add a select statement for id and eye in our transformation and trigger an initial snapshot build from the RAW dataset, and then remove the column hair in the RAW dataset, the incremental build will succeed and the schema change will not have any effect on the incremental transformation.
The build will also always succeed in case of additive changes to the schema (e.g. adding a new column).
中文翻译¶
增量转换¶
:::callout{theme="warning" title="警告"} 增量计算(Incremental computation)是一项高级功能。在使用此功能之前,请确保您已理解用户指南的其余部分。 :::
迄今为止,用户指南中展示的转换(Transform)在每次运行时都会重新计算其整个输出数据集(Dataset)。这可能会导致大量不必要的工作。请考虑以下示例:
package myproject.datasets;
import com.palantir.transforms.lang.java.api.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public final class FilterTransform {
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().read();
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write();
}
}
如果向 /examples/students_hair_eye_color 输入数据集添加了任何新数据,filter() 将对整个输入执行,而不仅仅是针对添加到输入中的新数据。这在计算资源和时间上都是浪费。
如果转换能够感知其构建历史(Build history),它就能更智能地计算输出。更具体地说,它可以利用对输入所做的更改来修改输出数据集。这种在重新物化(Materialized)表时使用已物化数据的过程称为增量计算(Incremental computation)。如果没有增量计算,输出数据集将始终被转换的最新输出所替换。
让我们回到上面展示的转换示例。该转换对 students 数据集执行 filter() 以写出棕色头发的学生。通过增量计算,如果将两名新学生的数据追加到 students 中,转换可以利用其构建历史的信息,仅将新的棕发学生追加到输出中:
RAW DERIVED
+---+-----+-----+ +---+-----+-----+
| id| hair| eye| | id| hair| eye|
+---+-----+-----+ Build 1 +---+-----+-----+
| 17|Black|Brown| ---------> | 18|Brown|Brown|
| 18|Brown|Brown| +---+-----+-----+
| 19| Red|Brown|
+---+-----+-----+ ...
...
+---+-----+-----+ Build 2 +---+-----+-----+
| 20|Brown|Brown| ---------> | 20|Brown|Brown|
| 21|Black|Blue | +---+-----+-----+
+---+-----+-----+
编写增量转换¶
我们将逐步指导您如何使用 transforms-java 编写增量转换。与 transforms-python 不同,transforms-java 不使用注解(Annotation)来自动验证增量特性(Incrementality)并以增量方式应用转换。在 Java 中编写增量转换的过程由用户更直接地控制,用户可以明确决定在何种情况下转换应以增量方式执行,何时不执行。通过解释输入数据集的修改方式,用户可以决定是以增量方式还是以类似快照(Snapshot)的方式更新输出数据集。
解析输入¶
第一步是解析您的输入。输入数据集可能通过多种方式进行修改,而我们只能在某些特定情况下应用增量转换。DataFrameModificationType(或 FilesModificationType)表达了数据集可以被修改的不同方式。不同的模式包括:
- APPENDED
- UPDATED
- NEW_VIEW
- UNCHANGED
根据输入的更改方式,我们可以决定从输入数据集中读取什么以及向输出数据集中写入什么。
读取输入¶
了解输入的修改方式使我们能够相应地读取它。如果某个事务(Transaction)仅追加了数据,我们可以确定安全地以增量方式执行,并仅读取已修改的部分。相反,如果输入数据集的更改包括对已有行的修改,我们可能需要重新读取整个视图。Transforms-Java API 通过 readForRange() 方法允许对输入数据集采用不同的读取模式。ReadRange 公开了可能的读取范围。不同的模式包括:
- UNPROCESSED
- PROCESSED
- ENTIRE_VIEW
通过解析输入修改类型,我们可以决定如何读取数据,如下例所示。
private ReadRange getReadRange(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case APPENDED:
LOG.info("Append-only changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case UPDATED:
LOG.info("Update-type changes in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
case NEW_VIEW:
LOG.info("New view in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset "
+ input.asDataFrame().modificationType());
}
}
然后我们可以相应地修改 compute 方法。
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write();
}
:::callout{theme="warning" title="警告"} 此时,我们只是读取输入数据集的不同部分,但并未对输出数据集采取不同的操作。到目前为止运行此示例中的代码将始终导致对输出执行快照事务,无论您读取的是输入的哪一部分。在应用您的增量转换之前,请继续阅读本教程直至结束,以了解如何正确修改输出数据集。 :::
转换数据¶
在这一步中,由用户决定应用所需的数据转换。请记住,根据输入修改的不同,读取的数据也会有所不同。在我们的例子中,转换是一个简单的棕眼过滤,我们可以将其隔离为:
inputDf = inputDf.filter("eye = 'Brown'");
写入输出¶
一旦我们解析了输入数据集中的修改,读取了所需的输入部分并根据我们的转换逻辑转换了数据,我们就可以相应地写入输出。WriteMode 提供了不同的写入模式。不同的模式包括:
- SNAPSHOT
- UPDATE
例如,在我们的例子中,我们可以根据输入修改类型选择输出类型。
private WriteMode getWriteMode(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case APPENDED:
LOG.info("Append-only changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case UPDATED:
LOG.info("Update-type changes in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
case NEW_VIEW:
LOG.info("new view in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType());
}
}
:::callout{theme="warning" title="警告"}
请勿混淆 WriteMode.UPDATE 和 DataFrameModificationType.UPDATED。前者会导致输出数据集的增量修改,从而使下游数据集产生 DataFrameModificationType.APPENDED 修改类型。后者是对输入数据集的修改,既包括追加也包括对现有行的修改。
:::
最后,可以修改 write() 函数以包含写入模式:
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput));
}
整合代码¶
我们可以将各部分组合起来,构建一个简单的增量过滤转换。
package myproject.datasets;
import com.palantir.transforms.lang.java.api.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FilterTransform {
private static final Logger LOG = LoggerFactory.getLogger(FilterTransform.class);
@Compute
public void myComputeFunction(
@Input("/examples/students_hair_eye_color") FoundryInput myInput,
@Output("/examples/students_hair_eye_color_filtered") FoundryOutput myOutput) {
Dataset<Row> inputDf = myInput.asDataFrame().readForRange(getReadRange(myInput));
myOutput.getDataFrameWriter(inputDf.filter("eye = 'Brown'")).write(getWriteMode(myInput));
}
private ReadRange getReadRange(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case APPENDED:
LOG.info("Append-only changes in input dataset, read only unprocessed");
return ReadRange.UNPROCESSED;
case UPDATED:
LOG.info("Update-type changes in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
case NEW_VIEW:
LOG.info("New view in input dataset, read entire view");
return ReadRange.ENTIRE_VIEW;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset "
+ input.asDataFrame().modificationType());
}
}
private WriteMode getWriteMode(FoundryInput input) {
switch (input.asDataFrame().modificationType()) {
case UNCHANGED:
LOG.info("No changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case APPENDED:
LOG.info("Append-only changes in input dataset, writing in update mode");
return WriteMode.UPDATE;
case UPDATED:
LOG.info("Update-type changes in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
case NEW_VIEW:
LOG.info("new view in input dataset, writing in snapshot mode");
return WriteMode.SNAPSHOT;
default:
throw new IllegalArgumentException("Unknown ModificationType for input dataset " + input.asDataFrame().modificationType());
}
}
}
如上所述,我们评估输入修改类型并相应地读取输入。然后,我们决定是增量更新输出数据集还是启动新的快照事务。
最佳实践¶
在快照和增量之间切换¶
假设您主要想运行增量转换,但有时需要重新运行数据集的快照。
为了避免手动硬编码所需结果,您可以添加一个新输入,只要修改此输入,就会使用 SNAPSHOT 写入模式生成输出。这个新输入本质上将充当快照触发数据集。请注意,您必须根据这个新快照触发数据集的修改类型来调整转换其他输入的读取范围。
也可以通过创建不带数据溯源(Provenance)的空追加事务来从外部强制执行快照。但是,transforms-java 未公开此类功能,因此这超出了本指南的范围。
高级功能¶
:::callout{theme="warning" title="警告"} 本节中的高级功能如果使用不当,可能会产生严重的负面影响。如果您不完全确定其影响,请不要使用这些功能。如果在没有适当注意和谨慎的情况下运行,产生不良后果的风险很高。如有任何问题,请联系您的 Palantir 代表。 :::
:::callout{theme="warning" title="警告"}
高级功能通常通过 @Compute 函数上方的注解来包含。但是,如果您的转换是手动注册的,则需要将属性添加到转换构建器(Transform Builder)中。
:::
忽略增量删除¶
如果增量构建依赖于无限增长的仅追加数据集,并且没有足够的磁盘空间来支持这种增长,则可能需要删除上游数据集的部分内容。然而,这可能会破坏增量特性,因为对原始数据集的修改不会导致 APPENDED 修改类型。IncrementalOptions.IGNORE_INCREMENTAL_DELETES 将避免这种情况,不会将上游数据集中的删除视为破坏性更改。
:::callout{theme="warning" title="警告"} 只能在底层转换(Low-level transforms)中忽略增量删除。 :::
@Compute
@UseIncrementalOptions(IncrementalOptions.IGNORE_INCREMENTAL_DELETES)
public void myComputeFunction(
@Input("/Users/admin/students_data") FoundryInput myInput,
@Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) {
...
如果您的转换是手动注册的,请按照以下代码块将属性添加到构建器中。
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
.computeFunctionInstance(new MyLowLevelManualFunction())
.putParameterToInputAlias("myInput", "/path/to/input/dataset")
.putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
.ignoreIncrementalDeletes(true)
.build();
忽略模式更改¶
:::callout{theme="warning" title="警告"} 请注意,当与增量转换结合使用时,输入数据集的模式(Schema)修改可能会产生意想不到的后果。
在使用此功能之前,请阅读以下所有文档并确保您了解所有潜在影响。 :::
:::callout{theme="warning" title="警告"} 只能在底层转换中忽略模式更改。 :::
如果增量构建所依赖的数据集的模式发生更改,该更改将导致 DataFrameModificationType.NEW_VIEW,从而可能破坏增量特性。
但是,如果设置了 IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE 选项,模式更改将不会导致新视图。相反,输入数据集中的模式更改将被解析为 DataFrameModificationType.UNCHANGED,并会设置 SchemaModificationType 标志 SchemaModificationType.NEW_SCHEMA,从而允许用户显式处理这种特殊情况。
@Compute
@UseIncrementalOptions(IncrementalOptions.USE_SCHEMA_MODIFICATION_TYPE)
public void myComputeFunction(
@Input("/Users/admin/students_data") FoundryInput myInput,
@Output("/Users/admin/students_data_filtered") FoundryOutput myOutput) {
...
}
如果您的转换是手动注册的,请按照以下代码块将属性添加到构建器中。
LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
.computeFunctionInstance(new MyLowLevelManualFunction())
.putParameterToInputAlias("myInput", "/path/to/input/dataset")
.putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
.useSchemaModificationType(true)
.build();
与转换相关的构建将成功或失败,具体取决于转换对输入数据集的依赖方式。更准确地说,如果转换依赖于模式更改中涉及的列,对这些列的修改将导致增量转换失败。在这些情况下,需要一个新的快照,然后才能再次使用增量转换。
如果满足以下条件,则转换依赖于特定列:
- 它包含显式依赖于该列的修改(例如,如果我们有
filter("eye = 'Brown'")并且在 RAW 数据集中重命名或删除了eye列,那么如果我们重新触发FilterTransform,增量更新将失败)。 - 修改后的列出现在输出数据集中(例如,如果我们在示例 RAW 数据集中删除了
hair列,我们的FilterTransform将失败)。
如果转换不依赖于模式更改,增量构建将成功。
例如,如果我们首先在转换中添加针对 id 和 eye 的 select 语句,并从 RAW 数据集触发初始快照构建,然后在 RAW 数据集中删除 hair 列,增量构建将成功,并且模式更改不会对增量转换产生任何影响。如果是对模式的附加更改(例如添加新列),构建也始终会成功。