跳转至

File processing workflows(文件处理工作流)

Lightweight transforms can process dataset files without schemas. To process dataset files without a schema, list the files with my_input.filesystem().ls().

The statement .filesystem().ls() is available for datasets without schemas, but the .path(), .pandas(), .polars(), .arrow(), and .filesystem().files() statements are only available on datasets with schemas.

Code example: File processing

The following code shows an example lightweight transform that processes files of a dataset without a schema.

from transforms.api import incremental, Input, Output, transform
import polars as pl
from concurrent.futures import ThreadPoolExecutor

@incremental()
@transform.using(my_input=Input("my-input"), my_output=Output('my-output'))
def my_incremental_transform(my_input, my_output):
    fs = my_input.filesystem()
    files = list(fs.ls(glob="*.csv"))

    def process_file(dataset_file):
        file_path = dataset_file.path
        # Access the file
        with fs.open(file_path, "rb") as f:
            # <do something with the file>
            # return some data as a dataframe

    with ThreadPoolExecutor() as executor:
        polars_dataframes = list(executor.map(process_file, files))

    # Union all the DFs into one
    combined_df = pl.concat(polars_dataframes)
    out.write_table(combined_df)

The following example demonstrates how to parse Excel files:

from transforms.api import transform, Input, Output
import tempfile
import shutil
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor


@transform.spark.using(
    my_output=Output("/path/tabular_output_dataset"),
    my_input=Input("/path/input_dataset_without_schema"),
)
def compute(my_input, my_output):
    # Parse each file
    # Open the Excel file at the provided path, using the provided filesystem
    def read_excel_to_polars(fs, file_path):
        with fs.open(file_path, "rb") as f:
            with tempfile.TemporaryFile() as tmp:
                # Copy paste the file from the source dataset to the local filesystem
                shutil.copyfileobj(f, tmp)
                tmp.flush()  # shutil.copyfileobj does not flush

                # read the excel file (the file is now seekable)
                pandas_df = pd.read_excel(tmp)
                # Convert eventual integer columns to string columns
                pandas_df = pandas_df.astype(str)
                # Convert the pandas dataframe to a polars dataframe
                return pl.from_pandas(pandas_df)

    fs = my_input.filesystem()
    # List all files in the input dataset
    files = [f.path for f in fs.ls()]

    def process_file(curr_file_as_row):
        # print(curr_file_as_row)
        return read_excel_to_polars(fs, curr_file_as_row)

    def union_polars_dataframes(dfs):
        return pl.concat(dfs)

    # Union all the DFs into one
    with ThreadPoolExecutor() as executor:
        polars_dataframes = list(executor.map(process_file, files))
    combined_df = union_polars_dataframes(polars_dataframes)

    my_output.write_table(combined_df)

中文翻译

文件处理工作流

轻量级转换(lightweight transforms)可以处理无需模式(schema)的数据集文件。要处理无模式的数据集文件,请使用 my_input.filesystem().ls() 列出文件。

.filesystem().ls() 语句适用于无模式的数据集,但 .path().pandas().polars().arrow().filesystem().files() 语句仅适用于有模式的数据集。

代码示例:文件处理

以下代码展示了一个处理无模式数据集文件的轻量级转换示例。

from transforms.api import incremental, Input, Output, transform
import polars as pl
from concurrent.futures import ThreadPoolExecutor

@incremental()
@transform.using(my_input=Input("my-input"), my_output=Output('my-output'))
def my_incremental_transform(my_input, my_output):
    fs = my_input.filesystem()
    files = list(fs.ls(glob="*.csv"))

    def process_file(dataset_file):
        file_path = dataset_file.path
        # 访问文件
        with fs.open(file_path, "rb") as f:
            # <对文件执行某些操作>
            # 将某些数据作为数据框返回

    with ThreadPoolExecutor() as executor:
        polars_dataframes = list(executor.map(process_file, files))

    # 将所有数据框合并为一个
    combined_df = pl.concat(polars_dataframes)
    out.write_table(combined_df)

以下示例演示了如何解析 Excel 文件:

from transforms.api import transform, Input, Output
import tempfile
import shutil
import polars as pl
import pandas as pd
from concurrent.futures import ThreadPoolExecutor


@transform.spark.using(
    my_output=Output("/path/tabular_output_dataset"),
    my_input=Input("/path/input_dataset_without_schema"),
)
def compute(my_input, my_output):
    # 解析每个文件
    # 使用提供的文件系统打开指定路径的 Excel 文件
    def read_excel_to_polars(fs, file_path):
        with fs.open(file_path, "rb") as f:
            with tempfile.TemporaryFile() as tmp:
                # 将文件从源数据集复制到本地文件系统
                shutil.copyfileobj(f, tmp)
                tmp.flush()  # shutil.copyfileobj 不会自动刷新

                # 读取 Excel 文件(文件现在可寻址)
                pandas_df = pd.read_excel(tmp)
                # 将可能的整数列转换为字符串列
                pandas_df = pandas_df.astype(str)
                # 将 pandas 数据框转换为 polars 数据框
                return pl.from_pandas(pandas_df)

    fs = my_input.filesystem()
    # 列出输入数据集中的所有文件
    files = [f.path for f in fs.ls()]

    def process_file(curr_file_as_row):
        # print(curr_file_as_row)
        return read_excel_to_polars(fs, curr_file_as_row)

    def union_polars_dataframes(dfs):
        return pl.concat(dfs)

    # 将所有数据框合并为一个
    with ThreadPoolExecutor() as executor:
        polars_dataframes = list(executor.map(process_file, files))
    combined_df = union_polars_dataframes(polars_dataframes)

    my_output.write_table(combined_df)