SharePoint Online¶
Connect to SharePoint Online to import files from specified SharePoint libraries into Foundry.
Supported capabilities¶
| Capability | Status |
|---|---|
| Exploration | 🟢 Generally available |
| Bulk import | 🟢 Generally available |
| Incremental | 🟢 Generally available |
| Export tasks | 🟡 Sunset |
| File exports | 🟢 Generally available |
Data model¶
The connector can transfer files of any type into Foundry datasets. File formats are preserved, and no schemas are applied during or after the transfer. Apply any necessary schema to the output dataset, or write a downstream transformation to access the data.
Performance and limitations¶
There is no limit to the size of transferable files. However, network issues can result in failures of large-scale transfers. In particular, Foundry syncs that take more than two days to run will be interrupted. To avoid network issues, we recommend using smaller file sizes and limiting the number of files that are ingested in every execution of the sync. Syncs can be scheduled to run frequently.
:::callout{theme="warning"} Connections to on-premise SharePoint servers are not supported. Use a REST API source type to connect to on-premise SharePoint. :::
Setup¶
- Open the Data Connection application and select + New Source in the upper right corner of the screen.
- Select SharePoint Online from the available connector types.
- Follow the additional configuration prompts to continue the setup of your connector using the information in the sections below.
Learn more about setting up a connector in Foundry.
Authentication¶
:::callout{theme="warning"} Authentication for the SharePoint Online source requires an application in Microsoft Entra ID (formerly known as Azure Active Directory). If you are not an Entra ID administrator, contact your IT department to request access. :::
Follow the initial steps below to access Azure application credentials:
- Create an application registration in Azure by following the instructions in the Microsoft documentation ↗.
- At Step 5, select Accounts in this organizational directory only and skip Redirect URL (optional).
- Note the client ID and tenant ID once registration is complete.
Then, choose between two available authentication method:
- Client credentials: Recommended when a wide range of access is required for every SharePoint site.
- Username/password: Recommended for limiting access to one or a few SharePoint sites.
Client credentials¶
In your Microsoft Entra admin center, complete the following steps:
- Go to API Permissions in the left sidebar.
- Select Add a Permission.
- Select Microsoft Graph.
- Select Application Permissions.
- If you would like your application to read all SharePoint sites add
Sites.Read.All.- If you plan to configure export tasks, use
Sites.ReadWrite.Allinstead.
- If you plan to configure export tasks, use
- If you would like your application to read selected SharePoint sites add
Sites.Selected. - If you are an Entra Administrator, select Grant admin consent for [tenant].
-
If you added
Sites.Selectedabove, add your application to specific sites ↗. -
The available options for the
"roles"array parameter are"write"and/or"read". The"read"option is sufficient to ingest files from the SharePoint site. - To easily send a POST with proper authentication, use the Graph Explorer ↗.
- You can receive metadata about a site by sending a GET to
https://graph.microsoft.com/v1.0/sites/[tenantName]:/sites/[siteName](for example:https://graph.microsoft.com/v1.0/sites/contoso.sharepoint.com:/sites/mySite). This request will return an ID that is a composite of several values: Site collection hostname, Site collection unique ID, and Site unique ID where the middle value is the siteId needed to run the permissions POST. - Generate a client secret. ↗.
Set the following source configurations in Data Connection:
| Option | Required? | Description |
|---|---|---|
Azure Client ID |
Yes | The ID of the app registration; also called Application ID. |
Azure Tenant ID |
Yes | the unique identifier of the Microsoft Entra ID instance. |
Client secret |
Yes | The secret generated in the app registration. |
Username/password¶
The username/password flow involves creating a user account that can sign in to Microsoft 365. The Graph API does not support two-factor authentication for the username/password authentication method. Because of this, we strongly recommend creating a randomly generated password of at least 32 characters in length.
In your Entra admin center, complete the following steps:
- Go to API Permissions in the left sidebar.
- Select Add a Permission.
- Select Microsoft Graph.
- Select Delegated Permissions.
- Add the
Sites.Read.Allpermission;. - If you plan to configure export tasks, use
Sites.ReadWrite.Allinstead. - If you are an Azure Administrator, select Grant admin consent for [tenant].
- Go to Authentication in the left sidebar.
- Change Allow public client flows to
Yes. - Create a user in Microsoft Entra ID with a randomly generated password of at least 32 characters.
- Add that user to any SharePoint sites that you would like it to read or write.
Set the following source configurations in Data Connection:
| Option | Required? | Description |
|---|---|---|
Azure Client ID |
Yes | The ID of the app registration; also called Application ID. |
Username |
Yes | The user's email address. |
Password |
Yes | The generated password. |
XML-based permissioning for SharePoint Add-ins¶
If you are using SharePoint Add-ins for authorization and authentication ↗, and your SharePoint Add-in uses XML for permission management, you must ensure that the correct scope is set in the scope URI to avoid access issues when connecting to SharePoint.
Follow the steps below to verify and configure the correct scope:
- Locate the
AppManifest.xmlfile containing the permission settings for your SharePoint Add-in. - In the
AppManifest.xmlfile, identify the scope URI within the XML file, which should look similar to this:
<AppPermissionRequests AllowAppOnlyPolicy="true"> <AppPermissionRequest Scope="http://sharepoint/content/sitecollection/web" Right="FullControl" /> </AppPermissionRequests>.
- Verify that the scope value (in this example,
http://sharepoint/content/sitecollection/web) matches the SharePoint site to which you are connecting; if the scope value does not match, adjust the scope value accordingly.
Networking¶
The SharePoint Online connector requires network access to the following domains on port 443:
login.microsoftonline.comgraph.microsoft.com- Your SharePoint URL; for example,
contoso.sharepoint.com
If you are using a GovCloud SharePoint instance, use the following domains on port 443 instead:
login.microsoftonline.usgraph.microsoft.us- Your SharePoint URL; for example,
contoso.sharepoint.us
Configuration options¶
The following configuration options are available for the SharePoint Online connector:
| Option | Required? | Description |
|---|---|---|
SharePoint Library URL |
Yes | A single SharePoint site may have several document libraries; your URL must point to a specific library. Must be in the format https://[tenant].sharepoint.com/sites/[site]/[library]. |
Credentials settings |
Yes | Configure using the Authentication guidance shown above. |
Proxy settings |
No | Enable to use a proxy while connecting to SharePoint Online. |
Sync data from SharePoint Online¶
The SharePoint Online connector uses the file-based sync interface.
Export data to SharePoint Online¶
To export to a SharePoint site, first enable exports for your SharePoint Online connector. Then, create a new export.
Export configuration options¶
| Option | Required? | Default | Description |
|---|---|---|---|
Directory path |
Yes | / | The path to the folder in the SharePoint library where files should be exported. The full path for an exported file is calculated as <SharePoint Library URL>/Directory Path>/<Exported File Path> |
Use SharePoint sources in code¶
The example below demonstrates how to upload a file to a SharePoint source using the Python client for SharePoint ↗ Office365-REST-Python-Client in an external transform. Note that this example uses client certificate authentication.
Review more examples from SharePoint ↗.
from pyspark.sql import DataFrame
from transforms.api import Input, Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
from office365.sharepoint.client_context import ClientContext
@lightweight
@external_systems(
sharepoint_source=Source("<source_rid>")
)
@transform(
output=Output("<dataset_rid>"),
input_df=Input("<dataset_rid>"), # Dataset containing a list of files to export to SharePoint
)
def compute(ctx, input_df: DataFrame, output, sharepoint_source) -> DataFrame:
# 1. Connect to SharePoint using client certificate authentication.
client = ClientContext("<sharepoint_url>").with_client_certificate(
tenant="<tenant_id>",
client_id="<client_id>",
thumbprint="<thumbprint>",
private_key=sharepoint_source.get_secret("clientSecret"),
)
current_web = client.web
client.load(current_web)
client.execute_query()
target_folder = client.web.lists.get_by_title("<document_library_name>").root_folder
# 2 Upload files from input_df, store URL in dataset
upload_urls = []
fs = input_df.filesystem()
input_files = fs.ls()
for f in input_files:
with fs.open(f.path) as fileobj:
uploaded_file = target_folder.upload_file(f.path, fileobj).execute_query()
upload_urls.append({'file_name': f.path, 'upload_url': uploaded_file.serverRelativeUrl})
# 3. Return dataset of uploaded URLs
output.write_table(pl.from_pandas(pd.DataFrame.from_records(upload_urls)))
Ingest SharePoint Lists¶
The SharePoint Online connector only supports file-based ingestion. To ingest data from SharePoint Lists, use an external transform with the Microsoft Graph API. The following helper class handles OAuth2 authentication and provides methods to retrieve lists and list items with automatic pagination:
import requests
import logging
from typing import Optional, Dict, List
from urllib.parse import urlparse
class SharePointListReader:
"""
Client for reading SharePoint lists via Microsoft Graph API.
This class handles OAuth2 authentication and provides methods to:
- Retrieve all lists from a SharePoint site
- Fetch items from specific lists with automatic pagination
Args:
tenant_id: Azure AD tenant ID
client_id: Azure AD application (client) ID
client_secret: Azure AD application client secret
site_url: Full SharePoint site URL (e.g., https://contoso.sharepoint.com/sites/mysite)
logger: Optional custom logger instance
"""
def __init__(
self,
tenant_id: str,
client_id: str,
client_secret: str,
site_url: str,
logger: Optional[logging.Logger] = None
):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.site_url = site_url.rstrip("/")
self.base_url = "https://graph.microsoft.com/v1.0"
self.access_token: Optional[str] = None
self.site_id: Optional[str] = None
self.logger = logger or self._setup_default_logger()
def _setup_default_logger(self) -> logging.Logger:
"""Configure default logger with console output."""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def get_access_token(self) -> bool:
"""
Acquire OAuth2 access token from Azure AD.
Returns:
True if token was successfully acquired, False otherwise
"""
token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "https://graph.microsoft.com/.default",
}
try:
response = requests.post(token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
self.logger.info(f"Authentication successful (expires in {expires_in}s)")
return True
except requests.exceptions.RequestException as e:
self.logger.error(f"Authentication failed: {e}")
return False
def _make_graph_request(self, url: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""
Execute authenticated GET request to Microsoft Graph API.
Args:
url: Full Graph API endpoint URL
params: Optional query parameters
Returns:
JSON response as dictionary, or None on failure
"""
if not self.access_token and not self.get_access_token():
return None
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"API request failed: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.debug(f"Response details: {e.response.text}")
return None
def get_site_id(self) -> Optional[str]:
"""
Retrieve SharePoint site ID from site URL.
Returns:
Site ID string, or None if retrieval fails
"""
if self.site_id:
return self.site_id
parsed = urlparse(self.site_url)
hostname = parsed.hostname
site_path = parsed.path.strip("/")
url = f"{self.base_url}/sites/{hostname}:/{site_path}"
data = self._make_graph_request(url)
if data and "id" in data:
self.site_id = data["id"]
self.logger.debug(f"Site ID retrieved: {self.site_id}")
return self.site_id
self.logger.error("Failed to retrieve site ID")
return None
def get_all_lists(self) -> Optional[Dict]:
"""
Retrieve all lists from the SharePoint site.
Returns:
Dictionary containing list metadata, or None on failure
"""
site_id = self.get_site_id()
if not site_id:
return None
url = f"{self.base_url}/sites/{site_id}/lists"
data = self._make_graph_request(url)
if data and "value" in data:
self.logger.info(f"Found {len(data['value'])} lists in site")
for lst in data["value"]:
self.logger.info(f" - {lst['name']} (ID: {lst['id']})")
return data
def get_all_list_items(self, list_id: str) -> Optional[List[Dict]]:
"""
Retrieve all items from a SharePoint list with automatic pagination.
Args:
list_id: GUID of the SharePoint list
Returns:
List of item dictionaries, or None on failure
"""
site_id = self.get_site_id()
if not site_id:
return None
all_items = []
url = f"{self.base_url}/sites/{site_id}/lists/{list_id}/items"
params = {"$expand": "fields", "$top": 5000}
page_count = 0
while url:
current_params = None if "@odata.nextLink" in url else params
data = self._make_graph_request(url, current_params)
if not data or "value" not in data:
break
page_count += 1
items_in_page = len(data["value"])
all_items.extend(data["value"])
self.logger.debug(f"Page {page_count}: retrieved {items_in_page} items")
url = data.get("@odata.nextLink")
params = None
self.logger.info(f"Retrieved {len(all_items)} total items from list")
return all_items
The following example demonstrates how to use this class in an external transform to ingest SharePoint List data into a Foundry dataset:
from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import polars as pl
@lightweight
@external_systems(
sharepoint_source=Source("<source_rid>")
)
@transform(
output=Output("<dataset_rid>"),
)
def compute(ctx, output, sharepoint_source):
# 1. Initialize the SharePoint List reader with credentials from the source
reader = SharePointListReader(
tenant_id="<tenant_id>",
client_id="<client_id>",
client_secret=sharepoint_source.get_secret("clientSecret"),
site_url="https://contoso.sharepoint.com/sites/mysite"
)
# 2. Retrieve all items from a specific list
items = reader.get_all_list_items(list_id="<list_guid>")
# 3. Extract the fields from each item and write to the output dataset
records = [item["fields"] for item in items if "fields" in item]
output.write_table(pl.from_dicts(records))
中文翻译¶
SharePoint Online¶
连接至 SharePoint Online,从指定的 SharePoint 库中导入文件至 Foundry。
支持的功能¶
| 功能 | 状态 |
|---|---|
| 探索(Exploration) | 🟢 正式发布(Generally available) |
| 批量导入(Bulk import) | 🟢 正式发布(Generally available) |
| 增量同步(Incremental) | 🟢 正式发布(Generally available) |
| 导出任务(Export tasks) | 🟡 即将停用(Sunset) |
| 文件导出(File exports) | 🟢 正式发布(Generally available) |
数据模型(Data model)¶
该连接器可将任意类型的文件传输至 Foundry 数据集。文件格式保持不变,传输过程中及传输后均不应用任何模式(Schema)。请对输出数据集应用必要的模式,或编写下游转换(transformation)以访问数据。
性能与限制(Performance and limitations)¶
可传输的文件大小没有限制。然而,网络问题可能导致大规模传输失败。特别是,运行时间超过两天的 Foundry 同步(Sync)将被中断。为避免网络问题,建议使用较小的文件大小,并限制每次同步执行时摄取的文件数量。同步可以按计划运行。
:::callout{theme="warning"} 不支持连接至本地(On-premise) SharePoint 服务器。请使用 REST API 源类型连接至本地 SharePoint。 :::
设置(Setup)¶
- 打开 Data Connection 应用程序,并在屏幕右上角选择 + New Source。
- 从可用的连接器类型中选择 SharePoint Online。
- 按照附加的配置提示,使用以下各节中的信息继续设置您的连接器。
了解更多关于在 Foundry 中设置连接器的信息。
身份验证(Authentication)¶
:::callout{theme="warning"} SharePoint Online 源的身份验证需要在 Microsoft Entra ID(原名 Azure Active Directory)中创建一个应用程序。如果您不是 Entra ID 管理员,请联系您的 IT 部门申请访问权限。 :::
请按照以下初始步骤获取 Azure 应用程序凭据:
- 按照 Microsoft 文档 ↗ 中的说明在 Azure 中创建应用程序注册。
- 在第 5 步,选择 仅此组织目录中的帐户(Accounts in this organizational directory only),并跳过 重定向 URL(可选)(Redirect URL (optional))。
- 注册完成后,记下客户端 ID(Client ID) 和租户 ID(Tenant ID)。
然后,在两种可用的身份验证方法中选择一种:
- 客户端凭据(Client credentials): 当需要对每个 SharePoint 站点进行广泛访问时推荐使用。
- 用户名/密码(Username/password): 推荐用于限制对一个或少数几个 SharePoint 站点的访问。
客户端凭据(Client credentials)¶
在您的 Microsoft Entra 管理中心,完成以下步骤:
- 在左侧边栏中,转到 API 权限(API Permissions)。
- 选择 添加权限(Add a Permission)。
- 选择 Microsoft Graph。
- 选择 应用程序权限(Application Permissions)。
- 如果您希望您的应用程序读取所有 SharePoint 站点,请添加
Sites.Read.All。- 如果您计划配置导出任务,请改用
Sites.ReadWrite.All。
- 如果您计划配置导出任务,请改用
- 如果您希望您的应用程序读取选定的 SharePoint 站点,请添加
Sites.Selected。 - 如果您是 Entra 管理员,请选择 为 [租户] 授予管理员同意(Grant admin consent for [tenant])。
-
如果您在上面添加了
Sites.Selected,请将您的应用程序添加到特定站点 ↗。 -
"roles"数组参数的可用选项为"write"和/或"read"。"read"选项足以从 SharePoint 站点摄取文件。 - 要使用适当的身份验证轻松发送 POST 请求,请使用 Graph Explorer ↗。
- 您可以通过向
https://graph.microsoft.com/v1.0/sites/[tenantName]:/sites/[siteName]发送 GET 请求来获取站点的元数据(例如:https://graph.microsoft.com/v1.0/sites/contoso.sharepoint.com:/sites/mySite)。此请求将返回一个由多个值组成的 ID:站点集合主机名(Site collection hostname)、站点集合唯一 ID(Site collection unique ID) 和站点唯一 ID(Site unique ID),其中中间的值是运行权限 POST 所需的 siteId。 - 生成客户端密钥(Client secret)。 ↗。
在 Data Connection 中设置以下源配置:
| 选项 | 是否必需? | 描述 |
|---|---|---|
Azure Client ID |
是 | 应用程序注册的 ID;也称为应用程序 ID(Application ID)。 |
Azure Tenant ID |
是 | Microsoft Entra ID 实例的唯一标识符。 |
Client secret |
是 | 在应用程序注册中生成的密钥。 |
用户名/密码(Username/password)¶
用户名/密码流程涉及创建一个可以登录 Microsoft 365 的用户帐户。Graph API 不支持用户名/密码身份验证方法的双因素身份验证。因此,我们强烈建议创建一个长度至少为 32 个字符的随机生成密码。
在您的 Entra 管理中心,完成以下步骤:
- 在左侧边栏中,转到 API 权限(API Permissions)。
- 选择 添加权限(Add a Permission)。
- 选择 Microsoft Graph。
- 选择 委托权限(Delegated Permissions)。
- 添加
Sites.Read.All权限。 - 如果您计划配置导出任务,请改用
Sites.ReadWrite.All。 - 如果您是 Azure 管理员,请选择 为 [租户] 授予管理员同意(Grant admin consent for [tenant])。
- 在左侧边栏中,转到 身份验证(Authentication)。
- 将 允许公共客户端流(Allow public client flows) 更改为
Yes。 - 在 Microsoft Entra ID 中创建一个用户,使用长度至少为 32 个字符的随机生成密码。
- 将该用户添加到您希望其读取或写入的任何 SharePoint 站点。
在 Data Connection 中设置以下源配置:
| 选项 | 是否必需? | 描述 |
|---|---|---|
Azure Client ID |
是 | 应用程序注册的 ID;也称为应用程序 ID(Application ID)。 |
Username |
是 | 用户的电子邮件地址。 |
Password |
是 | 生成的密码。 |
基于 XML 的 SharePoint 加载项权限设置(XML-based permissioning for SharePoint Add-ins)¶
如果您正在使用 SharePoint 加载项进行授权和身份验证 ↗,并且您的 SharePoint 加载项使用 XML 进行权限管理,您必须确保在范围 URI(scope URI) 中设置了正确的范围,以避免在连接到 SharePoint 时出现访问问题。
请按照以下步骤验证和配置正确的范围:
- 找到包含 SharePoint 加载项权限设置的
AppManifest.xml文件。 - 在
AppManifest.xml文件中,识别 XML 文件中的范围 URI,其格式应类似于:
<AppPermissionRequests AllowAppOnlyPolicy="true"> <AppPermissionRequest Scope="http://sharepoint/content/sitecollection/web" Right="FullControl" /> </AppPermissionRequests>。
- 验证范围值(在此示例中为
http://sharepoint/content/sitecollection/web)是否与您正在连接的 SharePoint 站点匹配;如果范围值不匹配,请相应调整范围值。
网络(Networking)¶
SharePoint Online 连接器需要在端口 443 上访问以下域名:
login.microsoftonline.comgraph.microsoft.com- 您的 SharePoint URL;例如,
contoso.sharepoint.com
如果您使用的是 GovCloud SharePoint 实例,请改用端口 443 上的以下域名:
login.microsoftonline.usgraph.microsoft.us- 您的 SharePoint URL;例如,
contoso.sharepoint.us
配置选项(Configuration options)¶
SharePoint Online 连接器提供以下配置选项:
| 选项 | 是否必需? | 描述 |
|---|---|---|
SharePoint Library URL |
是 | 单个 SharePoint 站点可能有多个文档库;您的 URL 必须指向特定的库。格式必须为 https://[tenant].sharepoint.com/sites/[site]/[library]。 |
Credentials settings |
是 | 使用上述身份验证指南进行配置。 |
Proxy settings |
否 | 启用以在连接 SharePoint Online 时使用代理。 |
从 SharePoint Online 同步数据(Sync data from SharePoint Online)¶
SharePoint Online 连接器使用基于文件的同步接口(file-based sync interface)。
将数据导出到 SharePoint Online(Export data to SharePoint Online)¶
要导出到 SharePoint 站点,首先为您的 SharePoint Online 连接器启用导出功能。然后,创建一个新的导出。
导出配置选项(Export configuration options)¶
| 选项 | 是否必需? | 默认值 | 描述 |
|---|---|---|---|
Directory path |
是 | / | SharePoint 库中应导出文件的文件夹路径。导出文件的完整路径计算为 <SharePoint Library URL>/Directory Path>/<Exported File Path> |
在代码中使用 SharePoint 源(Use SharePoint sources in code)¶
以下示例演示了如何使用 SharePoint 的 Python 客户端 ↗ Office365-REST-Python-Client 在外部转换(external transform) 中将文件上传到 SharePoint 源。请注意,此示例使用客户端证书(client certificate) 身份验证。
查看更多 来自 SharePoint 的示例 ↗。
from pyspark.sql import DataFrame
from transforms.api import Input, Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import pandas as pd
import polars as pl
from office365.sharepoint.client_context import ClientContext
@lightweight
@external_systems(
sharepoint_source=Source("<source_rid>")
)
@transform(
output=Output("<dataset_rid>"),
input_df=Input("<dataset_rid>"), # 包含要导出到 SharePoint 的文件列表的数据集
)
def compute(ctx, input_df: DataFrame, output, sharepoint_source) -> DataFrame:
# 1. 使用客户端证书身份验证连接到 SharePoint。
client = ClientContext("<sharepoint_url>").with_client_certificate(
tenant="<tenant_id>",
client_id="<client_id>",
thumbprint="<thumbprint>",
private_key=sharepoint_source.get_secret("clientSecret"),
)
current_web = client.web
client.load(current_web)
client.execute_query()
target_folder = client.web.lists.get_by_title("<document_library_name>").root_folder
# 2 从 input_df 上传文件,将 URL 存储在数据集中
upload_urls = []
fs = input_df.filesystem()
input_files = fs.ls()
for f in input_files:
with fs.open(f.path) as fileobj:
uploaded_file = target_folder.upload_file(f.path, fileobj).execute_query()
upload_urls.append({'file_name': f.path, 'upload_url': uploaded_file.serverRelativeUrl})
# 3. 返回上传 URL 的数据集
output.write_table(pl.from_pandas(pd.DataFrame.from_records(upload_urls)))
摄取 SharePoint 列表(Ingest SharePoint Lists)¶
SharePoint Online 连接器仅支持基于文件的摄取。要从 SharePoint 列表(SharePoint Lists)摄取数据,请使用带有 Microsoft Graph API 的外部转换(external transform)。以下辅助类处理 OAuth2 身份验证,并提供检索列表和列表项的方法,并支持自动分页:
import requests
import logging
from typing import Optional, Dict, List
from urllib.parse import urlparse
class SharePointListReader:
"""
用于通过 Microsoft Graph API 读取 SharePoint 列表的客户端。
此类处理 OAuth2 身份验证,并提供以下方法:
- 从 SharePoint 站点检索所有列表
- 从特定列表中获取项目,并支持自动分页
参数:
tenant_id: Azure AD 租户 ID
client_id: Azure AD 应用程序(客户端)ID
client_secret: Azure AD 应用程序客户端密钥
site_url: 完整的 SharePoint 站点 URL(例如,https://contoso.sharepoint.com/sites/mysite)
logger: 可选的日志记录器实例
"""
def __init__(
self,
tenant_id: str,
client_id: str,
client_secret: str,
site_url: str,
logger: Optional[logging.Logger] = None
):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.site_url = site_url.rstrip("/")
self.base_url = "https://graph.microsoft.com/v1.0"
self.access_token: Optional[str] = None
self.site_id: Optional[str] = None
self.logger = logger or self._setup_default_logger()
def _setup_default_logger(self) -> logging.Logger:
"""配置带有控制台输出的默认日志记录器。"""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def get_access_token(self) -> bool:
"""
从 Azure AD 获取 OAuth2 访问令牌。
返回:
如果成功获取令牌则返回 True,否则返回 False
"""
token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "https://graph.microsoft.com/.default",
}
try:
response = requests.post(token_url, data=payload)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
self.logger.info(f"身份验证成功(在 {expires_in} 秒后过期)")
return True
except requests.exceptions.RequestException as e:
self.logger.error(f"身份验证失败:{e}")
return False
def _make_graph_request(self, url: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""
执行经过身份验证的 GET 请求到 Microsoft Graph API。
参数:
url: 完整的 Graph API 端点 URL
params: 可选的查询参数
返回:
以字典形式返回的 JSON 响应,失败时返回 None
"""
if not self.access_token and not self.get_access_token():
return None
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"API 请求失败:{e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.debug(f"响应详情:{e.response.text}")
return None
def get_site_id(self) -> Optional[str]:
"""
从站点 URL 检索 SharePoint 站点 ID。
返回:
站点 ID 字符串,如果检索失败则返回 None
"""
if self.site_id:
return self.site_id
parsed = urlparse(self.site_url)
hostname = parsed.hostname
site_path = parsed.path.strip("/")
url = f"{self.base_url}/sites/{hostname}:/{site_path}"
data = self._make_graph_request(url)
if data and "id" in data:
self.site_id = data["id"]
self.logger.debug(f"站点 ID 已检索:{self.site_id}")
return self.site_id
self.logger.error("无法检索站点 ID")
return None
def get_all_lists(self) -> Optional[Dict]:
"""
从 SharePoint 站点检索所有列表。
返回:
包含列表元数据的字典,失败时返回 None
"""
site_id = self.get_site_id()
if not site_id:
return None
url = f"{self.base_url}/sites/{site_id}/lists"
data = self._make_graph_request(url)
if data and "value" in data:
self.logger.info(f"在站点中找到 {len(data['value'])} 个列表")
for lst in data["value"]:
self.logger.info(f" - {lst['name']} (ID: {lst['id']})")
return data
def get_all_list_items(self, list_id: str) -> Optional[List[Dict]]:
"""
从 SharePoint 列表中检索所有项目,并支持自动分页。
参数:
list_id: SharePoint 列表的 GUID
返回:
项目字典列表,失败时返回 None
"""
site_id = self.get_site_id()
if not site_id:
return None
all_items = []
url = f"{self.base_url}/sites/{site_id}/lists/{list_id}/items"
params = {"$expand": "fields", "$top": 5000}
page_count = 0
while url:
current_params = None if "@odata.nextLink" in url else params
data = self._make_graph_request(url, current_params)
if not data or "value" not in data:
break
page_count += 1
items_in_page = len(data["value"])
all_items.extend(data["value"])
self.logger.debug(f"第 {page_count} 页:检索到 {items_in_page} 个项目")
url = data.get("@odata.nextLink")
params = None
self.logger.info(f"从列表中检索到总共 {len(all_items)} 个项目")
return all_items
以下示例演示了如何在外部转换中使用此类将 SharePoint 列表数据摄取到 Foundry 数据集中:
from transforms.api import Output, transform, lightweight
from transforms.external.systems import external_systems, Source
import polars as pl
@lightweight
@external_systems(
sharepoint_source=Source("<source_rid>")
)
@transform(
output=Output("<dataset_rid>"),
)
def compute(ctx, output, sharepoint_source):
# 1. 使用源中的凭据初始化 SharePoint 列表读取器
reader = SharePointListReader(
tenant_id="<tenant_id>",
client_id="<client_id>",
client_secret=sharepoint_source.get_secret("clientSecret"),
site_url="https://contoso.sharepoint.com/sites/mysite"
)
# 2. 从特定列表中检索所有项目
items = reader.get_all_list_items(list_id="<list_guid>")
# 3. 从每个项目中提取字段并写入输出数据集
records = [item["fields"] for item in items if "fields" in item]
output.write_table(pl.from_dicts(records))