Spark distributed model training(Spark 分布式模型训练)¶
In distributed training, the training of a model takes place across multiple computing resources working in parallel. Foundry supports distributed model training in Spark environments. Distributed training enables you to:
- Train models faster and more efficiently: Distributed training leverages multiple nodes concurrently to accelerate model training.
- Scale up training datasets: By splitting training tasks and data across nodes, you can train models on much larger datasets than what single-node approaches can fit in memory.
:::callout{theme="neutral"} Currently, only PySpark XGBoost estimators are directly supported for distributed model training in Foundry. Additional distributed training libraries may be supported in the future. :::
Follow the steps below to learn how to perform distributed model training in Foundry.
1. Configure your Spark environment¶
Before using distributed training libraries, you must configure your Spark environment properly using a Spark profile.
Required Spark profile¶
To perform distributed model training, you must import and apply the KUBERNETES_OPEN_PORTS_ALL profile in your repository, as shown in the example code below:
from transforms.api import configure
@configure(profile=[
"KUBERNETES_OPEN_PORTS_ALL",
])
@transform_df(...)
def compute(...):
...
:::callout{theme="neutral"}
To reiterate: applying the KUBERNETES_OPEN_PORTS_ALL profile is mandatory for distributed training.
:::
2. Set up distributed training libraries¶
Set up an XGBoost Spark estimator¶
XGBoost provides a seamless PySpark integration for distributed training via a SparkXGBClassifier ↗ or SparkXGBRegressor ↗ estimator.
An example of basic setup for SparkXGBClassifier is as follows:
from xgboost.spark import SparkXGBClassifier
@configure(profile=["KUBERNETES_OPEN_PORTS_ALL"])
@transform(...)
def compute(...):
xgb = SparkXGBClassifier(
features_col=<your_feature_col_name>,
# other parameters as needed
)
model = xgb.fit(<your_training_dataframe>)
For additional configuration details, refer to the XGBoost Spark Documentation ↗.
Set up GPU support for distributed model training¶
To leverage GPUs for distributed model training, follow the steps below:
- Add your project to a resource queue with GPU support.
- Enable the GPU profile by adding the
EXECUTOR_GPU_ENABLEDSpark profile to your transform. - Configure the
deviceparameter to'gpu'or'cuda', depending on your GPU setup.
Example:
from transforms.api import configure
@configure(profile=[
"KUBERNETES_OPEN_PORTS_ALL",
"EXECUTOR_GPU_ENABLED",
])
@transform_df(...)
def compute(...):
xgb = SparkXGBClassifier(
...,
device='gpu' # options: 'cpu', 'gpu', 'cuda'
)
model = xgb.fit(...)
Refer to the SparkXGBClassifier documentation ↗ for more information on GPU configuration.
3. Distributed hyperparameter optimization¶
Optionally, you may want to perform hyperparameter optimization. We recommend using Optuna ↗ for hyperparameter optimization in Transforms.
- Optuna integrates well with Spark and distributed training workflows without additional setup.
- For more details, refer to the Optuna documentation ↗.
4. Use SparkXGBClassifier or SparkXGBRegressor with a model adapter¶
The built-in XGBoostSerializer from palantir_models.serializers does not directly support XGBoost’s Spark estimators (SparkXGBClassifier / SparkXGBRegressor).
Instead, you should:
- Train your Spark XGBoost model as usual.
- Extract the underlying
Boosterobject via.get_booster(). - Use the
Boosterin your model adapter.
:::callout{theme="neutral"}
Note: If you need to run inference in a distributed fashion, you can still register your ModelAdapter inside a PySpark UDF. See PySpark UDFs for details.
:::
Example model adapter¶
from palantir_models.serializers import XGBoostSerializer
import palantir_models as pm
class BoosterModelAdapter(pm.ModelAdapter):
@pm.auto_serialize(model=XGBoostSerializer())
def __init__(self, booster):
self.booster = booster
@classmethod
def api(cls):
<...>
def predict(self, input_df):
return self.booster.predict(input_df)
Training and adapter initialization¶
from transforms.api import configure, transform
from transforms.api.schema import Input, ModelOutput
from xgboost.spark import SparkXGBClassifier # or SparkXGBRegressor
from your_project.adapters import BoosterModelAdapter
@configure(profile=[
"DYNAMIC_ALLOCATION_DISABLED",
"NUM_EXECUTORS_2",
"KUBERNETES_OPEN_PORTS_ALL"
])
@transform(
training_df=Input(path="..."),
model_output=ModelOutput(path="...")
)
def compute(ctx, training_df, model_output):
# 1. Train the SparkXGBClassifier
xgb = SparkXGBClassifier(...)
spark_model = xgb.fit(training_df)
# 2. Extract the underlying Booster
booster = spark_model.get_booster()
# 3. Create your model adapter
adapter = BoosterModelAdapter(booster)
# 4. Publish the model to Foundry
model_output.publish(adapter)
For more on configuring adapters, see the model adapters overview.
中文翻译¶
Spark 分布式模型训练¶
在分布式训练中,模型的训练在多个并行工作的计算资源上进行。Foundry 支持在 Spark 环境中进行分布式模型训练。分布式训练使您能够:
- 更快、更高效地训练模型: 分布式训练利用多个节点并发执行,加速模型训练过程。
- 扩展训练数据集规模: 通过将训练任务和数据分散到多个节点,您可以训练比单节点方法所能容纳的内存大得多的数据集。
:::callout{theme="neutral"} 目前,Foundry 仅直接支持 PySpark XGBoost 估计器(estimator) 进行分布式模型训练。未来可能会支持更多的分布式训练库。 :::
请按照以下步骤了解如何在 Foundry 中执行分布式模型训练。
1. 配置 Spark 环境¶
在使用分布式训练库之前,您必须使用 Spark 配置文件(Spark profile) 正确配置 Spark 环境。
必需的 Spark 配置文件¶
要执行分布式模型训练,您必须在代码仓库中导入并应用 KUBERNETES_OPEN_PORTS_ALL 配置文件,如下面的示例代码所示:
from transforms.api import configure
@configure(profile=[
"KUBERNETES_OPEN_PORTS_ALL",
])
@transform_df(...)
def compute(...):
...
:::callout{theme="neutral"}
再次强调:应用 KUBERNETES_OPEN_PORTS_ALL 配置文件是分布式训练的强制要求。
:::
2. 设置分布式训练库¶
设置 XGBoost Spark 估计器¶
XGBoost 通过 SparkXGBClassifier ↗ 或 SparkXGBRegressor ↗ 估计器提供了无缝的 PySpark 分布式训练集成。
SparkXGBClassifier 的基本设置示例如下:
from xgboost.spark import SparkXGBClassifier
@configure(profile=["KUBERNETES_OPEN_PORTS_ALL"])
@transform(...)
def compute(...):
xgb = SparkXGBClassifier(
features_col=<您的特征列名称>,
# 根据需要添加其他参数
)
model = xgb.fit(<您的训练数据框>)
有关更多配置详情,请参阅 XGBoost Spark 文档 ↗。
为分布式模型训练设置 GPU 支持¶
要利用 GPU 进行分布式模型训练,请按照以下步骤操作:
- 将您的项目添加到支持 GPU 的 资源队列(resource queue)。
- 通过向转换(transform)添加
EXECUTOR_GPU_ENABLEDSpark 配置文件来启用 GPU 配置文件。 - 根据您的 GPU 设置,将
device参数配置为'gpu'或'cuda'。
示例:
from transforms.api import configure
@configure(profile=[
"KUBERNETES_OPEN_PORTS_ALL",
"EXECUTOR_GPU_ENABLED",
])
@transform_df(...)
def compute(...):
xgb = SparkXGBClassifier(
...,
device='gpu' # 选项:'cpu', 'gpu', 'cuda'
)
model = xgb.fit(...)
有关 GPU 配置的更多信息,请参阅 SparkXGBClassifier 文档 ↗。
3. 分布式超参数优化¶
您可能还需要进行超参数优化。我们建议在 Transforms 中使用 Optuna ↗ 进行超参数优化。
- Optuna 与 Spark 和分布式训练工作流集成良好,无需额外设置。
- 更多详情,请参阅 Optuna 文档 ↗。
4. 将 SparkXGBClassifier 或 SparkXGBRegressor 与模型适配器(model adapter)结合使用¶
来自 palantir_models.serializers 的内置 XGBoostSerializer 不直接支持 XGBoost 的 Spark 估计器(SparkXGBClassifier / SparkXGBRegressor)。
相反,您应该:
- 照常训练您的 Spark XGBoost 模型。
- 通过
.get_booster()提取底层的Booster对象。 - 在模型适配器中使用该
Booster。
:::callout{theme="neutral"}
注意: 如果您需要以分布式方式运行推理,您仍然可以在 PySpark UDF 内注册您的 ModelAdapter。详情请参阅 PySpark UDFs。
:::
模型适配器示例¶
from palantir_models.serializers import XGBoostSerializer
import palantir_models as pm
class BoosterModelAdapter(pm.ModelAdapter):
@pm.auto_serialize(model=XGBoostSerializer())
def __init__(self, booster):
self.booster = booster
@classmethod
def api(cls):
<...>
def predict(self, input_df):
return self.booster.predict(input_df)
训练和适配器初始化¶
from transforms.api import configure, transform
from transforms.api.schema import Input, ModelOutput
from xgboost.spark import SparkXGBClassifier # 或 SparkXGBRegressor
from your_project.adapters import BoosterModelAdapter
@configure(profile=[
"DYNAMIC_ALLOCATION_DISABLED",
"NUM_EXECUTORS_2",
"KUBERNETES_OPEN_PORTS_ALL"
])
@transform(
training_df=Input(path="..."),
model_output=ModelOutput(path="...")
)
def compute(ctx, training_df, model_output):
# 1. 训练 SparkXGBClassifier
xgb = SparkXGBClassifier(...)
spark_model = xgb.fit(training_df)
# 2. 提取底层的 Booster
booster = spark_model.get_booster()
# 3. 创建您的模型适配器
adapter = BoosterModelAdapter(booster)
# 4. 将模型发布到 Foundry
model_output.publish(adapter)
有关配置适配器的更多信息,请参阅 模型适配器概述。