Streaming keys(流键(Streaming keys))¶
Foundry streams allow you to specify one or more columns as key columns, as well as a primary key to identify a resolved record. The sections below explain how to set up and maintain both partitions and primary keys for your streaming pipeline.
Partition keys¶
Generally, primary keys in the Palantir platform are used to uniquely identify a database record. However, partition keys in Foundry streams group records with the same key, such as all readings for a particular device, or all transactions for a particular customer. In contrast to primary keys in batch pipelines, partition keys in streams do not deduplicate records since they do not uniquely identify records.
Keys in Foundry allow users to ensure all records for a particular key will maintain order. When the record enters Foundry, it is stored internally in Kafka. Kafka guarantees that records written to the same partition will maintain order on read and write. If a record is sent to Foundry with no key, the record will be written to any partition in the internal Kafka topic in a round-robin style. However, if the user specifies a key for the record, the record will be written to the dedicated partition for that key, thus maintaining order when consumed downstream.
Similarly, Flink streaming jobs automatically maintain ordering based on the partition key column(s) set on input sources. Flink transform jobs may run with one or more parallel partitions per transform operation. For partition keyed input streams, Flink jobs will automatically send all records with the same values for all key columns to the same parallel operator instance. Unless specifically re-keyed (as with the Key by transform), partitioning and ordering for the entire length of the pipeline is determined by the key columns and row values as they were on the source, even if transform logic drops columns or overwrites values.
Primary keys: Change data capture (CDC) mode¶
Primary keys in Foundry streams work similarly to those used in relational databases and in batch datasets. However, streaming primary keys consist of a set of deduplication columns that uniquely identify a resolved record and do not specify ordering columns.
The primary key is a piece of metadata that lives in the schema of a stream in Foundry. The key does not impact the contents of the stored data or how streaming data pipelines and transforms are applied. Primary keys control how some Foundry consumers read the data; the full data can be thought of as a changelog, while the primary key tells consumers how to compute a deduplicated current view of the data after applying all changes.
The current view is a filtered view of the data that only contains the most recently streamed record for each key. The full most recent record for a key will always be retained, even if it contains nulls. If a deleted column is specified, and the most recently streamed record for a key has this value set to true, the record will be filtered.
The following example shows a primary key and data stream, where the most recently streamed rows are higher in the table.
Primary key: {Dedeuplication column(s): [Key], isDeletedColumn: Optional[isDeleted]}
| Key | Value | isDeleted |\
|------|-----------|-----------|\
| Key2 | null | false |
| Key1 | thirdVal | true |
| Key2 | secondVal | false |
| Key1 | firstVal | false |
This stream of changelogs, when read by a CDC-aware consumer, will be read as the following current view:
| Key | Value | isDeleted |\
|------|-----------|-----------|\
| Key2 | null | false |
If you choose to set a primary key for your streaming data, you must also set the same columns as partition keys to maintain ordering and correctly resolve deduplicated data views. Once you set a primary key in the streaming setup interface, the same columns will be automatically added as partition key columns.
The following two CDC-aware jobs will automatically read primary keyed streaming data as a current view:
- Hydrate the Ontology with a primary keyed stream: New update records will cause a new object to be created, updated, or deleted as necessary.
- A stream's archive dataset is read by any Spark transformation job: Before any transformations occur, the source will be deduplicated. Notably, almost all interactions with the archive dataset run a Spark job, including Dataset Previews and Contour analyses. When viewing the archive dataset, it will always appear as a deduplicated current view, even though the data itself contains the full changelog (if you download the data as a file). The full changelog will be processed by any streaming job that does not run Spark, including replays.
Key propagation¶
Pipeline Builder tracks the evolution of both partition key and primary key columns through pipeline transforms and writes any valid keys to the schema of the output stream. If your transforms do not invalidate any key columns, the same partitioning and deduplication instructions will be automatically maintained in a sequence of any number of downstream pipelines.
If you rename a key column, the key will update to contain the new column name. Similarly, if you apply a transform that removes or overwrites a key column, that column will be removed from the key. Since any overwriting of key column contents may represent a new ordering guarantee or a new deduplication strategy, Pipeline Builder drops the column from the key entirely. You must apply the Key by transform again if you want to keep a key column that was previously overwritten. Even when partition key columns are dropped due to removals or overwrites, the same ordering guarantee will persist for the remainder of the pipeline unless you re-key.
:::callout{theme="warning"} If all partition keys or deduplication CDC keys are removed or overwritten, the key will be dropped entirely. Be mindful when deleting or overwriting the contents of key columns; doing so may lead to unexpected results, including losing ordering guarantees or deduplication strategies. :::
Currently, keys never propagate for user-defined functions (UDFs). Since the function is user-defined, there is no way to infer which key propagation strategy, if any, the user intends. If you intend to propagate keys, be sure to apply the Key by transform after your UDF.
Additionally, primary keys do not propagate for stateful transforms (most transforms are not stateful).
Use a streaming key¶
In a streaming pipeline in the Pipeline Builder application, add the Key by transform to your graph. Notably, any keys you set here will override and replace any keys that were on the input data.
If you only want to set a partition key, toggle off the CDC mode option and only supply the Key by columns list. The other parameters are not required or allowed unless you are in CDC mode.
To set both a primary and partition key, toggle on the CDC mode option. If you have an isDeleted column, optionally specify it in the Primary key is deleted field. For streaming use cases, we highly recommend leaving the optional Primary key ordering columns parameter blank. The transform will set both a partition and primary key, where the partition key columns are the same as primary key deduplication columns. The Primary key ordering columns parameter matters only when consuming the archive dataset in a batch job and will never impact how streaming thinks about deduplication. The option to specify ordering columns is available for backwards compatibility, users of batch transforms, or users who intend to consume the stream archive in a specific batch-only way.
Check current keys¶
The following sections describe how to find and verify streaming key logic in your Foundry stream.
Check partition keys¶
Open your streaming dataset, navigate to the Details tab, then open the Schema tab to view the data schema in JSON.
Search for includeInParitioning, which will appear in an element of the fieldSchemaList for each column that is part of the primary key:
"customMetadata": {
"includeInPartitioning": true
}
If you do not see any schema fields with includeInParitioning, your stream is not keyed and ordering is not guaranteed for how your data will be stored or processed. To manually add keys, edit the schema as JSON text and insert the custom metadata (as described above) to each schema field (column) that you want to set as a partition key column.
:::callout{theme="neutral"} If a stream already has one or more partition keys, adding a new partition key column will cause a weaker ordering guarantee since there will be more partitions; order is only guaranteed to be maintained for rows that share the same value for all partition key columns. :::
Before deployment, if partition key columns are set on an input stream to a pipeline, ordering will be guaranteed for that source and all its transforms for the entire pipeline unless you intentionally re-key. Partition key columns may appear in a data preview with a key symbol.
Check primary keys¶
Open your streaming dataset, navigate to the Details tab, then open the Schema tab to view the data schema in JSON.
You will see a JSON property named primaryKey. If your stream has deduplication columns called uniqueCol1 and uniqueCol2, and the isDeleted column is isDeletedCol, the schema should appear as follows:
"primaryKey": {
"columns": [
"uniqueCol1",
"uniqueCol2"
],
"resolution": {
"type": "duplicate",
"duplicate": {
"resolutionStrategy": {
"type": "customStrategy",
"customStrategy": {}
},
"deletionColumn": "isDeletedCol"
}
}
}
If no primary key is set, the schema will show null:
"primaryKey": null
To manually set or remove the primary key, you may edit the schema JSON to specify a key in the above format, or use null to remove the key. If you manually set primary keys, we strongly recommend setting the same columns as partition key columns.
:::callout{theme="neutral"} The only way to guarantee ordering is to set partition key columns on your entire streaming lineage. Once set, the partition key columns will automatically propagate downstream. Even if a stream is configured to have only one partition, ordering is not necessarily guaranteed due to the way Flink applications scale and process records non-deterministically. :::
Streaming key best practices¶
Choose partition keys carefully, as keys that result in inefficiently distributed records can artificially increase load and limit throughput. If ordering is important to your use case, then set a partition key to a generic grouping identifier on which you want to maintain order, such as email ID, customer ID, or organization ID. If ordering is not important for your use case, you can choose to use a unique ID as a key or to not use a key at all for your stream.
The ordering guarantee of your final streaming output will be as strong as the weakest guarantee in your streaming series (backed by Kafka topics) and transforms (Flink jobs) that lead to your output. Therefore, make sure your desired ordering is maintained and that correct partition keys are set for your entire data lineage, starting from an initial streaming extract that pulls records into Foundry. The ordering guarantee will be no stronger than the system from which you are extracting to Foundry. For example, if you are using a Kafka connector to extract from Kafka, set partition key columns equal to the Kafka key column to allow Foundry to maintain an equivalent ordering guarantee on your system.
Additionally, problems can arise if you completely change the partition columns (and the ordering guarantee) during a series of data transforms. If a different ordering was guaranteed before applying a new Key by transform, the transform will receive records that are out of order from the newly added key columns; these records will remain in the wrong order during the transform series.
中文翻译¶
流键(Streaming keys)¶
Foundry 流允许您指定一个或多个列作为键列,以及一个主键来标识解析后的记录。以下部分将介绍如何为流处理管道设置和维护分区及主键。
分区键(Partition keys)¶
通常,Palantir 平台中的主键用于唯一标识数据库记录。然而,Foundry 流中的分区键用于对具有相同键的记录进行分组,例如特定设备的所有读数,或特定客户的所有交易。与批处理管道中的主键不同,流中的分区键不会对记录进行去重,因为它们不能唯一标识记录。
Foundry 中的键允许用户确保特定键的所有记录保持顺序。当记录进入 Foundry 时,它会在内部存储在 Kafka 中。Kafka 保证写入同一分区的记录在读取和写入时保持顺序。如果发送到 Foundry 的记录没有键,该记录将以轮询方式写入内部 Kafka 主题的任何分区。但是,如果用户为记录指定了键,该记录将写入该键的专用分区,从而在下游消费时保持顺序。
同样,Flink 流处理作业会根据输入源上设置的分区键列自动维护顺序。Flink 转换作业在运行时,每个转换操作可能包含一个或多个并行分区。对于按分区键输入的流,Flink 作业会自动将所有键列值相同的记录发送到同一个并行算子实例。除非特别重新设置键(如使用 Key by 转换),否则整个管道的分区和顺序由源端的键列和行值决定,即使转换逻辑删除了列或覆盖了值也是如此。
主键:变更数据捕获(CDC)模式¶
Foundry 流中的主键与关系数据库和批处理数据集中使用的主键类似。但是,流主键由一组去重列组成,用于唯一标识解析后的记录,且不指定排序列。
主键是存在于 Foundry 流模式中的一段元数据。该键不会影响存储数据的内容,也不会影响流数据管道和转换的应用方式。主键控制某些 Foundry 消费端读取数据的方式;完整的数据可以被视为变更日志,而主键则告诉消费端在应用所有变更后如何计算去重后的当前视图。
当前视图是数据的过滤视图,仅包含每个键最近流式传输的记录。某个键的完整最新记录将始终被保留,即使它包含 null。如果指定了删除列,且该键最近流式传输的记录将此值设置为 true,则该记录将被过滤。
以下示例展示了主键和数据流,其中最近流式传输的行在表中位置更靠上。
主键:{Dedeuplication column(s): [Key], isDeletedColumn: Optional[isDeleted]}
| Key | Value | isDeleted |\
|------|-----------|-----------|\
| Key2 | null | false |
| Key1 | thirdVal | true |
| Key2 | secondVal | false |
| Key1 | firstVal | false |
当支持 CDC 的消费端读取此变更日志流时,将读取为以下当前视图:
| Key | Value | isDeleted |\
|------|-----------|-----------|\
| Key2 | null | false |
如果您选择为流数据设置主键,则还必须将相同的列设置为分区键,以维护顺序并正确解析去重后的数据视图。在流设置界面中设置主键后,相同的列将自动添加为分区键列。
以下两个支持 CDC 的作业将自动把具有主键的流数据作为当前视图读取:
- 使用具有主键的流填充本体(Ontology): 新的更新记录将根据需要创建、更新或删除新对象。
- 任何 Spark 转换作业读取流的归档数据集: 在执行任何转换之前,源数据将被去重。值得注意的是,几乎所有与归档数据集的交互都会运行 Spark 作业,包括数据集预览和 Contour 分析。查看归档数据集时,它将始终显示为去重后的当前视图,即使数据本身包含完整的变更日志(如果您将数据下载为文件)。任何不运行 Spark 的流处理作业(包括重放)都将处理完整的变更日志。
键传播(Key propagation)¶
Pipeline Builder 会跟踪分区键和主键列在管道转换过程中的演变,并将任何有效的键写入输出流的模式。如果您的转换没有使任何键列失效,相同的分区和去重指令将在任意数量的下游管道序列中自动维护。
如果重命名键列,键将更新为包含新的列名。同样,如果应用的转换删除或覆盖了键列,该列将从键中移除。由于对键列内容的任何覆盖都可能代表新的顺序保证或新的去重策略,Pipeline Builder 会完全从键中删除该列。如果您想保留之前被覆盖的键列,必须再次应用 Key by 转换。即使分区键列因删除或覆盖而被丢弃,除非您重新设置键,否则相同的顺序保证将在管道的剩余部分持续存在。
:::callout{theme="warning"} 如果所有分区键或去重 CDC 键被移除或覆盖,该键将被完全删除。在删除或覆盖键列内容时请务必谨慎;这样做可能会导致意外结果,包括失去顺序保证或去重策略。 :::
目前,键绝不会在用户定义函数(UDFs)中传播。由于函数是用户定义的,因此无法推断用户打算采用哪种键传播策略(如果有的话)。如果您打算传播键,请务必在 UDF 之后应用 Key by 转换。
此外,主键不会在有状态转换中传播(大多数转换不是有状态的)。
使用流键¶
在 Pipeline Builder 应用程序的流处理管道中,将 Key by 转换添加到您的图中。值得注意的是,您在此处设置的任何键都将覆盖并替换输入数据上的任何现有键。
如果只想设置分区键,请关闭 CDC mode 选项,并仅提供 Key by columns 列表。除非处于 CDC 模式,否则不需要也不允许使用其他参数。
要同时设置主键和分区键,请开启 CDC mode 选项。如果您有 isDeleted 列,可以选择在 Primary key is deleted 字段中指定它。对于流处理用例,我们强烈建议将可选的 Primary key ordering columns 参数留空。该转换将同时设置分区键和主键,其中分区键列与主键去重列相同。Primary key ordering columns 参数仅在批处理作业中消费归档数据集时重要,绝不会影响流处理对去重的理解。指定排序列的选项是为了向后兼容、批处理转换用户,或打算以特定批处理方式消费流归档的用户提供的。
检查当前键¶
以下部分介绍如何在 Foundry 流中查找和验证流键逻辑。
检查分区键¶
打开您的流数据集,导航到 Details 选项卡,然后打开 Schema 选项卡以 JSON 格式查看数据模式。
搜索 includeInParitioning,它将出现在属于主键的每一列的 fieldSchemaList 元素中:
"customMetadata": {
"includeInPartitioning": true
}
如果您没有看到任何带有 includeInParitioning 的模式字段,则说明您的流未设置键,并且无法保证数据存储或处理的顺序。要手动添加键,请将模式编辑为 JSON 文本,并将自定义元数据(如上所述)插入到您想设置为分区键列的每个模式字段(列)中。
:::callout{theme="neutral"} 如果流已经具有一个或多个分区键,添加新的分区键列会导致更弱的顺序保证,因为分区数量会增加;只有对于所有分区键列具有相同值的行,才能保证顺序。 :::
在部署之前,如果在管道的输入流上设置了分区键列,则该源及其在整个管道中的所有转换都将保证顺序,除非您故意重新设置键。分区键列可能会在数据预览中显示为带有键符号。
检查主键¶
打开您的流数据集,导航到 Details 选项卡,然后打开 Schema 选项卡以 JSON 格式查看数据模式。
您将看到一个名为 primaryKey 的 JSON 属性。如果您的流具有名为 uniqueCol1 和 uniqueCol2 的去重列,且 isDeleted 列为 isDeletedCol,则模式应如下所示:
"primaryKey": {
"columns": [
"uniqueCol1",
"uniqueCol2"
],
"resolution": {
"type": "duplicate",
"duplicate": {
"resolutionStrategy": {
"type": "customStrategy",
"customStrategy": {}
},
"deletionColumn": "isDeletedCol"
}
}
}
如果未设置主键,模式将显示 null:
"primaryKey": null
要手动设置或移除主键,您可以编辑模式 JSON 以指定上述格式的键,或使用 null 移除键。如果您手动设置主键,我们强烈建议将相同的列设置为分区键列。
:::callout{theme="neutral"} 保证顺序的唯一方法是在整个流数据血缘中设置分区键列。设置后,分区键列将自动向下游传播。即使流被配置为只有一个分区,由于 Flink 应用程序扩展和非确定性处理记录的方式,也不一定能保证顺序。 :::
流键最佳实践¶
请谨慎选择分区键,因为导致记录分布效率低下的键可能会人为增加负载并限制吞吐量。如果顺序对您的用例很重要,请将分区键设置为您希望保持顺序的通用分组标识符,例如电子邮件 ID、客户 ID 或组织 ID。如果顺序对您的用例不重要,您可以选择使用唯一 ID 作为键,或者完全不为流使用键。
最终流输出的顺序保证强度取决于流序列(由 Kafka 主题支持)和转换(Flink 作业)中最弱的保证。因此,请确保维护所需的顺序,并为整个数据血缘设置正确的分区键,从将记录提取到 Foundry 的初始流提取开始。顺序保证不会强于您提取数据的源系统。例如,如果您使用 Kafka 连接器从 Kafka 提取数据,请将分区键列设置为等于 Kafka 键列,以便 Foundry 在您的系统上保持等效的顺序保证。
此外,如果在一系列数据转换期间完全更改分区列(以及顺序保证),可能会出现问题。如果在应用新的 Key by 转换之前保证了不同的顺序,该转换将接收与新添加的键列顺序不符的记录;这些记录在转换序列中将保持错误的顺序。