Directory(目录)¶
:::callout{theme="warning"}
The Directory connector is a sunset connector documented here for historical reference. It only works with the legacy agent worker — there is no Foundry worker equivalent.
We recommend always using alternative file-sharing connectors when available, like SFTP, SMB, or FTP. If the files can only be accessed via the host itself, we recommend using external transforms with a REST API source instead of a Directory source.
:::
The Directory connector allows you to ingest files located directly on the host where a Data Connection agent is running. This connector is useful for scenarios where files are generated or stored locally on the agent machine and need to be synced into Foundry.
Supported capabilities¶
| Capability | Status |
|---|---|
| Exploration | 🟡 Sunset |
| Batch syncs | 🟡 Sunset |
| Incremental | 🟡 Sunset |
Data model¶
The connector can transfer files of any type into Foundry datasets. File formats are preserved, and no schemas are applied during or after the transfer. Apply any necessary schema to the output dataset, or write a downstream transformation to access the data.
Setup¶
- Open the Data Connection application and select + New Source in the upper right corner of the screen.
- Select Directory from the available connector types.
- The source will be configured to run on an agent worker.
- Follow the additional configuration prompts to continue the setup of your connector.
Learn more about setting up a connector in Foundry.
Configuration options¶
| Option | Required? | Description |
|---|---|---|
Root directory |
Yes | The directory on the agent host that will be used as the starting directory for all requests via this connection. |
Sync data from Directory¶
The Directory connector uses the file-based sync interface.
Ingest files from agent hosts using external transforms¶
For more flexibility and control, you can ingest files from an agent host using external transforms. This approach allows you to run the sync logic on a Foundry worker while still accessing files on a remote agent host.
Prerequisites¶
- Create a REST API source: Navigate to the Data Connection application and create a new REST API source.
- Configure the connection details:
- Set the domain to your agent host address (or placeholder domain name if this is a private IP address, see below).
- Set the port to
22(SSH). - Add the SSH username and password as secrets for a user that can SSH to the host.
- Add an agent proxy egress policy: Create an agent proxy egress policy for your agent host address (or placeholder domain name if this is a private IP address, see below), backed by the agent itself. This allows the Foundry worker to route traffic through the agent to reach the agent host.
:::callout{theme="note"}
If your agent host address is a private IP address (for example, 10.x.x.x, 172.16.x.x, or 192.168.x.x), you must configure a host override on your agent to map a placeholder domain name to that private IP. Use this placeholder domain instead of the private IP address when configuring the domain on your REST source and egress policy.
:::
- Import the source into your code repository: Follow the external transforms setup guide to import the source into your Python transforms repository.
Example: Read files from an agent host via SSH¶
The following example demonstrates how to connect to an agent host via SSH and read files into a Foundry dataset using the Paramiko ↗ Python library.
from transforms.api import transform, Output, Input, LightweightOutput, LightweightInput, lightweight
from transforms.external.systems import external_systems, Source, ResolvedSource
import paramiko
@lightweight
@external_systems(
agent_source=Source("<source_rid>") # Replace with your REST API source RID
)
@transform(
output_dataset=Output("<output_dataset_rid>"), # Replace with your output dataset RID
files_to_read=Input("<input_dataset_rid>"), # Dataset containing file paths to read
)
def compute(
agent_source: ResolvedSource,
output_dataset: LightweightOutput,
files_to_read: LightweightInput,
):
"""
Read files from a remote agent host via SSH and write them to a Foundry dataset.
"""
# 1. SSH connection setup
hostname = "<agent_hostname>" # Replace with your agent hostname
username = "<ssh_username>" # Replace with your SSH username
password = agent_source.get_secret("<password_secret_name>") # Replace with your secret name
# 2. Establish SSH connection
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=username, password=password)
# 3. Read file paths from input dataset
remote_file_paths = files_to_read.pandas()["remote_file_path"].tolist()
# 4. Open SFTP connection
sftp = client.open_sftp()
# 5. Read each file and write to output dataset
for remote_path in remote_file_paths:
with sftp.open(remote_path, "rb") as remote_file:
file_binary_data = remote_file.read()
# Extract filename from path and write to output
filename = remote_path.split("/")[-1]
with output_dataset.filesystem().open(filename, "wb") as f:
f.write(file_binary_data)
# 6. Close connections
sftp.close()
client.close()
:::callout{theme="neutral"}
Ensure that the paramiko library is installed in your Python transforms repository. You can add it via the Libraries tab in the left side panel of your code repository.
:::
Example: Delete files from an agent host via SSH¶
If you need guaranteed deletion of files from a directory source after ingestion, you can use an external transform instead of relying on completion strategies. Completion strategies only provide best-effort deletion.
The example below demonstrates how to delete files from an agent host after they have been processed. To use this approach:
- Create an upstream sync that ingests files from the directory source and outputs the list of successfully ingested file paths to a dataset.
- Schedule this delete transform to run after the sync completes, using the ingested file paths dataset as input.
- The output dataset will contain the deletion status for each file, allowing you to audit which files were successfully deleted.
:::callout{theme="warning" title="Irreversible file deletion"} This operation permanently deletes files from the agent host filesystem. Once executed, the deleted files cannot be recovered. Ensure you have confirmed the files are successfully ingested into Foundry before running this transform. :::
from transforms.api import transform, Output, Input, LightweightOutput, LightweightInput, lightweight
from transforms.external.systems import external_systems, Source, ResolvedSource
import paramiko
@lightweight
@external_systems(
agent_source=Source("<source_rid>") # Replace with your REST API source RID
)
@transform(
output_dataset=Output("<output_dataset_rid>"), # Replace with your output dataset RID
files_to_delete=Input("<input_dataset_rid>"), # Dataset containing file paths to delete
)
def compute(
agent_source: ResolvedSource,
output_dataset: LightweightOutput,
files_to_delete: LightweightInput,
):
"""
Delete files from a remote agent host via SSH.
Input dataset should contain a column 'remote_file_path' with absolute paths to delete.
"""
hostname = "<agent_hostname>" # Replace with your agent hostname
username = "<ssh_username>" # Replace with your SSH username
password = agent_source.get_secret("<password_secret_name>") # Replace with your secret name
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=username, password=password)
remote_file_paths = files_to_delete.pandas()["remote_file_path"].tolist()
sftp = client.open_sftp()
deletion_results = []
for remote_path in remote_file_paths:
try:
sftp.remove(remote_path)
deletion_results.append({"file": remote_path, "status": "deleted"})
except FileNotFoundError:
deletion_results.append({"file": remote_path, "status": "not_found"})
except PermissionError:
deletion_results.append({"file": remote_path, "status": "permission_denied"})
sftp.close()
client.close()
# Write deletion results to output dataset
import pandas as pd
results_df = pd.DataFrame(deletion_results)
output_dataset.write_pandas(results_df)
中文翻译¶
目录¶
:::callout{theme="warning"}
目录连接器(Directory connector)是一个已停用的连接器,此处仅作历史参考。它仅适用于旧版代理工作器(agent worker)——没有对应的Foundry工作器(Foundry worker)版本。
我们建议尽可能使用其他可用的文件共享连接器,例如SFTP、SMB或FTP。如果只能通过主机本身访问文件,我们建议使用带有REST API源的外部转换(external transforms),而不是目录源(Directory source)。
:::
目录连接器(Directory connector)允许您摄取位于运行数据连接代理(Data Connection agent)的主机上的文件。此连接器适用于文件在代理机器上生成或存储并需要同步到Foundry的场景。
支持的功能¶
| 功能 | 状态 |
|---|---|
| 探索(Exploration) | 🟡 已停用 |
| 批量同步(Batch syncs) | 🟡 已停用 |
| 增量同步(Incremental) | 🟡 已停用 |
数据模型¶
该连接器可以将任何类型的文件传输到Foundry数据集。文件格式保持不变,在传输期间或之后不会应用任何模式。对输出数据集应用任何必要的模式,或编写下游转换(downstream transformation)来访问数据。
设置¶
- 打开数据连接(Data Connection)应用程序,并在屏幕右上角选择+ 新建源(+ New Source)。
- 从可用的连接器类型中选择目录(Directory)。
- 该源将被配置为在代理工作器(agent worker)上运行。
- 按照其他配置提示继续完成连接器的设置。
了解更多关于在Foundry中设置连接器(setting up a connector)的信息。
配置选项¶
| 选项 | 是否必需 | 描述 |
|---|---|---|
根目录(Root directory) |
是 | 代理主机上的目录,将作为通过此连接进行所有请求的起始目录。 |
从目录同步数据¶
目录连接器(Directory connector)使用基于文件的同步接口(file-based sync interface)。
使用外部转换从代理主机摄取文件¶
为了获得更大的灵活性和控制力,您可以使用外部转换(external transforms)从代理主机摄取文件。这种方法允许您在Foundry工作器(Foundry worker)上运行同步逻辑,同时仍然访问远程代理主机上的文件。
前提条件¶
- 创建REST API源: 导航到数据连接(Data Connection)应用程序并创建一个新的REST API源(REST API source)。
- 配置连接详情:
- 将域名(domain)设置为您的代理主机地址(如果是私有IP地址,则设置为占位域名,见下文)。
- 将端口(port)设置为
22(SSH)。 - 将SSH用户名(username)和密码(password)作为密钥添加,用于可以SSH到主机的用户。
- 添加代理代理出站策略(agent proxy egress policy): 为您的代理主机地址(如果是私有IP地址,则设置为占位域名,见下文)创建一个代理代理出站策略(agent proxy egress policy),由代理本身支持。这允许Foundry工作器通过代理路由流量以到达代理主机。
:::callout{theme="note"}
如果您的代理主机地址是私有IP地址(例如10.x.x.x、172.16.x.x或192.168.x.x),您必须在代理上配置主机覆盖(configure a host override),将一个占位域名映射到该私有IP。在配置REST源和出站策略的域名时,请使用此占位域名而不是私有IP地址。
:::
- 将源导入到代码仓库: 按照外部转换设置指南(external transforms setup guide)将源导入到您的Python转换仓库中。
示例:通过SSH从代理主机读取文件¶
以下示例演示了如何使用Paramiko ↗ Python库通过SSH连接到代理主机并将文件读取到Foundry数据集中。
from transforms.api import transform, Output, Input, LightweightOutput, LightweightInput, lightweight
from transforms.external.systems import external_systems, Source, ResolvedSource
import paramiko
@lightweight
@external_systems(
agent_source=Source("<source_rid>") # 替换为您的REST API源RID
)
@transform(
output_dataset=Output("<output_dataset_rid>"), # 替换为您的输出数据集RID
files_to_read=Input("<input_dataset_rid>"), # 包含要读取的文件路径的数据集
)
def compute(
agent_source: ResolvedSource,
output_dataset: LightweightOutput,
files_to_read: LightweightInput,
):
"""
通过SSH从远程代理主机读取文件并将其写入Foundry数据集。
"""
# 1. SSH连接设置
hostname = "<agent_hostname>" # 替换为您的代理主机名
username = "<ssh_username>" # 替换为您的SSH用户名
password = agent_source.get_secret("<password_secret_name>") # 替换为您的密钥名称
# 2. 建立SSH连接
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=username, password=password)
# 3. 从输入数据集读取文件路径
remote_file_paths = files_to_read.pandas()["remote_file_path"].tolist()
# 4. 打开SFTP连接
sftp = client.open_sftp()
# 5. 读取每个文件并写入输出数据集
for remote_path in remote_file_paths:
with sftp.open(remote_path, "rb") as remote_file:
file_binary_data = remote_file.read()
# 从路径中提取文件名并写入输出
filename = remote_path.split("/")[-1]
with output_dataset.filesystem().open(filename, "wb") as f:
f.write(file_binary_data)
# 6. 关闭连接
sftp.close()
client.close()
:::callout{theme="neutral"}
确保paramiko库已安装在您的Python转换仓库中。您可以通过代码仓库左侧面板中的库(Libraries)选项卡添加它。
:::
示例:通过SSH从代理主机删除文件¶
如果您需要在摄取后保证从目录源删除文件,可以使用外部转换而不是依赖完成策略(completion strategies)。完成策略仅提供尽力而为的删除。
下面的示例演示了如何在文件被处理后从代理主机删除文件。要使用此方法:
- 创建一个上游同步,从目录源摄取文件,并将成功摄取的文件的路径列表输出到一个数据集。
- 安排此删除转换在同步完成后运行,使用已摄取的文件路径数据集作为输入。
- 输出数据集将包含每个文件的删除状态,允许您审计哪些文件已成功删除。
:::callout{theme="warning" title="不可逆的文件删除"} 此操作会永久删除代理主机文件系统上的文件。一旦执行,删除的文件无法恢复。在运行此转换之前,请确保已确认文件已成功摄取到Foundry中。 :::
from transforms.api import transform, Output, Input, LightweightOutput, LightweightInput, lightweight
from transforms.external.systems import external_systems, Source, ResolvedSource
import paramiko
@lightweight
@external_systems(
agent_source=Source("<source_rid>") # 替换为您的REST API源RID
)
@transform(
output_dataset=Output("<output_dataset_rid>"), # 替换为您的输出数据集RID
files_to_delete=Input("<input_dataset_rid>"), # 包含要删除的文件路径的数据集
)
def compute(
agent_source: ResolvedSource,
output_dataset: LightweightOutput,
files_to_delete: LightweightInput,
):
"""
通过SSH从远程代理主机删除文件。
输入数据集应包含一个列'remote_file_path',其中包含要删除的绝对路径。
"""
hostname = "<agent_hostname>" # 替换为您的代理主机名
username = "<ssh_username>" # 替换为您的SSH用户名
password = agent_source.get_secret("<password_secret_name>") # 替换为您的密钥名称
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=username, password=password)
remote_file_paths = files_to_delete.pandas()["remote_file_path"].tolist()
sftp = client.open_sftp()
deletion_results = []
for remote_path in remote_file_paths:
try:
sftp.remove(remote_path)
deletion_results.append({"file": remote_path, "status": "deleted"})
except FileNotFoundError:
deletion_results.append({"file": remote_path, "status": "not_found"})
except PermissionError:
deletion_results.append({"file": remote_path, "status": "permission_denied"})
sftp.close()
client.close()
# 将删除结果写入输出数据集
import pandas as pd
results_df = pd.DataFrame(deletion_results)
output_dataset.write_pandas(results_df)