跳转至

Dataset projections(数据集投影(Dataset projections))

Dataset projections can improve performance for a large class of queries. If you have a dataset you want to optimize for multiple query patterns (for example, filtering on two separate columns), you should consider adding projections to the dataset. Specific examples of use cases for projections are listed below.

Each projection typically supports a single query pattern, focused on either improving filters on a set of columns or joins on a set of columns. Multiple projections can be added to a dataset, and all column types are supported in projections. Additionally, projections can be added both to snapshot or incrementally built datasets. The Noho service is used to manage projections and is referenced in the dataset schema when setting up a projection.

Projections have some limitations:

  • Projections can only be added to datasets that are append-only. In an append-only dataset, rows can only be added (that is, appended) to the dataset; files or transactions cannot be manually deleted from an append-only dataset. Specifically, the only transactions that can be performed on an append-only dataset are APPEND, SNAPSHOT, and UPDATE transactions that add rows to the dataset. The projection system will automatically identify datasets that violate this requirement and disable projections on them, but as a user, you should still avoid this scenario.
  • Projections do not support schema evolution, even for incrementally consistent operations like adding columns.

Review the example use cases below to learn about whether projections are a good fit for your use case. Get started by learning how to set up a projection.

Use cases

Many types of queries will benefit from a projection. The examples below will demonstrate the following:

Filter on a list of columns

Given an ordered list of columns, optimize filters on any prefix of the list. Projections will only speed up filters that compare a column to a constant value. And any filters on a string column must be case-sensitive.

For example, say that there is a projection optimized for filters on the ordered list of columns ["x", "y", "z"]. It will speed up the following types of queries:

  • SELECT * FROM dataset WHERE x = 5 AND y = 10 AND z = '15'
  • SELECT * FROM dataset WHERE x = 5 AND y = 10
  • SELECT * FROM dataset WHERE x = 5 AND q = 3
  • If there are additional filters on other columns that are not in the configured list, as in this case with q = 3, Spark will automatically attempt to unpack the filter conditions it can "push" into the data source (in this case, x = 5) and apply the other conditions afterwards.

But the following types of queries will not be optimized:

  • SELECT * FROM dataset WHERE abs(x) == 10
  • abs(x) == 10 does not compare a column with a constant value.
  • SELECT * FROM dataset WHERE x % 100 == 10
  • x % 100 == 10 does not compare a column with a constant value.
  • SELECT * FROM dataset WHERE y = 10
  • ["y"] is not a prefix of ["x", "y", "z"].
  • SELECT * FROM dataset WHERE z = '15'
  • ["z"] is not a prefix of ["x", "y", "z"].
  • SELECT * FROM dataset WHERE x = 5 OR q = 3
  • Spark won't be able to "push" the x = 5 filter into the datasource because that would miss rows where x = 5 is false but q = 3 is true.

Range queries

Projections can optionally speed up range queries on the filter columns, for example:

  • SELECT * FROM dataset WHERE x > 5 AND x < 10
  • SELECT * FROM dataset WHERE s LIKE 'SOME_PREFIX%'

Limitations

  • It is not possible to create a projection that includes a non-primitive (eg. array) column and that also supports range queries on the filter columns.

Join on a set of columns

Given an unordered set of columns and a bucket count, optimize joins (only) on that exact set and bucket count.

For example, a projection optimized for joins on {"x", "y"} will optimize the following types of queries:

  • SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.x AND dataset1.y = dataset2.y

But the following queries will not be optimized:

  • SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.x

Join projected and non-projected datasets

In Foundry, joins of large datasets typically perform a sort-merge join. This involves partitioning each of the datasets according to the join key, sorting each partition on that key, and then merging the (sorted) partitions with the same key.

  • In the general case, this involves shuffling and sorting both of the datasets.
  • If one of the datasets has a projection that is optimized for joining on the join columns, it will not be shuffled or sorted, though the other dataset will be.
  • If both of the datasets have projections that are optimized for joining on the join columns, and use the same number of buckets, then neither dataset will be shuffled or sorted. This can lead to dramatic performance improvements. The same goes for joining a dataset that has a join-optimized projection with an explicitly bucketed dataset (with no projections), though again the number of buckets and join columns must match exactly.

:::callout{theme="neutral"} Most projection consumers in Foundry do not by default take advantage of the fact that the projection is already sorted when doing sort-merge joins, so you may still see a sort in your Spark query plan. In transforms, you can use the Spark profile BUCKET_SORTED_SCAN_ENABLED to modify Spark's behavior based on the fact that the projection is sorted, but this does not always improve performance and can actually make performance worse. :::

Aggregate on a pre-specified set of columns

In Foundry, aggregations usually involve performing a shuffle exchange on the dataset (i.e., partitioning the dataset) according to the aggregation key. When reading from a projection that is hash-bucketed (which is the case for all join-optimized projections, but may not be the case for a filter-optimized projection), consumers can avoid this shuffle, which can lead to dramatic performance improvements during aggregations. A primary key expectation check on a dataset that is updated incrementally in an append-only fashion is a case where a projection (on the primary key columns) is especially beneficial, both because of the compaction performed by the projection and because of the fact that the hash-bucketing of the projection can be leveraged in the aggregation used to compute the expectation.

Read from incremental pipelines

Incremental pipelines can result in very high file counts, and correspondingly degraded read performance. For example, a pipeline that writes ten partitions every five minutes will write over a million files per year. These are difficult to read for many reasons, including such things as simply producing the list of input partitions.

Projections provide a way to compact incremental pipelines such as these transparently. Just set up a projection, optimized for either filters or joins, and reads will use the projection. Learn more about using projections for incremental pipelines.

Note that consumers can still take advantage of the filter or join optimizations of the projection, even if the projection is not fully up-to-date with the canonical dataset, as long as there were no SNAPSHOT or DELETE transactions (or UPDATE transactions that modified existing files) on the canonical dataset between the latest transaction at the time that the projection was last built and the current latest transaction on the canonical dataset.

Query uploaded data

CSV is an inefficient file format for querying data, but it is common for data that is uploaded to Foundry (via Data Connection from a file system source, manual uploads, etc.) to initially be in CSV format. Creating a transform job is one way to convert these CSVs to a more efficient format such as Parquet. Alternatively, you can add a projection; read operations will use the optimized projection for better performance, while the dataset still contains the original CSV files.


中文翻译

数据集投影(Dataset projections)

数据集投影可以显著提升大量查询(Query)的性能。如果您有一个数据集,并希望针对多种查询模式(例如,对两个不同的列进行过滤)进行优化,则应考虑为该数据集添加投影。下文列出了投影用例的具体示例。

每个投影通常支持单一查询模式,主要侧重于优化对一组列的过滤(Filter)或对一组列的连接(Join)。一个数据集可以添加多个投影,且投影支持所有列类型。此外,快照数据集和增量构建数据集均可添加投影。Noho 服务用于管理投影,在设置投影时会在数据集 Schema 中引用该服务。

投影存在一些限制:

  • 投影只能添加到仅追加(Append-only)数据集中。在仅追加数据集中,只能向数据集添加(即追加)行;无法手动删除仅追加数据集中的文件或事务(Transaction)。具体而言,对仅追加数据集只能执行向数据集添加行的 APPENDSNAPSHOTUPDATE 事务。投影系统会自动识别违反此要求的数据集并禁用其投影,但作为用户,您仍应避免这种情况。
  • 投影不支持 Schema 演进(Schema evolution),即使是添加列等增量一致性操作也不支持。

请查看下方的示例用例,了解投影是否适合您的使用场景。您可以从学习如何设置投影开始。

用例

多种类型的查询都能从投影中获益。以下示例将演示以下内容:

对列列表进行过滤

给定一个有序列列表,优化对该列表任意前缀的过滤。投影仅会加速将列与常量值进行比较的过滤操作。并且,对字符串列的任何过滤都必须是区分大小写的。

例如,假设有一个针对有序列列表 ["x", "y", "z"] 过滤进行优化的投影。 它将加速以下类型的查询:

  • SELECT * FROM dataset WHERE x = 5 AND y = 10 AND z = '15'
  • SELECT * FROM dataset WHERE x = 5 AND y = 10
  • SELECT * FROM dataset WHERE x = 5 AND q = 3
  • 如果对未包含在配置列表中的其他列有额外过滤(如此例中的 q = 3),Spark 会自动尝试解包它可以“下推”到数据源的过滤条件(此例中为 x = 5),然后再应用其他条件。

但以下类型的查询不会被优化:

  • SELECT * FROM dataset WHERE abs(x) == 10
  • abs(x) == 10 不是将列与常量值进行比较。
  • SELECT * FROM dataset WHERE x % 100 == 10
  • x % 100 == 10 不是将列与常量值进行比较。
  • SELECT * FROM dataset WHERE y = 10
  • ["y"] 不是 ["x", "y", "z"] 的前缀。
  • SELECT * FROM dataset WHERE z = '15'
  • ["z"] 不是 ["x", "y", "z"] 的前缀。
  • SELECT * FROM dataset WHERE x = 5 OR q = 3
  • Spark 无法将 x = 5 过滤“下推”到数据源,因为这会遗漏 x = 5 为假但 q = 3 为真的行。

范围查询

投影还可以选择性地加速对过滤列的范围查询,例如:

  • SELECT * FROM dataset WHERE x > 5 AND x < 10
  • SELECT * FROM dataset WHERE s LIKE 'SOME_PREFIX%'

限制

  • 无法创建包含非基本类型(如数组)列且同时支持对过滤列进行范围查询的投影。

对一组列进行连接

给定一组无序列和一个分桶数,仅针对该确切列集合和分桶数优化连接。

例如,针对 {"x", "y"} 连接优化的投影将优化以下类型的查询:

  • SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.x AND dataset1.y = dataset2.y

但以下查询不会被优化:

  • SELECT * FROM dataset1 INNER JOIN dataset2 ON dataset1.x = dataset2.x

连接投影数据集与非投影数据集

在 Foundry 中,大型数据集的连接通常执行排序合并连接(Sort-merge join)。这涉及根据连接键对每个数据集进行分区,按键对每个分区进行排序,然后将具有相同键的(已排序)分区进行合并。

  • 在一般情况下,这涉及对两个数据集进行洗牌(Shuffle)和排序。
  • 如果其中一个数据集具有针对连接列进行优化的投影,则该数据集不会被洗牌或排序,但另一个数据集仍会。
  • 如果两个数据集都具有针对连接列进行优化的投影,并且使用相同的分桶数,则两个数据集都不会被洗牌或排序。这可以带来显著的性能提升。将具有连接优化投影的数据集与显式分桶的数据集(无投影)进行连接时也是如此,但同样,分桶数和连接列必须完全匹配。

:::callout{theme="neutral"} Foundry 中的大多数投影消费端在执行排序合并连接时,默认不会利用投影已排序这一特性,因此您可能仍会在 Spark 查询计划中看到排序操作。在 Transforms 中,您可以使用 Spark profile BUCKET_SORTED_SCAN_ENABLED 来基于投影已排序的事实修改 Spark 的行为,但这并不总能提升性能,有时甚至会导致性能下降。 :::

在预指定的一组列上进行聚合

在 Foundry 中,聚合(Aggregation)通常涉及根据聚合键对数据集执行洗牌交换(即对数据集进行分区)。从哈希分桶的投影中读取数据时(所有连接优化投影均属此类,但过滤优化投影可能不是),消费端可以避免这种洗牌,从而在聚合期间带来显著的性能提升。对以仅追加方式增量更新的数据集进行主键期望检查是一个投影(针对主键列)特别有益的场景,这既得益于投影执行的压缩操作,也得益于在用于计算期望的聚合中可以利用投影的哈希分桶特性。

从增量管道读取

增量管道(Incremental pipelines)可能会导致文件数量激增,并相应地降低读取性能。例如,一个每五分钟写入十个分区的管道,每年将写入超过一百万个文件。由于诸多原因,这些文件很难读取,其中包括仅仅是生成输入分区列表这一操作。

投影提供了一种透明地压缩此类增量管道的方法。只需设置一个针对过滤或连接优化的投影,读取操作就会使用该投影。了解有关将投影用于增量管道的更多信息。

请注意,即使投影未与规范数据集完全同步,消费端仍可利用投影的过滤或连接优化,前提是自投影上次构建时的最新事务到规范数据集当前最新事务之间,规范数据集上没有发生 SNAPSHOT 或 DELETE 事务(或修改现有文件的 UPDATE 事务)。

查询上传的数据

CSV 是一种低效的数据查询文件格式,但上传到 Foundry 的数据(通过来自文件系统源的 Data Connection、手动上传等)通常最初为 CSV 格式。创建 Transform 作业是将这些 CSV 转换为 Parquet 等更高效格式的一种方法。或者,您可以添加投影;读取操作将使用优化后的投影以获得更好的性能,而数据集中仍保留原始 CSV 文件。