跳转至

API: ModelAdapter reference(API:ModelAdapter 参考)

A model adapter is a published Python library that provides the communication layer between Foundry and stored model artifacts to enable Foundry to load, initialize, and run inference on any model.

To implement a ModelAdapter, you must understand the classes listed below:

ModelAdapter implementation

The ModelAdapter class is an abstract base class that all model adapter implementations must extend. There are four abstract methods that all model adapters must implement:

  1. load()
  2. save()
  3. Note that load() and save() are not required for model adapters defined with default serializers.
  4. api()
  5. predict() for single-output, and run_inference() for multi-output
import palantir_models as pm
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(pm.ModelAdapter):

    @classmethod
    def load(
        cls,
        state_reader: pm.ModelStateReader,
        container_context: Optional[executable_api.ContainerizedApplicationContext] = None,
        external_model_context: Optional[executable_api.ExternalModelExecutionContext] = None
    ) -> "pm.ModelAdapter":
        """
        Python or binary models:
            This is the method that Foundry will call to deserialize your ModelAdapter. The author of this ModelAdapter is expected to write logic to load the state of their trained model(s) from the same location that the model was saved/serialized to in the save method, and also initialize the model.

        Container models:
            This is the method that Foundry will call after your container has been launched as a sidecar to this ModelAdapter. The author of this ModelAdapter is expected to use the contents of the container_context to initialize any class variables the adapter might need. For example, users will often extract the relevant service URIs to send POST requests to the container within #run_inference.

        Externally hosted models:
            This is the method that Foundry will call when your model adapter is initialized. The author of this ModelAdapter is expected to write logic to initialize and persist a connection to their externally hosted model as well as other required model configuration.


        :param state_reader: A ModelStateReader object that can be used to read model files.
        :param container_context: This is only provided for container backed models, and defaults to None. The container context
          includes a mapping from container name to service URIs and the shared directory mount path.
        :param external_model_context: This is only provided for externally hosted models, and defaults to None. The external_model_context includes references to the configuration a user defines when creating an externally hosted model that uses this Model Adapter.

        :return: An instance of a ModelAdapter.
        """

    def save(self, state_writer: pm.ModelStateWriter) -> None:
        """
        This is the method that Foundry will call to serialize your model adapter. This method is only required if this ModelAdapter is being used to wrap a newly trained or refit model in Foundry.

        The author of this ModelAdapter is expected to write logic to save the state of their trained model(s) and relevant metadata to a ModelStateWriter.

        :param state_writer: The ModelStateWriter object to which the model is serialized and saved.
        """

    @classmethod
    def api(cls) -> pm.ModelApi:
        """
        This defines the expected input and output data structures of this model.

        :return: The ModelApi object for the model
        """

    def run_inference(self, inputs, outputs) -> None:
        """
        This method will be called with the relevant input and outputs defined in the ModelAdapter.api method.

        Runs inference on the associated model.

        :param inputs: A namedtuple of the inputs defined in the ModelAdapter.api method.
        :param outputs: A namedtuple of the outputs defined in the ModelAdapter.api method.
          Outputs should be written to in the run_inference method.
        """

    def predict(self, *args, **kwargs) -> Union[Tabular, Parameter]:
        """
        This method is used to perform inference on a multi (>=1) input to single output (tabular or parameter) model.
        The inputs are expected to be written into the signature of this method by the name that defines them in the api() method. The resulting output is returned by this method.

        Note that the run_inference() method is not defined by the user if predict() is used.
        """

Model save() and load()

The palantir_models.serializers library provides many default serializers that can be used for model serialization (save) and deserialization (load) for common modeling frameworks.

In some cases, users may want to implement custom logic for model serialization or deserialization. This may be necessary when, for example, there is no default serializer available for a modeling framework you are using or when you require manual control over which models are loaded into memory at any given time.

In these more complex cases, please see the implementation of save and load below.

Save

ModelStateWriter

A ModelStateWriter is provided to the ModelAdapter.save method so a ModelAdapter can save/serialize model artifacts to Foundry storage.

# ModelStateWriter can be imported from palantir_models.models

from io import IOBase
from typing import ContextManager

class ModelStateWriter:
    def open(self, asset_file_path: str, mode: str = "wb") -> ContextManager[IOBase]:
        """
        Open a file-like object to serialize model artifacts and parameters.
        """

    def put_file(self, local_file_path, asset_file_path=None):
        """
        Put a local file in this model's repository.
        :param asset_file_path: If provided, the file will be placed at this path in the repository.
            Otherwise the file will be placed in the root directory of the repository.
        """

    def put_directory(self, local_root_path, asset_root_path = "/"):
        """
        Put a local directory and its contents into this model's repository.
        :param asset_root_path: The path relative to the root path of the repository to place this directory.
        """

The save() method is called whenever a model adapter is published in a transform via model.publish(), upon which the ModelStateWriter's contents are packaged into a zip file that is persisted in an artifacts repository referenced by the newly-created model version.

Example: ModelStateWriter

The following example saves a model as a model.pkl file.

from palantir_models.models import ModelAdapter, ModelStateWriter

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, model):
        self.model = model

    ...

    def save(self, state_writer: ModelStateWriter):
        with state_writer.open("model.pkl", "wb") as model_outfile:
            pickle.dump(self.model, model_outfile)

    ...

Load

ModelStateReader

A ModelStateReader is provided to the ModelAdapter.load method so a ModelAdapter can read/deserialize saved model artifacts and initialize a model.

# ModelStateReader can be imported from palantir_models.models
from tempfile import TemporaryDirectory

class ModelStateReader:
    def open(self, asset_file_path: str, mode: str = "rb") -> ContextManager[IOBase]:
        """
        Open a file-like object to deserialize model artifacts and parameters.
        """

    def extract_to_temp_dir(self, root_dir: str = None) -> AnyStr:
        """
        Returns a TempDirectory containing the model artifacts associated with this model.
        :param root_dir: If specified, the root directory to extract
        """

    def extract(self, destination_path: str = None) -> None:
        """
        Extracts the repository to the provided local directory path.
        :param destination_path: If specified, the directory where the repository will be extracted
        """

The load() method is called whenever a model adapter is instantiated (via ModelInput in a transform, or launching a live or batch deployment that is backed by a model). This method then accesses the same artifacts repository that ModelStateWriter writes to, and provides access to its contents via ModelStateReader. load() is called before any transforms or inference logic is executed.

Example: ModelStateReader

The following example loads a model file and returns and instance of the model adapter initialized with it.

from palantir_models.models import ModelAdapter, ModelStateReader

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, model):
        self.model = model

    @classmethod
    def load(cls, state_reader: ModelStateReader):
        with state_reader.open("model.pkl", "rb") as model_infile:
            model = pickle.load(model_infile)
        return cls(model)

    ...

When used in conjunction with the save() method shown above, this method would retrieve the same model.pkl object that was persisted in the save() method. A ModelStateReader object is also provided to containerized models, and is backed by the user's uploaded zip file.

ContainerizedApplicationContext

The ContainerizedApplicationContext is optional and will be provided to the ModelAdapter.load method if, and only if, the model is backed by a container image or images. The context object includes a shared directory mount path and a mapping from container name to their service URIs. Each container can have multiple service URIs as it is valid to have multiple open ports.

# Note that this class type does not need to be imported within an authored adapter
class ContainerizedApplicationContext:
    def services(self) -> Dict[str, List[str]]:
        """
        Mapping from individual container name to list of service URIs the container provides.
        """

    def shared_empty_dir_mount_path(self) -> str:
        """
        The mount path of a shared empty directory that is available inside all containers and to the model adapter.
        The directory is readable and writable by containers and by the model entrypoint.
        """

An example populated services variable might look like the following:

{
    "container1": ["localhost:8080"],
    "container2": ["localhost:8080", "localhost:8081"],
}

Example: ContainerizedApplicationContext

The following example initializes a model adapter with a specific volume path, host, and port from the provided ContainerizedApplicationContext.

from palantir_models.models import ModelAdapter, ModelStateReader
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, shared_volume_path, model_host_and_port):
        self.shared_volume_path = shared_volume_path
        self.model_host_and_port = model_host_and_port

    @classmethod
    def load(cls, state_reader, container_context: executable_api.ContainerizedApplicationContext):
        shared_volume_path = container_context.shared_empty_dir_mount_path
        model_host_and_port = container_context.services["container1"][0]
        return cls(shared_volume_path, model_host_and_port)

    ...

ExternalModelContext

The ExternalModelContext is optional and will be provided to the ModelAdapter.load method if, and only if, the model is an externally hosted model. The context object contains an object representing the externally hosted model along with the user-defined map of decrypted secrets needed to connect to this externally hosted model.

# Note that this class type does not need to be imported within an authored adapter
class ExternalModelContext:
    def external_model(self) -> models_api_external_ExternalModel:
        """
        Object representing the externally hosted model notably contains the base_url and connection_configuration.
        """

    def resolved_credentials(self) -> Dict[str, str]:
        """
        Mapping of user-defined decrypted secret values needed to connect with this externally hosted model.
        """

# Note that this class type does not need to be imported within an authored adapter
class models_api_external_ExternalModel:
    def base_url(self) -> str:
        """
        User-defined url representing where this externally hosted model is hosted.
        """

    def connection_configuration(self) -> Dict[str, str]:
        """
        User-defined dictionary of unencrypted configuration fields.
        This is intended to store specific configuration details such as the model name, inference parameters, or prediction thresholds.
        """

Example: ExternalApplicationContext

The following example initializes a model adapter that executes requests to an externally hosted model. For more information on working with externally hosted models, read the documentation.

from palantir_models.models import ModelAdapter, ModelStateReader
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, url, credentials_map, configuration_map):
        # Extract model configuration from "Connection configuration" map
        model_name = configuration_map['model_name']
        model_parameter = configuration_map['model_parameter']

        # Extract model credentials from "Credentials configuration" map
        secret_key = credentials_map['secret_key']

        # Initiate http client at model load time
        self.client = ExampleClient(url, model_name, model_parameter, secret_key)

    @classmethod
    def load(
            cls,
            state_reader: ModelStateReader,
            container_context: Optional[executable_api.ContainerizedApplicationContext] = None,
            external_model_context: Optional[executable_api.ExternalModelExecutionContext] = None,
            ) -> "ModelAdapter":
        return cls(
            url=external_model_context.external_model.base_url,
            credentials_map=external_model_context.resolved_credentials,
            configuration_map=external_model_context.external_model.connection_configuration,
        )

    ...

Model API

The model adapter's api() method specifies the expected inputs and outputs in order to execute this model adapter's inference logic. Inputs and outputs are specified separately.

Example api()

import palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "df_in": pm.Pandas([('input_feature', float)])
        }
        outputs = {
            "df_out": pm.Pandas([('output_feature', int)])
        }
        return inputs, outputs

    ...

Model inputs

The ModelInput type contains input types that can be defined in the ModelAdapter.api method. Model adapters support the following input types:

  1. Tabular
  2. Parameter
  3. FileSystem
  4. MediaReference [Beta]
  5. Object
  6. ObjectSet

:::callout{theme="neutral" title="Beta"} MediaReference support is in the beta phase of development and only supports inference in a Python transform may not be available on your enrollment. MediaReferences with models do not support automatic evaluation, batch deployment, or live deployments in the Modeling Objectives application. Functionality may change during active development. :::

# DFType, and ModelInput can be imported from palantir_models.models.api

class ModelInput:
    Tabular = TabularInput
    FileSystem = FileSystemInput
    Parameter = ParameterInput
    MediaReference = MediaReferenceInput

class TabularInput:
    def __init__(self, name: str, df_type: DFType = DFType.SPARK, columns: List[ModelApiColumn]):
        """
        Used to specify that a ModelAdapter expects a tabular input.
        This input type will then convert the tabular input to the type specified in `df_type` if applicable.
        Pandas dataframes, Spark dataframes, and TransformInputs are accepted as tabular input types.
        """

class ParameterInput:
    def __init__(self, name: str, type: type, default = None):
        """
        Used to specify that a ModelAdapter expects a constant value parameter of type 'type'.
        The available types for parameter inputs are: str, int, float, bool, list, dict.
        If not passed directly in the args to .transform(), the provided default value will be used.
        """

class FileSystemInput:
    def __init__(self, name: str):
        """
        Used to specify that a ModelAdapter expects a filesystem input.
        """

class MediaReferenceInput:
    def __init__(self, name: str):
        """
        Used to specify that a ModelAdapter expects a Media Reference as an input.
        """

class ObjectInput:
    def __init__(self, object_type: OntologyObject):
        """
        Used to specify that a ModelAdapter expects a Python OSDK Object as an input.
        """

class ObjectSetInput:
    def __init__(self, object_type: OntologyObject):
        """
        Used to specify that a ModelAdapter expects a Python OSDK ObjectSet as an input.
        """

class DFType(Enum):
    SPARK = "spark"
    PANDAS = "pandas"

class ModelApiColumn(NamedTuple):
    """
    Used to specify the name and type of columns of a tabular input.
    """
    name: str
    type: type
    required: bool = True

Tabular inputs

A TabularInput is used to specify that an input provided to model.transform() is expected to be of a tabular type. In the context of this model adapter's inference logic, the type of this input will be the df_type parameter as specified in the api() method. Appropriate type conversions, if necessary, will be performed. The following tabular types are permissible:

  1. TransformInput
  2. Pandas DataFrame
  3. Spark DataFrame
  4. Note: Spark DataFrames are only supported for tabular inputs in the adapter API which specify df_type=DFType.SPARK. Conversion for tabular inputs specifying DFType.PANDAS is not supported for Spark DataFrames.

Column types

TabularInputs also specify a list of ModelApiColumns which describe the expected column schema of the tabular input. The type parameter can be one of the following:

  1. str
  2. int
  3. float
  4. bool
  5. list
  6. dict
  7. datetime.date
  8. datetime.time
  9. datetime.datetime
  10. typing.Any
  11. MediaReference [Beta]

The List and Dict type aliases from the typing library are also accepted. Some examples are as follows: List[str]; Dict[str, float]. Review the specific requirements on using these types when publishing the model as a function.

Parameter inputs

A ParameterInput is used to specify that an input provided to model.transform() is expected to be of a parameter type. Parameters are constant-valued inputs of a type specified with the type parameter. The following types are accepted for parameter inputs:

  1. str
  2. int
  3. float
  4. bool
  5. list (*)
  6. dict (*)
  7. typing.Any (*)
  8. palantir_models.NDArray for NumPy n-dimensional arrays

Parameter inputs can also specify a default value for when an input to model.transform() corresponding to the parameter input defined in the model adapter's api() method is not provided.

:::callout{theme="neutral"} Review the specific requirements on using these types when publishing the model as a function. :::

FileSystem inputs

A FileSystemInput is used to specify that an input provided to model.transform() is expected to be of a filesystem type. Only TransformInputs are supported as input types for FileSystemInput.

MediaReference inputs

A MediaReferenceInput is used to specify that an input provided to model.transform() is expected to be a media reference. Media references are expected to be of str type and contain the full media reference object definition. The model adapter will convert this media reference string to a MediaReference object which contains methods to interact with the media item being referenced.

Object and ObjectSet inputs

For Object or ObjectSet inputs, the object type is specified when defining the input in the model adapter API. This object type will be imported from an Ontology SDK generated for a chosen Ontology.

When performing model inference, the object or object set can be passed to the model in the following ways:

API enforcement

As a general principle, the model adapter API is not strictly enforced at inference time, although enforcement is generally stricter for live inference and particularly for Functions, which are strongly typed.

Parameter types are always enforced, and any parameter input to model.transform() that does not correspond to the designated type will throw an error.

The model adapter allows additional columns that are not specified in the API to be passed within a tabular input at inference time. Foundry will not drop additional columns from the input before passing it to the predict method. This is important to ensuring model adapters work with evaluation, as described in the Model Adapter API guide. In the batch inference case, the adapter will not throw an error if some columns specified in the API are missing. The MediaReference type (currently in Beta) is an exception. It expects each element in the column to be a media reference string. During batch inference, it will convert each element to a MediaReference object before being passed to this model adapter's inference logic both for batch and live inference.

In the live inference case, however, model inputs will be cast to their declared type (if defined) to ensure type safety. This is true of any columns in tabular inputs. A side-effect of this behavior is that an error will be thrown if a column with a specified type is not found in the input data, or if the data cannot be cast to the specified type.

For Object Inputs

  1. An instance of the OntologyObject type specified in the model adapter API's Object input.
  2. A primary key of the specified object type.

For ObjectSet Inputs

  1. An instance of and ObjectSet for the OntologyObject type specified in the model adapter API's ObjectSet input.
  2. An object set rid for the specified object type.

Model outputs

A ModelOutput contains the output types defined in the ModelAdapter.api method.

# ModelOutput can be imported from palantir_models.models.api

class ModelOutput:
    FileSystem = FileSystemOutput
    Parameter = ParameterOutput

class TabularOutput:
    def __init__(self, name: str):
        """
        Used to specify that a ModelAdapter will produce a tabular output.
        Only Pandas or Spark dataframes are supported as tabular output types.
        """

class ParameterOutput:
    def __init__(self, name: str, type: type, default = None):
        """
        Used to specify that a ModelAdapter will produce a constant value parameter of type 'type'.
        The available types for parameter outputs are: str, int, float, bool, list, dict.
        If not written to via `run_inference()`, the provided default value will be used.
        """

Both of the available model outputs act similarly to their input counterparts. One primary difference is that TabularOutput does not have a df_type parameter. It is able to accept both Pandas and Spark dataframes.

Model inference

The predict() method

For models with a single output of tabular or parameter type, the predict() method can be used instead of the run_inference() method to define the inference logic of the model adapter. The arguments of this method will be the names of the input objects defined in the model adapter's api() method. It is not required that the arguments retain the same order as they are defined, however the names must match.

predict() method example

The following example defines a predict() method for a multi-tabular-input, single-tabular-output model adapter.

import palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {"input_1": pm.Pandas(), "input_2": pm.Pandas()}
        outputs = {"output_dataframe": pm.Pandas()}
        return inputs, outputs

    def predict(self, input_1, input_2):
        resulting_dataframe = ... # Some inference logic using input_1 and input_2
        return resulting_dataframe

In the above example, the two inputs (input_1 and input_2) are referenced by name in the signature of the predict() method. The dataframe that the function returns, resulting_dataframe, will be written to the single output named output_dataframe.

The run_inference() method

In the case of multi-output models, or models that write to filesystems, custom inference logic must be defined via the run_inference() method. This method takes two arguments: inputs and outputs. Both of these arguments are NamedTuples whose names correspond to the name parameters of the inputs and outputs defined in the api() method.

Inputs

Referencing an input by name will access the object that was passed in to model.transform() corresponding to the api() input of the same name.

Example: Input

Given the following ModelAdapter definition:

from palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "df_in": pm.Pandas([("input_feature", float)])
        }
        outputs = {
            "df_out_one": pm.Pandas([("feature_one", int)]),
            "df_out_two": pm.Pandas([("output_feature_1", int),
                                     ("output_feature_2", float)])
        }
        return inputs, outputs

    def run_inference(self, inputs, outputs):
        my_input_df = inputs.df_in

        my_output_one = outputs.df_out_one
        my_output_two = outputs.df_out_two

    ...

And the following call to .transform():

@transform(
    my_input_data=Input(...),
    my_output_data_one=Output(...),
    my_output_data_two=Output(...),
    my_model=ModelInput(...)
)
def compute(my_input_data, my_output_data_one, my_output_data_two, my_model):
    my_model_outputs = my_model.transform(my_input_data)

The my_input_df object in the model adapter's run_inference() method, being a reference to the input named "input_dataframe" which is a tabular input of Pandas type, will be equal to a pandas representation of the my_input_data TransformInput that is passed in from the transform.

Outputs

Referencing an output by name will provide a writable object corresponding to the api() output of the same name. Each of these objects has a .write() method to specify which data will be written to each output.

Example: Output

Given the following ModelAdapter definition:

from palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "input_dataframe": pm.Pandas([('input_feature', float)])
        }
        outputs = {
            "output_dataframe_one": pm.Pandas([('output_feature', int)]),
            "output_dataframe_two": pm.Pandas([('output_feature', int)]),
        }
        return inputs, outputs

    def run_inference(self, inputs, outputs):
        my_input_df = inputs.input_dataframe
        my_output_dataframe_one = do_something_to_input_and_return_a_new_dataframe(my_input_df)
        my_output_dataframe_two = do_something_else_to_input_and_return_a_new_dataframe(my_input_df)
        outputs.output_dataframe_one.write(my_output_dataframe_one)
        outputs.output_dataframe_two.write(my_output_dataframe_two)

    ...

And the following call to .transform():

@transform(
    my_input_data=Input(...),
    my_output_data_one=Output(...),
    my_output_data_two=Output(...),
    my_model=ModelInput(...)
)
def compute(my_input_data, my_output_data_one, my_output_data_two, my_model):
    my_model_outputs = my_model.transform(my_input_data)
    my_output_dataframe_one = my_model_outputs.output_dataframe_one
    my_output_dataframe_two = my_model_outputs.output_dataframe_two
    my_output_data_one.write_pandas(my_output_dataframe_one)
    my_output_data_two.write_pandas(my_output_dataframe_two)

The my_output_dataframe_one and my_output_dataframe_two objects in the transform will be equal to the object that was written to the output_dataframe_one and output_dataframe_two outputs in the model adapter's run_inference() method (in this case, my_output_dataframe_one and my_output_dataframe_two).

Media references

Information on media references in Foundry can be found here.

In the case that parameter inputs or tabular input columns of type MediaReference are specified in the model adapter's api(), media reference strings that are provided via model.transform() will be converted to MediaReference objects. This object type provides methods to interact with the media reference.

class MediaReference:
    @property
    def media_reference(self):
        """
        The raw media reference string.
        """

    @property
    def media_item_rid(self):
        """
        The media item rid extracted from the media reference.
        """

    def get_media_item(self):
        """
        Returns the media item as a file-like object.
        """

    def get_media_item_via_access_pattern(self, access_pattern_name, access_pattern_path):
        """
        Returns the access pattern of the media item as a file-like object.
        Depending on the media set's persistence policy, this may cache the access pattern once calculated.
        """

    def transform_media_item(self, output_path, transformation):
        """
        Applies the transform to the media item and returns it as a file-like object.
        The output_path will be provided to the transformation.
        The transformation computation will be done by Mio, not by this Spark module.
        """

    def get_media_item_metadata(self):
        """
        Returns the media item metadata (width, height, etc.)
        """

中文翻译

API:ModelAdapter 参考

模型适配器(ModelAdapter) 是一个已发布的 Python 库,提供 Foundry 与存储的模型工件之间的通信层,使 Foundry 能够加载、初始化并对任何模型执行推理。

要实现 ModelAdapter,您必须了解以下列出的类:

ModelAdapter 实现

ModelAdapter 类是一个抽象基类,所有模型适配器实现都必须继承它。所有模型适配器必须实现四个抽象方法:

  1. load()
  2. save()
  3. 注意:对于使用默认序列化器(Default Serializers)定义的模型适配器,load()save() 不是必需的。
  4. api()
  5. 单输出使用 predict(),多输出使用 run_inference()
import palantir_models as pm
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(pm.ModelAdapter):

    @classmethod
    def load(
        cls,
        state_reader: pm.ModelStateReader,
        container_context: Optional[executable_api.ContainerizedApplicationContext] = None,
        external_model_context: Optional[executable_api.ExternalModelExecutionContext] = None
    ) -> "pm.ModelAdapter":
        """
        Python 或二进制模型:
            这是 Foundry 将调用的方法来反序列化您的 ModelAdapter。ModelAdapter 的作者需要编写逻辑,从 save 方法中保存/序列化模型的相同位置加载已训练模型的状态,并初始化模型。

        容器模型:
            这是 Foundry 在您的容器作为 sidecar 启动到此 ModelAdapter 后将调用的方法。ModelAdapter 的作者需要使用 container_context 的内容来初始化适配器可能需要的任何类变量。例如,用户通常会提取相关的服务 URI,以便在 #run_inference 中向容器发送 POST 请求。

        外部托管模型:
            这是 Foundry 在初始化模型适配器时将调用的方法。ModelAdapter 的作者需要编写逻辑来初始化并保持与外部托管模型的连接以及其他所需的模型配置。


        :param state_reader: 可用于读取模型文件的 ModelStateReader 对象。
        :param container_context: 仅提供给容器支持的模型,默认为 None。容器上下文
          包含从容器名称到服务 URI 的映射以及共享目录挂载路径。
        :param external_model_context: 仅提供给外部托管模型,默认为 None。external_model_context 包含用户在使用此模型适配器创建外部托管模型时定义的配置的引用。

        :return: ModelAdapter 的实例。
        """

    def save(self, state_writer: pm.ModelStateWriter) -> None:
        """
        这是 Foundry 将调用的方法来序列化您的模型适配器。仅当此 ModelAdapter 用于包装 Foundry 中新训练或重新拟合的模型时才需要此方法。

        ModelAdapter 的作者需要编写逻辑,将已训练模型的状态和相关元数据保存到 ModelStateWriter。

        :param state_writer: 模型序列化并保存到的 ModelStateWriter 对象。
        """

    @classmethod
    def api(cls) -> pm.ModelApi:
        """
        定义此模型的预期输入和输出数据结构。

        :return: 模型的 ModelApi 对象
        """

    def run_inference(self, inputs, outputs) -> None:
        """
        将使用 ModelAdapter.api 方法中定义的相应输入和输出来调用此方法。

        对关联模型执行推理。

        :param inputs: ModelAdapter.api 方法中定义的输入的命名元组。
        :param outputs: ModelAdapter.api 方法中定义的输出的命名元组。
          应在 run_inference 方法中写入输出。
        """

    def predict(self, *args, **kwargs) -> Union[Tabular, Parameter]:
        """
        此方法用于对多(>=1)输入到单输出(表格或参数)模型执行推理。
        输入应通过 api() 方法中定义它们的名称写入此方法的签名中。结果输出由此方法返回。

        注意:如果使用了 predict(),则用户不需要定义 run_inference() 方法。
        """

模型的 save()load()

palantir_models.serializers 库提供了许多默认序列化器(Default Serializers),可用于常见建模框架的模型序列化(保存)和反序列化(加载)。

在某些情况下,用户可能希望为模型序列化或反序列化实现自定义逻辑。例如,当您使用的建模框架没有可用的默认序列化器,或者当您需要手动控制任何时候加载到内存中的模型时,这可能是必要的。

在这些更复杂的情况下,请参阅下面的 saveload 实现。

Save

ModelStateWriter

ModelStateWriter 提供给 ModelAdapter.save 方法,以便 ModelAdapter 可以将模型工件保存/序列化到 Foundry 存储。

# ModelStateWriter 可以从 palantir_models.models 导入

from io import IOBase
from typing import ContextManager

class ModelStateWriter:
    def open(self, asset_file_path: str, mode: str = "wb") -> ContextManager[IOBase]:
        """
        打开一个类似文件的对象来序列化模型工件和参数。
        """

    def put_file(self, local_file_path, asset_file_path=None):
        """
        将本地文件放入此模型的存储库中。
        :param asset_file_path: 如果提供,文件将放置在此路径的存储库中。
            否则文件将放置在存储库的根目录中。
        """

    def put_directory(self, local_root_path, asset_root_path = "/"):
        """
        将本地目录及其内容放入此模型的存储库中。
        :param asset_root_path: 相对于存储库根路径的路径,用于放置此目录。
        """

每当模型适配器通过 model.publish() 在转换中发布时,都会调用 save() 方法,此时 ModelStateWriter 的内容会被打包成一个 zip 文件,该文件持久化在新创建的模型版本引用的工件存储库中。

示例:ModelStateWriter

以下示例将模型保存为 model.pkl 文件。

from palantir_models.models import ModelAdapter, ModelStateWriter

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, model):
        self.model = model

    ...

    def save(self, state_writer: ModelStateWriter):
        with state_writer.open("model.pkl", "wb") as model_outfile:
            pickle.dump(self.model, model_outfile)

    ...

Load

ModelStateReader

ModelStateReader 提供给 ModelAdapter.load 方法,以便 ModelAdapter 可以读取/反序列化保存的模型工件并初始化模型。

# ModelStateReader 可以从 palantir_models.models 导入
from tempfile import TemporaryDirectory

class ModelStateReader:
    def open(self, asset_file_path: str, mode: str = "rb") -> ContextManager[IOBase]:
        """
        打开一个类似文件的对象来反序列化模型工件和参数。
        """

    def extract_to_temp_dir(self, root_dir: str = None) -> AnyStr:
        """
        返回一个包含与此模型关联的模型工件的 TempDirectory。
        :param root_dir: 如果指定,要提取的根目录
        """

    def extract(self, destination_path: str = None) -> None:
        """
        将存储库提取到提供的本地目录路径。
        :param destination_path: 如果指定,存储库将被提取到的目录
        """

每当模型适配器被实例化时(通过转换中的 ModelInput,或启动由模型支持的实时或批量部署),都会调用 load() 方法。此方法然后访问 ModelStateWriter 写入的相同工件存储库,并通过 ModelStateReader 提供对其内容的访问。load() 在任何转换或推理逻辑执行之前被调用。

示例:ModelStateReader

以下示例加载一个模型文件并返回一个使用该模型初始化的模型适配器实例。

from palantir_models.models import ModelAdapter, ModelStateReader

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, model):
        self.model = model

    @classmethod
    def load(cls, state_reader: ModelStateReader):
        with state_reader.open("model.pkl", "rb") as model_infile:
            model = pickle.load(model_infile)
        return cls(model)

    ...

当与上面显示的 save() 方法结合使用时,此方法将检索在 save() 方法中持久化的同一个 model.pkl 对象。ModelStateReader 对象也提供给容器化模型,并由用户上传的 zip 文件支持。

ContainerizedApplicationContext

ContainerizedApplicationContext 是可选的,仅当模型由容器镜像支持时才会提供给 ModelAdapter.load 方法。上下文对象包括一个共享目录挂载路径和从容器名称到其服务 URI 的映射。每个容器可以有多个服务 URI,因为拥有多个开放端口是有效的。

# 注意:在编写的适配器中不需要导入此类类型
class ContainerizedApplicationContext:
    def services(self) -> Dict[str, List[str]]:
        """
        从单个容器名称到该容器提供的服务 URI 列表的映射。
        """

    def shared_empty_dir_mount_path(self) -> str:
        """
        在所有容器内部和模型适配器中都可用的共享空目录的挂载路径。
        该目录对容器和模型入口点都是可读可写的。
        """

一个填充后的 services 变量示例可能如下所示:

{
    "container1": ["localhost:8080"],
    "container2": ["localhost:8080", "localhost:8081"],
}

示例:ContainerizedApplicationContext

以下示例使用提供的 ContainerizedApplicationContext 中的特定卷路径、主机和端口初始化模型适配器。

from palantir_models.models import ModelAdapter, ModelStateReader
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, shared_volume_path, model_host_and_port):
        self.shared_volume_path = shared_volume_path
        self.model_host_and_port = model_host_and_port

    @classmethod
    def load(cls, state_reader, container_context: executable_api.ContainerizedApplicationContext):
        shared_volume_path = container_context.shared_empty_dir_mount_path
        model_host_and_port = container_context.services["container1"][0]
        return cls(shared_volume_path, model_host_and_port)

    ...

ExternalModelContext

ExternalModelContext 是可选的,仅当模型是外部托管模型时才会提供给 ModelAdapter.load 方法。上下文对象包含一个表示外部托管模型的对象,以及用户定义的用于连接到此外部托管模型的解密密钥映射。

# 注意:在编写的适配器中不需要导入此类类型
class ExternalModelContext:
    def external_model(self) -> models_api_external_ExternalModel:
        """
        表示外部托管模型的对象,特别包含 base_url 和 connection_configuration。
        """

    def resolved_credentials(self) -> Dict[str, str]:
        """
        用户定义的解密密钥值映射,用于与此外部托管模型连接。
        """

# 注意:在编写的适配器中不需要导入此类类型
class models_api_external_ExternalModel:
    def base_url(self) -> str:
        """
        用户定义的 URL,表示此外部托管模型的托管位置。
        """

    def connection_configuration(self) -> Dict[str, str]:
        """
        用户定义的未加密配置字段字典。
        这旨在存储特定的配置细节,如模型名称、推理参数或预测阈值。
        """

示例:ExternalApplicationContext

以下示例初始化一个模型适配器,该适配器向外部托管模型执行请求。有关使用外部托管模型的更多信息,请阅读文档

from palantir_models.models import ModelAdapter, ModelStateReader
import models_api.models_api_executable as executable_api

class ExampleModelAdapter(ModelAdapter):
    def __init__(self, url, credentials_map, configuration_map):
        # 从"连接配置"映射中提取模型配置
        model_name = configuration_map['model_name']
        model_parameter = configuration_map['model_parameter']

        # 从"凭据配置"映射中提取模型凭据
        secret_key = credentials_map['secret_key']

        # 在模型加载时初始化 http 客户端
        self.client = ExampleClient(url, model_name, model_parameter, secret_key)

    @classmethod
    def load(
            cls,
            state_reader: ModelStateReader,
            container_context: Optional[executable_api.ContainerizedApplicationContext] = None,
            external_model_context: Optional[executable_api.ExternalModelExecutionContext] = None,
            ) -> "ModelAdapter":
        return cls(
            url=external_model_context.external_model.base_url,
            credentials_map=external_model_context.resolved_credentials,
            configuration_map=external_model_context.external_model.connection_configuration,
        )

    ...

模型 API

模型适配器的 api() 方法指定了执行此模型适配器推理逻辑所需的预期输入和输出。输入和输出是分别指定的。

示例 api()

import palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "df_in": pm.Pandas([('input_feature', float)])
        }
        outputs = {
            "df_out": pm.Pandas([('output_feature', int)])
        }
        return inputs, outputs

    ...

模型输入

ModelInput 类型包含可以在 ModelAdapter.api 方法中定义的输入类型。模型适配器支持以下输入类型:

  1. 表格(Tabular)
  2. 参数(Parameter)
  3. 文件系统(FileSystem)
  4. 媒体引用(MediaReference) [Beta]
  5. 对象(Object)
  6. 对象集(ObjectSet)

:::callout{theme="neutral" title="Beta"} 媒体引用(MediaReference)支持处于测试版(Beta)开发阶段,仅支持 Python 转换中的推理,可能不适用于您的注册环境。模型中的媒体引用(MediaReference)不支持建模目标应用程序中的自动评估、批量部署或实时部署。功能在活跃开发期间可能会发生变化。 :::

# DFType 和 ModelInput 可以从 palantir_models.models.api 导入

class ModelInput:
    Tabular = TabularInput
    FileSystem = FileSystemInput
    Parameter = ParameterInput
    MediaReference = MediaReferenceInput

class TabularInput:
    def __init__(self, name: str, df_type: DFType = DFType.SPARK, columns: List[ModelApiColumn]):
        """
        用于指定 ModelAdapter 期望表格输入。
        如果适用,此输入类型会将表格输入转换为 `df_type` 中指定的类型。
        Pandas 数据框、Spark 数据框和 TransformInput 被接受为表格输入类型。
        """

class ParameterInput:
    def __init__(self, name: str, type: type, default = None):
        """
        用于指定 ModelAdapter 期望类型为 'type' 的常量值参数。
        参数输入可用的类型有:str、int、float、bool、list、dict。
        如果未直接传递给 .transform() 的参数,将使用提供的默认值。
        """

class FileSystemInput:
    def __init__(self, name: str):
        """
        用于指定 ModelAdapter 期望文件系统输入。
        """

class MediaReferenceInput:
    def __init__(self, name: str):
        """
        用于指定 ModelAdapter 期望媒体引用作为输入。
        """

class ObjectInput:
    def __init__(self, object_type: OntologyObject):
        """
        用于指定 ModelAdapter 期望 Python OSDK 对象作为输入。
        """

class ObjectSetInput:
    def __init__(self, object_type: OntologyObject):
        """
        用于指定 ModelAdapter 期望 Python OSDK 对象集作为输入。
        """

class DFType(Enum):
    SPARK = "spark"
    PANDAS = "pandas"

class ModelApiColumn(NamedTuple):
    """
    用于指定表格输入列的名称和类型。
    """
    name: str
    type: type
    required: bool = True

表格输入

TabularInput 用于指定提供给 model.transform() 的输入预期为表格类型。在此模型适配器的推理逻辑上下文中,此输入的类型将是 api() 方法中指定的 df_type 参数。如果需要,将执行适当的类型转换。允许的表格类型如下:

  1. TransformInput
  2. Pandas DataFrame
  3. Spark DataFrame
  4. 注意:Spark DataFrame 仅在适配器 API 中指定 df_type=DFType.SPARK 的表格输入中受支持。对于指定 DFType.PANDAS 的表格输入,不支持 Spark DataFrame 的转换。

列类型

TabularInput 还指定了一个 ModelApiColumn 列表,描述表格输入的预期列模式。type 参数可以是以下之一:

  1. str
  2. int
  3. float
  4. bool
  5. list
  6. dict
  7. datetime.date
  8. datetime.time
  9. datetime.datetime
  10. typing.Any
  11. MediaReference [Beta]

来自 typing 库的 ListDict 类型别名也被接受。一些示例如下: List[str]Dict[str, float]。在将模型发布为函数时,请查看关于使用这些类型的特定要求

参数输入

ParameterInput 用于指定提供给 model.transform() 的输入预期为参数类型。参数是使用 type 参数指定类型的常量值输入。参数输入接受以下类型:

  1. str
  2. int
  3. float
  4. bool
  5. list (*)
  6. dict (*)
  7. typing.Any (*)
  8. palantir_models.NDArray 用于 NumPy n 维数组

参数输入还可以指定一个 default 值,用于当未提供与模型适配器的 api() 方法中定义的参数输入对应的 model.transform() 输入时。

:::callout{theme="neutral"} 在将模型发布为函数时,请查看关于使用这些类型的特定要求。 :::

文件系统输入

FileSystemInput 用于指定提供给 model.transform() 的输入预期为文件系统类型。只有 TransformInput 被支持作为 FileSystemInput 的输入类型。

媒体引用输入

MediaReferenceInput 用于指定提供给 model.transform() 的输入预期为媒体引用。媒体引用预期为 str 类型,并包含完整的媒体引用对象定义。模型适配器会将此媒体引用字符串转换为 MediaReference 对象,该对象包含与所引用媒体项交互的方法。

对象和对象集输入

对于 ObjectObjectSet 输入,在模型适配器 API 中定义输入时指定对象类型。此对象类型将从为所选本体论(Ontology)生成的本体论 SDK(Ontology SDK)中导入。

在执行模型推理时,可以通过以下方式将对象或对象集传递给模型:

API 强制执行

作为一般原则,模型适配器 API 在推理时不会严格强制执行,尽管对于实时推理(Live Inference)以及特别是对于强类型的函数(Functions),强制执行通常更严格。

参数类型始终被强制执行,任何与指定类型不对应的 model.transform() 的参数输入都将抛出错误。

模型适配器允许在推理时在表格输入中传递 API 中未指定的额外列。Foundry 在将输入传递给 predict 方法之前不会删除输入中的额外列。这对于确保模型适配器与评估一起工作很重要,如模型适配器 API 指南中所述。在批量推理的情况下,如果 API 中指定的某些列缺失,适配器不会抛出错误。MediaReference 类型(目前处于 Beta 阶段)是一个例外。它期望列中的每个元素都是媒体引用字符串。在批量推理期间,它会在将每个元素传递给此模型适配器的推理逻辑之前将其转换为 MediaReference 对象,适用于批量推理和实时推理。

然而,在实时推理的情况下,模型输入将被转换为声明的类型(如果已定义)以确保类型安全。这对于表格输入中的任何列都是如此。此行为的一个副作用是,如果在输入数据中找不到具有指定类型的列,或者数据无法转换为指定类型,则会抛出错误。

对于对象输入

  1. 模型适配器 API 的对象输入中指定的 OntologyObject 类型的实例。
  2. 指定对象类型的主键。

对于对象集输入

  1. 模型适配器 API 的对象集输入中指定的 OntologyObject 类型的 ObjectSet 实例。
  2. 指定对象类型的对象集 rid。

模型输出

ModelOutput 包含在 ModelAdapter.api 方法中定义的输出类型。

# ModelOutput 可以从 palantir_models.models.api 导入

class ModelOutput:
    FileSystem = FileSystemOutput
    Parameter = ParameterOutput

class TabularOutput:
    def __init__(self, name: str):
        """
        用于指定 ModelAdapter 将产生表格输出。
        只有 Pandas 或 Spark 数据框被支持作为表格输出类型。
        """

class ParameterOutput:
    def __init__(self, name: str, type: type, default = None):
        """
        用于指定 ModelAdapter 将产生类型为 'type' 的常量值参数。
        参数输出可用的类型有:str、int、float、bool、list、dict。
        如果未通过 `run_inference()` 写入,将使用提供的默认值。
        """

两种可用的模型输出与其对应的输入类似。一个主要区别是 TabularOutput 没有 df_type 参数。它能够接受 Pandas 和 Spark 数据框。

模型推理

predict() 方法

对于具有表格或参数类型的单输出的模型,可以使用 predict() 方法代替 run_inference() 方法来定义模型适配器的推理逻辑。此方法的参数将是模型适配器的 api() 方法中定义的输入对象的名称。不要求参数保持与定义相同的顺序,但名称必须匹配。

predict() 方法示例

以下示例为多表格输入、单表格输出的模型适配器定义了一个 predict() 方法。

import palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {"input_1": pm.Pandas(), "input_2": pm.Pandas()}
        outputs = {"output_dataframe": pm.Pandas()}
        return inputs, outputs

    def predict(self, input_1, input_2):
        resulting_dataframe = ... # 使用 input_1 和 input_2 的一些推理逻辑
        return resulting_dataframe

在上面的示例中,两个输入(input_1input_2)在 predict() 方法的签名中通过名称引用。函数返回的数据框 resulting_dataframe 将被写入名为 output_dataframe 的单个输出。

run_inference() 方法

对于多输出模型或写入文件系统的模型,必须通过 run_inference() 方法定义自定义推理逻辑。此方法接受两个参数:inputsoutputs。这两个参数都是 NamedTuples,其名称对应于 api() 方法中定义的输入和输出的 name 参数。

输入

通过名称引用输入将访问传递给 model.transform() 的、与同名的 api() 输入对应的对象。

示例:输入

给定以下 ModelAdapter 定义:

from palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "df_in": pm.Pandas([("input_feature", float)])
        }
        outputs = {
            "df_out_one": pm.Pandas([("feature_one", int)]),
            "df_out_two": pm.Pandas([("output_feature_1", int),
                                     ("output_feature_2", float)])
        }
        return inputs, outputs

    def run_inference(self, inputs, outputs):
        my_input_df = inputs.df_in

        my_output_one = outputs.df_out_one
        my_output_two = outputs.df_out_two

    ...

以及对 .transform() 的以下调用:

@transform(
    my_input_data=Input(...),
    my_output_data_one=Output(...),
    my_output_data_two=Output(...),
    my_model=ModelInput(...)
)
def compute(my_input_data, my_output_data_one, my_output_data_two, my_model):
    my_model_outputs = my_model.transform(my_input_data)

模型适配器的 run_inference() 方法中的 my_input_df 对象,作为对名为 "input_dataframe" 的输入的引用(这是一个 Pandas 类型的表格输入),将等于从转换中传入的 my_input_data TransformInput 的 pandas 表示。

输出

通过名称引用输出将提供一个可写对象,对应于同名的 api() 输出。每个这些对象都有一个 .write() 方法,用于指定将哪些数据写入每个输出。

示例:输出

给定以下 ModelAdapter 定义:

from palantir_models as pm

class ExampleModelAdapter(pm.ModelAdapter):
    ...

    @classmethod
    def api(cls):
        inputs = {
            "input_dataframe": pm.Pandas([('input_feature', float)])
        }
        outputs = {
            "output_dataframe_one": pm.Pandas([('output_feature', int)]),
            "output_dataframe_two": pm.Pandas([('output_feature', int)]),
        }
        return inputs, outputs

    def run_inference(self, inputs, outputs):
        my_input_df = inputs.input_dataframe
        my_output_dataframe_one = do_something_to_input_and_return_a_new_dataframe(my_input_df)
        my_output_dataframe_two = do_something_else_to_input_and_return_a_new_dataframe(my_input_df)
        outputs.output_dataframe_one.write(my_output_dataframe_one)
        outputs.output_dataframe_two.write(my_output_dataframe_two)

    ...

以及对 .transform() 的以下调用:

@transform(
    my_input_data=Input(...),
    my_output_data_one=Output(...),
    my_output_data_two=Output(...),
    my_model=ModelInput(...)
)
def compute(my_input_data, my_output_data_one, my_output_data_two, my_model):
    my_model_outputs = my_model.transform(my_input_data)
    my_output_dataframe_one = my_model_outputs.output_dataframe_one
    my_output_dataframe_two = my_model_outputs.output_dataframe_two
    my_output_data_one.write_pandas(my_output_dataframe_one)
    my_output_data_two.write_pandas(my_output_dataframe_two)

转换中的 my_output_dataframe_onemy_output_dataframe_two 对象将等于在模型适配器的 run_inference() 方法中写入 output_dataframe_oneoutput_dataframe_two 输出的对象(在本例中为 my_output_dataframe_onemy_output_dataframe_two)。

媒体引用

有关 Foundry 中媒体引用的信息,请参见此处

如果在模型适配器的 api() 中指定了 MediaReference 类型的参数输入或表格输入列,则通过 model.transform() 提供的媒体引用字符串将被转换为 MediaReference 对象。此对象类型提供了与媒体引用交互的方法。

class MediaReference:
    @property
    def media_reference(self):
        """
        原始媒体引用字符串。
        """

    @property
    def media_item_rid(self):
        """
        从媒体引用中提取的媒体项 rid。
        """

    def get_media_item(self):
        """
        将媒体项作为类似文件的对象返回。
        """

    def get_media_item_via_access_pattern(self, access_pattern_name, access_pattern_path):
        """
        将媒体项的访问模式作为类似文件的对象返回。
        根据媒体集的持久化策略,这可能会在计算后缓存访问模式。
        """

    def transform_media_item(self, output_path, transformation):
        """
        对媒体项应用转换并将其作为类似文件的对象返回。
        output_path 将提供给转换。
        转换计算将由 Mio 完成,而不是由此 Spark 模块。
        """

    def get_media_item_metadata(self):
        """
        返回媒体项元数据(宽度、高度等)。
        """