跳转至

SharePoint Online

Connect to SharePoint Online to import files from specified SharePoint libraries into Foundry.

Supported capabilities

Capability Status
Exploration 🟢 Generally available
Bulk import 🟢 Generally available
Incremental 🟢 Generally available
Export tasks 🟡 Sunset
File exports 🟢 Generally available

Data model

The connector can transfer files of any type into Foundry datasets. File formats are preserved, and no schemas are applied during or after the transfer. Apply any necessary schema to the output dataset, or write a downstream transformation to access the data.

Performance and limitations

There is no limit to the size of transferable files. However, network issues can result in failures of large-scale transfers. In particular, Foundry syncs that take more than two days to run will be interrupted. To avoid network issues, we recommend using smaller file sizes and limiting the number of files that are ingested in every execution of the sync. Syncs can be scheduled to run frequently.

:::callout{theme="warning"} Connections to on-premise SharePoint servers are not supported. Use a REST API source type to connect to on-premise SharePoint. :::

Setup

  1. Open the Data Connection application and select + New Source in the upper right corner of the screen.
  2. Select SharePoint Online from the available connector types.
  3. Follow the additional configuration prompts to continue the setup of your connector using the information in the sections below.

Learn more about setting up a connector in Foundry.

Authentication

:::callout{theme="warning"} Authentication for the SharePoint Online source requires an application in Microsoft Entra ID (formerly known as Azure Active Directory). If you are not an Entra ID administrator, contact your IT department to request access. :::

Follow the initial steps below to access Azure application credentials:

  1. Create an application registration in Azure by following the instructions in the Microsoft documentation ↗.
  2. At Step 5, select Accounts in this organizational directory only and skip Redirect URL (optional).
  3. Note the client ID and tenant ID once registration is complete.

Then, choose between two available authentication method:

  • Client credentials: Recommended when a wide range of access is required for every SharePoint site.
  • Username/password: Recommended for limiting access to one or a few SharePoint sites.

Client credentials

In your Microsoft Entra admin center, complete the following steps:

  1. Go to API Permissions in the left sidebar.
  2. Select Add a Permission.
  3. Select Microsoft Graph.
  4. Select Application Permissions.
  5. If you would like your application to read all SharePoint sites add Sites.Read.All.
    • If you plan to configure export tasks, use Sites.ReadWrite.All instead.
  6. If you would like your application to read selected SharePoint sites add Sites.Selected.
  7. If you are an Entra Administrator, select Grant admin consent for [tenant].
  8. If you added Sites.Selected above, add your application to specific sites ↗.

  9. The available options for the "roles" array parameter are "write" and/or "read". The "read" option is sufficient to ingest files from the SharePoint site.

  10. To easily send a POST with proper authentication, use the Graph Explorer ↗.
  11. You can receive metadata about a site by sending a GET to https://graph.microsoft.com/v1.0/sites/[tenantName]:/sites/[siteName] (for example: https://graph.microsoft.com/v1.0/sites/contoso.sharepoint.com:/sites/mySite). This request will return an ID that is a composite of several values: Site collection hostname, Site collection unique ID, and Site unique ID where the middle value is the siteId needed to run the permissions POST.
  12. Generate a client secret. ↗.

Set the following source configurations in Data Connection:

Option Required? Description
Azure Client ID Yes The ID of the app registration; also called Application ID.
Azure Tenant ID Yes the unique identifier of the Microsoft Entra ID instance.
Client secret Yes The secret generated in the app registration.

Username/password

The username/password flow involves creating a user account that can sign in to Microsoft 365. The Graph API does not support two-factor authentication for the username/password authentication method. Because of this, we strongly recommend creating a randomly generated password of at least 32 characters in length.

In your Entra admin center, complete the following steps:

  1. Go to API Permissions in the left sidebar.
  2. Select Add a Permission.
  3. Select Microsoft Graph.
  4. Select Delegated Permissions.
  5. Add the Sites.Read.All permission;.
  6. If you plan to configure export tasks, use Sites.ReadWrite.All instead.
  7. If you are an Azure Administrator, select Grant admin consent for [tenant].
  8. Go to Authentication in the left sidebar.
  9. Change Allow public client flows to Yes.
  10. Create a user in Microsoft Entra ID with a randomly generated password of at least 32 characters.
  11. Add that user to any SharePoint sites that you would like it to read or write.

Set the following source configurations in Data Connection:

Option Required? Description
Azure Client ID Yes The ID of the app registration; also called Application ID.
Username Yes The user's email address.
Password Yes The generated password.

XML-based permissioning for SharePoint Add-ins

If you are using SharePoint Add-ins for authorization and authentication ↗, and your SharePoint Add-in uses XML for permission management, you must ensure that the correct scope is set in the scope URI to avoid access issues when connecting to SharePoint.

Follow the steps below to verify and configure the correct scope:

  1. Locate the AppManifest.xml file containing the permission settings for your SharePoint Add-in.
  2. In the AppManifest.xml file, identify the scope URI within the XML file, which should look similar to this:

<AppPermissionRequests AllowAppOnlyPolicy="true"> <AppPermissionRequest Scope="http://sharepoint/content/sitecollection/web" Right="FullControl" /> </AppPermissionRequests>.

  1. Verify that the scope value (in this example, http://sharepoint/content/sitecollection/web) matches the SharePoint site to which you are connecting; if the scope value does not match, adjust the scope value accordingly.

Networking

The SharePoint Online connector requires network access to the following domains on port 443:

  • login.microsoftonline.com
  • graph.microsoft.com
  • Your SharePoint URL; for example, contoso.sharepoint.com

If you are using a GovCloud SharePoint instance, use the following domains on port 443 instead:

  • login.microsoftonline.us
  • graph.microsoft.us
  • Your SharePoint URL; for example, contoso.sharepoint.us

Configuration options

The following configuration options are available for the SharePoint Online connector:

Option Required? Description
SharePoint Library URL Yes A single SharePoint site may have several document libraries; your URL must point to a specific library. Must be in the format https://[tenant].sharepoint.com/sites/[site]/[library].
Credentials settings Yes Configure using the Authentication guidance shown above.
Proxy settings No Enable to use a proxy while connecting to SharePoint Online.

Sync data from SharePoint Online

The SharePoint Online connector uses the file-based sync interface.

Export data to SharePoint Online

To export to a SharePoint site, first enable exports for your SharePoint Online connector. Then, create a new export.

Export configuration options

Option Required? Default Description
Directory path Yes / The path to the folder in the SharePoint library where files should be exported. The full path for an exported file is calculated as <SharePoint Library URL>/Directory Path>/<Exported File Path>

Use SharePoint sources in code

The example below demonstrates how to upload a file to a SharePoint source using the Python client for SharePoint ↗ Office365-REST-Python-Client in an external transform. Note that this example uses client certificate authentication.

Review more examples from SharePoint ↗.

from pyspark.sql import DataFrame
from transforms.api import Input, Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
from office365.sharepoint.client_context import ClientContext

@lightweight
@external_systems(
    sharepoint_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
    input_df=Input("<dataset_rid>"), # Dataset containing a list of files to export to SharePoint
)
def compute(ctx, input_df: DataFrame, output, sharepoint_source) -> DataFrame:

    # 1. Connect to SharePoint using client certificate authentication.
    client = ClientContext("<sharepoint_url>").with_client_certificate(
        tenant="<tenant_id>",
        client_id="<client_id>",
        thumbprint="<thumbprint>",
        private_key=sharepoint_source.get_secret("clientSecret"),
    )

    current_web = client.web
    client.load(current_web)
    client.execute_query()

    target_folder = client.web.lists.get_by_title("<document_library_name>").root_folder

    # 2 Upload files from input_df, store URL in dataset
    upload_urls = []
    fs = input_df.filesystem()
    input_files = fs.ls()
    for f in input_files:
        with fs.open(f.path) as fileobj:
            uploaded_file = target_folder.upload_file(f.path, fileobj).execute_query()
            upload_urls.append({'file_name': f.path, 'upload_url': uploaded_file.serverRelativeUrl})


    # 3. Return dataset of uploaded URLs
    output.write_table(pl.from_pandas(pd.DataFrame.from_records(upload_urls)))

Ingest SharePoint Lists

The SharePoint Online connector only supports file-based ingestion. To ingest data from SharePoint Lists, use an external transform with the Microsoft Graph API. The following helper class handles OAuth2 authentication and provides methods to retrieve lists and list items with automatic pagination:

import requests
import logging
from typing import Optional, Dict, List
from urllib.parse import urlparse


class SharePointListReader:
    """
    Client for reading SharePoint lists via Microsoft Graph API.

    This class handles OAuth2 authentication and provides methods to:
    - Retrieve all lists from a SharePoint site
    - Fetch items from specific lists with automatic pagination

    Args:
        tenant_id: Azure AD tenant ID
        client_id: Azure AD application (client) ID
        client_secret: Azure AD application client secret
        site_url: Full SharePoint site URL (e.g., https://contoso.sharepoint.com/sites/mysite)
        logger: Optional custom logger instance
    """

    def __init__(
        self,
        tenant_id: str,
        client_id: str,
        client_secret: str,
        site_url: str,
        logger: Optional[logging.Logger] = None
    ):
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.site_url = site_url.rstrip("/")
        self.base_url = "https://graph.microsoft.com/v1.0"

        self.access_token: Optional[str] = None
        self.site_id: Optional[str] = None
        self.logger = logger or self._setup_default_logger()

    def _setup_default_logger(self) -> logging.Logger:
        """Configure default logger with console output."""
        logger = logging.getLogger(__name__)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(levelname)s: %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    def get_access_token(self) -> bool:
        """
        Acquire OAuth2 access token from Azure AD.

        Returns:
            True if token was successfully acquired, False otherwise
        """
        token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "https://graph.microsoft.com/.default",
        }

        try:
            response = requests.post(token_url, data=payload)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]

            expires_in = token_data.get("expires_in", 3600)
            self.logger.info(f"Authentication successful (expires in {expires_in}s)")
            return True

        except requests.exceptions.RequestException as e:
            self.logger.error(f"Authentication failed: {e}")
            return False

    def _make_graph_request(self, url: str, params: Optional[Dict] = None) -> Optional[Dict]:
        """
        Execute authenticated GET request to Microsoft Graph API.

        Args:
            url: Full Graph API endpoint URL
            params: Optional query parameters

        Returns:
            JSON response as dictionary, or None on failure
        """
        if not self.access_token and not self.get_access_token():
            return None

        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json",
        }

        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            self.logger.error(f"API request failed: {e}")
            if hasattr(e, 'response') and e.response is not None:
                self.logger.debug(f"Response details: {e.response.text}")
            return None

    def get_site_id(self) -> Optional[str]:
        """
        Retrieve SharePoint site ID from site URL.

        Returns:
            Site ID string, or None if retrieval fails
        """
        if self.site_id:
            return self.site_id

        parsed = urlparse(self.site_url)
        hostname = parsed.hostname
        site_path = parsed.path.strip("/")

        url = f"{self.base_url}/sites/{hostname}:/{site_path}"
        data = self._make_graph_request(url)

        if data and "id" in data:
            self.site_id = data["id"]
            self.logger.debug(f"Site ID retrieved: {self.site_id}")
            return self.site_id

        self.logger.error("Failed to retrieve site ID")
        return None

    def get_all_lists(self) -> Optional[Dict]:
        """
        Retrieve all lists from the SharePoint site.

        Returns:
            Dictionary containing list metadata, or None on failure
        """
        site_id = self.get_site_id()
        if not site_id:
            return None

        url = f"{self.base_url}/sites/{site_id}/lists"
        data = self._make_graph_request(url)

        if data and "value" in data:
            self.logger.info(f"Found {len(data['value'])} lists in site")
            for lst in data["value"]:
                self.logger.info(f"  - {lst['name']} (ID: {lst['id']})")

        return data

    def get_all_list_items(self, list_id: str) -> Optional[List[Dict]]:
        """
        Retrieve all items from a SharePoint list with automatic pagination.

        Args:
            list_id: GUID of the SharePoint list

        Returns:
            List of item dictionaries, or None on failure
        """
        site_id = self.get_site_id()
        if not site_id:
            return None

        all_items = []
        url = f"{self.base_url}/sites/{site_id}/lists/{list_id}/items"
        params = {"$expand": "fields", "$top": 5000}

        page_count = 0
        while url:
            current_params = None if "@odata.nextLink" in url else params
            data = self._make_graph_request(url, current_params)

            if not data or "value" not in data:
                break

            page_count += 1
            items_in_page = len(data["value"])
            all_items.extend(data["value"])

            self.logger.debug(f"Page {page_count}: retrieved {items_in_page} items")

            url = data.get("@odata.nextLink")
            params = None

        self.logger.info(f"Retrieved {len(all_items)} total items from list")
        return all_items

The following example demonstrates how to use this class in an external transform to ingest SharePoint List data into a Foundry dataset:

from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import polars as pl


@lightweight
@external_systems(
    sharepoint_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
)
def compute(ctx, output, sharepoint_source):

    # 1. Initialize the SharePoint List reader with credentials from the source
    reader = SharePointListReader(
        tenant_id="<tenant_id>",
        client_id="<client_id>",
        client_secret=sharepoint_source.get_secret("clientSecret"),
        site_url="https://contoso.sharepoint.com/sites/mysite"
    )

    # 2. Retrieve all items from a specific list
    items = reader.get_all_list_items(list_id="<list_guid>")

    # 3. Extract the fields from each item and write to the output dataset
    records = [item["fields"] for item in items if "fields" in item]
    output.write_table(pl.from_dicts(records))

中文翻译

SharePoint Online

连接至 SharePoint Online,从指定的 SharePoint 库中导入文件至 Foundry。

支持的功能

功能 状态
探索(Exploration) 🟢 正式发布(Generally available)
批量导入(Bulk import) 🟢 正式发布(Generally available)
增量同步(Incremental) 🟢 正式发布(Generally available)
导出任务(Export tasks) 🟡 即将停用(Sunset)
文件导出(File exports) 🟢 正式发布(Generally available)

数据模型(Data model)

该连接器可将任意类型的文件传输至 Foundry 数据集。文件格式保持不变,传输过程中及传输后均不应用任何模式(Schema)。请对输出数据集应用必要的模式,或编写下游转换(transformation)以访问数据。

性能与限制(Performance and limitations)

可传输的文件大小没有限制。然而,网络问题可能导致大规模传输失败。特别是,运行时间超过两天的 Foundry 同步(Sync)将被中断。为避免网络问题,建议使用较小的文件大小,并限制每次同步执行时摄取的文件数量。同步可以按计划运行

:::callout{theme="warning"} 不支持连接至本地(On-premise) SharePoint 服务器。请使用 REST API 源类型连接至本地 SharePoint。 :::

设置(Setup)

  1. 打开 Data Connection 应用程序,并在屏幕右上角选择 + New Source
  2. 从可用的连接器类型中选择 SharePoint Online
  3. 按照附加的配置提示,使用以下各节中的信息继续设置您的连接器。

了解更多关于在 Foundry 中设置连接器的信息。

身份验证(Authentication)

:::callout{theme="warning"} SharePoint Online 源的身份验证需要在 Microsoft Entra ID(原名 Azure Active Directory)中创建一个应用程序。如果您不是 Entra ID 管理员,请联系您的 IT 部门申请访问权限。 :::

请按照以下初始步骤获取 Azure 应用程序凭据:

  1. 按照 Microsoft 文档 ↗ 中的说明在 Azure 中创建应用程序注册。
  2. 在第 5 步,选择 仅此组织目录中的帐户(Accounts in this organizational directory only),并跳过 重定向 URL(可选)(Redirect URL (optional))
  3. 注册完成后,记下客户端 ID(Client ID) 和租户 ID(Tenant ID)。

然后,在两种可用的身份验证方法中选择一种:

客户端凭据(Client credentials)

在您的 Microsoft Entra 管理中心,完成以下步骤:

  1. 在左侧边栏中,转到 API 权限(API Permissions)
  2. 选择 添加权限(Add a Permission)
  3. 选择 Microsoft Graph
  4. 选择 应用程序权限(Application Permissions)
  5. 如果您希望您的应用程序读取所有 SharePoint 站点,请添加 Sites.Read.All
    • 如果您计划配置导出任务,请改用 Sites.ReadWrite.All
  6. 如果您希望您的应用程序读取选定的 SharePoint 站点,请添加 Sites.Selected
  7. 如果您是 Entra 管理员,请选择 为 [租户] 授予管理员同意(Grant admin consent for [tenant])
  8. 如果您在上面添加了 Sites.Selected,请将您的应用程序添加到特定站点 ↗

  9. "roles" 数组参数的可用选项为 "write" 和/或 "read""read" 选项足以从 SharePoint 站点摄取文件。

  10. 要使用适当的身份验证轻松发送 POST 请求,请使用 Graph Explorer ↗
  11. 您可以通过向 https://graph.microsoft.com/v1.0/sites/[tenantName]:/sites/[siteName] 发送 GET 请求来获取站点的元数据(例如:https://graph.microsoft.com/v1.0/sites/contoso.sharepoint.com:/sites/mySite)。此请求将返回一个由多个值组成的 ID:站点集合主机名(Site collection hostname)、站点集合唯一 ID(Site collection unique ID) 和站点唯一 ID(Site unique ID),其中中间的值是运行权限 POST 所需的 siteId。
  12. 生成客户端密钥(Client secret)。 ↗

在 Data Connection 中设置以下源配置:

选项 是否必需? 描述
Azure Client ID 应用程序注册的 ID;也称为应用程序 ID(Application ID)。
Azure Tenant ID Microsoft Entra ID 实例的唯一标识符。
Client secret 在应用程序注册中生成的密钥。

用户名/密码(Username/password)

用户名/密码流程涉及创建一个可以登录 Microsoft 365 的用户帐户。Graph API 不支持用户名/密码身份验证方法的双因素身份验证。因此,我们强烈建议创建一个长度至少为 32 个字符的随机生成密码

在您的 Entra 管理中心,完成以下步骤:

  1. 在左侧边栏中,转到 API 权限(API Permissions)
  2. 选择 添加权限(Add a Permission)
  3. 选择 Microsoft Graph
  4. 选择 委托权限(Delegated Permissions)
  5. 添加 Sites.Read.All 权限。
  6. 如果您计划配置导出任务,请改用 Sites.ReadWrite.All
  7. 如果您是 Azure 管理员,请选择 为 [租户] 授予管理员同意(Grant admin consent for [tenant])
  8. 在左侧边栏中,转到 身份验证(Authentication)
  9. 允许公共客户端流(Allow public client flows) 更改为 Yes
  10. 在 Microsoft Entra ID 中创建一个用户,使用长度至少为 32 个字符的随机生成密码
  11. 将该用户添加到您希望其读取或写入的任何 SharePoint 站点。

在 Data Connection 中设置以下源配置:

选项 是否必需? 描述
Azure Client ID 应用程序注册的 ID;也称为应用程序 ID(Application ID)。
Username 用户的电子邮件地址。
Password 生成的密码。

基于 XML 的 SharePoint 加载项权限设置(XML-based permissioning for SharePoint Add-ins)

如果您正在使用 SharePoint 加载项进行授权和身份验证 ↗,并且您的 SharePoint 加载项使用 XML 进行权限管理,您必须确保在范围 URI(scope URI) 中设置了正确的范围,以避免在连接到 SharePoint 时出现访问问题。

请按照以下步骤验证和配置正确的范围:

  1. 找到包含 SharePoint 加载项权限设置的 AppManifest.xml 文件。
  2. AppManifest.xml 文件中,识别 XML 文件中的范围 URI,其格式应类似于:

<AppPermissionRequests AllowAppOnlyPolicy="true"> <AppPermissionRequest Scope="http://sharepoint/content/sitecollection/web" Right="FullControl" /> </AppPermissionRequests>

  1. 验证范围值(在此示例中为 http://sharepoint/content/sitecollection/web)是否与您正在连接的 SharePoint 站点匹配;如果范围值不匹配,请相应调整范围值。

网络(Networking)

SharePoint Online 连接器需要在端口 443 上访问以下域名:

  • login.microsoftonline.com
  • graph.microsoft.com
  • 您的 SharePoint URL;例如,contoso.sharepoint.com

如果您使用的是 GovCloud SharePoint 实例,请改用端口 443 上的以下域名:

  • login.microsoftonline.us
  • graph.microsoft.us
  • 您的 SharePoint URL;例如,contoso.sharepoint.us

配置选项(Configuration options)

SharePoint Online 连接器提供以下配置选项:

选项 是否必需? 描述
SharePoint Library URL 单个 SharePoint 站点可能有多个文档库;您的 URL 必须指向特定的库。格式必须为 https://[tenant].sharepoint.com/sites/[site]/[library]
Credentials settings 使用上述身份验证指南进行配置。
Proxy settings 启用以在连接 SharePoint Online 时使用代理。

从 SharePoint Online 同步数据(Sync data from SharePoint Online)

SharePoint Online 连接器使用基于文件的同步接口(file-based sync interface)

将数据导出到 SharePoint Online(Export data to SharePoint Online)

要导出到 SharePoint 站点,首先为您的 SharePoint Online 连接器启用导出功能。然后,创建一个新的导出

导出配置选项(Export configuration options)

选项 是否必需? 默认值 描述
Directory path / SharePoint 库中应导出文件的文件夹路径。导出文件的完整路径计算为 <SharePoint Library URL>/Directory Path>/<Exported File Path>

在代码中使用 SharePoint 源(Use SharePoint sources in code)

以下示例演示了如何使用 SharePoint 的 Python 客户端 ↗ Office365-REST-Python-Client外部转换(external transform) 中将文件上传到 SharePoint 源。请注意,此示例使用客户端证书(client certificate) 身份验证。

查看更多 来自 SharePoint 的示例 ↗

from pyspark.sql import DataFrame
from transforms.api import Input, Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
from office365.sharepoint.client_context import ClientContext

@lightweight
@external_systems(
    sharepoint_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
    input_df=Input("<dataset_rid>"), # 包含要导出到 SharePoint 的文件列表的数据集
)
def compute(ctx, input_df: DataFrame, output, sharepoint_source) -> DataFrame:

    # 1. 使用客户端证书身份验证连接到 SharePoint。
    client = ClientContext("<sharepoint_url>").with_client_certificate(
        tenant="<tenant_id>",
        client_id="<client_id>",
        thumbprint="<thumbprint>",
        private_key=sharepoint_source.get_secret("clientSecret"),
    )

    current_web = client.web
    client.load(current_web)
    client.execute_query()

    target_folder = client.web.lists.get_by_title("<document_library_name>").root_folder

    # 2 从 input_df 上传文件,将 URL 存储在数据集中
    upload_urls = []
    fs = input_df.filesystem()
    input_files = fs.ls()
    for f in input_files:
        with fs.open(f.path) as fileobj:
            uploaded_file = target_folder.upload_file(f.path, fileobj).execute_query()
            upload_urls.append({'file_name': f.path, 'upload_url': uploaded_file.serverRelativeUrl})


    # 3. 返回上传 URL 的数据集
    output.write_table(pl.from_pandas(pd.DataFrame.from_records(upload_urls)))

摄取 SharePoint 列表(Ingest SharePoint Lists)

SharePoint Online 连接器仅支持基于文件的摄取。要从 SharePoint 列表(SharePoint Lists)摄取数据,请使用带有 Microsoft Graph API 的外部转换(external transform)。以下辅助类处理 OAuth2 身份验证,并提供检索列表和列表项的方法,并支持自动分页:

import requests
import logging
from typing import Optional, Dict, List
from urllib.parse import urlparse


class SharePointListReader:
    """
    用于通过 Microsoft Graph API 读取 SharePoint 列表的客户端。

    此类处理 OAuth2 身份验证,并提供以下方法:
    - 从 SharePoint 站点检索所有列表
    - 从特定列表中获取项目,并支持自动分页

    参数:
        tenant_id: Azure AD 租户 ID
        client_id: Azure AD 应用程序(客户端)ID
        client_secret: Azure AD 应用程序客户端密钥
        site_url: 完整的 SharePoint 站点 URL(例如,https://contoso.sharepoint.com/sites/mysite)
        logger: 可选的日志记录器实例
    """

    def __init__(
        self,
        tenant_id: str,
        client_id: str,
        client_secret: str,
        site_url: str,
        logger: Optional[logging.Logger] = None
    ):
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.site_url = site_url.rstrip("/")
        self.base_url = "https://graph.microsoft.com/v1.0"

        self.access_token: Optional[str] = None
        self.site_id: Optional[str] = None
        self.logger = logger or self._setup_default_logger()

    def _setup_default_logger(self) -> logging.Logger:
        """配置带有控制台输出的默认日志记录器。"""
        logger = logging.getLogger(__name__)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(levelname)s: %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

    def get_access_token(self) -> bool:
        """
        从 Azure AD 获取 OAuth2 访问令牌。

        返回:
            如果成功获取令牌则返回 True,否则返回 False
        """
        token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "scope": "https://graph.microsoft.com/.default",
        }

        try:
            response = requests.post(token_url, data=payload)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data["access_token"]

            expires_in = token_data.get("expires_in", 3600)
            self.logger.info(f"身份验证成功(在 {expires_in} 秒后过期)")
            return True

        except requests.exceptions.RequestException as e:
            self.logger.error(f"身份验证失败:{e}")
            return False

    def _make_graph_request(self, url: str, params: Optional[Dict] = None) -> Optional[Dict]:
        """
        执行经过身份验证的 GET 请求到 Microsoft Graph API。

        参数:
            url: 完整的 Graph API 端点 URL
            params: 可选的查询参数

        返回:
            以字典形式返回的 JSON 响应,失败时返回 None
        """
        if not self.access_token and not self.get_access_token():
            return None

        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json",
        }

        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            self.logger.error(f"API 请求失败:{e}")
            if hasattr(e, 'response') and e.response is not None:
                self.logger.debug(f"响应详情:{e.response.text}")
            return None

    def get_site_id(self) -> Optional[str]:
        """
        从站点 URL 检索 SharePoint 站点 ID。

        返回:
            站点 ID 字符串,如果检索失败则返回 None
        """
        if self.site_id:
            return self.site_id

        parsed = urlparse(self.site_url)
        hostname = parsed.hostname
        site_path = parsed.path.strip("/")

        url = f"{self.base_url}/sites/{hostname}:/{site_path}"
        data = self._make_graph_request(url)

        if data and "id" in data:
            self.site_id = data["id"]
            self.logger.debug(f"站点 ID 已检索:{self.site_id}")
            return self.site_id

        self.logger.error("无法检索站点 ID")
        return None

    def get_all_lists(self) -> Optional[Dict]:
        """
        从 SharePoint 站点检索所有列表。

        返回:
            包含列表元数据的字典,失败时返回 None
        """
        site_id = self.get_site_id()
        if not site_id:
            return None

        url = f"{self.base_url}/sites/{site_id}/lists"
        data = self._make_graph_request(url)

        if data and "value" in data:
            self.logger.info(f"在站点中找到 {len(data['value'])} 个列表")
            for lst in data["value"]:
                self.logger.info(f"  - {lst['name']} (ID: {lst['id']})")

        return data

    def get_all_list_items(self, list_id: str) -> Optional[List[Dict]]:
        """
        从 SharePoint 列表中检索所有项目,并支持自动分页。

        参数:
            list_id: SharePoint 列表的 GUID

        返回:
            项目字典列表,失败时返回 None
        """
        site_id = self.get_site_id()
        if not site_id:
            return None

        all_items = []
        url = f"{self.base_url}/sites/{site_id}/lists/{list_id}/items"
        params = {"$expand": "fields", "$top": 5000}

        page_count = 0
        while url:
            current_params = None if "@odata.nextLink" in url else params
            data = self._make_graph_request(url, current_params)

            if not data or "value" not in data:
                break

            page_count += 1
            items_in_page = len(data["value"])
            all_items.extend(data["value"])

            self.logger.debug(f"第 {page_count} 页:检索到 {items_in_page} 个项目")

            url = data.get("@odata.nextLink")
            params = None

        self.logger.info(f"从列表中检索到总共 {len(all_items)} 个项目")
        return all_items

以下示例演示了如何在外部转换中使用此类将 SharePoint 列表数据摄取到 Foundry 数据集中:

from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import polars as pl


@lightweight
@external_systems(
    sharepoint_source=Source("<source_rid>")
)
@transform(
    output=Output("<dataset_rid>"),
)
def compute(ctx, output, sharepoint_source):

    # 1. 使用源中的凭据初始化 SharePoint 列表读取器
    reader = SharePointListReader(
        tenant_id="<tenant_id>",
        client_id="<client_id>",
        client_secret=sharepoint_source.get_secret("clientSecret"),
        site_url="https://contoso.sharepoint.com/sites/mysite"
    )

    # 2. 从特定列表中检索所有项目
    items = reader.get_all_list_items(list_id="<list_guid>")

    # 3. 从每个项目中提取字段并写入输出数据集
    records = [item["fields"] for item in items if "fields" in item]
    output.write_table(pl.from_dicts(records))