Native acceleration(原生加速(Native acceleration))¶
You can improve Spark's performance by enabling native acceleration with Velox ↗.
Native acceleration is a technique that leverages low-level hardware optimizations to improve the performance of batch jobs. These performance gains are achieved by shifting compute from Java Virtual Machine (JVM) languages to native languages, such as C++, which are compiled down to machine code and run directly on the hardware of the machine. By using platform-specific features, native acceleration aims to significantly reduce the time needed to process large-scale data workloads in order to speed up job execution and improve resource utilization.
Native acceleration is available for Python transforms and Pipeline Builder.
Build analysis¶
You can conduct basic analysis of a natively accelerated build in the Spark Details page. Under the Query Plan tab, select Physical Plan; you will see something like the following:
== Physical Plan ==
AdaptiveSparkPlan
+- == Final Plan ==
Execute InsertIntoHadoopFsRelationCommand
+- WriteFiles
+- CollectMetrics
+- VeloxColumnarToRowExec
+- ^ ProjectExecTransformer
+- ^ InputIteratorTransformer
+- ^ InputAdapter
+- ^ RowToVeloxColumnar
+- ^ HashAggregate
+- ^ VeloxColumnarToRowExec
+- ^ AQEShuffleRead
+- ^ ShuffleQueryStage
+- ColumnarExchange
+- ^ ProjectExecTransformer
+- ^ RowToVeloxColumnar
+- * ColumnarToRow
+- BatchScan parquet
While broadly similar to a conventional Spark query plan, you will notice a few key differences. Instead of the ProjectExec node, there is a ProjectExecTransformer. This means that the operation will be executed natively in the Velox query engine. All offloaded nodes of the query plan will be marked with ^ symbol in the tree. Blocks of native execution are sandwiched by RowToVeloxColumnar and VeloxColumnarToRowExec. These nodes are responsible for converting Spark datasets to Arrow DataFrames and vice-versa. This serialization/deserialization has a significant cost.
There are generally two patterns which indicate poor native acceleration performance:
- A small percentage of nodes executed natively, as indicated by the
^symbol. - A large number of
RowToVeloxColumnarandVeloxColumnarToRowExecnodes resulting in high serialization overheads.
This analysis can be helpful if performance is not as expected. Small changes to pipelines can have a large impact on the amount of compute that is offloaded. Features like checkpoints can be used to manually group together chunks of a build that can all be executed natively.
Implementation and architecture of native acceleration¶
Foundry’s implementation of native acceleration is built upon the Apache Gluten ↗ project. Foundry native acceleration leverages the Velox ↗ query engine to accelerate Spark jobs at runtime. Velox is written in C++ and is designed explicitly with database acceleration in mind ↗, providing a developer API to run operator-level operations on Arrow DataFrames ↗. Gluten provides the necessary "glue" to bind the Spark runtime with Velox.
In this setup, a pipeline first generates a Spark query plan as in a normal build (one without native acceleration). Additional optimization rules are then applied to the plan in order to identify whether parts of the query can be run with Velox. This decision is based on whether Velox has an equivalent implementation and whether a mapping for the implementation exists in Gluten. The query can be offloaded at the operator-level: this corresponds roughly to SQL statements like SELECT, FILTER, or JOIN. Any part of the query plan that can be offloaded is marked at this stage.
Once the planning step is complete, the query is executed through the normal Spark engine. This means all task scheduling, executor orchestration, and lifecycle management proceed as normal. The difference comes when an executor reaches part of the query plan that has been marked for native execution. If this occurs, instead of calling the default implementations in Spark, the Velox implementations are invoked.
This architecture is particularly advantageous because it supports queries where not all computations can be done with Velox. This is because the offload decision is made at the operator level rather than for the entire plan. The number of supported operators is constantly growing, but user-authored code like UDFs can never be offloaded as a native implementation does not exist.
View the full list of supported operators and expressions ↗
Why is native acceleration faster?¶
Spark is written in Scala, a JVM language, and contains many optimizations such as code generation ↗ to improve its performance. Further, the JVM itself contains optimizations such as the C2 Compiler ↗ that aim to take advantage of as many platform-specific features as possible. However, native languages such as C++ continue to offer better performance for three basic reasons:
- Compile-time optimizations: While Java and Scala are compiled into bytecode, which is then executed by the JVM, native languages like C++ are compiled directly into machine code. This allows the C++ compiler to perform extensive optimizations at compile-time that significantly reduce runtime overhead. In contrast, JVM languages rely on Just-In-Time (JIT) compilation, which occurs during execution and may not achieve the same level of optimization because it has to balance the time spent on compilation with the need to start running quickly.
- No garbage collection (GC): In C++, memory management is handled manually, which eliminates the overhead associated with garbage collection (GC). In JVM languages, the GC process can introduce unpredictable pauses and overhead that can impact performance, especially in memory-intensive applications.
- Direct hardware access and availability of vectorization APIs: C++ provides direct access to hardware features and low-level system resources, enabling developers to leverage platform-specific optimizations and vectorization APIs such as SSE, AVX, and other SIMD (Single Instruction Multiple Data) instructions. This direct access allows for fine-tuned performance optimizations that are not as easily achievable in JVM languages, where the abstraction layer may prevent the same level of hardware interaction.
Memory configuration considerations for native acceleration¶
Running Spark with native acceleration in Foundry requires a slightly different configuration from normal batch pipelines. Spark supports performing some operations with off-heap memory ↗. Off-heap memory is memory that is not managed by the JVM, cutting out GC overhead and leading to better performance. By default, we do not enable off-heap memory in Foundry, as doing so can introduce additional maintenance costs for pipelines. Enabling off-heap memory is necessary for native acceleration since DataFrames modified by Velox must be off-heap to be accessible by the native process. Foundry still requires sufficient on-heap memory for everything except Velox data transformations (for instance, orchestration, scheduling, and build management code still run in the JVM), but ideally most work will now be performed off-heap. Configuring a pipeline to use native acceleration introduces additional maintenance costs in balancing on-heap and off-heap memory.
中文翻译¶
原生加速(Native acceleration)¶
您可以通过启用Velox ↗的原生加速(Native acceleration)来提升Spark的性能。
原生加速是一种利用底层硬件优化来提升批处理作业性能的技术。这种性能提升是通过将计算任务从Java虚拟机(JVM)语言迁移到C++等原生语言来实现的,这些原生语言被编译为机器码并直接在机器硬件上运行。通过使用平台特定功能,原生加速旨在显著减少处理大规模数据工作负载所需的时间,从而加快作业执行速度并提高资源利用率。
原生加速适用于Python转换(transforms)和流水线构建器(Pipeline Builder)。
构建分析(Build analysis)¶
您可以在Spark详情(Spark Details)页面中对原生加速构建进行基本分析。在查询计划(Query Plan)选项卡下,选择物理计划(Physical Plan),您将看到类似以下内容:
== Physical Plan ==
AdaptiveSparkPlan
+- == Final Plan ==
Execute InsertIntoHadoopFsRelationCommand
+- WriteFiles
+- CollectMetrics
+- VeloxColumnarToRowExec
+- ^ ProjectExecTransformer
+- ^ InputIteratorTransformer
+- ^ InputAdapter
+- ^ RowToVeloxColumnar
+- ^ HashAggregate
+- ^ VeloxColumnarToRowExec
+- ^ AQEShuffleRead
+- ^ ShuffleQueryStage
+- ColumnarExchange
+- ^ ProjectExecTransformer
+- ^ RowToVeloxColumnar
+- * ColumnarToRow
+- BatchScan parquet
虽然与传统的Spark查询计划大体相似,但您会注意到一些关键差异。这里没有ProjectExec节点,而是出现了ProjectExecTransformer。这意味着该操作将在Velox查询引擎中原生执行。查询计划中所有被卸载的节点都会在树中用^符号标记。原生执行块被夹在RowToVeloxColumnar和VeloxColumnarToRowExec之间。这些节点负责将Spark数据集转换为Arrow DataFrame,反之亦然。这种序列化/反序列化操作具有显著的成本。
通常有两种模式表明原生加速性能不佳:
- 只有少量节点被原生执行(由
^符号指示)。 - 存在大量
RowToVeloxColumnar和VeloxColumnarToRowExec节点,导致序列化开销过高。
当性能未达到预期时,这种分析会很有帮助。对流水线的微小改动可能会对卸载的计算量产生重大影响。可以使用检查点(checkpoints)等功能手动将构建块分组,以便全部原生执行。
原生加速的实现与架构(Implementation and architecture of native acceleration)¶
Foundry的原生加速实现基于Apache Gluten ↗项目。Foundry原生加速利用Velox ↗查询引擎在运行时加速Spark作业。Velox使用C++编写,明确针对数据库加速进行设计 ↗,提供了在Arrow DataFrame ↗上执行算子级操作的开发者API。Gluten则提供了必要的"粘合剂",将Spark运行时与Velox绑定在一起。
在这种架构中,流水线首先像正常构建(未启用原生加速)一样生成Spark查询计划。然后,对计划应用额外的优化规则,以确定查询的哪些部分可以使用Velox运行。这一决策基于Velox是否具有等效的实现,以及Gluten中是否存在该实现的映射。查询可以在算子级别进行卸载:这大致对应于SELECT、FILTER或JOIN等SQL语句。查询计划中任何可以卸载的部分都会在此阶段被标记。
规划步骤完成后,查询通过正常的Spark引擎执行。这意味着所有任务调度、执行器编排和生命周期管理都按正常流程进行。区别在于,当执行器到达查询计划中已标记为原生执行的部分时,不会调用Spark的默认实现,而是调用Velox的实现。
这种架构特别有利,因为它支持并非所有计算都能用Velox完成的查询。这是因为卸载决策是在算子级别做出的,而不是针对整个计划。支持的算子数量在不断增加,但用户编写的代码(如UDF)永远无法被卸载,因为不存在对应的原生实现。
为什么原生加速更快?(Why is native acceleration faster?)¶
Spark使用Scala(一种JVM语言)编写,并包含许多优化措施(如代码生成(code generation) ↗)来提升性能。此外,JVM本身也包含诸如C2编译器(C2 Compiler) ↗等优化,旨在尽可能利用平台特定功能。然而,C++等原生语言仍然在三个基本方面提供更好的性能:
- 编译时优化(Compile-time optimizations): Java和Scala被编译为字节码,然后由JVM执行,而C++等原生语言则直接编译为机器码。这使得C++编译器能够在编译时执行大量优化,显著减少运行时开销。相比之下,JVM语言依赖即时编译(JIT),这种编译发生在执行过程中,可能无法达到相同的优化水平,因为它需要在编译时间和快速启动需求之间取得平衡。
- 无垃圾回收(No garbage collection): 在C++中,内存管理是手动处理的,从而消除了与垃圾回收(GC)相关的开销。在JVM语言中,GC过程可能会引入不可预测的暂停和开销,影响性能,尤其是在内存密集型应用中。
- 直接硬件访问和向量化API的可用性(Direct hardware access and availability of vectorization APIs): C++提供对硬件特性和底层系统资源的直接访问,使开发者能够利用平台特定的优化和向量化API,如SSE、AVX以及其他SIMD(单指令多数据流)指令。这种直接访问允许进行精细调整的性能优化,而在JVM语言中,抽象层可能会阻碍达到同样的硬件交互水平。
原生加速的内存配置考量(Memory configuration considerations for native acceleration)¶
在Foundry中使用原生加速运行Spark需要与普通批处理流水线略有不同的配置。Spark支持使用堆外内存(off-heap memory) ↗执行某些操作。堆外内存是不由JVM管理的内存,可以消除GC开销并带来更好的性能。默认情况下,我们在Foundry中不启用堆外内存,因为这样做可能会给流水线带来额外的维护成本。启用堆外内存对于原生加速是必要的,因为由Velox修改的DataFrame必须位于堆外,才能被原生进程访问。Foundry仍然需要足够的堆内内存来处理除Velox数据转换之外的所有操作(例如,编排、调度和构建管理代码仍在JVM中运行),但理想情况下,大部分工作现在将在堆外执行。配置流水线使用原生加速会在平衡堆内和堆外内存方面引入额外的维护成本。