跳转至

Read and write unstructured files(读写非结构化文件)

:::callout{theme="warning" title="Warning"} Unstructured file access is an advanced topic. Ensure you are familiar with the rest of the content in this user guide before reading this page. :::

You may want to access files in a data transformation for a variety of reasons. File access is particularly useful if you want to process files in non-tabular formats (such as XML or JSON) or compressed formats (such as gz or zip).

The transforms Python library allows users to read and write files in Foundry datasets. transforms.api.TransformInput exposes a read-only FileSystem object while transforms.api.TransformOutput exposes a write-only FileSystem object. These FileSystem objects allow file access based on the path of a file within the Foundry dataset, abstracting away the underlying storage.

:::callout{theme="neutral"} If you want to have access to files in your data transformation, you must construct your Transform object using the transform() decorator. This is because FileSystem objects are exposed by TransformInput and TransformOutput objects. transform() is the only decorator that expects the input(s) and output(s) to its compute function to be of type TransformInput and TransformOutput, respectively. :::

Import files

Files can be uploaded into Foundry using manual file imports or synced via a data connection. Structured and unstructured files can be imported into Foundry datasets to be processed in downstream applications. Files can also be uploaded as a raw file without modifying the extension. The examples below refer to files uploaded as Foundry datasets, rather than as raw files.

Foundry also has functionality to automatically infer a schema for certain file types uploaded to a dataset. For example, when importing a file of CSV type, the Apply a schema button is available to automatically apply a schema. Learn more about manually uploading data.

Browse files

Files in a dataset can be listed using the transforms.api.FileSystem.ls() method. This method returns a generator of transforms.api.FileStatus objects. These objects capture the path, size (in bytes), and modified timestamp (milliseconds since Unix epoch) for each file. Consider the following Transform object:

```python tab="Polars" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

```python tab="DuckDB"
from transforms.api import transform, Input, Output

@transform.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(ctx, hair_eye_color, processed):
    # your data transformation code
    pass

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

```python tab="PySpark"
from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # your data transformation code
    pass

In your data transformation code, you can browse your dataset files:

list(hair_eye_color.filesystem().ls())
# Result: [FileStatus(path='students.csv', size=688, modified=...)]

It is also possible to filter the results of the ls() call by passing either a glob or a regex pattern:

list(hair_eye_color.filesystem().ls(glob='*.csv'))
# Result: [FileStatus(path='students.csv', size=688, modified=...)]

list(hair_eye_color.filesystem().ls(regex='[A-Z]*\.csv'))
# Result: []

Read files

Files can be opened using the transforms.api.FileSystem.open() method. This returns a Python file-like stream object. All options accepted by io.open() are also supported. Note that files are read as streams meaning that random access is not supported.

:::callout{theme="neutral"} The file-like stream object returned by the open() method does not support the seek or tell methods. Thus, random access is not supported. :::

Consider the following Transform object:

```python tab="Polars" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

```python tab="DuckDB"
from transforms.api import transform, Input, Output

@transform.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(ctx, hair_eye_color, processed):
    # your data transformation code
    pass

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # your data transformation code pass

```python tab="PySpark"
from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # your data transformation code
    pass

In your data transformation code, you can read your dataset files as shown below:

with hair_eye_color.filesystem().open('students.csv') as f:
   f.readline()

# Result: 'id,hair,eye,sex\n'

The stream can also be passed into parsing libraries. For instance, we can parse a CSV file.

import csv
with hair_eye_color.filesystem().open('students.csv') as f:
    reader = csv.reader(f, delimiter=',')
    next(reader)

# Result: ['id', 'hair', 'eye', 'sex']

As mentioned earlier, you can also process files in non-tabular formats such as XML or JSON, or compressed formats such as gz or zip. For instance, you can read the CSV files in a zipped file and return their contents as a DataFrame with the code below:

```python tab="Polars" from transforms.api import Input, Output, transform

import polars as pl import zipfile

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem()

def process_zip_file(file_path):
    with fs.open(file_path, "rb") as f:
        with zipfile.ZipFile(f) as archive:
            for filename in archive.namelist():
                # Skip directory entries
                if filename.endswith("/"):
                    continue
                with archive.open(filename) as f2:
                    df = pl.read_csv(f2, has_header=True)
                    yield df

all_dfs = []
for file_status in fs.ls():
    for df in process_zip_file(file_status.path):
        all_dfs.append(df)
# Concatenate all DataFrames
final_df = pl.concat(all_dfs, how="vertical")
processed.write_table(final_df)

python tab="DuckDB" from transforms.api import Input, Output, transform import zipfile import tempfile @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(ctx, hair_eye_color, processed): fs = hair_eye_color.filesystem() conn = ctx.duckdb().conn def process_zip_file(file_path): with fs.open(file_path, "rb") as f: with zipfile.ZipFile(f) as archive: for filename in archive.namelist(): # Skip directory entries if filename.endswith("/"): continue with archive.open(filename) as f2: # Write to temporary file for DuckDB to read with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv') as tmp: tmp.write(f2.read()) tmp.flush() df = conn.sql(f"SELECT * FROM read_csv('{tmp.name}', header=true)") yield df all_queries = [] for file_status in fs.ls(): for df in process_zip_file(file_status.path): all_queries.append(df) # Union all queries if all_queries: union_query = ' UNION ALL '.join([f'({query.query})' for query in all_queries]) final_result = conn.sql(union_query) processed.write_table(final_result) ```

```python tab="Pandas" from transforms.api import Input, Output, transform

import zipfile import pandas as pd

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem()

def process_zip_file(file_path):
    with fs.open(file_path, "rb") as f:
        with zipfile.ZipFile(f) as archive:
            for filename in archive.namelist():
                # Skip directory entries
                if filename.endswith("/"):
                    continue
                with archive.open(filename) as f2:
                    df = pd.read_csv(f2, header=0)
                    yield df

all_dfs = []
for file_status in fs.ls():
    for df in process_zip_file(file_status.path):
        all_dfs.append(df)

# Concatenate all DataFrames
final_df = pd.concat(all_dfs, axis=0, ignore_index=True)
processed.write_table(final_df)

python tab="PySpark" from transforms.api import transform, Input, Output import tempfile import shutil import zipfile import io @transform.spark.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem() def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): # Skip directory entries if filename.endswith("/"): continue with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # Skip the first line of each CSV for line in tw: yield Row(*line.split(",")) rdd = fs.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF() processed.write_dataframe(df) ```

Random access

:::callout{theme="warning" title="Warning"} Using random access leads to significant performance degradation. We recommend rewriting your code so that it does not rely on the seek method. If you still want to use random access, refer below for information on how to do so. :::

Since the open() method returns stream objects, random access is not supported. If you need to have random access, you can buffer the file into memory or onto disk. Assuming hair_eye_color corresponds to a TransformInput object, here are some examples:

import io
import shutil
s = io.StringIO()
with hair_eye_color.filesystem().open('students.csv') as f:
    shutil.copyfileobj(f, s)
s.getvalue()

# Result: 'id,hair,eye,sex\n...'
with hair_eye_color.filesystem().open('students.csv') as f:
    lines = f.read().splitlines()
lines[0]

# Result: 'id,hair,eye,sex'
import tempfile
with tempfile.NamedTemporaryFile() as tmp:
    with hair_eye_color.filesystem().open('students.csv', 'rb') as f:
        shutil.copyfileobj(f, tmp)
        tmp.flush()  # shutil.copyfileobj does not flush
    with open(tmp.name) as t:
        t.readline()

# Result: 'id,hair,eye,sex\n'

Write files

Files are written in a similar way using the open() method. This returns a Python file-like stream object that can only be written to. All keyword arguments accepted by io.open() are also supported. Note that files are written as streams meaning that random access is not supported. Consider the following Transform object:

from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # your data transformation code
    pass

It is possible to write to an output filesystem in your data transformation code. For example, you can generate and write to CSV files as shown in the example below:

```python tab="Polars" from transforms.api import transform, Input, Output import polars as pl

@transform.using( processed=Output("/examples/csv_files"), ) def compute(processed): with processed.filesystem().open("csv1.csv", "wb") as f1: df1 = pl.DataFrame( { "student_id": ["001", "002", "003"], "hair_color": ["brown", "blonde", "black"], "eye_color": ["blue", "green", "brown"], } ) df1.write_csv(f1) with processed.filesystem().open("csv2.csv", "wb") as f2: df2 = pl.DataFrame( { "student_id": ["004", "005", "006"], "hair_color": ["red", "green", "blue"], "eye_color": ["purple", "grey", "black"], } ) df2.write_csv(f2)

```python tab="DuckDB"
from transforms.api import transform, Input, Output
import tempfile


@transform.using(
    processed=Output("/examples/csv_files"),
)
def compute(ctx, processed):
    conn = ctx.duckdb().conn

    # Create first CSV file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp1:
        conn.execute(f"""COPY (
            SELECT * FROM VALUES
            ('001', 'brown', 'blue'),
            ('002', 'blonde', 'green'),
            ('003', 'black', 'brown')
            AS t(student_id, hair_color, eye_color)
        ) TO '{tmp1.name}' (HEADER)""")

        with open(tmp1.name, 'r') as src:
            with processed.filesystem().open("csv1.csv", "w") as f1:
                f1.write(src.read())

    # Create second CSV file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp2:
        conn.execute(f"""COPY (
            SELECT * FROM VALUES
            ('004', 'red', 'purple'),
            ('005', 'green', 'grey'),
            ('006', 'blue', 'black')
            AS t(student_id, hair_color, eye_color)
        ) TO '{tmp2.name}' (HEADER)""")

        with open(tmp2.name, 'r') as src:
            with processed.filesystem().open("csv2.csv", "w") as f2:
                f2.write(src.read())

```python tab="Pandas" from transforms.api import transform, Input, Output import pandas as pd

@transform.using( processed=Output("/examples/csv_files"), ) def compute(processed): with processed.filesystem().open("csv1.csv", "w") as f1: df1 = pd.DataFrame( { "student_id": ["001", "002", "003"], "hair_color": ["brown", "blonde", "black"], "eye_color": ["blue", "green", "brown"], } ) df1.to_csv(f1) with processed.filesystem().open("csv2.csv", "w") as f2: df2 = pd.DataFrame( { "student_id": ["004", "005", "006"], "hair_color": ["red", "green", "blue"], "eye_color": ["purple", "grey", "black"], } ) df2.to_csv(f2)

In the following example, you can persist a model using the `pickle` module, the built-in Python serializer:

```python
import pickle

with processed.filesystem().open('model.pickle', 'wb') as f:
    pickle.dump(model, f)

Parallelized processing

You can parallelize file processing by manually operating in multiple processes.

```python tab="Polars" from transforms.api import transform, Input, Output import polars as pl from concurrent.futures import ThreadPoolExecutor

@transform.using( processed=Output("/Mint/Transforms/lightweight/abort/out"), hair_eye_color=Input("/Mint/Transforms/lightweight/abort/csvs"), ) def example_computation(hair_eye_color, processed): fs = hair_eye_color.filesystem() # Step 1: List all CSV files csv_files = list(fs.ls(glob="*.csv"))

# Step 2: Function to process a single file
def process_file(dataset_file):
    path = dataset_file.path
    with fs.open(path) as csv_file:
        # Read CSV as Polars DataFrame
        df = pl.read_csv(csv_file)
        # Optionally, select or rename columns to match schema
        return df.select(["student_id", "hair_color", "eye_color"])

# Step 3: Process files in parallel
with ThreadPoolExecutor() as executor:
    dfs = list(executor.map(process_file, csv_files))

# Step 4: Concatenate all DataFrames
result_df = pl.concat(dfs)
processed.write_table(result_df)

python tab="DuckDB" from transforms.api import transform, Input, Output import io from concurrent.futures import ThreadPoolExecutor @transform.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(ctx, hair_eye_color, processed): fs = hair_eye_color.filesystem() conn = ctx.duckdb().conn # Step 1: List all CSV files csv_files = list(fs.ls(glob="*.csv")) # Step 2: Function to process a single file def process_file(dataset_file): path = dataset_file.path with fs.open(path) as csv_file: # Read CSV content into memory csv_content = csv_file.read() df = conn.sql("SELECT student_id, hair_color, eye_color FROM read_csv_auto($1)", [csv_content]) return df.fetchdf() # Step 3: Process files in parallel with ThreadPoolExecutor() as executor: dfs = list(executor.map(process_file, csv_files)) # Step 4: Concatenate all DataFrames using DuckDB if dfs: # Create temporary table from first DataFrame conn.register('temp_result', dfs[0]) result_query = conn.sql("SELECT * FROM temp_result") # Union with remaining DataFrames for i, df in enumerate(dfs[1:], 1): conn.register(f'temp_{i}', df) result_query = conn.sql(f"SELECT * FROM ({result_query.query}) UNION ALL SELECT * FROM temp_{i}") processed.write_table(result_query) ```

```python tab="Pandas" from transforms.api import transform, Input, Output import pandas as pd from concurrent.futures import ThreadPoolExecutor

@transform.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): fs = hair_eye_color.filesystem()

# Step 1: List all CSV files
csv_files = list(fs.ls(glob="*.csv"))

# Step 2: Function to process a single file
def process_file(dataset_file):
    path = dataset_file.path
    with fs.open(path) as csv_file:
        # Read CSV as pandas DataFrame
        df = pd.read_csv(csv_file)
        # Optionally, select or rename columns to match schema
        return df[["student_id", "hair_color", "eye_color"]]

# Step 3: Process files in parallel
with ThreadPoolExecutor() as executor:
    dfs = list(executor.map(process_file, csv_files))

# Step 4: Concatenate all DataFrames and write out
result_df = pd.concat(dfs, ignore_index=True)
processed.write_table(result_df)

```

Parallelized processing with PySpark

Unlike data transformations expressed in terms of DataFrame objects, or defined using standard non-Spark transforms, it is important to understand the difference between driver and executor code with file-based transformations in PySpark. The compute function is executed on the driver, which is a single machine. Spark automatically distributes DataFrame functions to the executors (many machines) as it sees fit. To benefit from distributed processing with the files API, we have to leverage Spark to distribute the computation. To do so, we create a DataFrame of FileStatus and distribute this across our executors. Each task on the executor can then open the file that it has been assigned and process it with the results being aggregated by Spark. The files API exposes the files() function that accepts the same arguments as the ls() function but, instead, returns a DataFrame of FileStatus objects. This DataFrame is partitioned by file size to help balance the computation when file sizes vary. The partitioning can be controlled using two Spark configuration options: * spark.sql.files.maxPartitionBytes ↗ is the maximum number of bytes to pack into a single partition when reading files. * spark.sql.files.openCostInBytes ↗ is the estimated cost to open a file, measured by the number of bytes that could be scanned in the same time. This is added to the file size to calculate the total number of bytes used by the file in the partition. To modify the values for these properties, you must create a custom Transforms profile and apply it to your Transform using the configure() decorator. For more information, refer to the section on defining Transforms profiles in the Code Repositories documentation. Now, let’s step through an example. Say we have CSV files that we want to parse and concatenate. We make use of flatMap() to apply a processing function to each FileStatus object. This processing function must yield rows according to pyspark.sql.SparkSession.createDataFrame(). python tab="PySpark" import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform.spark.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): def process_file(file_status): with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # Construct a pyspark.Row from our header row header = next(r) MyRow = Row(*header) for row in r: yield MyRow(*row) schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) files_df = hair_eye_color.filesystem().files('**/*.csv') processed_df = files_df.rdd.flatMap(process_file).toDF(schema) processed.write_dataframe(processed_df)

:::callout{theme="warning" title="Warning"} Although it is possible to call toDF() without passing a schema, if your file processing returns zero rows then Spark’s schema inference will fail throwing a ValueError: RDD is empty exception. We therefore recommend you always manually specify a schema. :::


中文翻译

读写非结构化文件

:::callout{theme="warning" title="警告"} 非结构化文件访问是一个高级主题。在阅读本页内容之前,请确保您已熟悉本用户指南中的其他内容。 :::

您可能出于多种原因需要在数据转换中访问文件。当您想要处理非表格格式(如XMLJSON)或压缩格式(如gzzip)的文件时,文件访问尤其有用。

transforms Python库允许用户在Foundry数据集中读写文件。transforms.api.TransformInput暴露了一个只读的FileSystem对象,而transforms.api.TransformOutput暴露了一个只写的FileSystem对象。这些FileSystem对象允许基于Foundry数据集中的文件路径进行文件访问,从而抽象化底层存储。

:::callout{theme="neutral"} 如果您希望在数据转换中访问文件,则必须使用transform()装饰器构建您的Transform对象。这是因为FileSystem对象是由TransformInputTransformOutput对象暴露的。transform()是唯一期望其计算函数的输入和输出分别属于TransformInputTransformOutput类型的装饰器。 :::

导入文件

文件可以通过手动文件导入上传到Foundry,或通过数据连接同步。结构化和非结构化文件都可以导入到Foundry数据集中,以便在下游应用程序中进行处理。文件也可以作为原始文件上传,无需修改扩展名。以下示例指的是作为Foundry数据集上传的文件,而非原始文件。

Foundry还具有自动推断上传到数据集的某些文件类型的模式(Schema)的功能。例如,当导入CSV类型的文件时,可以使用应用模式按钮来自动应用模式。了解更多关于手动上传数据的信息。

浏览文件

数据集中的文件可以使用transforms.api.FileSystem.ls()方法列出。该方法返回一个transforms.api.FileStatus对象的生成器。这些对象捕获每个文件的路径、大小(以字节为单位)和修改时间戳(自Unix纪元以来的毫秒数)。考虑以下Transform对象:

```python tab="Polars" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 您的数据转换代码 pass

```python tab="DuckDB"
from transforms.api import transform, Input, Output

@transform.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(ctx, hair_eye_color, processed):
    # 您的数据转换代码
    pass

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 您的数据转换代码 pass

```python tab="PySpark"
from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # 您的数据转换代码
    pass

在您的数据转换代码中,您可以浏览数据集文件:

list(hair_eye_color.filesystem().ls())
# 结果: [FileStatus(path='students.csv', size=688, modified=...)]

还可以通过传递glob或正则表达式模式来过滤ls()调用的结果:

list(hair_eye_color.filesystem().ls(glob='*.csv'))
# 结果: [FileStatus(path='students.csv', size=688, modified=...)]

list(hair_eye_color.filesystem().ls(regex='[A-Z]*\.csv'))
# 结果: []

读取文件

文件可以使用transforms.api.FileSystem.open()方法打开。该方法返回一个类似Python文件的流对象。所有由io.open()接受的选项也都支持。请注意,文件是以流的形式读取的,这意味着不支持随机访问。

:::callout{theme="neutral"} open()方法返回的类似文件的流对象不支持seektell方法。因此,不支持随机访问。 :::

考虑以下Transform对象:

```python tab="Polars" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 您的数据转换代码 pass

```python tab="DuckDB"
from transforms.api import transform, Input, Output

@transform.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(ctx, hair_eye_color, processed):
    # 您的数据转换代码
    pass

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def filter_eye_color(hair_eye_color, processed): # type: (TransformInput, TransformOutput) -> None # 您的数据转换代码 pass

```python tab="PySpark"
from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # 您的数据转换代码
    pass

在您的数据转换代码中,您可以如下所示读取数据集文件:

with hair_eye_color.filesystem().open('students.csv') as f:
   f.readline()

# 结果: 'id,hair,eye,sex\n'

该流也可以传递给解析库。例如,我们可以解析一个CSV文件。

import csv
with hair_eye_color.filesystem().open('students.csv') as f:
    reader = csv.reader(f, delimiter=',')
    next(reader)

# 结果: ['id', 'hair', 'eye', 'sex']

如前所述,您还可以处理非表格格式的文件,如XMLJSON,或压缩格式,如gzzip。例如,您可以使用以下代码读取压缩文件中的CSV文件并将其内容作为DataFrame返回:

```python tab="Polars" from transforms.api import Input, Output, transform

import polars as pl import zipfile

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem()

def process_zip_file(file_path):
    with fs.open(file_path, "rb") as f:
        with zipfile.ZipFile(f) as archive:
            for filename in archive.namelist():
                # 跳过目录条目
                if filename.endswith("/"):
                    continue
                with archive.open(filename) as f2:
                    df = pl.read_csv(f2, has_header=True)
                    yield df

all_dfs = []
for file_status in fs.ls():
    for df in process_zip_file(file_status.path):
        all_dfs.append(df)
# 连接所有DataFrame
final_df = pl.concat(all_dfs, how="vertical")
processed.write_table(final_df)

python tab="DuckDB" from transforms.api import Input, Output, transform import zipfile import tempfile @transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(ctx, hair_eye_color, processed): fs = hair_eye_color.filesystem() conn = ctx.duckdb().conn def process_zip_file(file_path): with fs.open(file_path, "rb") as f: with zipfile.ZipFile(f) as archive: for filename in archive.namelist(): # 跳过目录条目 if filename.endswith("/"): continue with archive.open(filename) as f2: # 写入临时文件供DuckDB读取 with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv') as tmp: tmp.write(f2.read()) tmp.flush() df = conn.sql(f"SELECT * FROM read_csv('{tmp.name}', header=true)") yield df all_queries = [] for file_status in fs.ls(): for df in process_zip_file(file_status.path): all_queries.append(df) # 合并所有查询 if all_queries: union_query = ' UNION ALL '.join([f'({query.query})' for query in all_queries]) final_result = conn.sql(union_query) processed.write_table(final_result) ```

```python tab="Pandas" from transforms.api import Input, Output, transform

import zipfile import pandas as pd

@transform.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem()

def process_zip_file(file_path):
    with fs.open(file_path, "rb") as f:
        with zipfile.ZipFile(f) as archive:
            for filename in archive.namelist():
                # 跳过目录条目
                if filename.endswith("/"):
                    continue
                with archive.open(filename) as f2:
                    df = pd.read_csv(f2, header=0)
                    yield df

all_dfs = []
for file_status in fs.ls():
    for df in process_zip_file(file_status.path):
        all_dfs.append(df)

# 连接所有DataFrame
final_df = pd.concat(all_dfs, axis=0, ignore_index=True)
processed.write_table(final_df)

python tab="PySpark" from transforms.api import transform, Input, Output import tempfile import shutil import zipfile import io @transform.spark.using( hair_eye_color=Input('/examples/students_hair_eye_color'), processed=Output('/examples/hair_eye_color_processed') ) def read_zip(hair_eye_color, processed): fs = hair_eye_color.filesystem() def process_file(file_status): with fs.open(file_status.path, 'rb') as f: with tempfile.NamedTemporaryFile() as tmp: shutil.copyfileobj(f, tmp) tmp.flush() with zipfile.ZipFile(tmp) as archive: for filename in archive.namelist(): # 跳过目录条目 if filename.endswith("/"): continue with archive.open(filename) as f2: br = io.BufferedReader(f2) tw = io.TextIOWrapper(br) tw.readline() # 跳过每个CSV的第一行 for line in tw: yield Row(*line.split(",")) rdd = fs.files().rdd rdd = rdd.flatMap(process_file) df = rdd.toDF() processed.write_dataframe(df) ```

随机访问

:::callout{theme="warning" title="警告"} 使用随机访问会导致显著的性能下降。我们建议重写您的代码,使其不依赖于seek方法。如果您仍然想使用随机访问,请参考以下信息了解如何操作。 :::

由于open()方法返回流对象,因此不支持随机访问。如果您需要随机访问,可以将文件缓冲到内存或磁盘中。假设hair_eye_color对应一个TransformInput对象,以下是一些示例:

import io
import shutil
s = io.StringIO()
with hair_eye_color.filesystem().open('students.csv') as f:
    shutil.copyfileobj(f, s)
s.getvalue()

# 结果: 'id,hair,eye,sex\n...'
with hair_eye_color.filesystem().open('students.csv') as f:
    lines = f.read().splitlines()
lines[0]

# 结果: 'id,hair,eye,sex'
import tempfile
with tempfile.NamedTemporaryFile() as tmp:
    with hair_eye_color.filesystem().open('students.csv', 'rb') as f:
        shutil.copyfileobj(f, tmp)
        tmp.flush()  # shutil.copyfileobj不会刷新
    with open(tmp.name) as t:
        t.readline()

# 结果: 'id,hair,eye,sex\n'

写入文件

文件的写入方式类似,使用open()方法。该方法返回一个类似Python文件的流对象,该对象只能写入。所有由io.open()接受的关键字参数也都支持。请注意,文件是以流的形式写入的,这意味着不支持随机访问。考虑以下Transform对象:

from transforms.api import transform, Input, Output

@transform.spark.using(
    hair_eye_color=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def filter_eye_color(hair_eye_color, processed):
    # type: (TransformInput, TransformOutput) -> None
    # 您的数据转换代码
    pass

可以在您的数据转换代码中写入输出文件系统。例如,您可以生成并写入CSV文件,如下例所示:

```python tab="Polars" from transforms.api import transform, Input, Output import polars as pl

@transform.using( processed=Output("/examples/csv_files"), ) def compute(processed): with processed.filesystem().open("csv1.csv", "wb") as f1: df1 = pl.DataFrame( { "student_id": ["001", "002", "003"], "hair_color": ["brown", "blonde", "black"], "eye_color": ["blue", "green", "brown"], } ) df1.write_csv(f1) with processed.filesystem().open("csv2.csv", "wb") as f2: df2 = pl.DataFrame( { "student_id": ["004", "005", "006"], "hair_color": ["red", "green", "blue"], "eye_color": ["purple", "grey", "black"], } ) df2.write_csv(f2)

```python tab="DuckDB"
from transforms.api import transform, Input, Output
import tempfile


@transform.using(
    processed=Output("/examples/csv_files"),
)
def compute(ctx, processed):
    conn = ctx.duckdb().conn

    # 创建第一个CSV文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp1:
        conn.execute(f"""COPY (
            SELECT * FROM VALUES
            ('001', 'brown', 'blue'),
            ('002', 'blonde', 'green'),
            ('003', 'black', 'brown')
            AS t(student_id, hair_color, eye_color)
        ) TO '{tmp1.name}' (HEADER)""")

        with open(tmp1.name, 'r') as src:
            with processed.filesystem().open("csv1.csv", "w") as f1:
                f1.write(src.read())

    # 创建第二个CSV文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp2:
        conn.execute(f"""COPY (
            SELECT * FROM VALUES
            ('004', 'red', 'purple'),
            ('005', 'green', 'grey'),
            ('006', 'blue', 'black')
            AS t(student_id, hair_color, eye_color)
        ) TO '{tmp2.name}' (HEADER)""")

        with open(tmp2.name, 'r') as src:
            with processed.filesystem().open("csv2.csv", "w") as f2:
                f2.write(src.read())

```python tab="Pandas" from transforms.api import transform, Input, Output import pandas as pd

@transform.using( processed=Output("/examples/csv_files"), ) def compute(processed): with processed.filesystem().open("csv1.csv", "w") as f1: df1 = pd.DataFrame( { "student_id": ["001", "002", "003"], "hair_color": ["brown", "blonde", "black"], "eye_color": ["blue", "green", "brown"], } ) df1.to_csv(f1) with processed.filesystem().open("csv2.csv", "w") as f2: df2 = pd.DataFrame( { "student_id": ["004", "005", "006"], "hair_color": ["red", "green", "blue"], "eye_color": ["purple", "grey", "black"], } ) df2.to_csv(f2)

在以下示例中,您可以使用`pickle`模块(Python内置的序列化器)持久化模型:

```python
import pickle

with processed.filesystem().open('model.pickle', 'wb') as f:
    pickle.dump(model, f)

并行化处理

您可以通过手动在多个进程中操作来并行化文件处理。

```python tab="Polars" from transforms.api import transform, Input, Output import polars as pl from concurrent.futures import ThreadPoolExecutor

@transform.using( processed=Output("/Mint/Transforms/lightweight/abort/out"), hair_eye_color=Input("/Mint/Transforms/lightweight/abort/csvs"), ) def example_computation(hair_eye_color, processed): fs = hair_eye_color.filesystem() # 步骤1:列出所有CSV文件 csv_files = list(fs.ls(glob="*.csv"))

# 步骤2:处理单个文件的函数
def process_file(dataset_file):
    path = dataset_file.path
    with fs.open(path) as csv_file:
        # 读取CSV为Polars DataFrame
        df = pl.read_csv(csv_file)
        # 可选地,选择或重命名列以匹配模式
        return df.select(["student_id", "hair_color", "eye_color"])

# 步骤3:并行处理文件
with ThreadPoolExecutor() as executor:
    dfs = list(executor.map(process_file, csv_files))

# 步骤4:连接所有DataFrame
result_df = pl.concat(dfs)
processed.write_table(result_df)

python tab="DuckDB" from transforms.api import transform, Input, Output import io from concurrent.futures import ThreadPoolExecutor @transform.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(ctx, hair_eye_color, processed): fs = hair_eye_color.filesystem() conn = ctx.duckdb().conn # 步骤1:列出所有CSV文件 csv_files = list(fs.ls(glob="*.csv")) # 步骤2:处理单个文件的函数 def process_file(dataset_file): path = dataset_file.path with fs.open(path) as csv_file: # 将CSV内容读入内存 csv_content = csv_file.read() df = conn.sql("SELECT student_id, hair_color, eye_color FROM read_csv_auto($1)", [csv_content]) return df.fetchdf() # 步骤3:并行处理文件 with ThreadPoolExecutor() as executor: dfs = list(executor.map(process_file, csv_files)) # 步骤4:使用DuckDB连接所有DataFrame if dfs: # 从第一个DataFrame创建临时表 conn.register('temp_result', dfs[0]) result_query = conn.sql("SELECT * FROM temp_result") # 与剩余的DataFrame合并 for i, df in enumerate(dfs[1:], 1): conn.register(f'temp_{i}', df) result_query = conn.sql(f"SELECT * FROM ({result_query.query}) UNION ALL SELECT * FROM temp_{i}") processed.write_table(result_query) ```

```python tab="Pandas" from transforms.api import transform, Input, Output import pandas as pd from concurrent.futures import ThreadPoolExecutor

@transform.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): fs = hair_eye_color.filesystem()

# 步骤1:列出所有CSV文件
csv_files = list(fs.ls(glob="*.csv"))

# 步骤2:处理单个文件的函数
def process_file(dataset_file):
    path = dataset_file.path
    with fs.open(path) as csv_file:
        # 读取CSV为pandas DataFrame
        df = pd.read_csv(csv_file)
        # 可选地,选择或重命名列以匹配模式
        return df[["student_id", "hair_color", "eye_color"]]

# 步骤3:并行处理文件
with ThreadPoolExecutor() as executor:
    dfs = list(executor.map(process_file, csv_files))

# 步骤4:连接所有DataFrame并写出
result_df = pd.concat(dfs, ignore_index=True)
processed.write_table(result_df)

```

使用PySpark进行并行化处理

与以DataFrame对象表示的数据转换,或使用标准非Spark转换定义的数据转换不同,理解PySpark中基于文件的转换的驱动程序(Driver)和执行器(Executor)代码之间的区别非常重要。计算函数在驱动程序上执行,驱动程序是单台机器。Spark会自动根据其判断将DataFrame函数分发给执行器(多台机器)。 为了从使用文件API的分布式处理中受益,我们必须利用Spark来分发计算。为此,我们创建一个FileStatusDataFrame,并将其分发到我们的执行器上。然后,执行器上的每个任务可以打开分配给它的文件并进行处理,结果由Spark聚合。 文件API暴露了files()函数,该函数接受与ls()函数相同的参数,但返回一个FileStatus对象的DataFrame。该DataFrame按文件大小进行分区,以帮助在文件大小不同时平衡计算。分区可以通过两个Spark配置选项来控制: * spark.sql.files.maxPartitionBytes ↗ 是读取文件时打包到单个分区中的最大字节数。 * spark.sql.files.openCostInBytes ↗ 是打开文件的估计成本,以在相同时间内可以扫描的字节数来衡量。这会被添加到文件大小中,以计算分区中文件使用的总字节数。 要修改这些属性的值,您必须创建一个自定义的Transforms配置文件(Profile),并使用configure()装饰器将其应用到您的Transform。更多信息,请参考代码仓库文档中关于定义Transforms配置文件的部分。 现在,让我们逐步看一个示例。假设我们有想要解析和连接的CSV文件。我们使用flatMap()将处理函数应用于每个FileStatus对象。这个处理函数必须根据pyspark.sql.SparkSession.createDataFrame()生成行。 python tab="PySpark" import csv from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, StringType from transforms.api import transform, Input, Output @transform.spark.using( processed=Output('/examples/hair_eye_color_processed'), hair_eye_color=Input('/examples/students_hair_eye_color_csv'), ) def example_computation(hair_eye_color, processed): def process_file(file_status): with hair_eye_color.filesystem().open(file_status.path) as f: r = csv.reader(f) # 从我们的标题行构造一个pyspark.Row header = next(r) MyRow = Row(*header) for row in r: yield MyRow(*row) schema = StructType([ StructField('student_id', StringType(), True), StructField('hair_color', StringType(), True), StructField('eye_color', StringType(), True), ]) files_df = hair_eye_color.filesystem().files('**/*.csv') processed_df = files_df.rdd.flatMap(process_file).toDF(schema) processed.write_dataframe(processed_df)

:::callout{theme="warning" title="警告"} 虽然可以在不传递模式(Schema)的情况下调用toDF(),但如果您的文件处理返回零行,那么Spark的模式推断将失败,并抛出ValueError: RDD is empty异常。因此,我们建议您始终手动指定模式。 :::