跳转至

Example: Changelog views in Python transforms(示例:Python 转换中的变更日志视图(Changelog views))

This page provides code examples for working with CDC (Change Data Capture) and Iceberg changelogs in Python transforms.

In particular, note the following patterns and best practices:

  • Always use @incremental(v2_semantics=True) when working with Iceberg tables.
  • Use source.changelog([identifier_columns]) to get changes for each unique row (as defined by the identifier columns) since the last run.
  • Apply logic (such as cleaning, deduplication, or enrichment) to the changelog before merging.
  • Use output.apply_changelog(changelog_view_df, [identifier_columns]) to merge changes (including upserts and deletes).
  • Fall back to a full snapshot overwrite only if incremental context is unavailable (for example, on the first run).

Example: Incremental replication

This example is a simple CDC workflow that replicates changes from a source Iceberg table to a target, using the changelog API after applying any necessary filtering.

from transforms.api import (
    IncrementalTableTransformInput,
    IncrementalTransformContext,
    incremental,
    transform,
)
from transforms.tables import TableInput, TableOutput, TableTransformOutput

@incremental(v2_semantics=True)
@transform(
    source=TableInput("<PATH>/input"),
    output=TableOutput("<PATH>/output"),
)
def incremental_cleanup(
    ctx: IncrementalTransformContext,
    source: IncrementalTableTransformInput,
    output: TableTransformOutput,
):
    # Create a changelog view from the last seen Iceberg snapshot ID.
    changelog_view_df = source.changelog(["id"])

    # Read the changelog view, then merge into the target Iceberg table.
    output.apply_changelog(
        changelog_view_df.filter(...),  # Changelog view with any transformation logic applied
        ["id"]  # Identifier column(s)
    )

Key Points:

  • source.changelog([identifier_cols]) yields only the new, changed, or deleted rows since the last run.
  • The changelog dataframe can be modified, but the _change_type column must be preserved if you intend to use apply_changelog.
  • output.apply_changelog(df, identifier_columns) merges changes into the target. Each row is processed according to its _change_type value and its identifier-based match in the output table:
  • INSERT/UPDATE_AFTER: Updates any existing row with matching identifier columns or inserts a new row if there is no match. UPDATE_BEFORE is ignored.
  • DELETE: Deletes any existing row in the output table with the matching identifier columns.

Changelog performance optimization

  • Changelog reads and updates perform best with incremental changes. Reading and applying an entire input table as a changelog can be slow because row changes need to be correlated across a large snapshot range. For improved performance, branch your code using ctx.is_incremental and fall back to a full snapshot overwrite when the transform is not running incrementally (such as on the first run).
def changelog_with_fallback(ctx, source, output):
    if ctx.is_incremental:
        changelog_df = source.changelog(["id"])
        output.apply_changelog(changelog_df.filter(...), ["id"])
    else:
        # Full snapshot overwrite
        output.write_dataframe(source.dataframe().filter(...))
  • Changelog view creation is more performant when identifier columns are provided, as this allows for simpler internal partitioning. If identifier columns are not passed, the system generates a net changes changelog, which can be less efficient. See the changelog technical primer for more details on working with net changes changelogs.

中文翻译


示例:Python 转换中的变更日志视图(Changelog views)

本页面提供在 Python 转换中使用 CDC(变更数据捕获,Change Data Capture)和 Iceberg 变更日志的代码示例。

请特别注意以下模式与最佳实践:

  • 处理 Iceberg 表时,始终使用 @incremental(v2_semantics=True)
  • 使用 source.changelog([identifier_columns]) 获取自上次运行以来每个唯一行(由标识符列定义)的变更。
  • 在合并前对变更日志应用逻辑(如清洗、去重或数据丰富)。
  • 使用 output.apply_changelog(changelog_view_df, [identifier_columns]) 合并变更(包括更新插入和删除操作)。
  • 仅在增量上下文不可用时(例如首次运行),回退至全量快照覆盖。

示例:增量复制

以下示例是一个简单的 CDC 工作流,在应用必要过滤后,通过变更日志 API 将源 Iceberg 表的变更复制到目标表。

from transforms.api import (
    IncrementalTableTransformInput,
    IncrementalTransformContext,
    incremental,
    transform,
)
from transforms.tables import TableInput, TableOutput, TableTransformOutput

@incremental(v2_semantics=True)
@transform(
    source=TableInput("<PATH>/input"),
    output=TableOutput("<PATH>/output"),
)
def incremental_cleanup(
    ctx: IncrementalTransformContext,
    source: IncrementalTableTransformInput,
    output: TableTransformOutput,
):
    # 基于上次记录的 Iceberg 快照 ID 创建变更日志视图
    changelog_view_df = source.changelog(["id"])

    # 读取变更日志视图,然后合并到目标 Iceberg 表
    output.apply_changelog(
        changelog_view_df.filter(...),  # 已应用任意转换逻辑的变更日志视图
        ["id"]  # 标识符列
    )

关键要点:

  • source.changelog([identifier_cols]) 仅返回自上次运行以来新增、变更或删除的行。
  • 可对变更日志数据框进行修改,但若打算使用 apply_changelog,则必须保留 _change_type 列。
  • output.apply_changelog(df, identifier_columns) 将变更合并到目标表。每行根据其 _change_type 值及在输出表中基于标识符的匹配情况进行处理:
  • INSERT/UPDATE_AFTER:更新输出表中标识符列匹配的现有行,若无匹配则插入新行。UPDATE_BEFORE 将被忽略。
  • DELETE:删除输出表中标识符列匹配的现有行。

变更日志性能优化

  • 变更日志的读取和更新在增量变更场景下性能最佳。将整个输入表作为变更日志读取并应用可能较慢,因为需要在较大的快照范围内关联行变更。为提升性能,可使用 ctx.is_incremental 进行分支处理,在转换非增量运行时(如首次运行)回退至全量快照覆盖。
def changelog_with_fallback(ctx, source, output):
    if ctx.is_incremental:
        changelog_df = source.changelog(["id"])
        output.apply_changelog(changelog_df.filter(...), ["id"])
    else:
        # 全量快照覆盖
        output.write_dataframe(source.dataframe().filter(...))
  • 提供标识符列时,变更日志视图的创建性能更优,因为这样可以简化内部分区。若未传递标识符列,系统将生成净变更变更日志(net changes changelog),效率可能较低。有关处理净变更变更日志的更多详情,请参阅变更日志技术入门