跳转至

Technical primer: Iceberg changelogs in Python transforms(技术入门:Python 转换中的 Iceberg 变更日志)

This guide explains how to use changelogs in Python transforms with Iceberg tables. The guide covers the two main changelog modes, their mechanics, and best practices for avoiding common pitfalls.

What is a changelog?

A changelog is a view of the changes (inserts, updates, and deletes) that have occurred in a table between two points in time (as represented by Iceberg snapshots ↗). Changelogs enable efficient incremental processing, allowing you to process only what has changed since the last run.

There are two primary ways to generate a changelog in Foundry:

  • Identifier changelog (recommended): One or more identifier columns provided; recommended for most use cases.
  • Net changes changelog: No identifier columns provided; use when input data does not have a reliable set of primary keys.

Identifier changelog (has identifier columns)

Use the "identifier changelog" mode when you have one or more columns in your input data that uniquely identify each row, together constituting a primary key. This mode is more performant than the "net changes changelog" mode and provides richer semantics, including update-before and update-after records.

Identifier changelog syntax

To generate an identifier changelog, run source.changelog and provide an array of primary keys:

changelog_df = source.changelog(["id"])

Identifier changelog implementation details

Foundry's "identifier changelog" mode is implemented using the compute_updates option in Iceberg's create_changelog_view procedure.

With compute_updates:

  • Row insertions and deletions are represented as INSERT and DELETE changes.
  • Row updates are based on the identifier columns and are represented as a pair of UPDATE_BEFORE and UPDATE_AFTER rows, also known as "pre/post update images" ↗.

Foundry's API implements an additional step on top of Iceberg's create_changelog_view to get the latest update across a transaction range. For example, if a row has its value updated from 1 to 2 in one snapshot, and then from 2 to 3 in a subsequent snapshot, the resulting identifier changelog will only show an UPDATE_BEFORE of 2 and an UPDATE_AFTER of 3.

:::callout{theme="neutral"} The Iceberg table specification supports identifier fields ↗ in metadata but does not enforce uniqueness. Because of that, Transforms changelogs do not currently rely on identifier fields in metadata. :::

Identifier changelog schema

The schema for the identifier changelog:

Column name Type Description
All data columns various All columns from the source table
_change_type string One of INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER
_change_ordinal int Monotonically increasing number for ordering changes

Identifier changelog change types

The change types for the identifier changelog:

Change type Description
UPDATE_BEFORE The old version of the row for the identifier.
UPDATE_AFTER The new version of the row for the identifier.
INSERT New row for this identifier.
DELETE Row removed for this identifier.

Identifier changelog example

An example identifier changelog:

id value _change_type _change_ordinal
id1 val1 UPDATE_BEFORE 1
id1 val2 UPDATE_AFTER 1
id2 val2 INSERT 2

Net changes changelog (no identifier columns)

Use the "net changes changelog" mode when there is no set of identifier columns that uniquely identify entries in your input data. This approach provides more flexibility than the "identifier changelog" mode, but comes at the cost of performance.

Net changes changelog syntax

To generate a net changes changelog:

changelog_df = source.changelog()

:::callout{theme="neutral"} Use the "net changes changelog" mode when you cannot trust your primary keys or have duplicate rows in the source. :::

Net changes changelog implementation details

Foundry's "net changes changelog" mode is implemented using the net_changes option in Iceberg's create_changelog_view procedure.

In Iceberg, updating a row is implemented as a deletion and insertion of a row. There is no inherent notion of a row update in that nothing connects the deletion and insertion as semantically referring to the same row.

The "net changes changelog" is the simplest type of changelog in that it represents a row update as a pair of rows, one having as _change_type value DELETE and the other having _change_type value INSERT. (Deletions and insertions are represented by a single row with DELETE and INSERT respectively.)

The "net" in "net changes" refers to row updates (insertions, deletions, and updates) being combined across a range of snapshots. For example, if a row has its value updated from 1 to 2 in one snapshot, and 2 to 3 in a subsequent snapshot, then the "net changes changelog" will show a DELETE of 1 and an INSERT of 3. The 2 update is being skipped.

Net changes changelog schema

The schema for the net changes changelog:

Column name Type Description
All data columns various All columns from the source table
_change_type string INSERT or DELETE
_change_ordinal int Monotonically increasing number for ordering changes

Net changes changelog change types

The change types for the identifier changelog:

Change type Description
INSERT Row insertion
DELETE Row deletion

Net changes changelog example

An example net changes changelog:

id value _change_type _change_ordinal
id1 val1 DELETE 1
id1 val2 INSERT 1

Example changelog evolution

This example demonstrates how an Iceberg table evolves across snapshots V0 to V3 and how changelogs are produced.

Example table evolution across snapshots

V0 V1 V2 V3
[id1, val1] [id1, val2] [id1, val3] [id1, val3]
[id2, val2] [id1, val3]

Identifier changelog example evolution

The following tables show how the identifier changelog evolves across snapshots for the example Iceberg table.

Identifier changelog example: V0 -> V1

id value _change_type _change_ordinal
ID1 VAL1 UPDATE_BEFORE 1
ID1 VAL2 UPDATE_AFTER 1

Identifier changelog example: V1 -> V2

id value _change_type _change_ordinal
ID1 VAL2 UPDATE_BEFORE 2
ID1 VAL3 UPDATE_AFTER 2
ID2 VAL2 INSERT 2

Identifier changelog example: V0 -> V2

id value _change_type _change_ordinal
ID1 VAL1 UPDATE_BEFORE 1
ID1 VAL3 UPDATE_AFTER 2
ID2 VAL2 INSERT 2

Identifier changelog example: V0 -> V3 (failure case)

In this example, the identifier changelog from V0->V3 fails as there are duplicate rows for ID1 value of the primary key. This highlights the importance of unique identifier columns for the identifier changelog mode.

Net changes changelog example evolution

The following tables show how the net changes changelog evolves across snapshots for the example Iceberg table.

Net changes changelog example: V0 -> V1

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL2 INSERT 1

Net changes changelog example: V1 -> V2

id value _change_type _change_ordinal
ID1 VAL2 DELETE 2
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

Net changes changelog example: V0 -> V2

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
~~ID1~~ ~~VAL2~~ ~~INSERT~~ ~~1~~
~~ID1~~ ~~VAL2~~ ~~DELETE~~ ~~2~~
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

Rows with strikethrough are deleted to build the net changes changelog:

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

Net changes changelog example: V0 -> V3

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
~~ID1~~ ~~VAL2~~ ~~INSERT~~ ~~1~~
~~ID1~~ ~~VAL2~~ ~~DELETE~~ ~~2~~
ID1 VAL3 INSERT 2
~~ID2~~ ~~VAL2~~ ~~INSERT~~ ~~2~~
~~ID2~~ ~~VAL2~~ ~~DELETE~~ ~~3~~
ID1 VAL3 INSERT 3

Net changes after strikethrough rows removed:

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL3 INSERT 2
ID1 VAL3 INSERT 3

Uniqueness requirements and common pitfalls

When working with changelogs, you must ensure uniqueness of identifier columns at two stages: the source table and the destination table.

Uniqueness in the source table (source.changelog([identifier_cols]))

When you call:

changelog_df = source.changelog(["id"])

the API expects that the identifier columns (here, id) uniquely identify each row in your source table.

If there are duplicate rows for the same identifier in the source, changelog creation will fail:

ValueError: Duplicate rows found for identifier columns ['id'] in changelog view.

If you cannot guarantee uniqueness in your source, use the net changes changelog instead:

changelog_df = source.changelog()  # No identifier columns

Uniqueness in the destination table (output.apply_changelog(changelog_df, keys))

When you call:

output.apply_changelog(changelog_df, ["id"])

the keys argument specifies the identifier columns for the destination table. These columns must match the identifier columns in your changelog dataframe. The destination table must also be uniquely keyed by these columns.

If there are multiple rows in the destination table that match the same key, or if the changelog dataframe contains duplicates for the key, apply_changelog will fail:

Exception: apply_changelog failed: duplicate keys detected in changelog dataframe for identifier columns ['id']

or

Exception: apply_changelog failed: multiple rows in destination table match key ['id']

:::callout{theme="warning"} Always ensure both your changelog dataframe and your destination table are deduplicated on the identifier columns before calling apply_changelog. :::

Deduplicate before applying changelog

If you want to use the identifier changelog for its performance and semantics, you must deduplicate your data before calling apply_changelog. For example:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Group rows by identifiers and order by change ordinal such that the latest
# row comes first (`_row_number == 1`). This allows us to filter a net changes
# changelog to only keep the latest row per identifier.
window = Window.partitionBy("id").orderBy(F.col("_change_ordinal").desc())
deduped_df = (
        changelog_df
        .withColumn("_row_number", F.row_number().over(window))
        .filter("_row_number = 1")
        .drop("_row_number")
)

output.apply_changelog(deduped_df, ["id"])

If you skip this step and there are duplicates, apply_changelog will fail as shown above.

Summary and best practices

  • Prefer identifier changelog if your identifier columns are unique in the source.
  • Use net changes changelog if you have duplicates or cannot trust your primary key in the source.
  • Always deduplicate both the changelog dataframe and ensure the destination table is uniquely keyed before apply_changelog to avoid errors.
  • Understand the semantics and performance tradeoffs of each changelog mode.

For further reading, see the Iceberg documentation on changelog views ↗.


中文翻译


技术入门:Python 转换中的 Iceberg 变更日志

本指南介绍如何在 Python 转换 中配合 Iceberg 表 使用变更日志(changelog)。指南涵盖两种主要的变更日志模式、其运行机制以及避免常见陷阱的最佳实践。

什么是变更日志?

变更日志是表在两个时间点之间(由 Iceberg 快照 ↗ 表示)所发生变更(插入、更新和删除)的视图。变更日志支持高效的增量处理,使您能够仅处理自上次运行以来发生变化的数据。

在 Foundry 中,生成变更日志主要有两种方式:

  • 标识符变更日志(推荐):提供一个或多个标识符列;适用于大多数用例。
  • 净变更日志:不提供标识符列;当输入数据没有可靠的主键集时使用。

标识符变更日志(有标识符列)

当输入数据中有一个或多个列能唯一标识每一行(共同构成主键)时,请使用"标识符变更日志"模式。此模式比"净变更日志"模式性能更高,并提供更丰富的语义,包括更新前和更新后记录。

标识符变更日志语法

要生成标识符变更日志,请运行 source.changelog 并提供主键数组:

changelog_df = source.changelog(["id"])

标识符变更日志实现细节

Foundry 的"标识符变更日志"模式是通过 Iceberg 的 create_changelog_view 过程中的 compute_updates 选项实现的。

使用 compute_updates 时:

  • 行插入和删除分别表示为 INSERTDELETE 变更。
  • 行更新基于标识符列,并表示为一对 UPDATE_BEFOREUPDATE_AFTER 行,也称为"前/后更新映像" ↗

Foundry 的 API 在 Iceberg 的 create_changelog_view 基础上增加了一个额外步骤,以获取事务范围内的最新更新。例如,如果某行的值在一个快照中从 1 更新为 2,然后在后续快照中又从 2 更新为 3,则生成的标识符变更日志将仅显示 UPDATE_BEFORE2UPDATE_AFTER3

:::callout{theme="neutral"} Iceberg 表规范支持元数据中的标识符字段 ↗,但不强制唯一性。因此,Transforms 变更日志目前不依赖元数据中的标识符字段。 :::

标识符变更日志模式

标识符变更日志的模式:

列名 类型 描述
所有数据列 多种 源表中的所有列
_change_type string INSERTDELETEUPDATE_BEFOREUPDATE_AFTER 之一
_change_ordinal int 用于对变更进行排序的单调递增数字

标识符变更日志变更类型

标识符变更日志的变更类型:

变更类型 描述
UPDATE_BEFORE 该标识符对应行的旧版本。
UPDATE_AFTER 该标识符对应行的新版本。
INSERT 该标识符的新行。
DELETE 该标识符的行已被删除。

标识符变更日志示例

标识符变更日志示例:

id value _change_type _change_ordinal
id1 val1 UPDATE_BEFORE 1
id1 val2 UPDATE_AFTER 1
id2 val2 INSERT 2

净变更日志(无标识符列)

当没有一组标识符列能唯一标识输入数据中的条目时,请使用"净变更日志"模式。此方法比"标识符变更日志"模式更灵活,但代价是性能较低。

净变更日志语法

要生成净变更日志:

changelog_df = source.changelog()

:::callout{theme="neutral"} 当您无法信任主键或源中存在重复行时,请使用"净变更日志"模式。 :::

净变更日志实现细节

Foundry 的"净变更日志"模式是通过 Iceberg 的 create_changelog_view 过程中的 net_changes 选项实现的。

在 Iceberg 中,更新一行是通过删除和插入一行来实现的。没有固有的行更新概念,即没有任何东西将删除和插入在语义上关联为同一行。

"净变更日志"是最简单的变更日志类型,它将行更新表示为一对行,一行的 _change_type 值为 DELETE,另一行的 _change_type 值为 INSERT。(删除和插入分别由单行 DELETEINSERT 表示。)

"净变更"中的"净"指的是行更新(插入、删除和更新)在多个快照范围内被合并。例如,如果某行的值在一个快照中从 1 更新为 2,然后在后续快照中从 2 更新为 3,则"净变更日志"将显示 1DELETE3INSERT2 的更新被跳过。

净变更日志模式

净变更日志的模式:

列名 类型 描述
所有数据列 多种 源表中的所有列
_change_type string INSERTDELETE
_change_ordinal int 用于对变更进行排序的单调递增数字

净变更日志变更类型

标识符变更日志的变更类型:

变更类型 描述
INSERT 行插入
DELETE 行删除

净变更日志示例

净变更日志示例:

id value _change_type _change_ordinal
id1 val1 DELETE 1
id1 val2 INSERT 1

变更日志演变示例

此示例演示了 Iceberg 表如何在快照 V0V3 之间演变,以及如何生成变更日志。

跨快照的表演变示例

V0 V1 V2 V3
[id1, val1] [id1, val2] [id1, val3] [id1, val3]
[id2, val2] [id1, val3]

标识符变更日志演变示例

下表显示了标识符变更日志如何针对示例 Iceberg 表跨快照演变。

标识符变更日志示例:V0 -> V1

id value _change_type _change_ordinal
ID1 VAL1 UPDATE_BEFORE 1
ID1 VAL2 UPDATE_AFTER 1

标识符变更日志示例:V1 -> V2

id value _change_type _change_ordinal
ID1 VAL2 UPDATE_BEFORE 2
ID1 VAL3 UPDATE_AFTER 2
ID2 VAL2 INSERT 2

标识符变更日志示例:V0 -> V2

id value _change_type _change_ordinal
ID1 VAL1 UPDATE_BEFORE 1
ID1 VAL3 UPDATE_AFTER 2
ID2 VAL2 INSERT 2

标识符变更日志示例:V0 -> V3(失败情况)

在此示例中,从 V0 到 V3 的标识符变更日志失败,因为主键 ID1 的值存在重复行。这凸显了标识符变更日志模式中标识符列唯一性的重要性。

净变更日志演变示例

下表显示了净变更日志如何针对示例 Iceberg 表跨快照演变。

净变更日志示例:V0 -> V1

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL2 INSERT 1

净变更日志示例:V1 -> V2

id value _change_type _change_ordinal
ID1 VAL2 DELETE 2
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

净变更日志示例:V0 -> V2

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
~~ID1~~ ~~VAL2~~ ~~INSERT~~ ~~1~~
~~ID1~~ ~~VAL2~~ ~~DELETE~~ ~~2~~
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

删除带删除线的行以构建净变更日志:

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL3 INSERT 2
ID2 VAL2 INSERT 2

净变更日志示例:V0 -> V3

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
~~ID1~~ ~~VAL2~~ ~~INSERT~~ ~~1~~
~~ID1~~ ~~VAL2~~ ~~DELETE~~ ~~2~~
ID1 VAL3 INSERT 2
~~ID2~~ ~~VAL2~~ ~~INSERT~~ ~~2~~
~~ID2~~ ~~VAL2~~ ~~DELETE~~ ~~3~~
ID1 VAL3 INSERT 3

删除带删除线的行后的净变更:

id value _change_type _change_ordinal
ID1 VAL1 DELETE 1
ID1 VAL3 INSERT 2
ID1 VAL3 INSERT 3

唯一性要求与常见陷阱

使用变更日志时,必须确保在两个阶段标识符列的唯一性:源表和目标表。

源表中的唯一性(source.changelog([identifier_cols])

当您调用:

changelog_df = source.changelog(["id"])

API 期望标识符列(此处为 id)能唯一标识源表中的每一行。

如果源中同一标识符存在重复行,变更日志创建将失败:

ValueError: Duplicate rows found for identifier columns ['id'] in changelog view.

如果您无法保证源中的唯一性,请改用净变更日志:

changelog_df = source.changelog()  # 无标识符列

目标表中的唯一性(output.apply_changelog(changelog_df, keys)

当您调用:

output.apply_changelog(changelog_df, ["id"])

keys 参数指定目标表的标识符列。这些列必须与变更日志数据框中的标识符列匹配。目标表也必须通过这些列唯一键控。

如果目标表中有多行匹配同一键,或者变更日志数据框中包含该键的重复项,apply_changelog 将失败:

Exception: apply_changelog failed: duplicate keys detected in changelog dataframe for identifier columns ['id']

Exception: apply_changelog failed: multiple rows in destination table match key ['id']

:::callout{theme="warning"} 在调用 apply_changelog 之前,务必确保变更日志数据框和目标表都已根据标识符列去重。 :::

应用变更日志前进行去重

如果您希望利用标识符变更日志的性能和语义,则必须在调用 apply_changelog 之前对数据进行去重。例如:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 按标识符分组,并按变更序号降序排列,使最新行排在第一位(`_row_number == 1`)。
# 这样我们可以过滤净变更日志,仅保留每个标识符的最新行。
window = Window.partitionBy("id").orderBy(F.col("_change_ordinal").desc())
deduped_df = (
        changelog_df
        .withColumn("_row_number", F.row_number().over(window))
        .filter("_row_number = 1")
        .drop("_row_number")
)

output.apply_changelog(deduped_df, ["id"])

如果跳过此步骤且存在重复项,apply_changelog 将如上所示失败。

总结与最佳实践

  • 如果源中的标识符列是唯一的,请优先使用标识符变更日志
  • 如果源中存在重复项或无法信任主键,请使用净变更日志
  • 在调用 apply_changelog 之前,务必对变更日志数据框进行去重,并确保目标表具有唯一键控,以避免错误。
  • 理解每种变更日志模式的语义和性能权衡。

更多信息,请参阅 Iceberg 变更日志视图文档 ↗