File processing workflows(文件处理工作流)¶
Lightweight transforms can process dataset files without schemas. To process dataset files without a schema, list the files with my_input.filesystem().ls().
The statement .filesystem().ls() is available for datasets without schemas, but the .path(), .pandas(), .polars(), .arrow(), and .filesystem().files() statements are only available on datasets with schemas.
Code example: File processing¶
The following code shows an example lightweight transform that processes files of a dataset without a schema.
from transforms.api import incremental, Input, Output, transform
import polars as pl
from concurrent.futures import ThreadPoolExecutor
@incremental()
@transform.using(my_input=Input("my-input"), my_output=Output('my-output'))
def my_incremental_transform(my_input, my_output):
fs = my_input.filesystem()
files = list(fs.ls(glob="*.csv"))
def process_file(dataset_file):
file_path = dataset_file.path
# Access the file
with fs.open(file_path, "rb") as f:
# <do something with the file>
# return some data as a dataframe
with ThreadPoolExecutor() as executor:
polars_dataframes = list(executor.map(process_file, files))
# Union all the DFs into one
combined_df = pl.concat(polars_dataframes)
out.write_table(combined_df)
The following example demonstrates how to parse Excel files:
from transforms.api import transform, Input, Output
import tempfile
import shutil
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
@transform.spark.using(
my_output=Output("/path/tabular_output_dataset"),
my_input=Input("/path/input_dataset_without_schema"),
)
def compute(my_input, my_output):
# Parse each file
# Open the Excel file at the provided path, using the provided filesystem
def read_excel_to_polars(fs, file_path):
with fs.open(file_path, "rb") as f:
with tempfile.TemporaryFile() as tmp:
# Copy paste the file from the source dataset to the local filesystem
shutil.copyfileobj(f, tmp)
tmp.flush() # shutil.copyfileobj does not flush
# read the excel file (the file is now seekable)
pandas_df = pd.read_excel(tmp)
# Convert eventual integer columns to string columns
pandas_df = pandas_df.astype(str)
# Convert the pandas dataframe to a polars dataframe
return pl.from_pandas(pandas_df)
fs = my_input.filesystem()
# List all files in the input dataset
files = [f.path for f in fs.ls()]
def process_file(curr_file_as_row):
# print(curr_file_as_row)
return read_excel_to_polars(fs, curr_file_as_row)
def union_polars_dataframes(dfs):
return pl.concat(dfs)
# Union all the DFs into one
with ThreadPoolExecutor() as executor:
polars_dataframes = list(executor.map(process_file, files))
combined_df = union_polars_dataframes(polars_dataframes)
my_output.write_table(combined_df)
中文翻译¶
文件处理工作流¶
轻量级转换(lightweight transforms)可以处理无需模式(schema)的数据集文件。要处理无模式的数据集文件,请使用 my_input.filesystem().ls() 列出文件。
.filesystem().ls() 语句适用于无模式的数据集,但 .path()、.pandas()、.polars()、.arrow() 和 .filesystem().files() 语句仅适用于有模式的数据集。
代码示例:文件处理¶
以下代码展示了一个处理无模式数据集文件的轻量级转换示例。
from transforms.api import incremental, Input, Output, transform
import polars as pl
from concurrent.futures import ThreadPoolExecutor
@incremental()
@transform.using(my_input=Input("my-input"), my_output=Output('my-output'))
def my_incremental_transform(my_input, my_output):
fs = my_input.filesystem()
files = list(fs.ls(glob="*.csv"))
def process_file(dataset_file):
file_path = dataset_file.path
# 访问文件
with fs.open(file_path, "rb") as f:
# <对文件执行某些操作>
# 将某些数据作为数据框返回
with ThreadPoolExecutor() as executor:
polars_dataframes = list(executor.map(process_file, files))
# 将所有数据框合并为一个
combined_df = pl.concat(polars_dataframes)
out.write_table(combined_df)
以下示例演示了如何解析 Excel 文件:
from transforms.api import transform, Input, Output
import tempfile
import shutil
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
@transform.spark.using(
my_output=Output("/path/tabular_output_dataset"),
my_input=Input("/path/input_dataset_without_schema"),
)
def compute(my_input, my_output):
# 解析每个文件
# 使用提供的文件系统打开指定路径的 Excel 文件
def read_excel_to_polars(fs, file_path):
with fs.open(file_path, "rb") as f:
with tempfile.TemporaryFile() as tmp:
# 将文件从源数据集复制到本地文件系统
shutil.copyfileobj(f, tmp)
tmp.flush() # shutil.copyfileobj 不会自动刷新
# 读取 Excel 文件(文件现在可寻址)
pandas_df = pd.read_excel(tmp)
# 将可能的整数列转换为字符串列
pandas_df = pandas_df.astype(str)
# 将 pandas 数据框转换为 polars 数据框
return pl.from_pandas(pandas_df)
fs = my_input.filesystem()
# 列出输入数据集中的所有文件
files = [f.path for f in fs.ls()]
def process_file(curr_file_as_row):
# print(curr_file_as_row)
return read_excel_to_polars(fs, curr_file_as_row)
def union_polars_dataframes(dfs):
return pl.concat(dfs)
# 将所有数据框合并为一个
with ThreadPoolExecutor() as executor:
polars_dataframes = list(executor.map(process_file, files))
combined_df = union_polars_dataframes(polars_dataframes)
my_output.write_table(combined_df)