跳转至

Time bounded drop duplicates(时间限定去重(Time bounded drop duplicates))

Supported in: Streaming

Drops duplicate rows from the input for given column subset, rows seen will expire after configured amount of event time. Row that arrive late by an amount greater than the configured amount of event time will always be dropped. Partitions by keys specified. Each drop duplicates will be computed separately for distinct key column values.

Transform categories: Other

Declared arguments

  • Dataset: Dataset to deduplicate rows.
    Table
  • Key expiration time unit: Unit for amount of time to wait for data to deduplicate over.
    Enum\
  • Key expiration time value: Value for the amount of time to wait for data to deduplicate over.
    Literal\
  • optional Column subset: If any columns are specified only those will be used when determining uniqueness, otherwise the key subset that the stream is keyed by is implicitly used to determine uniqueness.
    Set\>
  • optional Eviction window slide: Value for how long the tumbling window of eviction should be, indicating the cadence at which stale state will be evicted. State is considered stale when more than the specified timeout in event-time has elapsed. Duplicates will be dropped between (key_expiry : key_expiry + eviction_slide] since the last duplicate was seen. Changing this value is considered a state break and will require a replay.
    Tuple\, Enum\\>
  • optional Key by columns: Columns on which to partition the input by key. Each drop duplicates will be computed separately in parallel for each distinct key value.
    Set\>

Examples

Example 1: Base case

Description: The first record at 00:00:00 is emitted and its state is scheduled for eviction at the next tumbling window boundary determined by the eviction window slide (default 1 minute). Although the configured timeout is 10 seconds, the subsequent records at 00:00:09, 00:00:18, and 00:00:28 are all dropped as duplicates because the watermark does not advance far enough for the eviction timer to fire. Duplicates are dropped between the key expiry and key expiry plus the eviction window slide.

Argument values:

  • Dataset: ri.foundry.main.dataset.test
  • Key expiration time unit: SECONDS
  • Key expiration time value: 10
  • Column subset: null
  • Eviction window slide: null
  • Key by columns: null

Input:

row_order day temperature measurement_timestamp
4 Monday 10.4 2024-09-30T00:00:28
3 Monday 10.3 2024-09-30T00:00:18
2 Monday 10.2 2024-09-30T00:00:09
1 Monday 10.1 2024-09-30T00:00:00

Output:

day temperature measurement_timestamp
Monday 10.1 2024-09-30T00:00:00

Example 2: Base case

Description: With deduplication partitioned by the day column, each key maintains independent state. The first record for Monday at 00:00:20 is emitted and advances the watermark. The record for Tuesday at 00:00:05 is dropped because it arrives too late: its event time plus the 10 second timeout (00:00:15) is behind the watermark (approximately 00:00:20). This occurs even though Tuesday has no prior records. The record for Wednesday at 00:00:25 is not late and is emitted as the first record for its key.

Argument values:

  • Dataset: ri.foundry.main.dataset.test
  • Key expiration time unit: SECONDS
  • Key expiration time value: 10
  • Column subset: null
  • Eviction window slide: null
  • Key by columns: {day}

Input:

row_order day temperature measurement_timestamp
3 Wednesday 22.1 2024-09-30T00:00:25
2 Tuesday 18.3 2024-09-30T00:00:05
1 Monday 20.5 2024-09-30T00:00:20

Output:

day temperature measurement_timestamp
Monday 20.5 2024-09-30T00:00:20
Wednesday 22.1 2024-09-30T00:00:25


中文翻译


时间限定去重(Time bounded drop duplicates)

支持:流式处理

从输入数据中删除指定列子集的重复行,已处理的行将在配置的事件时间(event time)后过期。延迟到达时间超过配置事件时间的行将被永久丢弃。按指定的键进行分区。每个去重操作将针对不同的键列值独立计算。

转换类别:其他

声明参数

  • 数据集(Dataset): 需要进行去重的数据集。
  • 键过期时间单位(Key expiration time unit): 数据去重等待时间的单位。
    枚举\<天、小时、毫秒、分钟、秒>
  • 键过期时间值(Key expiration time value): 数据去重等待时间的数值。
    字面量\<长整型>
  • 可选 列子集(Column subset): 若指定了列,则仅使用这些列判断唯一性;否则,默认使用流的分区键子集(key subset)来判断唯一性。
    集合\<列\<任意类型>>
  • 可选 驱逐窗口滑动(Eviction window slide): 驱逐的滚动窗口时长,表示过期状态被清除的频率。当事件时间超过指定超时时间时,状态被视为过期。重复行将在(键过期时间 : 键过期时间 + 驱逐滑动窗口] 范围内被删除(自上次发现重复行起算)。更改此值将被视为状态中断,需要重新回放。
    元组\<字面量\<长整型>, 枚举\<天、小时、毫秒、分钟、秒、周>>
  • 可选 键列(Key by columns): 用于按键对输入进行分区的列。每个去重操作将针对不同的键值并行独立计算。
    集合\<列\<二进制 | 布尔 | 字节 | 双精度浮点 | 浮点 | 整数 | 长整型 | 短整型 | 字符串 | 时间戳>>

示例

示例 1:基础情况

描述: 00:00:00 的第一条记录被输出,其状态计划在下一个由驱逐窗口滑动(默认 1 分钟)决定的滚动窗口边界处清除。尽管配置的超时时间为 10 秒,但后续的 00:00:09、00:00:18 和 00:00:28 的记录均作为重复行被丢弃,因为水位线(watermark)推进不足,无法触发驱逐计时器。重复行在键过期时间与键过期时间加驱逐窗口滑动之间被删除。

参数值:

  • 数据集: ri.foundry.main.dataset.test
  • 键过期时间单位: SECONDS
  • 键过期时间值: 10
  • 列子集: null
  • 驱逐窗口滑动: null
  • 键列: null

输入:

行序号 星期 温度 测量时间戳
4 星期一 10.4 2024-09-30T00:00:28
3 星期一 10.3 2024-09-30T00:00:18
2 星期一 10.2 2024-09-30T00:00:09
1 星期一 10.1 2024-09-30T00:00:00

输出:

星期 温度 测量时间戳
星期一 10.1 2024-09-30T00:00:00

示例 2:基础情况

描述: 按星期列分区进行去重时,每个键维护独立的状态。星期一在 00:00:20 的第一条记录被输出并推进水位线。星期二在 00:00:05 的记录因到达过晚而被丢弃:其事件时间加上 10 秒超时(00:00:15)落后于水位线(约 00:00:20)。即使星期二没有历史记录,此情况仍会发生。星期三在 00:00:25 的记录未延迟,作为该键的第一条记录被输出。

参数值:

  • 数据集: ri.foundry.main.dataset.test
  • 键过期时间单位: SECONDS
  • 键过期时间值: 10
  • 列子集: null
  • 驱逐窗口滑动: null
  • 键列: {day}

输入:

行序号 星期 温度 测量时间戳
3 星期三 22.1 2024-09-30T00:00:25
2 星期二 18.3 2024-09-30T00:00:05
1 星期一 20.5 2024-09-30T00:00:20

输出:

星期 温度 测量时间戳
星期一 20.5 2024-09-30T00:00:20
星期三 22.1 2024-09-30T00:00:25