跳转至

Joins in streaming Pipeline Builder pipelines(流式 Pipeline Builder 管道中的连接操作)

With Pipeline Builder for streaming, you can join your streams against both batch datasets and other streams. Given the low latency nature of streaming, the way joins are implemented differs from how they work in standard batch pipelines. This page explains how joins work and how best to leverage them in your pipelines.

Join streams with batch datasets

Foundry allows you to combine low latency streams with batch datasets in a manner similar to how you can join two batch datasets in a batch Pipeline Builder pipeline.

Complete the following steps to join a stream with a batch dataset in Pipeline Builder:

  1. Add the stream and the batch dataset to your Pipeline Builder graph.

  2. Under the batch dataset, select the dropdown menu and change the type to Snapshot.

    Image of stream type selection.

  3. Select the stream against which you want to join.

  4. Select Join, and select batch dataset for the right side of the join.

  5. Under Join Type, select Left Lookup Join.

  6. Enter the match conditions.

Architecture

Streaming joins against batch datasets work by initially downloading the batch dataset and indexing it in the streaming cluster to allow for low latency lookups. To make the join low-latency, transforms on the batch dataset are not permitted in the same Pipeline Builder pipeline before the join with the stream.

The batch dataset is updated when new transactions are written to the dataset. When a new transaction is added to the batch dataset, a background process will download the new view of the data and convert it into a queryable format. Once that process is complete, the stream will start joining against that new view of the batch dataset.

Limitations

:::callout{theme="warning"} You cannot transform the batch dataset before joining it against a stream. If you need to transform the batch dataset, you can do so in an upstream Pipeline Builder pipeline. :::

Consider the following limitations for streaming joins:

  • The left side of the join must be either a stream or a batch dataset with "Stream" read mode when joining against a batch dataset.
  • Performance may degrade if you join against batch datasets with more than 8-10GB of data.
  • The batch dataset will update at most once every five minutes if a new append transaction is detected.
  • Joining against large static datasets can slow down cluster startup time.

Join streams with other streams

Foundry allows you to combine multiple low latency streams, similar to how you can join multiple batch datasets in a batch Pipeline Builder pipeline.

Complete the following steps to join two streams in Pipeline Builder:

  1. Add the two streams to your Pipeline Builder graph.
  2. Select Join, and select the two streams.
  3. Under Join Type, select Outer Caching Join.
  4. Enter the match conditions.
  5. Specify the cache time values and units. The cache time values and units control how long data is stored in the cache we use to join the two streams.

:::callout{theme="neutral"} If you want a left or right join instead of an outer join, you can filter out records that have null values downstream of the join. For a right join, filter where the right side values are null; for a left join, filter where the right side values are null. :::

Architecture

Since streams run indefinitely and new records are constantly flowing into both sides of the join, joins between two streams operate on caches of data from each side of the join instead of joining against the entire stream.

Joins between multiple streams are limited to operate on a cache of data to prevent unbounded state growth, which would cause the streaming cluster to eventually run out of memory and crash. By setting expiration times for the caches on the left and right side of the join, the state required to store the records for the join is bounded; this prevents the streaming cluster from running out of memory.

Data is stored on a per-key basis and distributed across task managers to allow for larger joins. This means that to join against larger streams, you can increase the memory per task manager or increase the number of task managers the cluster is running with.

Records from the left side of the stream will always be joined against the most recent record of the right side, based on the key column specified in the join. Only the most recent record for a particular key will be joined.

Limitations

Consider the following limitations when joining streams with other streams:

  • A cache expiration time is required for both the left and right side of the joins to prevent unbounded state growth.
  • Only the most recent value per join key is stored for each side of the join. This means the join behaves like an "outer" join.
  • If a record arrives in either the left or right stream before the other side of the join has a match, a record will be emitted with null values for the other side of the join.

中文翻译

流式 Pipeline Builder 管道中的连接操作

借助流式 Pipeline Builder,您可以将流与批处理数据集以及其他流进行连接。由于流式处理具有低延迟特性,其连接实现方式与标准批处理管道有所不同。本页将说明连接的工作原理以及如何在管道中充分利用这些连接。

将流与批处理数据集连接

Foundry 允许您以类似于在批处理 Pipeline Builder 管道中连接两个批处理数据集的方式,将低延迟流与批处理数据集进行组合。

请按照以下步骤在 Pipeline Builder 中将流与批处理数据集连接:

  1. 将流和批处理数据集添加到您的 Pipeline Builder 图中。

  2. 在批处理数据集下,选择下拉菜单并将类型更改为快照(Snapshot)

    流类型选择示意图

  3. 选择您要连接的目标流。

  4. 选择连接(Join),并在连接右侧选择批处理数据集。

  5. 连接类型(Join Type)下,选择左查找连接(Left Lookup Join)

  6. 输入匹配条件。

架构

流与批处理数据集的连接工作原理是:首先下载批处理数据集并在流式集群中建立索引,以实现低延迟查找。为确保连接的低延迟性,在与流进行连接之前,不允许在同一 Pipeline Builder 管道中对批处理数据集进行转换。

当有新事务写入数据集时,批处理数据集会进行更新。当批处理数据集新增事务时,后台进程会下载数据的新视图并将其转换为可查询格式。该过程完成后,流将开始基于批处理数据集的新视图进行连接。

限制

:::callout{theme="warning"} 在与流连接之前,您不能对批处理数据集进行转换。如果需要对批处理数据集进行转换,可以在上游的 Pipeline Builder 管道中完成。 :::

请考虑流式连接的以下限制:

  • 与批处理数据集连接时,连接左侧必须是流或读取模式为"流(Stream)"的批处理数据集。
  • 如果连接的数据集超过 8-10GB,性能可能会下降。
  • 如果检测到新的追加事务,批处理数据集最多每五分钟更新一次。
  • 连接大型静态数据集可能会减慢集群启动时间。

将流与其他流连接

Foundry 允许您组合多个低延迟流,类似于在批处理 Pipeline Builder 管道中连接多个批处理数据集。

请按照以下步骤在 Pipeline Builder 中连接两个流:

  1. 将两个流添加到您的 Pipeline Builder 图中。
  2. 选择连接(Join),然后选择两个流。
  3. 连接类型(Join Type)下,选择外部缓存连接(Outer Caching Join)
  4. 输入匹配条件。
  5. 指定缓存时间值和单位。缓存时间值和单位控制数据在用于连接两个流的缓存中的存储时长。

:::callout{theme="neutral"} 如果您需要左连接或右连接而非外连接,可以在连接下游过滤掉包含空值的记录。对于右连接,过滤右侧值为空的记录;对于左连接,过滤左侧值为空的记录。 :::

架构

由于流会无限期运行,并且新记录会持续流入连接的两侧,因此两个流之间的连接是基于每侧数据的缓存进行操作,而不是对整个流进行连接。

多个流之间的连接仅限于在数据缓存上操作,以防止无限制的状态增长,否则会导致流式集群最终耗尽内存并崩溃。通过为连接左右两侧的缓存设置过期时间,存储连接记录所需的状态将受到限制,从而防止流式集群内存耗尽。

数据按每个键(key)存储并分布在各个任务管理器(task manager)之间,以支持更大的连接。这意味着要连接更大的流,您可以增加每个任务管理器的内存,或增加集群运行的任务管理器数量。

来自左侧流的记录将始终基于连接中指定的键列与右侧流的最新记录进行连接。对于特定键,只会连接最新的记录。

限制

将流与其他流连接时,请考虑以下限制:

  • 连接左右两侧都需要设置缓存过期时间,以防止无限制的状态增长。
  • 连接每侧仅存储每个连接键的最新值。这意味着连接的行为类似于"外连接(outer join)"。
  • 如果记录到达左侧或右侧流时,连接的另一侧尚未匹配,则会发出记录,其中连接的另一侧值为空。