Server Message Block (SMB)(服务器消息块(SMB))¶
Connect to Server Message Block (SMB) shares to sync data between folders and Foundry datasets. Common examples of SMB servers include Windows File Server and Samba File Server.
Supported capabilities¶
| Capability | Status |
|---|---|
| Exploration | 🟢 Generally available |
| Bulk import | 🟢 Generally available |
| Incremental | 🟢 Generally available |
| File exports | 🟢 Generally available |
The SMB connector supports SMB protocol versions 2 and 3.
Data model¶
The connector can transfer files of any type into Foundry datasets. File formats are preserved, and no schemas are applied during or after the transfer. Apply any necessary schema to the output dataset, or write a downstream transformation to access the data.
Performance and limitations¶
There is no limit to the size of transferable files. However, network issues can result in failures of large-scale transfers. In particular, direct cloud syncs that take more than two days to run will be interrupted. To avoid network issues, we recommend using smaller file sizes and limiting the number of files that are ingested in every execution of the sync. Syncs can be scheduled to run frequently.
Setup¶
- Open the Data Connection application and select + New Source in the upper right corner of the screen.
- Select SMB from the Protocol sources section.
- Follow the additional configuration prompts to continue the setup of your connector using the information in the sections below.
Learn more about setting up a connector in Foundry.
Configuration options¶
The following configuration options are available for the SMB connector:
| Option | Required? | Description |
|---|---|---|
Hostname |
Yes | The domain name pointing to the server or the IP address of the server. |
Port |
No | The port on which the SMB server is running. |
Share |
Yes | The name of the SMB share you are connecting to. |
Username |
Yes | The SMB login username. |
Password |
Yes | The SMB login password. |
Domain |
No | The Active Directory domain of the SMB login account. Leave blank if the login account is not an AD user. |
Networking¶
The SMB connector must be able to reach Hostname on Port (445 by default). If you are using a direct connection egress policy, you must use TCP-level allowlisting.
Sync data from SMB¶
The SMB connector uses the file-based sync interface.
Export data to SMB¶
To export to an SMB share, first enable exports for your SMB connector. Then, create a new export.
Export configuration options¶
| Option | Required? | Default | Description |
|---|---|---|---|
Directory path |
Yes | / | The path to the folder in the SMB share where files should be exported. The full path for an exported file is calculated as <Share>/<Directory Path>/<Exported File Path> |
Use SMB sources in code¶
You can connect to SMB shares from a Python transforms code repository using external transforms.
Read files and metadata from SMB¶
The example below demonstrates the minimal code needed to connect to an SMB share and read files:
import smbclient
import pandas as pd
from transforms.api import Output, transform
from transforms.external.systems import external_systems, Source
DOMAIN = "<smb_domain>"
USERNAME = "<smb_username>"
SMB_PATH = "<smb_path>"
@external_systems(
smb_source=Source("ri.magritte..source.YOUR_SMB_SOURCE_RID")
)
@transform(
file_metadata=Output("ri.foundry.main.dataset.YOUR_METADATA_OUTPUT_RID"),
output_files_dataset=Output("ri.foundry.main.dataset.YOUR_FILES_OUTPUT_RID")
)
def read_smb_files(ctx, smb_source, file_metadata, output_files_dataset):
"""
Read files from an SMB share and output files and metadata.
"""
# Configure SMB client with credentials from source
username = f"{DOMAIN}\\{USERNAME}"
password = smb_source.get_secret("Password")
smbclient.ClientConfig(username=username, password=password)
# List files in a directory
files_info = []
for item in smbclient.scandir(SMB_PATH):
if not item.is_dir():
# Get file info
stat = item.stat()
file_path = f"{SMB_PATH}\\{item.name}"
files_info.append({
"filename": item.name,
"safe_filename": create_safe_filename(item.name),
"size_bytes": stat.st_size,
"path": file_path
})
with smbclient.open_file(file_path, mode="rb") as f:
content = f.read()
safe_foundry_file_name = create_safe_filename(item.name)
with output_files_dataset.filesystem().open(safe_foundry_file_name, 'w') as fileobj:
fileobj.write(content)
# Write metadata to output
if files_info:
df = ctx.spark_session.createDataFrame(pd.DataFrame(files_info))
file_metadata.write_dataframe(df)
def create_safe_filename(filepath: str) -> str:
"""
Create a safe filename for storage by removing problematic characters.
Args:
filepath: Original file path
Returns:
Sanitized filename safe for storage
"""
import os
filename = os.path.basename(filepath)
# Replace characters that might cause issues in storage
safe_chars = str.maketrans({"\\": "_", "/": "_", ":": "_", "?": "_", "*": "_", "<": "_", ">": "_", "|": "_"})
return filename.translate(safe_chars)
Read files from SMB and upload as media sets¶
The comprehensive example below demonstrates how to connect to an SMB share from a Python transform code repository and create media set outputs, including error handling, recursive directory scanning, and file categorization:
"""
SMB File Processing Transform Template
This template demonstrates how to connect to an SMB share
from a Foundry Python transform to process files and organize them by type.
Key concepts covered:
- External systems integration with SMB sources
- Recursive directory traversal
- File categorization by extension
- MediaSet outputs for different file types
- Structured metadata collection
- Error handling and logging
Prerequisites:
- SMB source configured in Data Connection
- Input dataset with directory paths
- Output datasets/mediasets configured
- smbclient library available in the repository
"""
import logging
import os
from datetime import datetime
import pandas as pd
import smbclient
from transforms.api import Input, Output, transform
from transforms.external.systems import ResolvedSource, Source, external_systems
from transforms.mediasets import MediaSetOutput
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
@external_systems(
smb_source=Source("ri.magritte..source.YOUR_SMB_SOURCE_RID") # Replace with your SMB source RID
)
@transform(
input_directories=Input("ri.foundry.main.dataset.YOUR_INPUT_DATASET_RID"), # Dataset containing directory paths
documents_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_DOCUMENTS_MEDIASET_RID"), # For PDF, DOC, PPT files
images_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_IMAGES_MEDIASET_RID"), # For image files
spreadsheets_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_SPREADSHEETS_MEDIASET_RID"), # For Excel files
file_metadata=Output("ri.foundry.main.dataset.YOUR_METADATA_OUTPUT_RID"), # Structured metadata output
)
def smb_file_processor(
ctx,
smb_source: ResolvedSource,
input_directories,
documents_mediaset,
images_mediaset,
spreadsheets_mediaset,
file_metadata,
):
"""
Process files from SMB directories and categorize them into different outputs.
This transform:
1. Reads directory paths from input dataset
2. Connects to SMB share using configured source
3. Recursively scans directories for files
4. Categorizes files by extension
5. Downloads and stores files in appropriate MediaSets
6. Creates structured metadata for all processed files
Args:
ctx: Transform context
smb_source: Configured SMB source from Data Connection
input_directories: Dataset with 'path' column containing directory paths to process
documents_mediaset: MediaSet for document files (PDF, DOC, PPT, TXT)
images_mediaset: MediaSet for image files (JPG, PNG, etc.)
spreadsheets_mediaset: MediaSet for spreadsheet files (XLSX)
file_metadata: Structured dataset for file metadata and processing results
"""
# Define file categories by extension
DOCUMENT_EXTENSIONS = {".pdf", ".pptx", ".docx", ".txt", ".ppt", ".doc"}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp", ".gif"}
SPREADSHEET_EXTENSIONS = {".xlsx", ".xls", ".csv"}
# SMB connection configuration
# These values should match your SMB source configuration in Data Connection
smb_config = {
"hostname": "your-smb-server.domain.com", # Replace with your SMB server hostname
"share": "your-share-name", # Replace with your SMB share name
"username": "your-service-account", # Replace with your SMB username
"password": smb_source.get_secret("Password"), # Password stored securely in source
"domain": "your.domain.com", # Replace with your domain (optional)
}
logger.info("=" * 80)
logger.info("SMB FILE PROCESSOR - STARTING")
logger.info("=" * 80)
logger.info(f"Server: {smb_config['hostname']}")
logger.info(f"Share: {smb_config['share']}")
logger.info(f"Username: {smb_config['domain']}\\{smb_config['username']}")
logger.info("=" * 80)
def transform_directory_path(input_path: str) -> str:
"""
Transform input directory path to SMB UNC format.
Modify this function based on your specific path transformation needs.
Example: Convert from local mount path to UNC path format.
Args:
input_path: Original directory path from input dataset
Returns:
SMB UNC path format (\\\\server\\share\\path)
"""
# Example transformation - customize for your environment
# Remove local mount prefix and convert to UNC format
local_prefix = "/mnt/your-mount/path/" # Replace with your local mount prefix
if input_path.startswith(local_prefix):
relative_path = input_path[len(local_prefix) :]
else:
# Handle alternative path formats
logger.warning(f"Unexpected path format: {input_path}")
relative_path = input_path
# Convert to UNC format
unc_path = f"\\\\{smb_config['hostname']}\\{smb_config['share']}\\{relative_path}"
return unc_path.replace("/", "\\") # Ensure Windows path separators
def get_file_category(filename: str) -> str:
"""
Determine file category based on file extension.
Args:
filename: Name of the file
Returns:
Category string: 'document', 'image', 'spreadsheet', or 'other'
"""
ext = os.path.splitext(filename)[1].lower()
if ext in DOCUMENT_EXTENSIONS:
return "document"
elif ext in IMAGE_EXTENSIONS:
return "image"
elif ext in SPREADSHEET_EXTENSIONS:
return "spreadsheet"
else:
return "other"
def create_safe_filename(filepath: str) -> str:
"""
Create a safe filename for storage by removing problematic characters.
Args:
filepath: Original file path
Returns:
Sanitized filename safe for storage
"""
filename = os.path.basename(filepath)
# Replace characters that might cause issues in storage
safe_chars = str.maketrans({"\\": "_", "/": "_", ":": "_", "?": "_", "*": "_", "<": "_", ">": "_", "|": "_"})
return filename.translate(safe_chars)
def get_all_files_recursive(directory_path: str) -> list:
"""
Recursively scan directory and return all files with their metadata.
Args:
directory_path: SMB directory path to scan
Returns:
List of dictionaries containing file information
"""
all_files = []
try:
# Scan directory contents
items = list(smbclient.scandir(directory_path))
for item in items:
if item.is_dir():
# Recursively process subdirectories
subdirectory_path = f"{directory_path}\\{item.name}"
logger.info(f"Scanning subdirectory: {subdirectory_path}")
all_files.extend(get_all_files_recursive(subdirectory_path))
else:
# Add file information
file_info = {"item": item, "directory_path": directory_path}
all_files.append(file_info)
except Exception as e:
logger.error(f"Error scanning directory {directory_path}: {str(e)}")
return all_files
# Initialize tracking variables
metadata_records = []
total_files_processed = 0
files_by_category = {"document": 0, "image": 0, "spreadsheet": 0, "other": 0}
try:
# Configure SMB client authentication
username = f"{smb_config['domain']}\\{smb_config['username']}"
smbclient.ClientConfig(username=username, password=smb_config["password"])
# Read input directories from dataset
input_df = input_directories.dataframe()
directory_rows = input_df.select("path").collect()
directory_paths = [row.path for row in directory_rows]
logger.info(f"Processing {len(directory_paths)} directories")
# Process each directory
for idx, directory_path in enumerate(directory_paths, 1):
logger.info(f"[{idx}/{len(directory_paths)}] Processing: {directory_path}")
try:
# Transform to SMB path format
smb_directory_path = transform_directory_path(directory_path)
logger.info(f"SMB path: {smb_directory_path}")
# Get all files recursively
all_files_info = get_all_files_recursive(smb_directory_path)
logger.info(f"Found {len(all_files_info)} files in directory tree")
# Process each file
for file_info in all_files_info:
file_item = file_info["item"]
file_directory_path = file_info["directory_path"]
filename = file_item.name
file_path = f"{file_directory_path}\\{filename}"
try:
# Get file statistics
stat_info = file_item.stat()
file_size = stat_info.st_size
created_time = datetime.fromtimestamp(stat_info.st_ctime)
modified_time = datetime.fromtimestamp(stat_info.st_mtime)
# Categorize file
category = get_file_category(filename)
files_by_category[category] += 1
total_files_processed += 1
# Create metadata record
metadata_record = {
"filename": filename,
"full_path": file_path,
"directory_path": file_directory_path,
"file_size_bytes": file_size,
"created_date": created_time,
"modified_date": modified_time,
"file_extension": os.path.splitext(filename)[1].lower(),
"category": category,
"processed_timestamp": pd.Timestamp.now(),
}
# Download and store files based on category
if category in ["document", "image", "spreadsheet"]:
try:
# Download file content
with smbclient.open_file(file_path, mode="rb") as smb_file:
file_content = smb_file.read()
# Create safe filename for storage
safe_filename = create_safe_filename(file_path)
# Store in appropriate MediaSet
import io
file_stream = io.BytesIO(file_content)
if category == "document":
documents_mediaset.put_media_item(file_stream, safe_filename)
elif category == "image":
images_mediaset.put_media_item(file_stream, safe_filename)
elif category == "spreadsheet":
spreadsheets_mediaset.put_media_item(file_stream, safe_filename)
logger.info(f"✓ Downloaded {category}: {filename} ({file_size:,} bytes)")
except Exception as download_error:
logger.error(f"✗ Failed to download {filename}: {str(download_error)}")
metadata_record["download_error"] = str(download_error)
else:
logger.info(f"⚠ Skipped {filename} (category: {category})")
metadata_records.append(metadata_record)
except Exception as file_error:
logger.error(f"✗ Error processing file {filename}: {str(file_error)}")
# Add error record
error_record = {
"filename": filename,
"full_path": file_path,
"directory_path": file_directory_path,
"file_size_bytes": 0,
"created_date": None,
"modified_date": None,
"file_extension": os.path.splitext(filename)[1].lower(),
"category": "error",
"processed_timestamp": pd.Timestamp.now(),
"processing_error": str(file_error),
}
metadata_records.append(error_record)
except Exception as dir_error:
logger.error(f"✗ Failed to process directory {directory_path}: {str(dir_error)}")
# Add directory error record
error_record = {
"filename": None,
"full_path": None,
"directory_path": directory_path,
"file_size_bytes": 0,
"created_date": None,
"modified_date": None,
"file_extension": None,
"category": "directory_error",
"processed_timestamp": pd.Timestamp.now(),
"processing_error": str(dir_error),
}
metadata_records.append(error_record)
# Write metadata to output dataset
if metadata_records:
metadata_df = ctx.spark_session.createDataFrame(pd.DataFrame(metadata_records))
file_metadata.write_dataframe(metadata_df)
logger.info(f"✓ Written {len(metadata_records)} metadata records")
else:
logger.warning("No metadata records to write")
except Exception as e:
logger.error(f"✗ CRITICAL FAILURE: {str(e)}")
raise
# Log final summary
logger.info("\n" + "=" * 80)
logger.info("PROCESSING SUMMARY")
logger.info("=" * 80)
logger.info(f"Total files processed: {total_files_processed:,}")
logger.info(f"Documents: {files_by_category['document']:,}")
logger.info(f"Images: {files_by_category['image']:,}")
logger.info(f"Spreadsheets: {files_by_category['spreadsheet']:,}")
logger.info(f"Other/Skipped: {files_by_category['other']:,}")
logger.info(f"Metadata records created: {len(metadata_records):,}")
logger.info("=" * 80)
中文翻译¶
服务器消息块(SMB)¶
连接到服务器消息块(SMB)共享,在文件夹和Foundry数据集之间同步数据。常见的SMB服务器示例包括Windows文件服务器(Windows File Server)和Samba文件服务器(Samba File Server)。
支持的功能¶
| 功能 | 状态 |
|---|---|
| 探索(Exploration) | 🟢 正式可用 |
| 批量导入(Bulk import) | 🟢 正式可用 |
| 增量同步(Incremental) | 🟢 正式可用 |
| 文件导出(File exports) | 🟢 正式可用 |
SMB连接器支持SMB协议版本2和3。
数据模型¶
该连接器可以将任何类型的文件传输到Foundry数据集中。文件格式保持不变,在传输过程中或传输后不会应用任何模式。对输出数据集应用任何必要的模式,或编写下游转换来访问数据。
性能与限制¶
可传输的文件大小没有限制。然而,网络问题可能导致大规模传输失败。特别是,运行时间超过两天的直接云同步将被中断。为避免网络问题,我们建议使用较小的文件大小,并限制每次同步执行时摄取的文件数量。同步可以安排为频繁运行。
设置¶
- 打开数据连接(Data Connection)应用程序,在屏幕右上角选择+ 新建源。
- 从协议源(Protocol sources)部分选择SMB。
- 按照额外的配置提示,使用以下部分的信息继续设置您的连接器。
了解更多关于在Foundry中设置连接器的信息。
配置选项¶
SMB连接器提供以下配置选项:
| 选项 | 是否必需 | 描述 |
|---|---|---|
主机名(Hostname) |
是 | 指向服务器的域名或服务器的IP地址。 |
端口(Port) |
否 | SMB服务器运行的端口。 |
共享(Share) |
是 | 您要连接的SMB共享名称。 |
用户名(Username) |
是 | SMB登录用户名。 |
密码(Password) |
是 | SMB登录密码。 |
域(Domain) |
否 | SMB登录账户的Active Directory域。如果登录账户不是AD用户,则留空。 |
网络¶
SMB连接器必须能够通过端口(Port)(默认为445)访问主机名(Hostname)。如果您使用直接连接出口策略,则必须使用TCP级别的白名单。
从SMB同步数据¶
SMB连接器使用基于文件的同步接口。
将数据导出到SMB¶
要导出到SMB共享,首先为您的SMB连接器启用导出。然后,创建一个新的导出。
导出配置选项¶
| 选项 | 是否必需 | 默认值 | 描述 |
|---|---|---|---|
目录路径(Directory path) |
是 | / | SMB共享中应导出文件的文件夹路径。导出文件的完整路径计算为<共享>/<目录路径>/<导出文件路径> |
在代码中使用SMB源¶
您可以使用外部转换(External transforms)从Python转换代码仓库连接到SMB共享。
从SMB读取文件和元数据¶
以下示例演示了连接到SMB共享并读取文件所需的最小代码:
import smbclient
import pandas as pd
from transforms.api import Output, transform
from transforms.external.systems import external_systems, Source
DOMAIN = "<smb_domain>"
USERNAME = "<smb_username>"
SMB_PATH = "<smb_path>"
@external_systems(
smb_source=Source("ri.magritte..source.YOUR_SMB_SOURCE_RID")
)
@transform(
file_metadata=Output("ri.foundry.main.dataset.YOUR_METADATA_OUTPUT_RID"),
output_files_dataset=Output("ri.foundry.main.dataset.YOUR_FILES_OUTPUT_RID")
)
def read_smb_files(ctx, smb_source, file_metadata, output_files_dataset):
"""
从SMB共享读取文件并输出文件和元数据。
"""
# 使用源中的凭据配置SMB客户端
username = f"{DOMAIN}\\{USERNAME}"
password = smb_source.get_secret("Password")
smbclient.ClientConfig(username=username, password=password)
# 列出目录中的文件
files_info = []
for item in smbclient.scandir(SMB_PATH):
if not item.is_dir():
# 获取文件信息
stat = item.stat()
file_path = f"{SMB_PATH}\\{item.name}"
files_info.append({
"filename": item.name,
"safe_filename": create_safe_filename(item.name),
"size_bytes": stat.st_size,
"path": file_path
})
with smbclient.open_file(file_path, mode="rb") as f:
content = f.read()
safe_foundry_file_name = create_safe_filename(item.name)
with output_files_dataset.filesystem().open(safe_foundry_file_name, 'w') as fileobj:
fileobj.write(content)
# 将元数据写入输出
if files_info:
df = ctx.spark_session.createDataFrame(pd.DataFrame(files_info))
file_metadata.write_dataframe(df)
def create_safe_filename(filepath: str) -> str:
"""
通过移除有问题的字符来创建安全的存储文件名。
Args:
filepath: 原始文件路径
Returns:
经过清理的安全存储文件名
"""
import os
filename = os.path.basename(filepath)
# 替换可能在存储中引起问题的字符
safe_chars = str.maketrans({"\\": "_", "/": "_", ":": "_", "?": "_", "*": "_", "<": "_", ">": "_", "|": "_"})
return filename.translate(safe_chars)
从SMB读取文件并上传为媒体集¶
以下综合示例演示了如何从Python转换代码仓库连接到SMB共享并创建媒体集(MediaSet)输出,包括错误处理、递归目录扫描和文件分类:
"""
SMB文件处理转换模板
此模板演示了如何从Foundry Python转换连接到SMB共享,
以处理文件并按类型组织它们。
涵盖的关键概念:
- 与SMB源的外部系统集成
- 递归目录遍历
- 按扩展名进行文件分类
- 针对不同文件类型的MediaSet输出
- 结构化元数据收集
- 错误处理和日志记录
前提条件:
- 在数据连接中配置了SMB源
- 包含目录路径的输入数据集
- 已配置的输出数据集/媒体集
- 仓库中可用的smbclient库
"""
import logging
import os
from datetime import datetime
import pandas as pd
import smbclient
from transforms.api import Input, Output, transform
from transforms.external.systems import ResolvedSource, Source, external_systems
from transforms.mediasets import MediaSetOutput
# 配置日志记录
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
@external_systems(
smb_source=Source("ri.magritte..source.YOUR_SMB_SOURCE_RID") # 替换为您的SMB源RID
)
@transform(
input_directories=Input("ri.foundry.main.dataset.YOUR_INPUT_DATASET_RID"), # 包含目录路径的数据集
documents_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_DOCUMENTS_MEDIASET_RID"), # 用于PDF、DOC、PPT文件
images_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_IMAGES_MEDIASET_RID"), # 用于图像文件
spreadsheets_mediaset=MediaSetOutput("ri.mio.main.media-set.YOUR_SPREADSHEETS_MEDIASET_RID"), # 用于Excel文件
file_metadata=Output("ri.foundry.main.dataset.YOUR_METADATA_OUTPUT_RID"), # 结构化元数据输出
)
def smb_file_processor(
ctx,
smb_source: ResolvedSource,
input_directories,
documents_mediaset,
images_mediaset,
spreadsheets_mediaset,
file_metadata,
):
"""
处理来自SMB目录的文件并将其分类到不同的输出中。
此转换:
1. 从输入数据集读取目录路径
2. 使用配置的源连接到SMB共享
3. 递归扫描目录中的文件
4. 按扩展名对文件进行分类
5. 下载文件并存储到相应的MediaSet中
6. 为所有处理的文件创建结构化元数据
Args:
ctx: 转换上下文
smb_source: 来自数据连接的已配置SMB源
input_directories: 包含要处理的目录路径'path'列的数据集
documents_mediaset: 文档文件的MediaSet(PDF、DOC、PPT、TXT)
images_mediaset: 图像文件的MediaSet(JPG、PNG等)
spreadsheets_mediaset: 电子表格文件的MediaSet(XLSX)
file_metadata: 文件元数据和处理结果的结构化数据集
"""
# 按扩展名定义文件类别
DOCUMENT_EXTENSIONS = {".pdf", ".pptx", ".docx", ".txt", ".ppt", ".doc"}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp", ".gif"}
SPREADSHEET_EXTENSIONS = {".xlsx", ".xls", ".csv"}
# SMB连接配置
# 这些值应与数据连接中的SMB源配置匹配
smb_config = {
"hostname": "your-smb-server.domain.com", # 替换为您的SMB服务器主机名
"share": "your-share-name", # 替换为您的SMB共享名称
"username": "your-service-account", # 替换为您的SMB用户名
"password": smb_source.get_secret("Password"), # 密码安全存储在源中
"domain": "your.domain.com", # 替换为您的域(可选)
}
logger.info("=" * 80)
logger.info("SMB文件处理器 - 开始")
logger.info("=" * 80)
logger.info(f"服务器: {smb_config['hostname']}")
logger.info(f"共享: {smb_config['share']}")
logger.info(f"用户名: {smb_config['domain']}\\{smb_config['username']}")
logger.info("=" * 80)
def transform_directory_path(input_path: str) -> str:
"""
将输入目录路径转换为SMB UNC格式。
根据您的特定路径转换需求修改此函数。
示例:从本地挂载路径转换为UNC路径格式。
Args:
input_path: 来自输入数据集的原始目录路径
Returns:
SMB UNC路径格式 (\\\\server\\share\\path)
"""
# 示例转换 - 根据您的环境自定义
# 移除本地挂载前缀并转换为UNC格式
local_prefix = "/mnt/your-mount/path/" # 替换为您的本地挂载前缀
if input_path.startswith(local_prefix):
relative_path = input_path[len(local_prefix) :]
else:
# 处理其他路径格式
logger.warning(f"意外的路径格式: {input_path}")
relative_path = input_path
# 转换为UNC格式
unc_path = f"\\\\{smb_config['hostname']}\\{smb_config['share']}\\{relative_path}"
return unc_path.replace("/", "\\") # 确保使用Windows路径分隔符
def get_file_category(filename: str) -> str:
"""
根据文件扩展名确定文件类别。
Args:
filename: 文件名
Returns:
类别字符串:'document'、'image'、'spreadsheet'或'other'
"""
ext = os.path.splitext(filename)[1].lower()
if ext in DOCUMENT_EXTENSIONS:
return "document"
elif ext in IMAGE_EXTENSIONS:
return "image"
elif ext in SPREADSHEET_EXTENSIONS:
return "spreadsheet"
else:
return "other"
def create_safe_filename(filepath: str) -> str:
"""
通过移除有问题的字符来创建安全的存储文件名。
Args:
filepath: 原始文件路径
Returns:
经过清理的安全存储文件名
"""
filename = os.path.basename(filepath)
# 替换可能在存储中引起问题的字符
safe_chars = str.maketrans({"\\": "_", "/": "_", ":": "_", "?": "_", "*": "_", "<": "_", ">": "_", "|": "_"})
return filename.translate(safe_chars)
def get_all_files_recursive(directory_path: str) -> list:
"""
递归扫描目录并返回所有文件及其元数据。
Args:
directory_path: 要扫描的SMB目录路径
Returns:
包含文件信息的字典列表
"""
all_files = []
try:
# 扫描目录内容
items = list(smbclient.scandir(directory_path))
for item in items:
if item.is_dir():
# 递归处理子目录
subdirectory_path = f"{directory_path}\\{item.name}"
logger.info(f"正在扫描子目录: {subdirectory_path}")
all_files.extend(get_all_files_recursive(subdirectory_path))
else:
# 添加文件信息
file_info = {"item": item, "directory_path": directory_path}
all_files.append(file_info)
except Exception as e:
logger.error(f"扫描目录 {directory_path} 时出错: {str(e)}")
return all_files
# 初始化跟踪变量
metadata_records = []
total_files_processed = 0
files_by_category = {"document": 0, "image": 0, "spreadsheet": 0, "other": 0}
try:
# 配置SMB客户端认证
username = f"{smb_config['domain']}\\{smb_config['username']}"
smbclient.ClientConfig(username=username, password=smb_config["password"])
# 从数据集读取输入目录
input_df = input_directories.dataframe()
directory_rows = input_df.select("path").collect()
directory_paths = [row.path for row in directory_rows]
logger.info(f"正在处理 {len(directory_paths)} 个目录")
# 处理每个目录
for idx, directory_path in enumerate(directory_paths, 1):
logger.info(f"[{idx}/{len(directory_paths)}] 正在处理: {directory_path}")
try:
# 转换为SMB路径格式
smb_directory_path = transform_directory_path(directory_path)
logger.info(f"SMB路径: {smb_directory_path}")
# 递归获取所有文件
all_files_info = get_all_files_recursive(smb_directory_path)
logger.info(f"在目录树中找到 {len(all_files_info)} 个文件")
# 处理每个文件
for file_info in all_files_info:
file_item = file_info["item"]
file_directory_path = file_info["directory_path"]
filename = file_item.name
file_path = f"{file_directory_path}\\{filename}"
try:
# 获取文件统计信息
stat_info = file_item.stat()
file_size = stat_info.st_size
created_time = datetime.fromtimestamp(stat_info.st_ctime)
modified_time = datetime.fromtimestamp(stat_info.st_mtime)
# 对文件进行分类
category = get_file_category(filename)
files_by_category[category] += 1
total_files_processed += 1
# 创建元数据记录
metadata_record = {
"filename": filename,
"full_path": file_path,
"directory_path": file_directory_path,
"file_size_bytes": file_size,
"created_date": created_time,
"modified_date": modified_time,
"file_extension": os.path.splitext(filename)[1].lower(),
"category": category,
"processed_timestamp": pd.Timestamp.now(),
}
# 根据类别下载和存储文件
if category in ["document", "image", "spreadsheet"]:
try:
# 下载文件内容
with smbclient.open_file(file_path, mode="rb") as smb_file:
file_content = smb_file.read()
# 创建安全的存储文件名
safe_filename = create_safe_filename(file_path)
# 存储到相应的MediaSet
import io
file_stream = io.BytesIO(file_content)
if category == "document":
documents_mediaset.put_media_item(file_stream, safe_filename)
elif category == "image":
images_mediaset.put_media_item(file_stream, safe_filename)
elif category == "spreadsheet":
spreadsheets_mediaset.put_media_item(file_stream, safe_filename)
logger.info(f"✓ 已下载 {category}: {filename} ({file_size:,} 字节)")
except Exception as download_error:
logger.error(f"✗ 下载 {filename} 失败: {str(download_error)}")
metadata_record["download_error"] = str(download_error)
else:
logger.info(f"⚠ 已跳过 {filename} (类别: {category})")
metadata_records.append(metadata_record)
except Exception as file_error:
logger.error(f"✗ 处理文件 {filename} 时出错: {str(file_error)}")
# 添加错误记录
error_record = {
"filename": filename,
"full_path": file_path,
"directory_path": file_directory_path,
"file_size_bytes": 0,
"created_date": None,
"modified_date": None,
"file_extension": os.path.splitext(filename)[1].lower(),
"category": "error",
"processed_timestamp": pd.Timestamp.now(),
"processing_error": str(file_error),
}
metadata_records.append(error_record)
except Exception as dir_error:
logger.error(f"✗ 处理目录 {directory_path} 失败: {str(dir_error)}")
# 添加目录错误记录
error_record = {
"filename": None,
"full_path": None,
"directory_path": directory_path,
"file_size_bytes": 0,
"created_date": None,
"modified_date": None,
"file_extension": None,
"category": "directory_error",
"processed_timestamp": pd.Timestamp.now(),
"processing_error": str(dir_error),
}
metadata_records.append(error_record)
# 将元数据写入输出数据集
if metadata_records:
metadata_df = ctx.spark_session.createDataFrame(pd.DataFrame(metadata_records))
file_metadata.write_dataframe(metadata_df)
logger.info(f"✓ 已写入 {len(metadata_records)} 条元数据记录")
else:
logger.warning("没有要写入的元数据记录")
except Exception as e:
logger.error(f"✗ 严重故障: {str(e)}")
raise
# 记录最终摘要
logger.info("\n" + "=" * 80)
logger.info("处理摘要")
logger.info("=" * 80)
logger.info(f"处理文件总数: {total_files_processed:,}")
logger.info(f"文档: {files_by_category['document']:,}")
logger.info(f"图像: {files_by_category['image']:,}")
logger.info(f"电子表格: {files_by_category['spreadsheet']:,}")
logger.info(f"其他/已跳过: {files_by_category['other']:,}")
logger.info(f"创建的元数据记录数: {len(metadata_records):,}")
logger.info("=" * 80)