Example: Changelog views in Python transforms(示例:Python 转换中的变更日志视图(Changelog views))¶
This page provides code examples for working with CDC (Change Data Capture) and Iceberg changelogs in Python transforms.
In particular, note the following patterns and best practices:
- Always use
@incremental(v2_semantics=True)when working with Iceberg tables. - Use
source.changelog([identifier_columns])to get changes for each unique row (as defined by the identifier columns) since the last run. - Apply logic (such as cleaning, deduplication, or enrichment) to the changelog before merging.
- Use
output.apply_changelog(changelog_view_df, [identifier_columns])to merge changes (including upserts and deletes). - Fall back to a full snapshot overwrite only if incremental context is unavailable (for example, on the first run).
Example: Incremental replication¶
This example is a simple CDC workflow that replicates changes from a source Iceberg table to a target, using the changelog API after applying any necessary filtering.
from transforms.api import (
IncrementalTableTransformInput,
IncrementalTransformContext,
incremental,
transform,
)
from transforms.tables import TableInput, TableOutput, TableTransformOutput
@incremental(v2_semantics=True)
@transform(
source=TableInput("<PATH>/input"),
output=TableOutput("<PATH>/output"),
)
def incremental_cleanup(
ctx: IncrementalTransformContext,
source: IncrementalTableTransformInput,
output: TableTransformOutput,
):
# Create a changelog view from the last seen Iceberg snapshot ID.
changelog_view_df = source.changelog(["id"])
# Read the changelog view, then merge into the target Iceberg table.
output.apply_changelog(
changelog_view_df.filter(...), # Changelog view with any transformation logic applied
["id"] # Identifier column(s)
)
Key Points:
source.changelog([identifier_cols])yields only the new, changed, or deleted rows since the last run.- The changelog dataframe can be modified, but the
_change_typecolumn must be preserved if you intend to useapply_changelog. output.apply_changelog(df, identifier_columns)merges changes into the target. Each row is processed according to its_change_typevalue and its identifier-based match in the output table:INSERT/UPDATE_AFTER: Updates any existing row with matching identifier columns or inserts a new row if there is no match.UPDATE_BEFOREis ignored.DELETE: Deletes any existing row in the output table with the matching identifier columns.
Changelog performance optimization¶
- Changelog reads and updates perform best with incremental changes. Reading and applying an entire input table as a changelog can be slow because row changes need to be correlated across a large snapshot range. For improved performance, branch your code using
ctx.is_incrementaland fall back to a full snapshot overwrite when the transform is not running incrementally (such as on the first run).
def changelog_with_fallback(ctx, source, output):
if ctx.is_incremental:
changelog_df = source.changelog(["id"])
output.apply_changelog(changelog_df.filter(...), ["id"])
else:
# Full snapshot overwrite
output.write_dataframe(source.dataframe().filter(...))
- Changelog view creation is more performant when identifier columns are provided, as this allows for simpler internal partitioning. If identifier columns are not passed, the system generates a net changes changelog, which can be less efficient. See the changelog technical primer for more details on working with net changes changelogs.
中文翻译¶
示例:Python 转换中的变更日志视图(Changelog views)¶
本页面提供在 Python 转换中使用 CDC(变更数据捕获,Change Data Capture)和 Iceberg 变更日志的代码示例。
请特别注意以下模式与最佳实践:
- 处理 Iceberg 表时,始终使用
@incremental(v2_semantics=True)。 - 使用
source.changelog([identifier_columns])获取自上次运行以来每个唯一行(由标识符列定义)的变更。 - 在合并前对变更日志应用逻辑(如清洗、去重或数据丰富)。
- 使用
output.apply_changelog(changelog_view_df, [identifier_columns])合并变更(包括更新插入和删除操作)。 - 仅在增量上下文不可用时(例如首次运行),回退至全量快照覆盖。
示例:增量复制¶
以下示例是一个简单的 CDC 工作流,在应用必要过滤后,通过变更日志 API 将源 Iceberg 表的变更复制到目标表。
from transforms.api import (
IncrementalTableTransformInput,
IncrementalTransformContext,
incremental,
transform,
)
from transforms.tables import TableInput, TableOutput, TableTransformOutput
@incremental(v2_semantics=True)
@transform(
source=TableInput("<PATH>/input"),
output=TableOutput("<PATH>/output"),
)
def incremental_cleanup(
ctx: IncrementalTransformContext,
source: IncrementalTableTransformInput,
output: TableTransformOutput,
):
# 基于上次记录的 Iceberg 快照 ID 创建变更日志视图
changelog_view_df = source.changelog(["id"])
# 读取变更日志视图,然后合并到目标 Iceberg 表
output.apply_changelog(
changelog_view_df.filter(...), # 已应用任意转换逻辑的变更日志视图
["id"] # 标识符列
)
关键要点:
source.changelog([identifier_cols])仅返回自上次运行以来新增、变更或删除的行。- 可对变更日志数据框进行修改,但若打算使用
apply_changelog,则必须保留_change_type列。 output.apply_changelog(df, identifier_columns)将变更合并到目标表。每行根据其_change_type值及在输出表中基于标识符的匹配情况进行处理:INSERT/UPDATE_AFTER:更新输出表中标识符列匹配的现有行,若无匹配则插入新行。UPDATE_BEFORE将被忽略。DELETE:删除输出表中标识符列匹配的现有行。
变更日志性能优化¶
- 变更日志的读取和更新在增量变更场景下性能最佳。将整个输入表作为变更日志读取并应用可能较慢,因为需要在较大的快照范围内关联行变更。为提升性能,可使用
ctx.is_incremental进行分支处理,在转换非增量运行时(如首次运行)回退至全量快照覆盖。
def changelog_with_fallback(ctx, source, output):
if ctx.is_incremental:
changelog_df = source.changelog(["id"])
output.apply_changelog(changelog_df.filter(...), ["id"])
else:
# 全量快照覆盖
output.write_dataframe(source.dataframe().filter(...))
- 提供标识符列时,变更日志视图的创建性能更优,因为这样可以简化内部分区。若未传递标识符列,系统将生成净变更变更日志(net changes changelog),效率可能较低。有关处理净变更变更日志的更多详情,请参阅变更日志技术入门。