Python SDK¶
The Python SDK for compute modules enables you to build deployed functions that integrate with Foundry. The SDK provides decorators for function registration, authentication helpers, structured logging, and utilities for working with Foundry resources.
The Python SDK provides these key modules:
| Module | Purpose |
|---|---|
compute_modules.annotations |
@function decorator for registering functions |
compute_modules.auth |
RefreshingOauthToken, retrieve_third_party_id_and_creds() for authentication |
compute_modules.logging |
get_logger(), set_internal_log_level() for structured logging |
compute_modules.sources_v2 |
get_source() for accessing configured sources |
Defining functions¶
Basic function¶
Every function annotated with @function is automatically registered as an endpoint in your compute module:
from compute_modules.annotations import function
@function
def my_function(context, event):
"""
context: Dict with metadata (tokens, credentials, job ID, etc.)
event: Dict with function parameters
"""
return str(event['value'])
Function signature requirements:
context: Metadata about the invocation (job ID, credentials, etc.)event: The input payload (can be typed with TypedDict or dataclass)
The return value must be JSON serializable.
Typed functions (recommended)¶
Using typed inputs and outputs enables automatic schema inference and provides better IDE support.
Using TypedDict:
from typing import TypedDict
from compute_modules.annotations import function
class CalculateInput(TypedDict):
x: float
y: float
operation: str
class CalculateOutput(TypedDict):
result: float
operation: str
@function
def calculate(context, event: CalculateInput) -> CalculateOutput:
ops = {
'add': lambda a, b: a + b,
'subtract': lambda a, b: a - b,
'multiply': lambda a, b: a * b,
'divide': lambda a, b: a / b if b != 0 else float('inf')
}
result = ops.get(event['operation'], lambda a, b: 0)(event['x'], event['y'])
return {'result': result, 'operation': event['operation']}
Using dataclass:
from dataclasses import dataclass
from compute_modules.annotations import function
import datetime
@dataclass
class EventInput:
name: str
timestamp: datetime.datetime
value: float
@function
def process_event(context, event: EventInput) -> str:
return f"Processed {event.name} at {event.timestamp} with value {event.value}"
Learn more about type systems and automatic schema inference.
Streaming output¶
For large result sets, use streaming to avoid memory issues and provide progressive results:
from compute_modules.annotations import function
from typing import Iterable
@function(streaming=True)
def generate_items(context, event) -> Iterable[str]:
for i in range(event['count']):
yield f"Item {i}"
:::callout{theme="warning"}
Important: Generators are NOT JSON-serializable. You MUST use streaming=True when returning a generator.
:::
Authentication¶
Application permissions (3PA)¶
Use application permissions (third-party application authorization) to access Foundry resources on behalf of users. The RefreshingOauthToken class automatically refreshes tokens every 30 minutes:
from compute_modules.auth import RefreshingOauthToken, retrieve_third_party_id_and_creds
HOSTNAME = "yourenrollment.palantirfoundry.com"
# Get client ID and secret
client_id, client_secret = retrieve_third_party_id_and_creds()
# Create refreshing token (refreshes every 30 minutes by default)
refreshing_token = RefreshingOauthToken(
hostname=HOSTNAME,
scope=["api:datasets-read", "api:datasets-write"]
)
# Get token (always returns valid token)
access_token = refreshing_token.get_token()
Learn more about authentication modes.
Working with Foundry resources¶
Using the Ontology SDK¶
The Python SDK integrates with the Ontology SDK (OSDK) for working with Ontology objects:
import os
from compute_modules.annotations import function
from compute_modules.auth import retrieve_third_party_id_and_creds
from your_osdk_package import FoundryClient, ConfidentialClientAuth
foundry_url = os.environ["FOUNDRY_URL"]
CLIENT_ID, CLIENT_CREDS = retrieve_third_party_id_and_creds()
@function
def get_object(context, event):
auth = ConfidentialClientAuth(
client_id=CLIENT_ID,
client_secret=CLIENT_CREDS,
hostname=foundry_url,
should_refresh=True,
scopes=[
"api:ontologies-read",
"api:ontologies-write",
],
)
client = FoundryClient(auth=auth, hostname=foundry_url)
employee = client.ontology.objects.Employee
return str(employee.take(1))
Learn more about OSDK integration.
Reading and writing datasets¶
Use the Foundry APIs to read and write dataset files:
import logging
import os
import requests
from dataclasses import dataclass
from compute_modules.annotations import function
from compute_modules.auth import RefreshingOauthToken
from compute_modules.logging import get_logger
logger = get_logger(__name__)
logger.setLevel(logging.INFO)
FOUNDRY_URL = os.environ["FOUNDRY_URL"]
BASE_URL = f"https://{FOUNDRY_URL}"
refreshing_token = RefreshingOauthToken(
hostname=FOUNDRY_URL,
scope=["api:datasets-read", "api:datasets-write"]
)
CA_PATH = os.environ["DEFAULT_CA_PATH"]
@dataclass
class UploadFileRequest:
dataset_rid: str
file_path: str
file_content: str
@dataclass
class UploadFileResponse:
status: int
@function
def upload_file(context, event: UploadFileRequest) -> UploadFileResponse:
"""Write a file to a dataset."""
logger.info(f"Uploading file to path: {event.file_path}")
url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/upload"
response = requests.post(
url,
params={"transactionType": "APPEND", "branchName": "master"},
headers={
"Authorization": f"Bearer {refreshing_token.get_token()}",
"Content-Type": "application/octet-stream",
},
data=event.file_content,
verify=CA_PATH,
)
return UploadFileResponse(status=response.status_code)
@dataclass
class GetFileRequest:
dataset_rid: str
file_path: str
@dataclass
class GetFileResponse:
status: int
file_content: str
@function
def get_file(context, event: GetFileRequest) -> GetFileResponse:
"""Read contents of a file from a dataset."""
logger.info(f"Getting file from {event.file_path}")
url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/content"
response = requests.get(
url,
headers={"Authorization": f"Bearer {refreshing_token.get_token()}"},
verify=CA_PATH,
)
return GetFileResponse(
status=response.status_code,
file_content=response.content.decode('utf-8'),
)
Logging¶
The SDK provides structured logging that automatically includes session IDs, job IDs, and process IDs:
import logging
from compute_modules.logging import get_logger
logger = get_logger(__name__)
logger.setLevel(logging.INFO)
@function
def my_function(context, event) -> str:
logger.debug(f"Input: {event}")
try:
result = process(event)
logger.info(f"Success: {result}")
return result
except Exception as e:
logger.error(f"Failed: {e}", exc_info=True)
raise
Log levels:
logger.debug("Debug logs") # severity = 0
logger.info("Info logs") # severity = 1
logger.warning("Warning logs") # severity = 2
logger.error("Error logs") # severity = 3
logger.critical("Critical logs") # severity = 4
Set the minimum log level using logger.setLevel(logging.INFO). Only logs with severity greater than or equal to the set level will appear.
Learn more about debugging and viewing logs.
Using sources¶
Compute modules can access external systems through configured sources. The SDK provides utilities for using sources:
from compute_modules.sources_v2 import get_source
# Access source credentials
source = get_source("my-source-identifier")
:::callout{theme="warning"} Additional dependencies required: To use sources, you must run:
pip install foundry-compute-modules[sources]
:::
Learn more about configuring and using sources.
GitHub repository¶
The Python SDK is open source and available on GitHub:
中文翻译¶
Python SDK¶
Python SDK(软件开发工具包)为计算模块提供支持,使您能够构建与 Foundry 集成的已部署函数。该 SDK 提供了函数注册的装饰器(decorator)、身份验证辅助工具、结构化日志记录(structured logging)以及用于处理 Foundry 资源的实用程序。
Python SDK 提供以下关键模块:
| 模块 | 用途 |
|---|---|
compute_modules.annotations |
用于注册函数的 @function 装饰器 |
compute_modules.auth |
用于身份验证的 RefreshingOauthToken、retrieve_third_party_id_and_creds() |
compute_modules.logging |
用于结构化日志记录的 get_logger()、set_internal_log_level() |
compute_modules.sources_v2 |
用于访问已配置数据源(source)的 get_source() |
定义函数¶
基本函数¶
每个带有 @function 注解的函数都会自动注册为计算模块中的一个端点(endpoint):
from compute_modules.annotations import function
@function
def my_function(context, event):
"""
context: 包含元数据的字典(令牌、凭据、任务ID等)
event: 包含函数参数的字典
"""
return str(event['value'])
函数签名要求:
context: 关于调用(任务ID、凭据等)的元数据event: 输入负载(可使用 TypedDict 或 dataclass 进行类型标注)
返回值必须是 JSON 可序列化的。
类型化函数(推荐)¶
使用类型化的输入和输出可以实现自动模式推断(schema inference),并提供更好的 IDE 支持。
使用 TypedDict:
from typing import TypedDict
from compute_modules.annotations import function
class CalculateInput(TypedDict):
x: float
y: float
operation: str
class CalculateOutput(TypedDict):
result: float
operation: str
@function
def calculate(context, event: CalculateInput) -> CalculateOutput:
ops = {
'add': lambda a, b: a + b,
'subtract': lambda a, b: a - b,
'multiply': lambda a, b: a * b,
'divide': lambda a, b: a / b if b != 0 else float('inf')
}
result = ops.get(event['operation'], lambda a, b: 0)(event['x'], event['y'])
return {'result': result, 'operation': event['operation']}
使用 dataclass:
from dataclasses import dataclass
from compute_modules.annotations import function
import datetime
@dataclass
class EventInput:
name: str
timestamp: datetime.datetime
value: float
@function
def process_event(context, event: EventInput) -> str:
return f"Processed {event.name} at {event.timestamp} with value {event.value}"
流式输出¶
对于大型结果集,使用流式输出(streaming)可以避免内存问题并提供渐进式结果:
from compute_modules.annotations import function
from typing import Iterable
@function(streaming=True)
def generate_items(context, event) -> Iterable[str]:
for i in range(event['count']):
yield f"Item {i}"
:::callout{theme="warning"}
重要提示: 生成器(Generator)不是 JSON 可序列化的。返回生成器时,必须使用 streaming=True。
:::
身份验证¶
应用权限(3PA)¶
使用应用权限(第三方应用授权)代表用户访问 Foundry 资源。RefreshingOauthToken 类会自动每 30 分钟刷新令牌:
from compute_modules.auth import RefreshingOauthToken, retrieve_third_party_id_and_creds
HOSTNAME = "yourenrollment.palantirfoundry.com"
# 获取客户端ID和密钥
client_id, client_secret = retrieve_third_party_id_and_creds()
# 创建可刷新的令牌(默认每30分钟刷新一次)
refreshing_token = RefreshingOauthToken(
hostname=HOSTNAME,
scope=["api:datasets-read", "api:datasets-write"]
)
# 获取令牌(始终返回有效令牌)
access_token = refreshing_token.get_token()
使用 Foundry 资源¶
使用 Ontology SDK¶
Python SDK 与 Ontology SDK(OSDK)集成,用于处理 Ontology 对象:
import os
from compute_modules.annotations import function
from compute_modules.auth import retrieve_third_party_id_and_creds
from your_osdk_package import FoundryClient, ConfidentialClientAuth
foundry_url = os.environ["FOUNDRY_URL"]
CLIENT_ID, CLIENT_CREDS = retrieve_third_party_id_and_creds()
@function
def get_object(context, event):
auth = ConfidentialClientAuth(
client_id=CLIENT_ID,
client_secret=CLIENT_CREDS,
hostname=foundry_url,
should_refresh=True,
scopes=[
"api:ontologies-read",
"api:ontologies-write",
],
)
client = FoundryClient(auth=auth, hostname=foundry_url)
employee = client.ontology.objects.Employee
return str(employee.take(1))
读取和写入数据集¶
使用 Foundry API 读取和写入数据集文件:
import logging
import os
import requests
from dataclasses import dataclass
from compute_modules.annotations import function
from compute_modules.auth import RefreshingOauthToken
from compute_modules.logging import get_logger
logger = get_logger(__name__)
logger.setLevel(logging.INFO)
FOUNDRY_URL = os.environ["FOUNDRY_URL"]
BASE_URL = f"https://{FOUNDRY_URL}"
refreshing_token = RefreshingOauthToken(
hostname=FOUNDRY_URL,
scope=["api:datasets-read", "api:datasets-write"]
)
CA_PATH = os.environ["DEFAULT_CA_PATH"]
@dataclass
class UploadFileRequest:
dataset_rid: str
file_path: str
file_content: str
@dataclass
class UploadFileResponse:
status: int
@function
def upload_file(context, event: UploadFileRequest) -> UploadFileResponse:
"""将文件写入数据集。"""
logger.info(f"正在上传文件到路径:{event.file_path}")
url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/upload"
response = requests.post(
url,
params={"transactionType": "APPEND", "branchName": "master"},
headers={
"Authorization": f"Bearer {refreshing_token.get_token()}",
"Content-Type": "application/octet-stream",
},
data=event.file_content,
verify=CA_PATH,
)
return UploadFileResponse(status=response.status_code)
@dataclass
class GetFileRequest:
dataset_rid: str
file_path: str
@dataclass
class GetFileResponse:
status: int
file_content: str
@function
def get_file(context, event: GetFileRequest) -> GetFileResponse:
"""从数据集中读取文件内容。"""
logger.info(f"正在从 {event.file_path} 获取文件")
url = f"{BASE_URL}/api/v2/datasets/{event.dataset_rid}/files/{event.file_path}/content"
response = requests.get(
url,
headers={"Authorization": f"Bearer {refreshing_token.get_token()}"},
verify=CA_PATH,
)
return GetFileResponse(
status=response.status_code,
file_content=response.content.decode('utf-8'),
)
日志记录¶
SDK 提供结构化日志记录,会自动包含会话ID、任务ID和进程ID:
import logging
from compute_modules.logging import get_logger
logger = get_logger(__name__)
logger.setLevel(logging.INFO)
@function
def my_function(context, event) -> str:
logger.debug(f"输入:{event}")
try:
result = process(event)
logger.info(f"成功:{result}")
return result
except Exception as e:
logger.error(f"失败:{e}", exc_info=True)
raise
日志级别:
logger.debug("调试日志") # 严重级别 = 0
logger.info("信息日志") # 严重级别 = 1
logger.warning("警告日志") # 严重级别 = 2
logger.error("错误日志") # 严重级别 = 3
logger.critical("严重日志") # 严重级别 = 4
使用 logger.setLevel(logging.INFO) 设置最低日志级别。只有严重级别大于或等于所设级别的日志才会显示。
使用数据源¶
计算模块可以通过已配置的数据源访问外部系统。SDK 提供了使用数据源的实用程序:
from compute_modules.sources_v2 import get_source
# 访问数据源凭据
source = get_source("my-source-identifier")
:::callout{theme="warning"} 需要额外依赖: 要使用数据源,您必须运行:
pip install foundry-compute-modules[sources]
:::
GitHub 仓库¶
Python SDK 是开源的,可在 GitHub 上获取: