Support for Spark ML Models in palantir_models(palantir_models 对 Spark ML 模型的支持)¶
Apache Spark™ is one of the main engines backing compute in Foundry and offers extensive Machine Learning capabilities ↗. While Foundry supports Spark MLlib, this comes with some caveats due to the specificities of Spark as an inherently distributed machine learning framework.
For example, some Foundry features rely on encapsulating the model in a container image, leveraging underlying infrastructure such as Rubix ↗ to provision and manage multiple containers running the model. Foundry does not yet support building such a container image for Spark models.
Thus, we recommend that users favor frameworks that natively support single-node training and inference, such as scikit-learn, xgboost or keras. Spark ML is particularly discouraged if the model is to be consumed as a REST API.
Spark Model training & batch inference¶
In Foundry, we recommend training Spark models in code repositories, since code workspaces run on a single node and code workbooks are considered legacy. However, authoring a Spark ML model in Foundry does not fundamentally differ from authoring in other frameworks:
- The Model API supports Spark DataFrames as inputs.
- MLlib pipeline models ↗ can be auto-serialized using the
SparkMLAutoSerializerclass introduced in version 0.1599.0 ofpalantir_models.
Here is example code to train a multi-class classification model on the open-source Iris dataset ↗ with Spark ML:
# src/main/model_training/model_training.py
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from transforms.api import transform, Input, Output
from palantir_models.transforms import ModelOutput
from main.model_adapters.adapter import SparkModelAdapter
@transform(
iris_data_in=Input("<PATH_TO_FOLDER>/iris_data"),
model_output=ModelOutput("<PATH_TO_FOLDER>/spark_model"),
inference_data_out=Output("<PATH_TO_FOLDER>/inference_data_out")
)
def compute(iris_data_in, model_output, inference_data_out):
iris_data = iris_data_in.dataframe()
# assuming columns sepallength, sepalwidth, petallength, petalwidth, variety
feature_cols = iris_data.columns[:-1]
train_data, test_data = iris_data.randomSplit([0.7, 0.3], seed=42)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
label_indexer = StringIndexer(inputCol="variety", outputCol="label", handleInvalid="keep")
rf = RandomForestClassifier(
labelCol="label",
featuresCol="features",
numTrees=10,
maxDepth=5,
seed=42
)
pipeline = Pipeline(stages=[assembler, label_indexer, rf])
model = pipeline.fit(train_data)
# Wrap the trained model in a ModelAdapter
foundry_model = SparkModelAdapter(model)
predictions = foundry_model.transform(test_data).df_out
# Publish and write the trained model to Foundry with the experiment
# Once the model and experiment are published, they are immediately visible in the model page
inference_data_out.write_dataframe(
predictions
)
model_output.publish(
model_adapter=foundry_model,
)
And the corresponding adapter code:
# src/main/model_adapters/adapter.py
import palantir_models as pm
from palantir_models.serializers import SparkMLAutoSerializer
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as F
class SparkModelAdapter(pm.ModelAdapter):
NUM_CLASSES = 3
@pm.auto_serialize(model=SparkMLAutoSerializer())
def __init__(self, model):
self.model = model
@classmethod
def api(cls):
input_cols = [
("sepallength", float),
("sepalwidth", float),
("petallength", float),
("petalwidth", float),
# The input training dataset also has this column, but it doesn't need to be
# added to the adapter since it will not be present at actual inference time.
# ("variety", str),
]
output_cols = input_cols + [
("label", int)
]
inputs = {
"df_in": pm.Spark(columns=input_cols),
}
outputs = {"df_out": pm.Spark(columns=output_cols)}
return inputs, outputs
def predict(self, df_in):
predictions = self.model.transform(df_in)
# For simplicity, do not return the probability and label vectors, and simply return
# the label as specified in the API.
return predictions.drop("features", "rawPrediction", "probability", "prob_array")
Live inference¶
While technically supported as an experimental feature in Modeling Objectives, usage of Spark models for live inference is generally discouraged. We suggest preferring models built with other, single-node libraries when designing a model for Live Inference.
中文翻译¶
palantir_models 对 Spark ML 模型的支持¶
Apache Spark™ 是 Foundry 中支撑计算的主要引擎之一,并提供了广泛的机器学习能力 ↗。虽然 Foundry 支持 Spark MLlib,但由于 Spark 本质上是一个分布式机器学习框架,其特殊性带来了一些注意事项。
例如,某些 Foundry 功能依赖于将模型封装在容器镜像中,并利用 Rubix ↗ 等底层基础设施来配置和管理运行模型的多个容器。Foundry 目前尚不支持为 Spark 模型构建此类容器镜像。
因此,我们建议用户优先使用原生支持单节点训练和推理的框架,例如 scikit-learn、xgboost 或 keras。如果模型需要作为 REST API 使用,则尤其不推荐使用 Spark ML。
Spark 模型训练与批量推理¶
在 Foundry 中,我们建议在代码仓库中训练 Spark 模型,因为代码工作台在单节点上运行,而代码工作簿已被视为遗留功能。不过,在 Foundry 中编写 Spark ML 模型与其他框架并无本质区别:
- 模型 API 支持将 Spark DataFrame 作为输入。
- MLlib 流水线模型 ↗ 可以使用
palantir_models0.1599.0 版本中引入的SparkMLAutoSerializer类进行自动序列化。
以下是在 Spark ML 中使用开源鸢尾花数据集 ↗训练多分类模型的示例代码:
# src/main/model_training/model_training.py
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from transforms.api import transform, Input, Output
from palantir_models.transforms import ModelOutput
from main.model_adapters.adapter import SparkModelAdapter
@transform(
iris_data_in=Input("<PATH_TO_FOLDER>/iris_data"),
model_output=ModelOutput("<PATH_TO_FOLDER>/spark_model"),
inference_data_out=Output("<PATH_TO_FOLDER>/inference_data_out")
)
def compute(iris_data_in, model_output, inference_data_out):
iris_data = iris_data_in.dataframe()
# 假设列名为 sepallength, sepalwidth, petallength, petalwidth, variety
feature_cols = iris_data.columns[:-1]
train_data, test_data = iris_data.randomSplit([0.7, 0.3], seed=42)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
label_indexer = StringIndexer(inputCol="variety", outputCol="label", handleInvalid="keep")
rf = RandomForestClassifier(
labelCol="label",
featuresCol="features",
numTrees=10,
maxDepth=5,
seed=42
)
pipeline = Pipeline(stages=[assembler, label_indexer, rf])
model = pipeline.fit(train_data)
# 将训练好的模型封装在 ModelAdapter 中
foundry_model = SparkModelAdapter(model)
predictions = foundry_model.transform(test_data).df_out
# 将训练好的模型发布并写入 Foundry,同时记录实验
# 模型和实验发布后,会立即在模型页面中显示
inference_data_out.write_dataframe(
predictions
)
model_output.publish(
model_adapter=foundry_model,
)
以及对应的适配器代码:
# src/main/model_adapters/adapter.py
import palantir_models as pm
from palantir_models.serializers import SparkMLAutoSerializer
from pyspark.ml.functions import vector_to_array
from pyspark.sql import functions as F
class SparkModelAdapter(pm.ModelAdapter):
NUM_CLASSES = 3
@pm.auto_serialize(model=SparkMLAutoSerializer())
def __init__(self, model):
self.model = model
@classmethod
def api(cls):
input_cols = [
("sepallength", float),
("sepalwidth", float),
("petallength", float),
("petalwidth", float),
# 输入训练数据集也包含此列,但无需添加到适配器中,
# 因为实际推理时该列不会出现。
# ("variety", str),
]
output_cols = input_cols + [
("label", int)
]
inputs = {
"df_in": pm.Spark(columns=input_cols),
}
outputs = {"df_out": pm.Spark(columns=output_cols)}
return inputs, outputs
def predict(self, df_in):
predictions = self.model.transform(df_in)
# 为简化起见,不返回概率和标签向量,仅按 API 指定返回标签
return predictions.drop("features", "rawPrediction", "probability", "prob_array")
实时推理¶
虽然在技术上将 Spark 模型作为建模目标中的实验性功能提供支持,但通常不推荐将 Spark 模型用于实时推理。在设计用于实时推理的模型时,我们建议优先使用其他单节点库构建的模型。