Hive-style partitioning(Hive风格分区(Hive-style partitioning))¶
Hive-style partitioning is a method for optimizing the layout of data in a dataset in order to dramatically improve the performance of queries that filter on particular columns. In the context of Foundry Spark-based transforms, hive-style partitioning is performed in the following fashion:
- When writing data to the output dataset, for each partition in the Spark dataframe and each combination of unique values for the specified partition columns, write a separate file to the output dataset.
- For each file written in this way, include the partition column values in the file path.
- In the transaction metadata, record the fact that the dataset is partitioned by the partition columns.
Dataset readers that use compute engines such as Spark or Polars and that filter on these columns can automatically leverage the metadata in the transaction metadata and file paths in order to narrow down the files to read.
Because at least one file is written for each unique combination of partition column values in the data, and writing an excessive amount of files results in poor write and subsequent read performance, hive-style partitioning is not suited for columns with very high cardinality (many unique values and only a few rows for each value).
Configuration of hive-style partitioning¶
The below minimal examples show how to configure hive-style partitioning when writing data to the output in Python and Java.
In these examples, we repartition the dataframe using repartitionByRange ↗ on the partition columns before writing to the output. Repartitioning ensures that the output contains only one file per unique combination of partition column values, rather than one file per unique combination of partition column values in each input dataframe partition. Skipping this repartition step can result in an excessive amount of files in the output dataset, causing poor write and read performance.
:::callout{theme="neutral"}
repartitionByRange is generally preferred over repartition ↗ in the context of hive-style partitioning because repartitionByRange uses sampling to estimate partition ranges that will distribute data as evenly as possible. Conversely, repartition uses a hash function modulo by the number of partitions to assign values to dataframe partitions; for columns with low cardinality, this hash-and-modulo operation has a high likelihood of distributing data unevenly, even if the original data is relatively evenly distributed across values. Uneven data distribution (skew) can cause Spark executor out-of-memory errors and job failures.
:::
Python transforms example of hive-style partitioning¶
from transforms.api import transform, Input, Output
@transform(
transform_output=Output("/path/to/output"),
transform_input=Input("/path/to/input"),
)
def compute(transform_output, transform_input):
transform_output.write_dataframe(
transform_input.dataframe().repartitionByRange("record_date", "department"),
partition_cols=["record_date", "department"],
)
Java transforms example of hive-style partitioning¶
package myproject.datasets;
import com.palantir.foundry.spark.api.DatasetFormatSettings;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import static org.apache.spark.sql.functions.col;
public final class HivePartitioningInJava {
@Compute
public void myComputeFunction(
@Input("ri.foundry.main.dataset.e2dd4bcf-7985-461c-9d08-ee0edd734a1a") FoundryInput myInput,
@Output("ri.foundry.main.dataset.4b62bf9b-3700-40f6-9e85-505eaf87e57d") FoundryOutput myOutput) {
myOutput.getDataFrameWriter(
myInput.asDataFrame().read().repartitionByRange(col("record_date"), col("department")))
.setFormatSettings(DatasetFormatSettings.builder()
.addPartitionColumns("record_date", "department")
.build())
.write();
}
}
Advanced repartitionByRange usage¶
In the above code examples, we invoke repartitionByRange without specifying a partition count, and we specify the same partition columns as we do in the hive-style partitioning settings. This simple implementation is generally fine, but there are two situations involving very large-scale data in which it can lead to issues.
- If the data for a single value combination is too large to fit into a single Spark executor's memory, we will encounter out-of-memory errors, because
repartitionByRange, likerepartition, assigns each unique value combination to exactly one partition. - Because we do not specify a partition count, Spark will use the default number of partitions as configured by the
spark.sql.shuffle.partitionssetting. If the unique number of value combinations of the partition columns is greater than that value, then at least one partition will contain the data for multiple value combinations. This increases the likelihood of Spark out-of-memory errors, even if the data for a single value combination is small enough to fit into a single Spark executor's memory.
The below Python code sample represents a typical implementation that avoids both of these issues, at the cost of increased complexity. Per the sample's logic, each unique value combination of department and record_date will be spread across an average of eight Spark partitions, meaning that each value combination will have roughly eight files in the output dataset instead of one.
from transforms.api import transform, Input, Output
@transform(
transform_output=Output("/path/to/output"),
transform_input=Input("/path/to/input"),
)
def compute(transform_output, transform_input):
input_df = transform_input.dataframe()
unique_date_department_combinations = input_df.select("department", "record_date").distinct().count()
partition_count = unique_date_department_combinations * 8
transform_output.write_dataframe(
input_df.repartitionByRange(partition_count, "department", "record_date", "record_timestamp"),
partition_cols=["department", "record_date"],
)
中文翻译¶
Hive风格分区(Hive-style partitioning)¶
Hive风格分区是一种优化数据集中数据布局的方法,旨在显著提升对特定列进行过滤的查询性能。在Foundry基于Spark的转换(transform)上下文中,Hive风格分区按以下方式执行:
- 将数据写入输出数据集时,对于Spark数据框(dataframe)中的每个分区以及指定分区列的唯一值组合,分别向输出数据集写入一个独立文件。
- 对于以这种方式写入的每个文件,在文件路径中包含分区列的值。
- 在事务元数据(transaction metadata)中记录该数据集已按分区列进行分区的事实。
使用Spark或Polars等计算引擎并对这些列进行过滤的数据集读取器,可以自动利用事务元数据和文件路径中的元数据来缩小需要读取的文件范围。
由于数据中每个唯一的分区列值组合至少会写入一个文件,而写入过多文件会导致写入性能下降并影响后续读取性能,因此Hive风格分区不适用于基数(cardinality)非常高的列(即唯一值很多但每个值对应的行数很少)。
Hive风格分区的配置¶
以下最小示例展示了如何在Python和Java中配置向输出写入数据时的Hive风格分区。
在这些示例中,我们在写入输出之前,使用repartitionByRange ↗对分区列进行重新分区(repartition)。重新分区确保输出中每个唯一的分区列值组合只生成一个文件,而不是在每个输入数据框分区中为每个唯一的分区列值组合生成一个文件。跳过此重新分区步骤可能导致输出数据集中文件数量过多,从而造成写入和读取性能下降。
:::callout{theme="neutral"}
在Hive风格分区的上下文中,通常优先使用repartitionByRange而非repartition ↗,因为repartitionByRange通过采样来估算分区范围,从而尽可能均匀地分布数据。相比之下,repartition使用哈希函数对分区数取模来将值分配到数据框分区;对于低基数的列,这种哈希取模操作极有可能导致数据分布不均,即使原始数据在各值之间相对均匀分布。数据分布不均(倾斜)可能导致Spark执行器(executor)内存溢出错误和作业失败。
:::
Python转换示例:Hive风格分区¶
from transforms.api import transform, Input, Output
@transform(
transform_output=Output("/path/to/output"),
transform_input=Input("/path/to/input"),
)
def compute(transform_output, transform_input):
transform_output.write_dataframe(
transform_input.dataframe().repartitionByRange("record_date", "department"),
partition_cols=["record_date", "department"],
)
Java转换示例:Hive风格分区¶
package myproject.datasets;
import com.palantir.foundry.spark.api.DatasetFormatSettings;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import static org.apache.spark.sql.functions.col;
public final class HivePartitioningInJava {
@Compute
public void myComputeFunction(
@Input("ri.foundry.main.dataset.e2dd4bcf-7985-461c-9d08-ee0edd734a1a") FoundryInput myInput,
@Output("ri.foundry.main.dataset.4b62bf9b-3700-40f6-9e85-505eaf87e57d") FoundryOutput myOutput) {
myOutput.getDataFrameWriter(
myInput.asDataFrame().read().repartitionByRange(col("record_date"), col("department")))
.setFormatSettings(DatasetFormatSettings.builder()
.addPartitionColumns("record_date", "department")
.build())
.write();
}
}
高级repartitionByRange用法¶
在上述代码示例中,我们调用repartitionByRange时未指定分区数,并且指定的分区列与Hive风格分区设置中的分区列相同。这种简单实现通常没有问题,但在处理超大规模数据时,有两种情况可能导致问题。
- 如果单个值组合的数据量过大,无法容纳在单个Spark执行器的内存中,则会遇到内存溢出错误,因为
repartitionByRange与repartition一样,会将每个唯一值组合分配到恰好一个分区。 - 由于我们没有指定分区数,Spark将使用
spark.sql.shuffle.partitions设置配置的默认分区数。如果分区列的唯一值组合数量大于该值,则至少有一个分区将包含多个值组合的数据。即使单个值组合的数据量小到足以容纳在单个Spark执行器的内存中,这也会增加Spark内存溢出错误的可能性。
以下Python代码示例展示了一种典型的实现方式,虽然增加了复杂性,但可以避免上述两个问题。根据示例的逻辑,department和record_date的每个唯一值组合将平均分布在八个Spark分区中,这意味着每个值组合在输出数据集中大约有八个文件,而不是一个。
from transforms.api import transform, Input, Output
@transform(
transform_output=Output("/path/to/output"),
transform_input=Input("/path/to/input"),
)
def compute(transform_output, transform_input):
input_df = transform_input.dataframe()
unique_date_department_combinations = input_df.select("department", "record_date").distinct().count()
partition_count = unique_date_department_combinations * 8
transform_output.write_dataframe(
input_df.repartitionByRange(partition_count, "department", "record_date", "record_timestamp"),
partition_cols=["department", "record_date"],
)