跳转至

Iceberg changelogs and CDC pipelines(Iceberg 变更日志与 CDC 管道)

Foundry transforms make it easy to build robust, scalable Change Data Capture (CDC) pipelines leveraging Apache Iceberg’s changelog and snapshot features. You can use CDC in transforms to efficiently process new, updated, or deleted records since the last pipeline run, enabling efficient, incremental, low-latency data movement and processing.

In addition to existing support for append-only incremental transforms on datasets, Foundry now offers full CDC processing support for Iceberg tables as part of the transforms-tables library. This capability leverages Iceberg’s changelog views ↗ to retrieve inserts, updates, and deletes between Iceberg table snapshots.

Benefits of CDC processing

Using CDC with Iceberg tables offers a number of benefits including:

  • Efficient incremental processing: CDC avoids reprocessing the entire dataset on every run, improving performance and reducing costs.
  • Streaming and real-time pipelines: CDC enables low-latency data movement by processing only new and changed records.
  • Audit and slowly changing dimensions (SCD): CDC lets you track before/after changes for full audit trails or SCD Type 2 implementations.

Quick start: using changelog views in Python transforms

You can use the Palantir transforms API to read and write changelogs from Iceberg tables:

from transforms.api import incremental, transform
from transforms.tables import TableInput, TableOutput

@incremental(v2_semantics=True)
@transform(
    source=TableInput("<PATH>/your_iceberg_input_table"),
    output=TableOutput("<PATH>/your_iceberg_output_table"),
)
def cdc_transform(ctx, source, output):
    # Read only the changes since the last run
    changelog_df = source.changelog(["your_primary_key"])
    # Apply your business logic to the changelog
    output.apply_changelog(changelog_df, ["your_primary_key"])

For more detailed guides and examples, see the next sections with changelog code examples and a technical primer, including a walkthrough of an example with no primary keys in the input.


中文翻译

Iceberg 变更日志与 CDC 管道

Foundry 转换功能让您能够轻松构建健壮、可扩展的变更数据捕获(CDC)管道,充分利用 Apache Iceberg 的变更日志和快照特性。您可以在转换中使用 CDC 来高效处理自上次管道运行以来新增、更新或删除的记录,从而实现高效、增量、低延迟的数据移动和处理。

除了对数据集已有的仅追加增量转换支持外,Foundry 现在通过 transforms-tables 库为 Iceberg 表提供完整的 CDC 处理支持。该功能利用 Iceberg 的变更日志视图 ↗来检索 Iceberg 表快照之间的插入、更新和删除操作。

CDC 处理的优势

在 Iceberg 表上使用 CDC 具有多项优势,包括:

  • 高效的增量处理: CDC 避免每次运行时重新处理整个数据集,从而提升性能并降低成本。
  • 流式与实时管道: CDC 通过仅处理新增和变更的记录来实现低延迟数据移动。
  • 审计与缓慢变化维度(SCD): CDC 让您能够追踪变更前后的状态,实现完整的审计追踪或 SCD Type 2 实施。

快速入门:在 Python 转换中使用变更日志视图

您可以使用 Palantir 转换 API 来读取和写入 Iceberg 表的变更日志:

from transforms.api import incremental, transform
from transforms.tables import TableInput, TableOutput

@incremental(v2_semantics=True)
@transform(
    source=TableInput("<PATH>/your_iceberg_input_table"),
    output=TableOutput("<PATH>/your_iceberg_output_table"),
)
def cdc_transform(ctx, source, output):
    # 仅读取自上次运行以来的变更
    changelog_df = source.changelog(["your_primary_key"])
    # 对变更日志应用您的业务逻辑
    output.apply_changelog(changelog_df, ["your_primary_key"])

有关更详细的指南和示例,请参阅后续章节中的变更日志代码示例技术入门指南,其中包含一个输入表中没有主键的示例演练。