跳转至

Python SDK

The Python SDK for compute modules enables you to build deployed functions that integrate with Foundry. The SDK provides decorators for function registration, authentication helpers, structured logging, and utilities for working with Foundry resources.

The Python SDK provides these key modules:

Module Purpose
compute_modules.annotations @function decorator for registering functions
compute_modules.auth RefreshingOauthToken, retrieve_third_party_id_and_creds() for authentication
compute_modules.logging get_logger(), set_internal_log_level() for structured logging
compute_modules.sources_v2 get_source() for accessing configured sources

Defining functions

Basic function

Every function annotated with @function is automatically registered as an endpoint in your compute module:

from compute_modules.annotations import function

@function
def my_function(context, event):
    """
    context: Dict with metadata (tokens, credentials, job ID, etc.)
    event: Dict with function parameters
    """
    return str(event['value'])

Function signature requirements:

  1. context: Metadata about the invocation (job ID, credentials, etc.)
  2. event: The input payload (can be typed with TypedDict or dataclass)

The return value must be JSON serializable.

Using typed inputs and outputs enables automatic schema inference and provides better IDE support.

Using TypedDict:

from typing import TypedDict
from compute_modules.annotations import function

class CalculateInput(TypedDict):
    x: float
    y: float
    operation: str

class CalculateOutput(TypedDict):
    result: float
    operation: str

@function
def calculate(context, event: CalculateInput) -> CalculateOutput:
    ops = {
        'add': lambda a, b: a + b,
        'subtract': lambda a, b: a - b,
        'multiply': lambda a, b: a * b,
        'divide': lambda a, b: a / b if b != 0 else float('inf')
    }
    result = ops.get(event['operation'], lambda a, b: 0)(event['x'], event['y'])
    return {'result': result, 'operation': event['operation']}

Using dataclass:

from dataclasses import dataclass
from compute_modules.annotations import function
import datetime

@dataclass
class EventInput:
    name: str
    timestamp: datetime.datetime
    value: float

@function
def process_event(context, event: EventInput) -> str:
    return f"Processed {event.name} at {event.timestamp} with value {event.value}"

Learn more about type systems and automatic schema inference.

Streaming output

For large result sets, use streaming to avoid memory issues and provide progressive results:

from compute_modules.annotations import function
from typing import Iterable

@function(streaming=True)
def generate_items(context, event) -> Iterable[str]:
    for i in range(event['count']):
        yield f"Item {i}"

:::callout{theme="warning"} Important: Generators are NOT JSON-serializable. You MUST use streaming=True when returning a generator. :::

Authentication

Application permissions (3PA)

Use application permissions (third-party application authorization) to access Foundry resources on behalf of users. The RefreshingOauthToken class automatically refreshes tokens every 30 minutes:

from compute_modules.auth import RefreshingOauthToken, retrieve_third_party_id_and_creds

HOSTNAME = "yourenrollment.palantirfoundry.com"

# Get client ID and secret
client_id, client_secret = retrieve_third_party_id_and_creds()

# Create refreshing token (refreshes every 30 minutes by default)
refreshing_token = RefreshingOauthToken(
    hostname=HOSTNAME,
    scope=["api:datasets-read", "api:datasets-write"]
)

# Get token (always returns valid token)
access_token = refreshing_token.get_token()

Learn more about authentication modes.

Working with Foundry resources

Using the Ontology SDK

The Python SDK integrates with the Ontology SDK (OSDK) for working with Ontology objects:

import os
from compute_modules.annotations import function
from compute_modules.auth import retrieve_third_party_id_and_creds
from your_osdk_package import FoundryClient, ConfidentialClientAuth

foundry_url = os.environ["FOUNDRY_URL"]
CLIENT_ID, CLIENT_CREDS = retrieve_third_party_id_and_creds()

@function
def get_object(context, event):
    auth = ConfidentialClientAuth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_CREDS,
        hostname=foundry_url,
        should_refresh=True,
        scopes=[
            "api:ontologies-read",
            "api:ontologies-write",
        ],
    )
    client = FoundryClient(auth=auth, hostname=foundry_url)
    employee = client.ontology.objects.Employee
    return str(employee.take(1))

Learn more about OSDK integration.

Reading and writing datasets

Use the Foundry APIs to read and write dataset files:

import logging
import os
import requests
from dataclasses import dataclass
from compute_modules.annotations import function
from compute_modules.auth import RefreshingOauthToken
from compute_modules.logging import get_logger

logger = get_logger(__name__)
logger.setLevel(logging.INFO)

FOUNDRY_URL = os.environ["FOUNDRY_URL"]
BASE_URL = f"https://{FOUNDRY_URL}"
refreshing_token = RefreshingOauthToken(
    hostname=FOUNDRY_URL,
    scope=["api:datasets-read", "api:datasets-write"]
)
CA_PATH = os.environ["DEFAULT_CA_PATH"]

@dataclass
class UploadFileRequest:
    dataset_rid: str
    file_path: str
    file_content: str

@dataclass
class UploadFileResponse:
    status: int

@function
def upload_file(context, event: UploadFileRequest) -> UploadFileResponse:
    """Write a file to a dataset."""
    logger.info(f"Uploading file to path: {event.file_path}")
    url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/upload"
    response = requests.post(
        url,
        params={"transactionType": "APPEND", "branchName": "master"},
        headers={
            "Authorization": f"Bearer {refreshing_token.get_token()}",
            "Content-Type": "application/octet-stream",
        },
        data=event.file_content,
        verify=CA_PATH,
    )
    return UploadFileResponse(status=response.status_code)

@dataclass
class GetFileRequest:
    dataset_rid: str
    file_path: str

@dataclass
class GetFileResponse:
    status: int
    file_content: str

@function
def get_file(context, event: GetFileRequest) -> GetFileResponse:
    """Read contents of a file from a dataset."""
    logger.info(f"Getting file from {event.file_path}")
    url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/content"
    response = requests.get(
        url,
        headers={"Authorization": f"Bearer {refreshing_token.get_token()}"},
        verify=CA_PATH,
    )
    return GetFileResponse(
        status=response.status_code,
        file_content=response.content.decode('utf-8'),
    )

Logging

The SDK provides structured logging that automatically includes session IDs, job IDs, and process IDs:

import logging
from compute_modules.logging import get_logger

logger = get_logger(__name__)
logger.setLevel(logging.INFO)

@function
def my_function(context, event) -> str:
    logger.debug(f"Input: {event}")
    try:
        result = process(event)
        logger.info(f"Success: {result}")
        return result
    except Exception as e:
        logger.error(f"Failed: {e}", exc_info=True)
        raise

Log levels:

logger.debug("Debug logs")     # severity = 0
logger.info("Info logs")       # severity = 1
logger.warning("Warning logs") # severity = 2
logger.error("Error logs")     # severity = 3
logger.critical("Critical logs") # severity = 4

Set the minimum log level using logger.setLevel(logging.INFO). Only logs with severity greater than or equal to the set level will appear.

Learn more about debugging and viewing logs.

Using sources

Compute modules can access external systems through configured sources. The SDK provides utilities for using sources:

from compute_modules.sources_v2 import get_source

# Access source credentials
source = get_source("my-source-identifier")

:::callout{theme="warning"} Additional dependencies required: To use sources, you must run:

pip install foundry-compute-modules[sources] :::

Learn more about configuring and using sources.

GitHub repository

The Python SDK is open source and available on GitHub:


中文翻译


Python SDK

Python SDK(软件开发工具包)为计算模块提供支持,使您能够构建与 Foundry 集成的已部署函数。该 SDK 提供了函数注册的装饰器(decorator)、身份验证辅助工具、结构化日志记录(structured logging)以及用于处理 Foundry 资源的实用程序。

Python SDK 提供以下关键模块:

模块 用途
compute_modules.annotations 用于注册函数的 @function 装饰器
compute_modules.auth 用于身份验证的 RefreshingOauthTokenretrieve_third_party_id_and_creds()
compute_modules.logging 用于结构化日志记录的 get_logger()set_internal_log_level()
compute_modules.sources_v2 用于访问已配置数据源(source)的 get_source()

定义函数

基本函数

每个带有 @function 注解的函数都会自动注册为计算模块中的一个端点(endpoint):

from compute_modules.annotations import function

@function
def my_function(context, event):
    """
    context: 包含元数据的字典(令牌、凭据、任务ID等)
    event: 包含函数参数的字典
    """
    return str(event['value'])

函数签名要求:

  1. context 关于调用(任务ID、凭据等)的元数据
  2. event 输入负载(可使用 TypedDict 或 dataclass 进行类型标注)

返回值必须是 JSON 可序列化的。

类型化函数(推荐)

使用类型化的输入和输出可以实现自动模式推断(schema inference),并提供更好的 IDE 支持。

使用 TypedDict:

from typing import TypedDict
from compute_modules.annotations import function

class CalculateInput(TypedDict):
    x: float
    y: float
    operation: str

class CalculateOutput(TypedDict):
    result: float
    operation: str

@function
def calculate(context, event: CalculateInput) -> CalculateOutput:
    ops = {
        'add': lambda a, b: a + b,
        'subtract': lambda a, b: a - b,
        'multiply': lambda a, b: a * b,
        'divide': lambda a, b: a / b if b != 0 else float('inf')
    }
    result = ops.get(event['operation'], lambda a, b: 0)(event['x'], event['y'])
    return {'result': result, 'operation': event['operation']}

使用 dataclass:

from dataclasses import dataclass
from compute_modules.annotations import function
import datetime

@dataclass
class EventInput:
    name: str
    timestamp: datetime.datetime
    value: float

@function
def process_event(context, event: EventInput) -> str:
    return f"Processed {event.name} at {event.timestamp} with value {event.value}"

了解更多关于类型系统和自动模式推断的信息。

流式输出

对于大型结果集,使用流式输出(streaming)可以避免内存问题并提供渐进式结果:

from compute_modules.annotations import function
from typing import Iterable

@function(streaming=True)
def generate_items(context, event) -> Iterable[str]:
    for i in range(event['count']):
        yield f"Item {i}"

:::callout{theme="warning"} 重要提示: 生成器(Generator)不是 JSON 可序列化的。返回生成器时,必须使用 streaming=True。 :::

身份验证

应用权限(3PA)

使用应用权限(第三方应用授权)代表用户访问 Foundry 资源。RefreshingOauthToken 类会自动每 30 分钟刷新令牌:

from compute_modules.auth import RefreshingOauthToken, retrieve_third_party_id_and_creds

HOSTNAME = "yourenrollment.palantirfoundry.com"

# 获取客户端ID和密钥
client_id, client_secret = retrieve_third_party_id_and_creds()

# 创建可刷新的令牌(默认每30分钟刷新一次)
refreshing_token = RefreshingOauthToken(
    hostname=HOSTNAME,
    scope=["api:datasets-read", "api:datasets-write"]
)

# 获取令牌(始终返回有效令牌)
access_token = refreshing_token.get_token()

了解更多关于身份验证模式的信息。

使用 Foundry 资源

使用 Ontology SDK

Python SDK 与 Ontology SDK(OSDK)集成,用于处理 Ontology 对象:

import os
from compute_modules.annotations import function
from compute_modules.auth import retrieve_third_party_id_and_creds
from your_osdk_package import FoundryClient, ConfidentialClientAuth

foundry_url = os.environ["FOUNDRY_URL"]
CLIENT_ID, CLIENT_CREDS = retrieve_third_party_id_and_creds()

@function
def get_object(context, event):
    auth = ConfidentialClientAuth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_CREDS,
        hostname=foundry_url,
        should_refresh=True,
        scopes=[
            "api:ontologies-read",
            "api:ontologies-write",
        ],
    )
    client = FoundryClient(auth=auth, hostname=foundry_url)
    employee = client.ontology.objects.Employee
    return str(employee.take(1))

了解更多关于 OSDK 集成的信息。

读取和写入数据集

使用 Foundry API 读取和写入数据集文件:

import logging
import os
import requests
from dataclasses import dataclass
from compute_modules.annotations import function
from compute_modules.auth import RefreshingOauthToken
from compute_modules.logging import get_logger

logger = get_logger(__name__)
logger.setLevel(logging.INFO)

FOUNDRY_URL = os.environ["FOUNDRY_URL"]
BASE_URL = f"https://{FOUNDRY_URL}"
refreshing_token = RefreshingOauthToken(
    hostname=FOUNDRY_URL,
    scope=["api:datasets-read", "api:datasets-write"]
)
CA_PATH = os.environ["DEFAULT_CA_PATH"]

@dataclass
class UploadFileRequest:
    dataset_rid: str
    file_path: str
    file_content: str

@dataclass
class UploadFileResponse:
    status: int

@function
def upload_file(context, event: UploadFileRequest) -> UploadFileResponse:
    """将文件写入数据集。"""
    logger.info(f"正在上传文件到路径:{event.file_path}")
    url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/upload"
    response = requests.post(
        url,
        params={"transactionType": "APPEND", "branchName": "master"},
        headers={
            "Authorization": f"Bearer {refreshing_token.get_token()}",
            "Content-Type": "application/octet-stream",
        },
        data=event.file_content,
        verify=CA_PATH,
    )
    return UploadFileResponse(status=response.status_code)

@dataclass
class GetFileRequest:
    dataset_rid: str
    file_path: str

@dataclass
class GetFileResponse:
    status: int
    file_content: str

@function
def get_file(context, event: GetFileRequest) -> GetFileResponse:
    """从数据集中读取文件内容。"""
    logger.info(f"正在从 {event.file_path} 获取文件")
    url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/content"
    response = requests.get(
        url,
        headers={"Authorization": f"Bearer {refreshing_token.get_token()}"},
        verify=CA_PATH,
    )
    return GetFileResponse(
        status=response.status_code,
        file_content=response.content.decode('utf-8'),
    )

日志记录

SDK 提供结构化日志记录,会自动包含会话ID、任务ID和进程ID:

import logging
from compute_modules.logging import get_logger

logger = get_logger(__name__)
logger.setLevel(logging.INFO)

@function
def my_function(context, event) -> str:
    logger.debug(f"输入:{event}")
    try:
        result = process(event)
        logger.info(f"成功:{result}")
        return result
    except Exception as e:
        logger.error(f"失败:{e}", exc_info=True)
        raise

日志级别:

logger.debug("调试日志")     # 严重级别 = 0
logger.info("信息日志")       # 严重级别 = 1
logger.warning("警告日志")   # 严重级别 = 2
logger.error("错误日志")     # 严重级别 = 3
logger.critical("严重日志")  # 严重级别 = 4

使用 logger.setLevel(logging.INFO) 设置最低日志级别。只有严重级别大于或等于所设级别的日志才会显示。

了解更多关于调试和查看日志的信息。

使用数据源

计算模块可以通过已配置的数据源访问外部系统。SDK 提供了使用数据源的实用程序:

from compute_modules.sources_v2 import get_source

# 访问数据源凭据
source = get_source("my-source-identifier")

:::callout{theme="warning"} 需要额外依赖: 要使用数据源,您必须运行:

pip install foundry-compute-modules[sources] :::

了解更多关于配置和使用数据源的信息。

GitHub 仓库

Python SDK 是开源的,可在 GitHub 上获取: