跳转至

REST

Foundry can integrate with external systems that expose a REST (representational state transfer) API. You may need to use a different approach depending on whether you are syncing, exporting, or interactively calling REST APIs. On this page you can find several connection options for secure and efficient integration with REST APIs.

REST API source

The REST API source may be used for workflows requiring interactive HTTP requests to external systems directly from Foundry applications via Actions. For example, you can create a Workshop application with a button that uses a webhook to calls a REST endpoint when clicked, connecting that application to existing workflows and source systems.

Webhooks to HTTP endpoints should use the REST API source type in Data Connection. You will need to configure the base URL, authentication, and an optional port.

Option Required Description
Domain Yes At least one domain must be specified.
Authentication Yes For each domain, the authentication must be specified. Options include None, Basic, Bearer Token, and API Key.
Port No A port may be optionally specified. By default, all REST webhooks will use HTTPS on port 443. Non-default ports are only supported on legacy agent worker sources.
Request Options No When selecting API Key authentication, you may choose whether you want to pass the API Key as a query param or header in the webhook requests.

The example configuration below shows how to configure a connection to https://my-domain.com using bearer token authentication.

New webhook

:::callout{theme="warning"} The REST API source type does not support other capabilities such as syncs or exports. The legacy magritte-rest-v2 source type is no longer recommended for Webhooks workflows. Syncs and exports to REST APIs should use external transforms. :::

Learn more about Webhooks in Foundry.

External transforms in Code Repositories

Use external transforms to configure syncs and exports that require you to call REST APIs. Simply import a source in a Python Code Repository and write custom logic to query the API.

:::callout{theme="success"} You can use external transforms to access REST API sources inaccessible over the internet when using a Foundry worker with agent egress policies. :::

Learn more about calling APIs from code repositories.

Examples

The examples below show common patterns of complex external transforms.

OAuth Client Credentials grant

The OAuth Client Credentials grant ↗ is a common authentication pattern for non-interactive, service-to-service REST API calls. The client exchanges a client_id and client_secret at the OAuth2 server's token endpoint for a short-lived access token, then includes that token as a bearer token in the Authorization header of every subsequent request to the resource API.

Before writing the transform, configure the REST API source with the OAuth2 token endpoint domain and the resource API domain, and store the client_id and client_secret as additional secrets on the source. Do not hard-code secrets in your transform.

The request to the token endpoint must use the application/x-www-form-urlencoded content type with at least the following parameters:

Parameter Value
grant_type client_credentials
client_id The application client ID
client_secret The application client secret
scope Space-separated list of required scopes

Once you have an access token, include it in the Authorization header of every request to the resource API:

Authorization: Bearer <access_token>

Many APIs return results in pages. A common pattern uses a nextPageToken field in the response to indicate there are more results; your transform should loop until no nextPageToken is returned.

:::callout{theme="warning"} Access tokens expire. If your transform runs for a long time, you may need to request a new token partway through execution. Check the expires_in field from the token response to determine when the token will expire. :::

Basic client_id/client_secret example

The following example requests an access token and calls a paginated resource API. It uses generic placeholders so that it can be adapted to any OAuth2-protected REST API.

import logging

import pandas as pd
from transforms.api import Output, transform_pandas
from transforms.external.systems import external_systems, Source, ResolvedSource

logger = logging.getLogger(__name__)


@external_systems(
    api_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<output_dataset_rid>"),
)
def compute(api_source: ResolvedSource) -> pd.DataFrame:
    base_url = api_source.get_https_connection().url
    client = api_source.get_https_connection().get_client()

    client_id = api_source.get_secret("additionalSecretClientId")
    client_secret = api_source.get_secret("additionalSecretClientSecret")

    token_response = client.post(
        base_url + "/oauth/token",
        data={
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": "<space-separated-scopes>",
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
    )
    token_response.raise_for_status()
    access_token = token_response.json()["access_token"]

    auth_headers = {"Authorization": f"Bearer {access_token}"}

    results = []
    page_token = None
    while True:
        params = {"pageSize": 100}
        if page_token:
            params["pageToken"] = page_token

        response = client.get(
            base_url + "/api/v1/<resource>",
            headers=auth_headers,
            params=params,
        )
        response.raise_for_status()
        body = response.json()

        results.extend(body.get("data", []))

        page_token = body.get("nextPageToken")
        if not page_token:
            break

        logger.info(f"Fetched {len(results)} items so far, continuing to next page.")

    return pd.DataFrame(results)

To adapt this pattern, replace the token endpoint path, the resource URL, and the scope values with those required by the target API. Some OAuth2 servers require additional parameters such as audience or resource — add them to the data dictionary of the token request.

If the target API is the Foundry API itself (a Foundry-to-Foundry call from a transform), see Call the Foundry API from code for the Foundry-specific token endpoint, scopes, and setup steps.

JWT client-assertion variant (NetSuite)

Some OAuth2 servers — including NetSuite — require the client to authenticate with a signed JWT client assertion instead of a plain client_secret. The following example updates account names in NetSuite from an input dataset of accounts, using the POST /account endpoint. To enable the grant type, the client_id, certificate_id, and certificate_private_key are added to the source as additional secrets.

from transforms.api import (
    transform,
    Output,
    Input,
    TransformInput,
    TransformOutput,
    TransformContext,
)
from transforms.external.systems import external_systems, Source, ResolvedSource
import datetime
import jwt
from urllib.parse import urljoin
import logging

logger = logging.getLogger(__name__)

@external_systems(
    netsuite_rest_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
    account_updates=Input("<dataset_rid>"),  # Dataset with schema [account_id: String, account_name: String]
)
def update_account_names(
    netsuite_rest_source: ResolvedSource,
    account_updates: TransformInput,
    output: TransformOutput,
    ctx: TransformContext,
):
    # --- Set up connections and secrets ---
    base_url = netsuite_rest_source.get_https_connection().url
    client = netsuite_rest_source.get_https_connection().get_client()
    client_id = netsuite_rest_source.get_secret("additionalSecretClientId")
    certificate_id = netsuite_rest_source.get_secret("additionalSecretCertificateId")
    certificate_private_key = netsuite_rest_source.get_secret("additionalSecretPrivateCertificate")

    # --- Helper: Make JWT token ---
    def make_jwt_token(
        url, client_id, certificate_id, certificate_private_key, lifetime_in_minutes=59
    ):
        current_timestamp = datetime.datetime.now()
        expiration = current_timestamp + datetime.timedelta(minutes=lifetime_in_minutes)

        payload = {
            "iss": client_id,
            "scope": "rest_webservices",
            "aud": url,
            "iat": current_timestamp,
            "exp": expiration,
        }

        additional_headers = {
            "kid": certificate_id,
        }

        return jwt.encode(
            payload,
            certificate_private_key,
            algorithm="ES256",
            headers=additional_headers,
        )

    # --- Helper: Get OAuth2 access token ---
    def get_oauth2_access_token():
        url = urljoin(base_url, "/services/rest/auth/oauth2/v1/token")
        payload = {
            "grant_type": "client_credentials",
            "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
            "client_assertion": make_jwt_token(
                url,
                client_id,
                certificate_id,
                certificate_private_key,
            ),
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = client.post(url, data=payload, headers=headers)
        return response.json()["access_token"]

    # --- Prepare data for update ---
    account_update_data = [
        {
            "account_id": row.account_id,
            "payload": f'{{"acctName": "{row.account_name}"}}',
        }
        for row in account_updates.dataframe().collect()
    ]

    # --- Update accounts ---
    token = get_oauth2_access_token()
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}",
    }

    responses = []
    for account in account_update_data:
        account_id = account["account_id"]
        payload = account["payload"]
        logger.info(f"Updating account: {account_id} with payload {payload}")
        url = urljoin(base_url, f"/services/rest/record/v1/account/{account_id}")
        response = client.patch(url, data=payload, headers=headers)
        responses.append(
            {
                "account_id": account_id,
                "response_status": response.status_code,
                "response": response.text,
            }
        )

    output.write_dataframe(ctx.spark_session.createDataFrame(responses))

Use self-signed server certificates

On-premise systems sometimes use self-signed server certificates that must be added to the source for the connection to be trusted. These certificates are typically added automatically to the built-in HTTPS client provided in external transforms. However, some Python clients might rely on the REQUESTS_CA_BUNDLE environment variable. In these cases, you will need to override the variable.

The example below demonstrates how to override the REQUESTS_CA_BUNDLE to read data from an on-premise SharePoint source using the Python client for SharePoint ↗ Office365-REST-Python-Client, which is a required step to use the client.

from pyspark.sql import DataFrame
from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
import tempfile
import os
from office365.sharepoint.client_context import ClientContext

@lightweight
@external_systems(
    sharepoint_rest=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
)
def compute(ctx,  output, sharepoint_rest) -> DataFrame:

    # 1. Add custom certificates to default certificates environment variable
    cert_file = tempfile.NamedTemporaryFile(delete=False)
    with open(cert_file.name, 'w') as tmp_f:
        with open(os.environ.get("REQUESTS_CA_BUNDLE"), 'r') as ca_f:
            with open(sharepoint_rest.server_certificates_bundle_path, 'r') as source_ca_f:
                tmp_f.write(ca_f.read())
                tmp_f.write(source_ca_f.read())
    cert_file.close()

    os.environ["REQUESTS_CA_BUNDLE"] = cert_file.name # the REQUESTS_CA_BUNDLE now contains the source self-signed certificate


    # 2. Connect to Sharepoint using client certificate authentication.
    client = ClientContext("<sharepoint_url>").with_client_certificate(
        tenant="<tenant_id>",
        client_id="<client_id>",
        thumbprint="<thumbprint>",
        private_key=sharepoint_rest.get_secret("additionalSecretPrivateKey"),
        passphrase=sharepoint_rest.get_secret("additionalSecretPrivateKeyPassphrase"), # optional, if the private key is password encrypted
    )

    # 3. Grab web title and return it as DataFrame
    current_web = client.web
    client.load(current_web)
    client.execute_query()

    data = [{"web_title": current_web.properties['Title']}]
    output.write_table(pl.from_pandas(pd.DataFrame.from_records(data)))

Foundry REST API

For cases where you want to build applications on top of the Foundry platform, use the Foundry REST API. The Foundry API uses the OAuth 2.0 protocol for authentication, primarily uses JSON requests and responses, and provides support for Ontology and Modeling resources.

Learn more about the Foundry API.


中文翻译

REST

Foundry 可以与暴露 REST(表述性状态传递)API 的外部系统集成。根据您是同步、导出还是交互式调用 REST API,可能需要使用不同的方法。在本页面中,您可以找到几种用于安全高效集成 REST API 的连接选项。

REST API 源

REST API 源可用于需要通过操作(Actions)从 Foundry 应用程序直接对外部系统发起交互式 HTTP 请求的工作流。例如,您可以创建一个 Workshop 应用程序,其中包含一个按钮,该按钮在点击时使用 Webhook 调用 REST 端点,从而将该应用程序连接到现有工作流和源系统。

指向 HTTP 端点的 Webhook 应使用数据连接(Data Connection)中的 REST API 源类型。您需要配置基础 URL、身份验证以及可选的端口。

选项 必需 描述
域名(Domain) 必须至少指定一个域名。
身份验证(Authentication) 必须为每个域名指定身份验证方式。选项包括 无(None)基本(Basic)Bearer 令牌(Bearer Token)API 密钥(API Key)
端口(Port) 可选指定端口。默认情况下,所有 REST Webhook 将使用 HTTPS 协议,端口为 443。非默认端口仅支持传统的 代理工作器(agent worker) 源。
请求选项(Request Options) 选择 API 密钥(API Key) 身份验证时,您可以选择在 Webhook 请求中将 API 密钥作为查询参数或标头传递。

以下示例配置展示了如何使用 Bearer 令牌身份验证配置到 https://my-domain.com 的连接。

新建 Webhook

:::callout{theme="warning"} REST API 源类型不支持同步或导出等其他功能。不再推荐将传统的 magritte-rest-v2 源类型用于 Webhook 工作流。对 REST API 的同步和导出应使用外部转换(external transforms)。 :::

了解更多关于 Foundry 中 Webhook 的信息。

代码仓库中的外部转换

使用 外部转换(external transforms) 来配置需要调用 REST API 的同步和导出。只需在 Python 代码仓库(Python Code Repository) 中导入一个源,并编写自定义逻辑来查询 API 即可。

:::callout{theme="success"} 当使用 具有代理出口策略的 Foundry 工作器(Foundry worker with agent egress policies) 时,您可以使用外部转换来访问无法通过互联网访问的 REST API 源。 :::

了解更多关于从代码仓库调用 API 的信息。

示例

以下示例展示了复杂外部转换的常见模式。

OAuth 客户端凭证授权(OAuth Client Credentials grant)

OAuth 客户端凭证授权(OAuth Client Credentials grant)↗ 是一种用于非交互式、服务到服务的 REST API 调用的常见身份验证模式。客户端在 OAuth2 服务器的令牌端点使用 client_idclient_secret 换取一个短期有效的访问令牌,然后在后续对资源 API 的每个请求中,将该令牌作为 Bearer 令牌包含在 Authorization 标头中。

在编写转换之前,请使用 OAuth2 令牌端点域名和资源 API 域名配置 REST API 源,并将 client_idclient_secret 作为 附加密钥(additional secrets) 存储在源上。请勿在转换中硬编码密钥。

对令牌端点的请求必须使用 application/x-www-form-urlencoded 内容类型,并至少包含以下参数:

参数
grant_type client_credentials
client_id 应用程序客户端 ID
client_secret 应用程序客户端密钥
scope 以空格分隔的所需作用域列表

获得访问令牌后,将其包含在对资源 API 的每个请求的 Authorization 标头中:

Authorization: Bearer <access_token>

许多 API 会分页返回结果。一种常见模式是在响应中使用 nextPageToken 字段来指示有更多结果;您的转换应循环处理,直到不再返回 nextPageToken

:::callout{theme="warning"} 访问令牌会过期。如果您的转换运行时间较长,您可能需要在执行过程中请求新的令牌。检查令牌响应中的 expires_in 字段以确定令牌何时过期。 :::

基本的 client_id/client_secret 示例

以下示例请求一个访问令牌并调用一个分页的资源 API。它使用通用占位符,以便可以适配任何受 OAuth2 保护的 REST API。

import logging

import pandas as pd
from transforms.api import Output, transform_pandas
from transforms.external.systems import external_systems, Source, ResolvedSource

logger = logging.getLogger(__name__)


@external_systems(
    api_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<output_dataset_rid>"),
)
def compute(api_source: ResolvedSource) -> pd.DataFrame:
    base_url = api_source.get_https_connection().url
    client = api_source.get_https_connection().get_client()

    client_id = api_source.get_secret("additionalSecretClientId")
    client_secret = api_source.get_secret("additionalSecretClientSecret")

    token_response = client.post(
        base_url + "/oauth/token",
        data={
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": "<space-separated-scopes>",
        },
        headers={"Content-Type": "application/x-www-form-urlencoded"},
    )
    token_response.raise_for_status()
    access_token = token_response.json()["access_token"]

    auth_headers = {"Authorization": f"Bearer {access_token}"}

    results = []
    page_token = None
    while True:
        params = {"pageSize": 100}
        if page_token:
            params["pageToken"] = page_token

        response = client.get(
            base_url + "/api/v1/<resource>",
            headers=auth_headers,
            params=params,
        )
        response.raise_for_status()
        body = response.json()

        results.extend(body.get("data", []))

        page_token = body.get("nextPageToken")
        if not page_token:
            break

        logger.info(f"Fetched {len(results)} items so far, continuing to next page.")

    return pd.DataFrame(results)

要适配此模式,请将令牌端点路径、资源 URL 和 scope 值替换为目标 API 所需的值。某些 OAuth2 服务器需要额外的参数,例如 audienceresource——请将它们添加到令牌请求的 data 字典中。

如果目标 API 本身就是 Foundry API(从转换中进行的 Foundry 到 Foundry 调用),请参阅 从代码调用 Foundry API 以了解 Foundry 特定的令牌端点、作用域和设置步骤。

JWT 客户端断言变体(NetSuite)

某些 OAuth2 服务器(包括 NetSuite)要求客户端使用签名的 JWT 客户端断言进行身份验证,而不是使用纯文本的 client_secret。以下示例使用 POST /account 端点,根据输入的账户数据集更新 NetSuite 中的账户名称。为了启用此授权类型,client_idcertificate_idcertificate_private_key 被作为 附加密钥(additional secrets) 添加到源中。

from transforms.api import (
    transform,
    Output,
    Input,
    TransformInput,
    TransformOutput,
    TransformContext,
)
from transforms.external.systems import external_systems, Source, ResolvedSource
import datetime
import jwt
from urllib.parse import urljoin
import logging

logger = logging.getLogger(__name__)

@external_systems(
    netsuite_rest_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
    account_updates=Input("<dataset_rid>"),  # Dataset with schema [account_id: String, account_name: String]
)
def update_account_names(
    netsuite_rest_source: ResolvedSource,
    account_updates: TransformInput,
    output: TransformOutput,
    ctx: TransformContext,
):
    # --- Set up connections and secrets ---
    base_url = netsuite_rest_source.get_https_connection().url
    client = netsuite_rest_source.get_https_connection().get_client()
    client_id = netsuite_rest_source.get_secret("additionalSecretClientId")
    certificate_id = netsuite_rest_source.get_secret("additionalSecretCertificateId")
    certificate_private_key = netsuite_rest_source.get_secret("additionalSecretPrivateCertificate")

    # --- Helper: Make JWT token ---
    def make_jwt_token(
        url, client_id, certificate_id, certificate_private_key, lifetime_in_minutes=59
    ):
        current_timestamp = datetime.datetime.now()
        expiration = current_timestamp + datetime.timedelta(minutes=lifetime_in_minutes)

        payload = {
            "iss": client_id,
            "scope": "rest_webservices",
            "aud": url,
            "iat": current_timestamp,
            "exp": expiration,
        }

        additional_headers = {
            "kid": certificate_id,
        }

        return jwt.encode(
            payload,
            certificate_private_key,
            algorithm="ES256",
            headers=additional_headers,
        )

    # --- Helper: Get OAuth2 access token ---
    def get_oauth2_access_token():
        url = urljoin(base_url, "/services/rest/auth/oauth2/v1/token")
        payload = {
            "grant_type": "client_credentials",
            "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
            "client_assertion": make_jwt_token(
                url,
                client_id,
                certificate_id,
                certificate_private_key,
            ),
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = client.post(url, data=payload, headers=headers)
        return response.json()["access_token"]

    # --- Prepare data for update ---
    account_update_data = [
        {
            "account_id": row.account_id,
            "payload": f'{{"acctName": "{row.account_name}"}}',
        }
        for row in account_updates.dataframe().collect()
    ]

    # --- Update accounts ---
    token = get_oauth2_access_token()
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}",
    }

    responses = []
    for account in account_update_data:
        account_id = account["account_id"]
        payload = account["payload"]
        logger.info(f"Updating account: {account_id} with payload {payload}")
        url = urljoin(base_url, f"/services/rest/record/v1/account/{account_id}")
        response = client.patch(url, data=payload, headers=headers)
        responses.append(
            {
                "account_id": account_id,
                "response_status": response.status_code,
                "response": response.text,
            }
        )

    output.write_dataframe(ctx.spark_session.createDataFrame(responses))

使用自签名服务器证书

本地系统有时会使用自签名服务器证书,必须将这些证书添加到源中才能使连接受信任。这些证书通常会自动添加到外部转换中提供的 内置 HTTPS 客户端(built-in HTTPS client) 中。然而,某些 Python 客户端可能依赖于 REQUESTS_CA_BUNDLE 环境变量。在这些情况下,您需要覆盖该变量。

以下示例演示了如何覆盖 REQUESTS_CA_BUNDLE,以使用 SharePoint Python 客户端(Python client for SharePoint)↗ Office365-REST-Python-Client 从本地 SharePoint 源读取数据,这是使用该客户端的必要步骤。

from pyspark.sql import DataFrame
from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
import tempfile
import os
from office365.sharepoint.client_context import ClientContext

@lightweight
@external_systems(
    sharepoint_rest=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
)
def compute(ctx,  output, sharepoint_rest) -> DataFrame:

    # 1. Add custom certificates to default certificates environment variable
    cert_file = tempfile.NamedTemporaryFile(delete=False)
    with open(cert_file.name, 'w') as tmp_f:
        with open(os.environ.get("REQUESTS_CA_BUNDLE"), 'r') as ca_f:
            with open(sharepoint_rest.server_certificates_bundle_path, 'r') as source_ca_f:
                tmp_f.write(ca_f.read())
                tmp_f.write(source_ca_f.read())
    cert_file.close()

    os.environ["REQUESTS_CA_BUNDLE"] = cert_file.name # the REQUESTS_CA_BUNDLE now contains the source self-signed certificate


    # 2. Connect to Sharepoint using client certificate authentication.
    client = ClientContext("<sharepoint_url>").with_client_certificate(
        tenant="<tenant_id>",
        client_id="<client_id>",
        thumbprint="<thumbprint>",
        private_key=sharepoint_rest.get_secret("additionalSecretPrivateKey"),
        passphrase=sharepoint_rest.get_secret("additionalSecretPrivateKeyPassphrase"), # optional, if the private key is password encrypted
    )

    # 3. Grab web title and return it as DataFrame
    current_web = client.web
    client.load(current_web)
    client.execute_query()

    data = [{"web_title": current_web.properties['Title']}]
    output.write_table(pl.from_pandas(pd.DataFrame.from_records(data)))

Foundry REST API

对于希望在 Foundry 平台之上构建应用程序的情况,请使用 Foundry REST API。Foundry API 使用 OAuth 2.0 协议进行身份验证,主要使用 JSON 请求和响应,并为本体论(Ontology)和建模(Modeling)资源提供支持。

了解更多关于 Foundry API 的信息。