Examples(示例)¶
This section contains a wide range of examples of incrementally computable transforms:
- Append
- Merge and append
- Merge and replace
- Leveraging incremental transforms to join large datasets
- Handling schema or logic changes
- Developing incremental code on a branch
- Summary of examples
The examples make use of two inputs to demonstrate incremental computation: students and students_updated. The students input contains 3 students and is not incremental. This means it has no history:
>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students.dataframe('added') is students.dataframe()
True
The students_updated input is the same as students but with an additional update that contains three extra students. This update makes the input incremental. Therefore, it has a non-empty previous DataFrame.
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True
Append¶
An append-only incremental computation is one where the added output rows are a function only of the added input rows. This means that to compute its output, the transform does the following:
- Looks at any newly added input data,
- Computes any new output rows–which are a function only of these added input rows, and
- Appends the new output to the existing output.
:::callout{theme="neutral"} Changing column types, formatting dates as strings, and filtering are all examples of append-only computations. In these examples, each added input row is transformed or deleted to generate the output rows. :::
Notice that the only difference to make an append-only transform incremental is the incremental() decoration.
When running incrementally, the default read mode of added means the transform reads only the new students, and the default write mode of modify means the transform appends only the filtered new students to the output.
When running non-incrementally, the default read mode of added means the transform reads the full input, and the default write mode of replace means the transform replaces the output with the full set of filtered students.
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_filter(students, processed):
new_students_df = students.dataframe()
processed.write_dataframe(
new_students_df.filter(new_students_df.hair == 'Brown')
)
Merge and append¶
Sometimes a transform needs to refer to its previous output in order to incrementally compute an update. An example of this is the distinct() ↗ method.
To remove duplicate rows in a transform (assuming the current output is correct), the transform must de-duplicate any new rows in the input, and then check those rows do not already exist in the output.
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_distinct(students, processed):
new_students_df = students.dataframe()
processed.write_dataframe(
new_students_df.distinct().subtract(
processed.dataframe('previous', schema=new_students_df.schema)
)
)
Here we make use of the previous read mode on the output dataset. This returns the DataFrame ↗ that was output during the last build. Since it is possible that there is no previous output, we have to provide a schema to the dataframe('previous') call so that an empty DataFrame can be correctly constructed.
Merge and replace¶
There are some transformations that always replace their entire output. Yet often, these transforms can still benefit from incremental computation. One such example is aggregating statistics. For example, counting the number of times each distinct value occurs in a column.
from pyspark.sql import functions as F
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_group_by(students, processed):
# Compute the hair color counts for only the new students.
new_hair_counts_df = students.dataframe().groupBy('hair').count()
# Union with the old counts
out_schema = new_hair_counts_df.schema
all_counts_df = new_hair_counts_df.union(
processed.dataframe('previous', schema=out_schema)
)
# Group by hair color, summing the two sets of counts.
totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count'))
# To fully replace the output, we always set the output mode to 'replace'.
processed.set_mode('replace')
processed.write_dataframe(totals_df.select(out_schema.fieldNames()))
Again, since it is possible that there is no previous output, we have to provide a schema to the dataframe('previous') call so that an empty DataFrame can be correctly constructed.
Merge and append with varying schemas¶
Sometimes, an incremental transform needs to create a Spark DataFrame from the files added to a schemaless input dataset and then append the contents of that DataFrame to the output. For such transforms, there are two implementation patterns.
-
Statically specify the expected schema and enforce that the DataFrame generated has that schema (by ignoring extra fields in the input data, filling in nulls for fields missing in the input data, and so on).
-
Dynamically capture whatever fields are present in the input data.
For the dynamic capture implementation pattern, it is necessary to ensure that the DataFrame that is appended to the output has a schema that is compatible with the existing output. For a schema to be compatible with the existing output, the following conditions must be met:
-
Columns in the new data that have the same name as columns in the existing output must also have the same type.
-
All columns in the existing output must be present in the new data.
-
There are no columns in the new data with names that only differ in capitalization (case) from columns in the existing data; for example, there cannot be a column
Valuein the new data if there is already a columnvaluein the existing data.
To ensure that these conditions are met, it is necessary to dynamically inspect the schema of the existing data, which means calling dataframe('previous') without specifying a schema. This is supported as long as the transform is being run incrementally.
The below code puts all of these principles together.
from functools import reduce
from pyspark.sql import functions as F
from transforms.api import transform, incremental, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@incremental()
@transform(
csvs=Input('/examples/dataset_of_csvs'),
parsed=Output('/examples/parsed_csvs')
)
def incremental_read_csv(ctx, csvs, parsed):
input_fs = csvs.filesystem()
def process_file(f):
df = (
ctx.spark_session.read
# Set inferSchema to False so that every column is a string
# This prevents issues due to inconsistent inference results between files,
# both within and across incremental builds.
.option("inferSchema", False)
.option("header", True)
.csv(input_fs.hadoop_path + "/" + f)
)
sanitized = sanitize_schema_for_parquet(df)
# Make all columns lowercase to prevent issues due to inconsistent casing
# between files, both within and across incremental builds.
# Note that this logic does not handle the situation
# where a single file contains columns differing only in case.
return sanitized.select(*(F.col(a).alias(a.lower()) for a in sanitized.columns))
dfs = [process_file(f.path) for f in input_fs.ls()]
if len(dfs) == 0:
parsed.abort()
return
df = reduce(
lambda a, b: a.unionByName(b, allowMissingColumns=True),
dfs[1:],
dfs[0],
)
if not ctx.is_incremental:
parsed.write_dataframe(df)
else:
existing_columns = parsed.dataframe("previous").columns
columns_in_new_data = set(df.columns)
resolved_schema_df = df.select(
*df.columns,
*(
F.lit(None).cast("string").alias(col)
for col in existing_columns
if col not in columns_in_new_data
)
)
parsed.write_dataframe(resolved_schema_df)
Leverage incremental transforms to join large datasets¶
Let's assume you have two tables - Orders submitted by the customer and Deliveries that have been completed - and we would like to compute a table DeliveryDuration with the time it takes for items to be delivered. Even though both Orders and Deliveries tables will only get new rows appended and no rows will ever be modified, a simple join between the two incremental datasets will not work. For example, the Orders table might contain orderIds that are not yet present in the Deliveries table.
Orders: Deliveries:
+---------+---------------+ +---------+--------------+ +---------+------------------+
| orderId | submittedDate | | orderId | deliveryDate | | orderId | deliveryDuration |
+---------+---------------+ +---------+--------------+ ----> +---------+------------------+
| 1001 | 2019-08-21 | join on | 1001 | 2019-08-23 | | 1001 | 2 |
+---------+---------------+ orderId +---------+--------------+ +---------+------------------+
| 1002 | 2019-08-22 |
+---------+---------------+
| 1003 | 2019-08-23 |
+---------+---------------+
Assuming orderId is strictly increasing in both Orders and Deliveries tables, we can check what has been the last orderId that we computed deliveryDuration for (maxComputedOrderId) and only get the rows from Orders and Deliveries tables with orderId bigger than maxComputedOrderId:
from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F
@incremental(snapshot_inputs=['orders', 'deliveries'])
@transform(
orders=Input('/example/Orders'),
deliveries=Input('/example/Deliveries'),
delivery_duration=Output('/example/New_Delivery_Date')
)
def compute_delivery_duration(orders, deliveries, delivery_duration):
def to_fields(datatype, names, nullable=True):
return [T.StructField(n, datatype, nullable) for n in names]
# Generate a schema to pass for deliveryDuration
fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration'])
# Explicitly define the schema as you can't refer to the previous version schema
maxComputedOrderId = (
delivery_duration
.dataframe('previous', schema=T.StructType(fields))
.groupby()
.max('orderId')
.collect()[0][0]
)
# At first iteration, maxComputedOrderId is empty because delivery_duration dataset doesn't exist yet
if maxComputedOrderId == None:
maxComputedOrderId = 0
ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId)
deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId)
newDurations = (
ordersNotProcessed
.join(deliveriesNotProcessed, 'orderId', how='left')
.withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate')))
.drop(*['submittedDate', 'deliveryDate'])
)
delivery_duration.write_dataframe(newDurations)
Handle schema or logic changes¶
Let’s say we would like to add another column to our incremental dataset from now on. Adding another column to the output won’t invalidate the is_incremental flag, so the next run will compute the new rows and write the data with a new column and this column will be null in all the rows written previously.
However, we might want to populate the column for previous rows as well. Increasing the semantic_version of the transform will make it run non-incrementally once, and if you are using read mode of “added”, the input will contain all the data enabling you to recompute it and add the new column.
If your transform has been creating a historical dataset from snapshot input, then it becomes slightly more complicated, as the previous data is in a stack of snapshot transactions on your input. In this case, contact your Palantir representative.
In this example, we discussed adding a new column, but the above reasoning applies to all sorts of logic changes.
Develop incremental code on a branch¶
Creating a new branch and running a build on it, will run the build incrementally. Simply the last transaction commited on the original branch you created your branch based off will be seen as the previous transaction for the first build on the new branch.
Summary of examples¶
We saw how to process data incrementally by:
- getting newly added rows, processing them and appending them to the output,
- getting newly added rows, filtering them based on rows already present in the output and appending them to the output
- getting newly added rows, computing an aggregation based on new rows and rows already present in the output and replacing the output with new aggregated statistics
- leveraging incremental transforms to join large datasets
We also explored how to:
- handle schema or logic changes of an incremental transform
- develop incremental code on a branch without rebuilding based on full content of inputs
Incremental Python errors¶
To understand incremental errors, it is easier — and sometimes necessary — to have read the concepts of transactions and dataset views.
Catalog transaction errors¶
Useful context¶
When a job runs incrementally, its incremental input datasets only consist of the unprocessed transactions range, not the full dataset view.
Imagine the following transaction history for a dataset:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
|
Last processed transaction
The last time the dataset was built, the latest transaction was (3). Since then, transactions (4) and (5) have been committed, therefore the unprocessed transaction range is (4) — (5).
The dataset view is the transaction range (1) — (5). The transaction “on top” of the view (the most recent) is sometimes referred to as the branch’s HEAD (again by analogy with git). Like in git a branch is a pointer to a transaction, so we say that the branch points at transaction (5). Several branches can point at several transactions, and branches might share a transaction history:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5) [develop]
|
└─> UPDATE [feature-branch]
Error: Catalog:TransactionsNotInView¶
In order for the job to run incrementally, a series of checks is run at the beginning of the job. One of these checks verifies that the unprocessed transactions range is strictly incremental (i.e., append-only file changes, see requirements for incremental computation). It does so by comparing the files in the unprocessed transactions range, and the processed transactions range.
However if the branch’s HEAD has been moved, the incremental job is now in an inconsistent state: it no longer make sense to compare both ranges, so an error Catalog:TransactionNotInView is thrown.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)
| (last processed (branch's previous
| transaction) HEAD, now orphan)
|
└─> UPDATE (6) --> UPDATE (7, branch's current HEAD)
Here the processed transaction range is (1) — (5), the current branch's HEAD points at (7), and the current view consists of transactions (1), (2), (6), and (7).
This is an inconsistent state because not all processed transactions are upstream of the branch’s HEAD: indeed (3) is not. In other words, the previous HEAD (3) no longer is part of the current view, hence why Catalog:TransactionNotInView is thrown.
Error: Catalog:InconsistentBranchesOrder¶
The other Catalog error that can be thrown is Catalog:InconsistentBranchesOrder, when the last processed transaction (prevTransaction) is greater than the branch HEAD (endTransaction). This can happen if the HEAD of the dataset is moved to a transaction before the previous transaction.
See the below for a diagram of how this error can occur:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
| |
Current HEAD Last processed transaction
Remediation of errors¶
A branch’s HEAD can change for two reasons:
- A user knowingly updated the branch’s HEAD using Catalog endpoints.
- Some transactions were not committed through a transform job. For example, when you merge branches in Code Workbook, the dataset is also “merged”.
- However, transactions on Code Workbook datasets are always
SNAPSHOT, so they cannot lead to an inconsistent state.
In order to remediate this, you will need to either:
- Run the transform as a snapshot; for example, by bumping the semantic version. This starts a new dataset view, thereby resetting the incremental check mentioned above.
- Manually update the branch’s HEAD to point at a transaction which is downstream of the already-processed range. This must be done using the
updateBranch2endpoint with the latest processed transaction as the parentRef. Note that we only recommend the use of this endpoint for experienced users.
中文翻译¶
示例¶
本章节包含各类可增量计算的转换(transform) 示例: * 追加(Append) * 合并后追加(Merge and append) * 合并后替换(Merge and replace) * 利用增量转换关联大数据集 * 处理模式或逻辑变更 * 在分支上开发增量代码 * 示例总结
本章节的示例使用两个输入数据集演示增量计算逻辑:students 和 students_updated。students 输入包含3条学生数据,不支持增量,因此没有历史记录:
>>> students.dataframe('previous').sort('id').show()
+---+----+---+---+
| id|hair|eye|sex|
+---+----+---+---+
+---+----+---+---+
>>>
>>> students.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students.dataframe('added') is students.dataframe()
True
students_updated 输入的基础数据和students一致,但额外新增了3条学生数据,该更新让输入支持增量,因此它的previous数据帧(DataFrame)非空:
>>> students_updated.dataframe('previous').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('current').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 1|Brown|Green|Female|
| 2| Red| Blue| Male|
| 3|Blond|Hazel|Female|
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> students_updated.dataframe('added').sort('id').show()
+---+-----+-----+------+
| id| hair| eye| sex|
+---+-----+-----+------+
| 4|Brown|Green|Female|
| 5|Brown| Blue| Male|
| 6|Blond|Hazel|Female|
+---+-----+-----+------+
>>>
>>> # Recall that the default read mode for inputs is 'added'
>>> students_updated.dataframe('added') is students_updated.dataframe()
True
追加(Append)¶
仅追加的增量计算指新增输出行仅由新增输入行计算得到,这类转换的执行逻辑如下: * 读取所有新增的输入数据 * 仅基于这些新增输入行计算得到新的输出行 * 将新的输出追加到现有输出的末尾
:::callout{theme="neutral"} 修改列类型、日期转字符串、过滤操作都属于仅追加计算的典型场景。在这类场景中,每一条新增输入行都会经过转换或过滤,最终生成对应的输出行。 :::
请注意,让仅追加转换支持增量运行的唯一修改,就是添加incremental()装饰器。
- 增量运行时,默认的added读取模式意味着转换仅读取新增的学生数据,默认的modify写入模式意味着转换仅将过滤后的新数据追加到输出中。
- 非增量运行时,默认的added读取模式意味着转换读取全量输入,默认的replace写入模式意味着转换会用全量过滤后的结果替换原有输出。
from transforms.api import transform, incremental, Input, Output
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_filter(students, processed):
new_students_df = students.dataframe()
processed.write_dataframe(
new_students_df.filter(new_students_df.hair == 'Brown')
)
合并后追加(Merge and append)¶
有时转换需要读取之前的输出来完成增量更新计算,distinct() ↗ 方法就是典型例子。要在转换中实现去重(假设当前输出本身没有重复数据),转换必须先对输入中的新行去重,再校验这些行不存在于现有输出中。
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_distinct(students, processed):
new_students_df = students.dataframe()
processed.write_dataframe(
new_students_df.distinct().subtract(
processed.dataframe('previous', schema=new_students_df.schema)
)
)
这里我们使用了输出数据集的previous读取模式,会返回上一次构建生成的 DataFrame ↗。由于可能不存在previous输出,我们需要在调用dataframe('previous')时传入模式(schema),以便正确构造空DataFrame。
合并后替换(Merge and replace)¶
部分转换始终需要替换全量输出,但这类转换通常仍可以从增量计算中获益,统计聚合(aggregation) 就是典型场景,比如统计某一列各独立值的出现次数。
from pyspark.sql import functions as F
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_group_by(students, processed):
# Compute the hair color counts for only the new students.
new_hair_counts_df = students.dataframe().groupBy('hair').count()
# Union with the old counts
out_schema = new_hair_counts_df.schema
all_counts_df = new_hair_counts_df.union(
processed.dataframe('previous', schema=out_schema)
)
# Group by hair color, summing the two sets of counts.
totals_df = all_counts_df.groupBy('hair').agg(F.sum('count').alias('count'))
# To fully replace the output, we always set the output mode to 'replace'.
processed.set_mode('replace')
processed.write_dataframe(totals_df.select(out_schema.fieldNames()))
同样,由于可能不存在previous输出,我们需要在调用dataframe('previous')时传入模式(schema),以便正确构造空DataFrame。
多模式下的合并后追加(Merge and append with varying schemas)¶
有时增量转换需要从无模式输入数据集的新增文件中生成Spark DataFrame,再将该DataFrame的内容追加到输出中。针对这类转换,有两种实现模式: * 静态指定预期schema,并强制生成的DataFrame符合该schema(比如忽略输入数据中的额外字段,为输入缺失的字段填充null值等)。 * 动态捕获输入数据中存在的所有字段。
对于动态捕获的实现模式,需要确保追加到输出的DataFrame的schema与现有输出兼容,需满足以下条件:
* 新数据中与现有输出同名的列,数据类型必须一致。
* 现有输出中的所有列必须在新数据中存在。
* 新数据中不能出现与现有数据列名仅大小写不同的列;比如如果现有数据中已经有value列,新数据中就不能有Value列。
要满足以上条件,需要动态检查现有数据的schema,也就是调用dataframe('previous')时不指定schema,只要转换以增量模式运行,该用法就受支持。
以下代码实现了上述所有逻辑:
from functools import reduce
from pyspark.sql import functions as F
from transforms.api import transform, incremental, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
@incremental()
@transform(
csvs=Input('/examples/dataset_of_csvs'),
parsed=Output('/examples/parsed_csvs')
)
def incremental_read_csv(ctx, csvs, parsed):
input_fs = csvs.filesystem()
def process_file(f):
df = (
ctx.spark_session.read
# Set inferSchema to False so that every column is a string
# This prevents issues due to inconsistent inference results between files,
# both within and across incremental builds.
.option("inferSchema", False)
.option("header", True)
.csv(input_fs.hadoop_path + "/" + f)
)
sanitized = sanitize_schema_for_parquet(df)
# Make all columns lowercase to prevent issues due to inconsistent casing
# between files, both within and across incremental builds.
# Note that this logic does not handle the situation
# where a single file contains columns differing only in case.
return sanitized.select(*(F.col(a).alias(a.lower()) for a in sanitized.columns))
dfs = [process_file(f.path) for f in input_fs.ls()]
if len(dfs) == 0:
parsed.abort()
return
df = reduce(
lambda a, b: a.unionByName(b, allowMissingColumns=True),
dfs[1:],
dfs[0],
)
if not ctx.is_incremental:
parsed.write_dataframe(df)
else:
existing_columns = parsed.dataframe("previous").columns
columns_in_new_data = set(df.columns)
resolved_schema_df = df.select(
*df.columns,
*(
F.lit(None).cast("string").alias(col)
for col in existing_columns
if col not in columns_in_new_data
)
)
parsed.write_dataframe(resolved_schema_df)
利用增量转换关联大数据集(Leverage incremental transforms to join large datasets)¶
假设你有两张表:客户提交的Orders表和已完成的Deliveries表,我们需要计算DeliveryDuration表存储商品的配送时长。尽管Orders和Deliveries表都只会追加新行、不会修改已有行,直接对两个增量数据集做简单关联(join) 无法得到正确结果,比如Orders表中可能存在还没有出现在Deliveries表中的orderId。
Orders: Deliveries:
+---------+---------------+ +---------+--------------+ +---------+------------------+
| orderId | submittedDate | | orderId | deliveryDate | | orderId | deliveryDuration |
+---------+---------------+ +---------+--------------+ ----> +---------+------------------+
| 1001 | 2019-08-21 | join on | 1001 | 2019-08-23 | | 1001 | 2 |
+---------+---------------+ orderId +---------+--------------+ +---------+------------------+
| 1002 | 2019-08-22 |
+---------+---------------+
| 1003 | 2019-08-23 |
+---------+---------------+
假设orderId在Orders和Deliveries表中都是严格递增的,我们可以查询已经计算过deliveryDuration的最大orderId(maxComputedOrderId),只读取两张表中orderId大于maxComputedOrderId的行:
from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F
@incremental(snapshot_inputs=['orders', 'deliveries'])
@transform(
orders=Input('/example/Orders'),
deliveries=Input('/example/Deliveries'),
delivery_duration=Output('/example/New_Delivery_Date')
)
def compute_delivery_duration(orders, deliveries, delivery_duration):
def to_fields(datatype, names, nullable=True):
return [T.StructField(n, datatype, nullable) for n in names]
# Generate a schema to pass for deliveryDuration
fields = to_fields(T.IntegerType(), ['orderId', 'deliveryDuration'])
# Explicitly define the schema as you can't refer to the previous version schema
maxComputedOrderId = (
delivery_duration
.dataframe('previous', schema=T.StructType(fields))
.groupby()
.max('orderId')
.collect()[0][0]
)
# At first iteration, maxComputedOrderId is empty because delivery_duration dataset doesn't exist yet
if maxComputedOrderId == None:
maxComputedOrderId = 0
ordersNotProcessed = orders.dataframe().filter(F.col('orderId') > maxComputedOrderId)
deliveriesNotProcessed = deliveries.dataframe().filter(F.col('orderId') > maxComputedOrderId)
newDurations = (
ordersNotProcessed
.join(deliveriesNotProcessed, 'orderId', how='left')
.withColumn('deliveryDuration', F.datediff(F.col('deliveryDate'), F.col('submittedDate')))
.drop(*['submittedDate', 'deliveryDate'])
)
delivery_duration.write_dataframe(newDurations)
处理模式或逻辑变更(Handle schema or logic changes)¶
假设我们想要从现在开始给增量数据集新增一列。给输出加新列不会使is_incremental标识失效,因此下一次运行会计算新行,写入包含新列的数据,之前写入的所有行的该列值会为null。
但如果我们也想要给历史行填充该列的值,提升转换的语义版本(semantic_version) 会让它全量运行一次,如果你的读取模式是added,此时输入会包含全量数据,你就可以重新计算全量数据并添加新列。
如果你的转换是基于快照输入生成历史数据集,处理会稍微复杂一些,因为历史数据存储在输入的一系列快照事务(transaction) 中。这种情况下,请联系你的Palantir客户代表。
本示例中我们讨论的是新增列的场景,但上述逻辑适用于各类逻辑变更。
在分支上开发增量代码(Develop incremental code on a branch)¶
创建新分支(branch) 并在其上运行构建时,构建会以增量模式运行。你创建分支时基于的原分支上最后提交的事务,会被视为新分支上第一次构建的上一次事务。
示例总结(Summary of examples)¶
我们学习了以下增量处理数据的方式: * 获取新增行,处理后追加到输出 * 获取新增行,基于输出中已有的行过滤后追加到输出 * 获取新增行,基于新增行和输出中已有行计算聚合结果,用新的聚合统计结果替换全量输出 * 利用增量转换关联大数据集
我们也了解了如何: * 处理增量转换的schema或逻辑变更 * 在分支上开发增量代码,无需基于全量输入重新构建
增量Python错误(Incremental Python errors)¶
要理解增量错误,建议(有时是必须)先阅读事务(transaction)和数据集视图(dataset view)的相关概念。
目录事务错误(Catalog transaction errors)¶
背景说明¶
作业以增量模式运行时,其增量输入数据集仅包含未处理的事务范围,而非全量数据集视图。
假设某数据集的事务历史如下:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
|
Last processed transaction
上一次构建数据集时,最新的事务是(3)。之后提交了事务(4)和(5),因此未处理的事务范围是(4)到(5)。
数据集视图是事务范围(1)到(5),视图最顶部(最新)的事务有时也被称为分支的HEAD(类比Git的概念)。和Git一样,分支是指向某个事务的指针,因此我们称分支指向事务(5)。多个分支可以指向不同的事务,分支之间也可以共享事务历史:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5) [develop]
|
└─> UPDATE [feature-branch]
错误:Catalog:TransactionsNotInView¶
要让作业以增量模式运行,作业启动时会执行一系列检查。其中一项检查会校验未处理事务范围是严格增量的(即仅追加文件变更,参见增量计算要求),校验方式是对比未处理事务范围和已处理事务范围中的文件。
但如果分支的HEAD被移动,增量作业就会进入不一致状态:此时对比两个事务范围不再有意义,因此会抛出Catalog:TransactionNotInView错误。
以下是该错误发生的场景示意图:
SNAPSHOT (1) ─> UPDATE (2) ─> UPDATE (3) ─> UPDATE (4) ─> UPDATE (5)
| (last processed (branch's previous
| transaction) HEAD, now orphan)
|
└─> UPDATE (6) --> UPDATE (7, branch's current HEAD)
此处已处理事务范围是(1)到(5),当前分支的HEAD指向(7),当前视图包含事务(1)、(2)、(6)和(7)。
这属于不一致状态,因为并非所有已处理事务都在分支HEAD的上游:事务(3)就不在。换句话说,之前的HEAD(3)不再属于当前视图,因此会抛出Catalog:TransactionNotInView错误。
错误:Catalog:InconsistentBranchesOrder¶
另一种可能抛出的目录错误是Catalog:InconsistentBranchesOrder,即最后处理的事务(prevTransaction)的序号大于分支HEAD的事务(endTransaction)序号。当数据集的HEAD被回退到上一次处理的事务之前的版本时,就会发生该错误。
以下是该错误发生的场景示意图:
SNAPSHOT (1) --> UPDATE (2) --> UPDATE (3) --> UPDATE (4) --> UPDATE (5)
| |
Current HEAD Last processed transaction
错误修复方案¶
分支HEAD发生变更通常有以下原因:
* 用户通过Catalog接口主动更新了分支HEAD。
* 部分事务不是通过转换作业提交的,比如你在Code Workbook中合并分支时,数据集也会被“合并”。
* 不过Code Workbook数据集的事务始终是SNAPSHOT类型,因此不会导致不一致状态。
要修复该错误,你可以选择以下任一方式:
* 以快照模式运行转换,比如提升语义版本号。这会生成新的数据集视图,从而重置上述增量检查。
* 手动更新分支HEAD,使其指向已处理事务范围下游的某个事务。该操作需要调用updateBranch2接口,将最新处理的事务作为parentRef传入。我们仅建议经验丰富的用户使用该接口。