跳转至

Infer a schema for CSV or JSON files(为CSV或JSON文件推断模式(Schema))

It's easiest to work with datasets in Foundry if they have a schema. Foundry allows you to manually add a schema to datasets containing CSV or JSON files by selecting the Apply a schema button in the dataset. The Apply a schema button will automatically infer the schema based on a subset of the data. Once a schema is applied, select Edit schema in the dataset view to modify column types or apply additional parsing options to drop jagged rows, change encoding, or add additional columns like file path, byte offset for row, import timestamp, or row number.

Schemas applied statically based on the initial dataset's files can become out of date if data changes. Thus, it can be helpful to have Spark dynamically infer a schema as the first step of a transforms pipeline on semi-structured data.

Note that inferring a schema dynamically on each pipeline build has a performance cost, so this technique should only be used sparingly (for instance, when the schema may change).

Below are examples for CSV and JSON inputs.

:::callout{theme="neutral"} Parquet, the default output file format for Transforms, does not allow certain special characters that may be present in an automatically-inferred schema. Therefore, we recommend that you use sanitize_schema_for_parquet as in the examples below to prevent potential issues. :::

:::callout{theme="neutral"} Other than dynamic schema inference, there are many other use-cases for reading all or a subset of dataset files with SparkSession.read ↗ as in the examples below. If your use-case is one that does not actually need the dynamic schema inference behavior, you should disable it either by setting inferSchema to False (which will result in all columns being strings) or by leaving that option out and explicitly passing a schema ↗. Disabling automatic schema inference will result in significantly better performance and consistency especially for incremental pipelines where different schema inference results between incremental batches can be problematic. :::

CSV

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))

JSON

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_json"),
)
def read_json(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("multiline", False)  # False is the default; use True if each file contains a single JSON object instead of newline-delimited JSON objects
        .json(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))

中文翻译

为CSV或JSON文件推断模式(Schema)

如果数据集具有模式(Schema),在Foundry中处理起来最为便捷。Foundry允许您通过选择数据集中的应用模式(Apply a schema)按钮,手动为包含CSV或JSON文件的数据集添加模式。应用模式(Apply a schema)按钮会根据数据子集自动推断模式。应用模式后,在数据集视图中选择编辑模式(Edit schema),即可修改列类型或应用额外的解析选项,例如删除参差不齐的行、更改编码,或添加文件路径、行字节偏移量、导入时间戳或行号等额外列。

基于初始数据集文件静态应用的模式可能会因数据变化而过时。因此,让Spark在半结构化数据的转换管道第一步中动态推断模式会很有帮助。

请注意,在每次管道构建时动态推断模式会产生性能开销,因此应谨慎使用此技术(例如,在模式可能发生变化的情况下)。

以下是CSV和JSON输入的示例。

:::callout{theme="neutral"} Parquet(Transforms的默认输出文件格式)不允许自动推断模式中可能存在的某些特殊字符。因此,我们建议您使用下方示例中的sanitize_schema_for_parquet来防止潜在问题。 :::

:::callout{theme="neutral"} 除了动态模式推断之外,使用SparkSession.read ↗读取全部或部分数据集文件还有许多其他用例,如下方示例所示。如果您的用例实际上不需要动态模式推断行为,则应通过将inferSchema设置为False(这将导致所有列均为字符串类型)或省略该选项并显式传递模式(Schema) ↗来禁用它。禁用自动模式推断将显著提升性能并增强一致性,特别是在增量管道中,不同增量批次之间的模式推断结果差异可能会引发问题。 :::

CSV

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_csv"),
)
def read_csv(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 为默认编码
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))

JSON

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet

@transform(
    output=Output("/Company/sourceA/parsed/data"),
    raw=Input("/Company/sourceA/raw/data_json"),
)
def read_json(ctx, raw, output):
    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    df = (
        ctx
        .spark_session
        .read
        .option("multiline", False)  # False 为默认值;如果每个文件包含单个JSON对象而非换行符分隔的JSON对象,请使用True
        .json(files)
    )
    output.write_dataframe(sanitize_schema_for_parquet(df))