跳转至

Transforms(转换(Transforms))

Python

Parse excel file with Python

How can I read and process complex Excel files with dynamic schema in a distributed manner using PySpark and Openpyxl?

This code uses PySpark and the Openpyxl library to read multiple Excel files from an input filesystem, parse their content, and convert them into PySpark DataFrames. The DataFrames are then combined into a single DataFrame and written to the output.

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import tempfile
import shutil
import openpyxl
import functools


@transform(
    processed_excel=Output("example_processed_dataframe"),
    excel_input=Input("example_excel_dataframe"),
)
def compute(ctx, processed_excel, excel_input):
    def parse_file(file_status):
        # Open the Excel file
        with excel_input.filesystem().open(file_status.path, "rb") as in_xlsx:
            # Create a temporary file for processing
            with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_xlsx:
                shutil.copyfileobj(in_xlsx, tmp_xlsx)
                tmp_xlsx.flush()

                # Load the Excel workbook and parse its content
                try:
                    workbook = openpyxl.load_workbook(tmp_xlsx.name)
                    return parse_workbook(workbook)
                except:
                    return None

    # Get the list of Excel files from the input filesystem
    files_df = excel_input.filesystem().files()
    # Parse each file using the 'parse_file' function
    parsed_files = files_df.rdd.map(parse_file).collect()

    # Convert the parsed files to PySpark dataframes
    dfs = []
    for parsed_file in parsed_files:
        dfs.append(convert_to_df(ctx, parsed_file))

    # Union the dataframes and write the result to the output
    df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)
    processed_excel.write_dataframe(df)
  • Date submitted: 2024-08-12
  • Tags: code authoring, code repositories, python, openpyxl

Combine shapefiles and convert to GeoJSON

How do I combine multiple shapefiles and convert them to GeoJSON format?

This code uses the geospatial_tools library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.

from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping


@geospatial()
@transform(
    output=Output(),
    input_data=Input(),
)
def compute(ctx, input_data, output):
    fs = input_data.filesystem()
    schema = T.StructType([T.StructField("geoshape", T.StringType()),
                           T.StructField("name", T.StringType()),
                           T.StructField("centroid", T.StringType())])
    shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
    combined_data = ctx.spark_session.createDataFrame([], schema)
    for shapefile in shapefiles:  # NOQA
        with tempfile.TemporaryDirectory() as tmp_dir:
            # Copy all files for the shapefile to the local filesystem
            # There are multiple files associated with a shapefile, such as .prj and .cpg
            for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
                with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
                    with fs.open(shapefile_file.path, 'rb') as f:
                        shutil.copyfileobj(f, tmp_file)
            # Create a GeoJSON geometry column
            pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
            pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
            df = ctx.spark_session.createDataFrame(pdf)

            # Convert everything to EPSG:4326 format expected by Foundry
            crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
            df = df.withColumn(
                "geoshape",
                clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
                ).select("geoshape")
            df = df.withColumn('name', F.lit(shapefile))
            df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
            combined_data = combined_data.unionByName(df)

    return output.write_dataframe(combined_data)
  • Date submitted: 2024-05-23
  • Tags: geospatial, shapefile, geojson, geohash, pyspark, geopandas

Copy raw files between datasets

How can I copy raw files from an input dataset to an output dataset within a Python authoring transform?

This code defines a PySpark transform function to copy raw files from an input dataset to an output dataset. It uses the 'shutil' library to copy the file bytes and allows for copying all files or only a subset based on provided regex patterns.

from transforms.api import transform, Input, Output
from pyspark.sql import DataFrame
from functools import reduce
import shutil

# Copy raw files from the input dataset to the output dataset in a python transform
@transform(
    my_output=Output("my_output_dataset"),
    my_input=Input("my_input_dataset")
)
def copy_my_input(my_output, my_input):
    copy_raw_files(my_output, my_input, [".*\.csv"], False)


def copy_raw_files(my_output, my_input, regexes, copy_full=False):
    # Copies the raw files
    def copy_file(file_status):
        # Open the given file in the input dataframe filesystem
        with my_input.filesystem().open(file_status.path, 'rb') as in_f:
            # Open a file in the output dataframe filesystem
            with my_output.filesystem().open(file_status.path, 'wb') as out_f:
                # Copy the file bytes from intput to output
                shutil.copyfileobj(in_f, out_f)

    # Choose whether to copy all the files or only a subset
    if copy_full:
        files_df = my_input.filesystem().files()
    else:
        files_to_copy = []
        for regex in regexes:
            # Only copy files that match the regex
            files_to_copy.append(my_input.filesystem().files(regex=regex))
        # Create one dataframe with all the files
        files_df = reduce(DataFrame.unionByName, files_to_copy)

    # This will parallelise the copy operation
    files_df.rdd.foreach(copy_file)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python

File processing

How do I process multiple files in a dataset using PySpark?

This code uses PySpark to process multiple files in a dataset, including gzipped files, by reading the first line of each file and creating a dataframe with the file information and the first line content.

from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql import Row
import gzip
import io

# @incremental decorator or not (compatible with both)
# Changed @transform_df for @transform
# This gives more control over inputs and outputs and is needed to access the "file" version of the input dataset(s)


@transform(
    output_dataset_1=Output(""),
    output_dataset_2=Output(""),
    input_dataset=Input("")
)
def example_transform_file_processing(ctx, input_dataset, output_dataset_1, output_dataset_2):
    # The "files()" method returns a dataframe representing the filesystem of the input dataset
    fs = input_dataset.filesystem()
    files_df = input_dataset.filesystem().files()

    # Here you can extract the paths of each file, and then process them in any way you need or want
    # ==== Example of computation
    # Defines the schema of the rdd flatmap output
    schema = T.StructType([
        T.StructField('hadoop_path', T.StringType()),
        T.StructField('file_name', T.StringType()),
        T.StructField('size', T.LongType()),
        T.StructField('modified', T.LongType()),
        T.StructField('first_row_content', T.StringType())
    ])
    cols = schema.fieldNames()  # equivalent to :["hadoop_path", "file_name", "size", "modified"]
    MyRow = Row(*cols)  # defining of the "MyRow" object to use as a return type of the RDD UDF-like function

    # Inline function to parse one file (idea: like a UDF, but for RDD)
    def process_file(file_status):
        # Example of processing : read the first line of each file
        line = "default value"
        try:
            line = "WARNING: Not supported file type."
            if file_status.path.endswith('.gz'):
                # Handle Gzipped files
                with fs.open(file_status.path, "rb") as f:
                    gz = gzip.GzipFile(fileobj=f)
                    br = io.BufferedReader(gz)
                    tw = io.TextIOWrapper(br)
                    line = tw.readline()
            else:
                with fs.open(file_status.path, "r") as f:
                    line = f.readline()

        except Exception as e:
            line = "ERROR: " + str(e)

        # It creates a row out of the RDD element
        yield MyRow(fs.hadoop_path, file_status.path, file_status.size, file_status.modified, line)

    # Convert the files dataframe to a RDD. See https://spark.apache.org/docs/latest/rdd-programming-guide.html
    rdd = files_df.rdd
    # Apply a function on each element of the RDD
    rdd = rdd.flatMap(process_file)
    # Convert the RDD into a dataframe, to be write it to output easily
    # Specifying the schema allows to handle empty rdd.
    output_df = ctx.spark_session.createDataFrame(rdd, schema)
    # Add a timestamp
    output_df = output_df.withColumn('processed_at', F.current_timestamp())
    # ==== End Example of computation

    # Writes to the output the filesystem's dataframe representation of the input
    output_dataset_1.write_dataframe(files_df)
    # Write the processed dataframe to the output
    output_dataset_2.write_dataframe(output_df)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, gzip, zip

Load ORC file using PySpark

How do I load an ORC file using PySpark?

This code reads a raw ORC file from a the Hadoop path of an input dataset and writes the resulting spark dataframe to an output.

from transforms.api import transform, Input, Output


@transform(
    out=Output("output"),
    raw=Input("input"),
)
def compute(ctx, out, raw):
    hadoop_path = raw.filesystem().hadoop_path
    df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/')
    out.write_dataframe(df)
  • Date submitted: 2024-07-18
  • Tags: pyspark, dataframe, orc, hadoop

Parse SAS files using PySpark

How do I create a PySpark dataframe from SAS datasets?

This code defines a transform function that takes an input dataset containing raw SAS files and creates a PySpark dataframe from them. It uses the spark-sas7bdat package to read the SAS files and load them into a dataframe.

@transform(
    output=Output("xxxxx"), # include foundry RID here
    input_df=Input("xxxxx") # include foundry RID here
)
def parse_sas_file(ctx, input_df, output, sas_path="*.sas7bdat"):
    '''
    Creates a PySpark dataframe from SAS datasets
    Note that this function performs computation in the driver, and may require an increase in driver memory
    ctx: Spark context
    input_df: Input dataset containing raw SAS files
    sas_path: Path to SAS file within dataset, defaults to all SAS files in the dataset
    include_filename_as_field: Include the filename as a column for parsing downstream; defaults to false
    '''
    fs = input_df.filesystem()
    hadoop_path = fs.hadoop_path
    files_df = fs.files(sas_path)
    # dfs = []

    spark_session = ctx.spark_session.builder.appName(ctx.spark_session.sparkContext.appName).config('spark.jars.packages', 'saurfang:spark-sas7bdat:3.0.0-s_2.12').getOrCreate()

    # TODO: Update this to work for multiple paths
    # read in whatever files from the backing dataset
    path = files_df.collect()[0].path
    full_path = f'{hadoop_path}/{path}'
    df = spark_session.read.format("com.github.saurfang.sas.spark").load(full_path)

    output.write_dataframe(df)
  • Date submitted: 2024-07-29
  • Tags: pyspark, dataframe, sas, code repositories

Process and combine multiple files

How do I process multiple files in a dataset and combine them into a single PySpark DataFrame?

This code defines a PySpark transform that takes an input dataset containing multiple files, processes each file individually, and combines the results into a single PySpark DataFrame. It uses the 'map' function to apply the 'parse_file' function to each file in the dataset, collects the results, and unions all the DataFrames together.

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import tempfile
import shutil
import functools

# Define the PySpark transform
@transform(
    processed_files=Output("example_processed_files_dataset"),
    file_dataset=Input("example_file_dataset"),
)
def compute(ctx, processed_files, file_dataset):
    # Function to parse a single file
    def parse_file(file_status):
        with file_dataset.filesystem().open(file_status.path, "rb") as in_file:
            with tempfile.NamedTemporaryFile() as tmp_file:
                shutil.copyfileobj(in_file, tmp_file)
                tmp_file.flush()

                # Process the file locally and return a Python object
                return process_file_locally_and_return_python_object(tmp_file)

    # Get the list of files in the dataset
    files_df = file_dataset.filesystem().files()

    # Parse each file and collect the results
    parsed_files = files_df.rdd.map(parse_file).collect()

    dfs = []
    for parsed_file in parsed_files:
        # Convert the parsed file to a PySpark DataFrame
        dfs.append(convert_to_df(ctx, parsed_file))

    # Union all the DataFrames together
    df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)

    # Write the resulting DataFrame to the output dataset
    processed_files.write_dataframe(df)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, raw files, unstructured

Extracting content from DOCX files

How do I extract content from a DOCX file using Python?

This code uses the python-docx library to read the content of a DOCX file from a dataset and store it in a Document object for further processing.

from transforms.api import transform, Input, Output
import docx as dx
from io import BytesIO

@transform(
    output=Output("output_dataset"),
    docs=Input("input_dataset"),
)
def compute(ctx, docs, output):

    fs = docs.filesystem()
    doc_file = list(fs.ls(regex=r'.*\.docx'))[0]

    # Open the file with the filesystem and read its content into a BytesIO object
    with fs.open(doc_file.path, 'rb') as f:
        source_stream = BytesIO(f.read())
        document = dx.Document(source_stream)
        source_stream.close()

    # Do something with the document object
  • Date submitted: 2024-03-20
  • Tags: code authoring, python, python-docx, bytesio, raw files, unstructured

Process zipped CSV

How do I read and process a zip file containing multiple CSV files from an input dataset and write the processed data to an output dataset in PySpark?

This code uses PySpark to read and process zip files containing CSVs in an input dataset, skipping the first line of each CSV, and writes the processed data to an output dataset

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import shutil
import tempfile
import zipfile
import io

@transform(
    my_output=Output("my_output_dataset"),
    my_input=Input("my_input_dataset")
)
def compute(ctx, my_output, my_input):
    # Function to process each file in the input dataset
    def process_file(file_status):
        with fs.open(file_status.path, 'rb') as f:
            with tempfile.NamedTemporaryFile() as tmp:
                shutil.copyfileobj(f, tmp)
                tmp.flush()

                # Read and process the zip file
                with zipfile.ZipFile(tmp) as archive:
                    for filename in archive.namelist():
                        with archive.open(filename) as f2:
                            br = io.BufferedReader(f2)
                            tw = io.TextIOWrapper(br)
                            tw.readline() # Skip the first line of each CSV
                            # Read and process each line in the CSV
                            for line in tw:
                                yield MyRow(*line.split(","))

    # Read input dataset and process each file
    rdd = my_input.files().rdd
    rdd = rdd.flatMap(process_file)
    df = rdd.toDF()

    # Write the processed data to the output dataset
    my_output.write_dataframe(df)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, zip, csv

Unzipping and extracting files in dataset

How can I unzip a file in a dataset?

This code uses PySpark to read zipped files from an input, extract the contents, and write the extracted files to an output. It does this by iterating through the zipped files, reading their contents into a BytesIO stream, and then using the zipfile library to extract the files to a temporary directory. The extracted files are then written to the output.

# from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import zipfile
import tempfile
import os
from io import BytesIO


@transform(
    unzipped=Output(""),
    zipped=Input(""),
)
def compute(unzipped, zipped):
    zip_files = zipped.filesystem().files(glob="*.zip").collect()
    for zip_file in zip_files:
        with zipped.filesystem().open(zip_file["path"], 'rb') as zip_f:
            source_stream = BytesIO(zip_f.read())
            with zipfile.ZipFile(source_stream, 'r') as zip_ref:
                with tempfile.TemporaryDirectory() as temp_dir:
                    zip_ref.extractall(temp_dir)
                    for path in iterate_directories(temp_dir):
                        output_file_name = path.replace(temp_dir, "")
                        with unzipped.filesystem().open(output_file_name, "w") as out_f:
                            with open(path, 'r') as in_f:
                                out_f.write(in_f.read())


def iterate_directories(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            path = os.path.join(root, file)
            if is_leaf_file(path):
                yield path


def is_leaf_file(path):
    return os.path.isfile(path) and not os.path.islink(path)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, raw files, zip, unzip

Zip dataset files

How can I create a zip file from a dataset of files?

This code uses the transforms API to read all Markdown files from a source dataset and create a zip file containing these files.

from transforms.api import transform, Input, Output
import zipfile


@transform(
    my_output=Output(""),
    source_df=Input(""),
)
def compute(ctx, my_output, source_df):
    files = source_df.filesystem().files(glob="*.md").collect()

    with my_output.filesystem().open("foundry_code_examples.zip", 'wb') as write_zip:
        with zipfile.ZipFile(write_zip.name, 'w') as zip_file:
            for file_row in files:
                with source_df.filesystem().open(file_row["path"], 'rb') as markdown_file:
                    zip_file.write(markdown_file.name, arcname=file_row["path"])

    return source_df
  • Date submitted: 2024-03-26
  • Tags: raw files, zip, python, code authoring, code repositories, export

Java

Parse Excel files with complex, multi-row headers

How can I parse Excel files where the header is complex and consists of multiple rows?

This code demonstrates how to parse Excel files with complex headers using the transforms-excel-parser library. It creates a TableParser with a MultilayerMergedHeaderExtractor, then creates a TransformsExcelParser with the TableParser. Finally, it uses the TransformsExcelParser to extract data from the Excel files in the input dataset and writes the result to the output.

package myproject.datasets;

import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.table.MultilayerMergedHeaderExtractor;
import com.palantir.transforms.excel.table.TableParser;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import java.util.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public final class ComplexHeaderExcel {

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid>") FoundryInput myInput,
            @Output("<output_dataset_rid>") FoundryOutput myOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput
        ) {
        // Create a TableParser with a MultilayerMergedHeaderExtractor
        Parser tableParser = TableParser.builder()
                .headerExtractor(MultilayerMergedHeaderExtractor.builder()
                        .topLeftCellName("A1")
                        .bottomRightCellName("D2")
                        .build())
                .build();

        // Create a TransformsExcelParser with the TableParser
        TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser);

        // Parse input
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // Get the parsed data, which may be empty if there were no rows in the input
        // or an error occurred
        Optional<Dataset<Row>> maybeDf = result.singleResult();

        // If parsed data is not empty, write it to the output dataset
        maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write());

        // Write error information to the error output
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
    }
}
  • Date submitted: 2024-08-08
  • Tags: code authoring, code repositories, java, transforms-excel-parser, excel

Parse Excel files with non-tabular (form) data

How do I parse Excel files where the data is not tabular?

This code demonstrates how to use the transforms-excel-parser library to extract data from Excel files containing forms across multiple sheets.

package myproject.datasets;

import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.form.FieldSpec;
import com.palantir.transforms.excel.form.FormParser;
import com.palantir.transforms.excel.form.Location;
import com.palantir.transforms.excel.form.cellvalue.AdjacentCellAssertion;
import com.palantir.transforms.excel.form.cellvalue.CellValue;
import com.palantir.transforms.excel.functions.RegexSubstringMatchingSheetSelector;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;

public final class FormStyleExcel {
    private static final String FORM_A_KEY = "FORM_A";
    private static final String FORM_B_KEY = "FORM_B";

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid") FoundryInput myInput,
            @Output("<form_a_output_dataset_rid>") FoundryOutput formAOutput,
            @Output("<form_b_output_dataset_rid>") FoundryOutput formBOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput) {
        // Form A parser configuration
        Parser formAParser = FormParser.builder()
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_A"))
                .addFieldSpecs(createFieldSpec("form_a_field_1", "B1"))
                .addFieldSpecs(createFieldSpec("form_a_field_2", "B2"))
                .build();

        // Form B parser configuration
        Parser formBParser = FormParser.builder()
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B"))
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B"))
                .addFieldSpecs(createFieldSpec("form_b_field_1", "B1"))
                .addFieldSpecs(createFieldSpec("form_b_field_2", "B2"))
                .build();

        // TransformsExcelParser with both Form A and Form B parsers
        TransformsExcelParser transformsParser = TransformsExcelParser.builder()
                .putKeyToParser(FORM_A_KEY, formAParser)
                .putKeyToParser(FORM_B_KEY, formBParser)
                .build();

        // Parse input
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // Write parsed data to the output datasets
        result.dataframeForKey(FORM_A_KEY)
                .ifPresent(df -> formAOutput.getDataFrameWriter(df).write());
        result.dataframeForKey(FORM_B_KEY)
                .ifPresent(df -> formBOutput.getDataFrameWriter(df).write());

        // Write error information to the error output
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
    }

    // Helper method to concisely create a FieldSpec with an appropriate assertion
    private static FieldSpec createFieldSpec(String fieldName, String cellLocation) {
        return FieldSpec.of(
                fieldName,
                CellValue.builder()
                        .addAssertions(AdjacentCellAssertion.left(1, fieldName))
                        .location(Location.of(cellLocation))
                        .build());
    }
}
  • Date submitted: 2024-08-06
  • Tags: code authoring, code repositories, java, transforms-excel-parser, excel

Parse simple tabular Excel files

How do I parse simple tabular Excel files using Transforms Excel Parser?

This code demonstrates how to parse a dataset containing simple tabular Excel files using the transforms-excel-parser library. It creates a TableParser with a SimpleHeaderExtractor, then creates a TransformsExcelParser with the TableParser. Finally, it uses the TransformsExcelParser to parse the files in the input dataset and writes the extracted data to the output dataset.

package myproject.datasets;

import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.table.SimpleHeaderExtractor;
import com.palantir.transforms.excel.table.TableParser;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import java.util.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public final class SimpleTabularExcel {

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid>") FoundryInput myInput,
            @Output("<output_dataset_rid>") FoundryOutput myOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput
        ) {
        // Create a TableParser with an appropriately configured SimpleHeaderExtractor
        // In this example, the header of the file is on the second row.
        // If the header were on the first row, we would not need to
        // specify rowsToSkip, since the default value is 0, and in fact
        // we could just do TableParser.builder().build() in that case.
        Parser tableParser = TableParser.builder()
                .headerExtractor(
                        SimpleHeaderExtractor.builder().rowsToSkip(1).build())
                .build();

        // Create a TransformsExcelParser with the TableParser
        TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser);

        // Parse input
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // Get the parsed data, which may be empty if there were no rows in the input
        // or an error occurred
        Optional<Dataset<Row>> maybeDf = result.singleResult();

        // If parsed data is not empty, write it to the output dataset
        maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write());

        // Write error information to the error output
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
}
  • Date submitted: 2024-08-08
  • Tags: code authoring, code repositories, java, transforms-excel-parser, excel

中文翻译


转换(Transforms)

Python

使用 Python 解析 Excel 文件

如何利用 PySpark 和 Openpyxl 以分布式方式读取和处理具有动态模式的复杂 Excel 文件?

此代码使用 PySpark 和 Openpyxl 库从输入文件系统读取多个 Excel 文件,解析其内容,并将其转换为 PySpark DataFrame。然后将这些 DataFrame 合并为一个 DataFrame 并写入输出。

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import tempfile
import shutil
import openpyxl
import functools


@transform(
    processed_excel=Output("example_processed_dataframe"),
    excel_input=Input("example_excel_dataframe"),
)
def compute(ctx, processed_excel, excel_input):
    def parse_file(file_status):
        # 打开 Excel 文件
        with excel_input.filesystem().open(file_status.path, "rb") as in_xlsx:
            # 创建一个临时文件用于处理
            with tempfile.NamedTemporaryFile(suffix=".xlsx") as tmp_xlsx:
                shutil.copyfileobj(in_xlsx, tmp_xlsx)
                tmp_xlsx.flush()

                # 加载 Excel 工作簿并解析其内容
                try:
                    workbook = openpyxl.load_workbook(tmp_xlsx.name)
                    return parse_workbook(workbook)
                except:
                    return None

    # 从输入文件系统获取 Excel 文件列表
    files_df = excel_input.filesystem().files()
    # 使用 'parse_file' 函数解析每个文件
    parsed_files = files_df.rdd.map(parse_file).collect()

    # 将解析后的文件转换为 PySpark DataFrame
    dfs = []
    for parsed_file in parsed_files:
        dfs.append(convert_to_df(ctx, parsed_file))

    # 合并 DataFrame 并将结果写入输出
    df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)
    processed_excel.write_dataframe(df)
  • 提交日期:2024-08-12
  • 标签:代码编写代码仓库pythonopenpyxl

合并 Shapefile 并转换为 GeoJSON

如何合并多个 Shapefile 并将其转换为 GeoJSON 格式?

此代码使用 geospatial_tools 库读取多个 Shapefile,将其几何图形转换为 GeoJSON 格式,并合并为一个 PySpark DataFrame。它还计算每个几何图形的质心并将其转换为地理哈希(geohash)。

from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping


@geospatial()
@transform(
    output=Output(),
    input_data=Input(),
)
def compute(ctx, input_data, output):
    fs = input_data.filesystem()
    schema = T.StructType([T.StructField("geoshape", T.StringType()),
                           T.StructField("name", T.StringType()),
                           T.StructField("centroid", T.StringType())])
    shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
    combined_data = ctx.spark_session.createDataFrame([], schema)
    for shapefile in shapefiles:  # NOQA
        with tempfile.TemporaryDirectory() as tmp_dir:
            # 将所有与 shapefile 关联的文件复制到本地文件系统
            # shapefile 关联多个文件,例如 .prj 和 .cpg
            for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
                with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
                    with fs.open(shapefile_file.path, 'rb') as f:
                        shutil.copyfileobj(f, tmp_file)
            # 创建 GeoJSON 几何列
            pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
            pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
            df = ctx.spark_session.createDataFrame(pdf)

            # 转换为 Foundry 期望的 EPSG:4326 格式
            crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
            df = df.withColumn(
                "geoshape",
                clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
                ).select("geoshape")
            df = df.withColumn('name', F.lit(shapefile))
            df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
            combined_data = combined_data.unionByName(df)

    return output.write_dataframe(combined_data)
  • 提交日期:2024-05-23
  • 标签:地理空间shapefilegeojsongeohashpysparkgeopandas

在数据集之间复制原始文件

如何在 Python 编写转换中,将原始文件从输入数据集复制到输出数据集?

此代码定义了一个 PySpark 转换函数,用于将原始文件从输入数据集复制到输出数据集。它使用 shutil 库复制文件字节,并允许根据提供的正则表达式模式复制所有文件或仅复制子集。

from transforms.api import transform, Input, Output
from pyspark.sql import DataFrame
from functools import reduce
import shutil

# 在 Python 转换中将原始文件从输入数据集复制到输出数据集
@transform(
    my_output=Output("my_output_dataset"),
    my_input=Input("my_input_dataset")
)
def copy_my_input(my_output, my_input):
    copy_raw_files(my_output, my_input, [".*\.csv"], False)


def copy_raw_files(my_output, my_input, regexes, copy_full=False):
    # 复制原始文件
    def copy_file(file_status):
        # 在输入 DataFrame 文件系统中打开给定文件
        with my_input.filesystem().open(file_status.path, 'rb') as in_f:
            # 在输出 DataFrame 文件系统中打开一个文件
            with my_output.filesystem().open(file_status.path, 'wb') as out_f:
                # 将文件字节从输入复制到输出
                shutil.copyfileobj(in_f, out_f)

    # 选择是复制所有文件还是仅复制子集
    if copy_full:
        files_df = my_input.filesystem().files()
    else:
        files_to_copy = []
        for regex in regexes:
            # 仅复制与正则表达式匹配的文件
            files_to_copy.append(my_input.filesystem().files(regex=regex))
        # 创建一个包含所有文件的 DataFrame
        files_df = reduce(DataFrame.unionByName, files_to_copy)

    # 这将并行化复制操作
    files_df.rdd.foreach(copy_file)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库python

文件处理

如何使用 PySpark 处理数据集中的多个文件?

此代码使用 PySpark 处理数据集中的多个文件,包括 gzip 压缩文件,通过读取每个文件的第一行,并创建一个包含文件信息和第一行内容的 DataFrame。

from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql import Row
import gzip
import io

# 是否使用 @incremental 装饰器(两者兼容)
# 将 @transform_df 改为 @transform
# 这提供了对输入和输出的更多控制,并且需要访问输入数据集的“文件”版本


@transform(
    output_dataset_1=Output(""),
    output_dataset_2=Output(""),
    input_dataset=Input("")
)
def example_transform_file_processing(ctx, input_dataset, output_dataset_1, output_dataset_2):
    # "files()" 方法返回一个表示输入数据集文件系统的 DataFrame
    fs = input_dataset.filesystem()
    files_df = input_dataset.filesystem().files()

    # 在此处,您可以提取每个文件的路径,然后以任何需要或想要的方式处理它们
    # ==== 计算示例
    # 定义 RDD flatmap 输出的模式
    schema = T.StructType([
        T.StructField('hadoop_path', T.StringType()),
        T.StructField('file_name', T.StringType()),
        T.StructField('size', T.LongType()),
        T.StructField('modified', T.LongType()),
        T.StructField('first_row_content', T.StringType())
    ])
    cols = schema.fieldNames()  # 等同于:["hadoop_path", "file_name", "size", "modified"]
    MyRow = Row(*cols)  # 定义 "MyRow" 对象,用作 RDD UDF 类函数的返回类型

    # 内联函数,用于解析一个文件(思路:类似于 UDF,但用于 RDD)
    def process_file(file_status):
        # 处理示例:读取每个文件的第一行
        line = "default value"
        try:
            line = "WARNING: Not supported file type."
            if file_status.path.endswith('.gz'):
                # 处理 Gzip 压缩文件
                with fs.open(file_status.path, "rb") as f:
                    gz = gzip.GzipFile(fileobj=f)
                    br = io.BufferedReader(gz)
                    tw = io.TextIOWrapper(br)
                    line = tw.readline()
            else:
                with fs.open(file_status.path, "r") as f:
                    line = f.readline()

        except Exception as e:
            line = "ERROR: " + str(e)

        # 从 RDD 元素创建一行
        yield MyRow(fs.hadoop_path, file_status.path, file_status.size, file_status.modified, line)

    # 将文件 DataFrame 转换为 RDD。参见 https://spark.apache.org/docs/latest/rdd-programming-guide.html
    rdd = files_df.rdd
    # 对 RDD 的每个元素应用一个函数
    rdd = rdd.flatMap(process_file)
    # 将 RDD 转换为 DataFrame,以便轻松写入输出
    # 指定模式可以处理空 RDD。
    output_df = ctx.spark_session.createDataFrame(rdd, schema)
    # 添加时间戳
    output_df = output_df.withColumn('processed_at', F.current_timestamp())
    # ==== 计算示例结束

    # 将输入的文件系统 DataFrame 表示写入输出
    output_dataset_1.write_dataframe(files_df)
    # 将处理后的 DataFrame 写入输出
    output_dataset_2.write_dataframe(output_df)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库pythongzipzip

使用 PySpark 加载 ORC 文件

如何使用 PySpark 加载 ORC 文件?

此代码从输入数据集的 Hadoop 路径读取原始 ORC 文件,并将生成的 Spark DataFrame 写入输出。

from transforms.api import transform, Input, Output


@transform(
    out=Output("output"),
    raw=Input("input"),
)
def compute(ctx, out, raw):
    hadoop_path = raw.filesystem().hadoop_path
    df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/')
    out.write_dataframe(df)
  • 提交日期:2024-07-18
  • 标签:pysparkdataframeorchadoop

使用 PySpark 解析 SAS 文件

如何从 SAS 数据集创建 PySpark DataFrame?

此代码定义了一个转换函数,该函数接受包含原始 SAS 文件的输入数据集,并从中创建一个 PySpark DataFrame。它使用 spark-sas7bdat 包读取 SAS 文件并将其加载到 DataFrame 中。

@transform(
    output=Output("xxxxx"), # 在此处包含 Foundry RID
    input_df=Input("xxxxx") # 在此处包含 Foundry RID
)
def parse_sas_file(ctx, input_df, output, sas_path="*.sas7bdat"):
    '''
    从 SAS 数据集创建 PySpark DataFrame
    注意:此函数在驱动程序(driver)中执行计算,可能需要增加驱动程序内存
    ctx: Spark 上下文
    input_df: 包含原始 SAS 文件的输入数据集
    sas_path: 数据集中 SAS 文件的路径,默认为数据集中的所有 SAS 文件
    include_filename_as_field: 是否将文件名作为列包含以用于下游解析;默认为 false
    '''
    fs = input_df.filesystem()
    hadoop_path = fs.hadoop_path
    files_df = fs.files(sas_path)
    # dfs = []

    spark_session = ctx.spark_session.builder.appName(ctx.spark_session.sparkContext.appName).config('spark.jars.packages', 'saurfang:spark-sas7bdat:3.0.0-s_2.12').getOrCreate()

    # TODO: 更新此代码以支持多个路径
    # 从底层数据集读取文件
    path = files_df.collect()[0].path
    full_path = f'{hadoop_path}/{path}'
    df = spark_session.read.format("com.github.saurfang.sas.spark").load(full_path)

    output.write_dataframe(df)
  • 提交日期:2024-07-29
  • 标签:pysparkdataframesas代码仓库

处理并合并多个文件

如何处理数据集中的多个文件并将其合并为单个 PySpark DataFrame?

此代码定义了一个 PySpark 转换,该转换接受包含多个文件的输入数据集,单独处理每个文件,并将结果合并为单个 PySpark DataFrame。它使用 map 函数将 parse_file 函数应用于数据集中的每个文件,收集结果,然后将所有 DataFrame 合并在一起。

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import tempfile
import shutil
import functools

# 定义 PySpark 转换
@transform(
    processed_files=Output("example_processed_files_dataset"),
    file_dataset=Input("example_file_dataset"),
)
def compute(ctx, processed_files, file_dataset):
    # 解析单个文件的函数
    def parse_file(file_status):
        with file_dataset.filesystem().open(file_status.path, "rb") as in_file:
            with tempfile.NamedTemporaryFile() as tmp_file:
                shutil.copyfileobj(in_file, tmp_file)
                tmp_file.flush()

                # 在本地处理文件并返回一个 Python 对象
                return process_file_locally_and_return_python_object(tmp_file)

    # 获取数据集中的文件列表
    files_df = file_dataset.filesystem().files()

    # 解析每个文件并收集结果
    parsed_files = files_df.rdd.map(parse_file).collect()

    dfs = []
    for parsed_file in parsed_files:
        # 将解析后的文件转换为 PySpark DataFrame
        dfs.append(convert_to_df(ctx, parsed_file))

    # 将所有 DataFrame 合并在一起
    df = functools.reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)

    # 将生成的 DataFrame 写入输出数据集
    processed_files.write_dataframe(df)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库python原始文件非结构化

从 DOCX 文件中提取内容

如何使用 Python 从 DOCX 文件中提取内容?

此代码使用 python-docx 库从数据集中读取 DOCX 文件的内容,并将其存储在 Document 对象中以供进一步处理。

from transforms.api import transform, Input, Output
import docx as dx
from io import BytesIO

@transform(
    output=Output("output_dataset"),
    docs=Input("input_dataset"),
)
def compute(ctx, docs, output):

    fs = docs.filesystem()
    doc_file = list(fs.ls(regex=r'.*\.docx'))[0]

    # 使用文件系统打开文件并将其内容读入 BytesIO 对象
    with fs.open(doc_file.path, 'rb') as f:
        source_stream = BytesIO(f.read())
        document = dx.Document(source_stream)
        source_stream.close()

    # 对 document 对象执行某些操作
  • 提交日期:2024-03-20
  • 标签:代码编写pythonpython-docxbytesio原始文件非结构化

处理压缩的 CSV 文件

如何使用 PySpark 读取并处理输入数据集中包含多个 CSV 文件的 zip 文件,并将处理后的数据写入输出数据集?

此代码使用 PySpark 读取并处理输入数据集中的 zip 文件(包含 CSV),跳过每个 CSV 的第一行,并将处理后的数据写入输出数据集。

from pyspark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, configure
import shutil
import tempfile
import zipfile
import io

@transform(
    my_output=Output("my_output_dataset"),
    my_input=Input("my_input_dataset")
)
def compute(ctx, my_output, my_input):
    # 处理输入数据集中每个文件的函数
    def process_file(file_status):
        with fs.open(file_status.path, 'rb') as f:
            with tempfile.NamedTemporaryFile() as tmp:
                shutil.copyfileobj(f, tmp)
                tmp.flush()

                # 读取并处理 zip 文件
                with zipfile.ZipFile(tmp) as archive:
                    for filename in archive.namelist():
                        with archive.open(filename) as f2:
                            br = io.BufferedReader(f2)
                            tw = io.TextIOWrapper(br)
                            tw.readline() # 跳过每个 CSV 的第一行
                            # 读取并处理 CSV 中的每一行
                            for line in tw:
                                yield MyRow(*line.split(","))

    # 读取输入数据集并处理每个文件
    rdd = my_input.files().rdd
    rdd = rdd.flatMap(process_file)
    df = rdd.toDF()

    # 将处理后的数据写入输出数据集
    my_output.write_dataframe(df)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库pythonzipcsv

解压并提取数据集中的文件

如何解压数据集中的文件?

此代码使用 PySpark 从输入中读取压缩文件,提取内容,并将提取的文件写入输出。它通过遍历压缩文件,将其内容读入 BytesIO 流,然后使用 zipfile 库将文件提取到临时目录来实现。然后将提取的文件写入输出。

# from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import zipfile
import tempfile
import os
from io import BytesIO


@transform(
    unzipped=Output(""),
    zipped=Input(""),
)
def compute(unzipped, zipped):
    zip_files = zipped.filesystem().files(glob="*.zip").collect()
    for zip_file in zip_files:
        with zipped.filesystem().open(zip_file["path"], 'rb') as zip_f:
            source_stream = BytesIO(zip_f.read())
            with zipfile.ZipFile(source_stream, 'r') as zip_ref:
                with tempfile.TemporaryDirectory() as temp_dir:
                    zip_ref.extractall(temp_dir)
                    for path in iterate_directories(temp_dir):
                        output_file_name = path.replace(temp_dir, "")
                        with unzipped.filesystem().open(output_file_name, "w") as out_f:
                            with open(path, 'r') as in_f:
                                out_f.write(in_f.read())


def iterate_directories(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            path = os.path.join(root, file)
            if is_leaf_file(path):
                yield path


def is_leaf_file(path):
    return os.path.isfile(path) and not os.path.islink(path)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库python原始文件zipunzip

压缩数据集文件

如何从文件数据集创建 zip 文件?

此代码使用 transforms API 从源数据集读取所有 Markdown 文件,并创建一个包含这些文件的 zip 文件。

from transforms.api import transform, Input, Output
import zipfile


@transform(
    my_output=Output(""),
    source_df=Input(""),
)
def compute(ctx, my_output, source_df):
    files = source_df.filesystem().files(glob="*.md").collect()

    with my_output.filesystem().open("foundry_code_examples.zip", 'wb') as write_zip:
        with zipfile.ZipFile(write_zip.name, 'w') as zip_file:
            for file_row in files:
                with source_df.filesystem().open(file_row["path"], 'rb') as markdown_file:
                    zip_file.write(markdown_file.name, arcname=file_row["path"])

    return source_df
  • 提交日期:2024-03-26
  • 标签:原始文件zippython代码编写代码仓库导出

Java

解析具有复杂多行标题的 Excel 文件

如何解析标题复杂且由多行组成的 Excel 文件?

此代码演示了如何使用 transforms-excel-parser 库解析具有复杂标题的 Excel 文件。它创建一个带有 MultilayerMergedHeaderExtractor 的 TableParser,然后使用该 TableParser 创建一个 TransformsExcelParser。最后,使用 TransformsExcelParser 从输入数据集中的 Excel 文件中提取数据,并将结果写入输出。

package myproject.datasets;

import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.table.MultilayerMergedHeaderExtractor;
import com.palantir.transforms.excel.table.TableParser;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import java.util.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public final class ComplexHeaderExcel {

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid>") FoundryInput myInput,
            @Output("<output_dataset_rid>") FoundryOutput myOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput
        ) {
        // 创建一个带有 MultilayerMergedHeaderExtractor 的 TableParser
        Parser tableParser = TableParser.builder()
                .headerExtractor(MultilayerMergedHeaderExtractor.builder()
                        .topLeftCellName("A1")
                        .bottomRightCellName("D2")
                        .build())
                .build();

        // 使用 TableParser 创建一个 TransformsExcelParser
        TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser);

        // 解析输入
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // 获取解析后的数据,如果输入中没有行或发生错误,则可能为空
        Optional<Dataset<Row>> maybeDf = result.singleResult();

        // 如果解析后的数据不为空,则将其写入输出数据集
        maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write());

        // 将错误信息写入错误输出
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
    }
}
  • 提交日期:2024-08-08
  • 标签:代码编写代码仓库javatransforms-excel-parserexcel

解析非表格(表单)数据的 Excel 文件

如何解析数据不是表格形式的 Excel 文件?

此代码演示了如何使用 transforms-excel-parser 库从包含跨多个工作表的表单的 Excel 文件中提取数据。

package myproject.datasets;

import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.form.FieldSpec;
import com.palantir.transforms.excel.form.FormParser;
import com.palantir.transforms.excel.form.Location;
import com.palantir.transforms.excel.form.cellvalue.AdjacentCellAssertion;
import com.palantir.transforms.excel.form.cellvalue.CellValue;
import com.palantir.transforms.excel.functions.RegexSubstringMatchingSheetSelector;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;

public final class FormStyleExcel {
    private static final String FORM_A_KEY = "FORM_A";
    private static final String FORM_B_KEY = "FORM_B";

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid") FoundryInput myInput,
            @Output("<form_a_output_dataset_rid>") FoundryOutput formAOutput,
            @Output("<form_b_output_dataset_rid>") FoundryOutput formBOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput) {
        // 表单 A 解析器配置
        Parser formAParser = FormParser.builder()
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_A"))
                .addFieldSpecs(createFieldSpec("form_a_field_1", "B1"))
                .addFieldSpecs(createFieldSpec("form_a_field_2", "B2"))
                .build();

        // 表单 B 解析器配置
        Parser formBParser = FormParser.builder()
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B"))
                .sheetSelector(new RegexSubstringMatchingSheetSelector("Form_B"))
                .addFieldSpecs(createFieldSpec("form_b_field_1", "B1"))
                .addFieldSpecs(createFieldSpec("form_b_field_2", "B2"))
                .build();

        // 同时包含表单 A 和表单 B 解析器的 TransformsExcelParser
        TransformsExcelParser transformsParser = TransformsExcelParser.builder()
                .putKeyToParser(FORM_A_KEY, formAParser)
                .putKeyToParser(FORM_B_KEY, formBParser)
                .build();

        // 解析输入
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // 将解析后的数据写入输出数据集
        result.dataframeForKey(FORM_A_KEY)
                .ifPresent(df -> formAOutput.getDataFrameWriter(df).write());
        result.dataframeForKey(FORM_B_KEY)
                .ifPresent(df -> formBOutput.getDataFrameWriter(df).write());

        // 将错误信息写入错误输出
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
    }

    // 辅助方法,用于简洁地创建带有适当断言的 FieldSpec
    private static FieldSpec createFieldSpec(String fieldName, String cellLocation) {
        return FieldSpec.of(
                fieldName,
                CellValue.builder()
                        .addAssertions(AdjacentCellAssertion.left(1, fieldName))
                        .location(Location.of(cellLocation))
                        .build());
    }
}
  • 提交日期:2024-08-06
  • 标签:代码编写代码仓库javatransforms-excel-parserexcel

解析简单的表格型 Excel 文件

如何使用 Transforms Excel Parser 解析简单的表格型 Excel 文件?

此代码演示了如何使用 transforms-excel-parser 库解析包含简单表格型 Excel 文件的数据集。它创建一个带有 SimpleHeaderExtractor 的 TableParser,然后使用该 TableParser 创建一个 TransformsExcelParser。最后,使用 TransformsExcelParser 解析输入数据集中的文件,并将提取的数据写入输出数据集。

package myproject.datasets;

import com.palantir.transforms.excel.ParseResult;
import com.palantir.transforms.excel.Parser;
import com.palantir.transforms.excel.TransformsExcelParser;
import com.palantir.transforms.excel.table.SimpleHeaderExtractor;
import com.palantir.transforms.excel.table.TableParser;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import java.util.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public final class SimpleTabularExcel {

    @Compute
    public void myComputeFunction(
            @Input("<input_dataset_rid>") FoundryInput myInput,
            @Output("<output_dataset_rid>") FoundryOutput myOutput,
            @Output("<error_output_dataset_rid>") FoundryOutput errorOutput
        ) {
        // 创建一个带有适当配置的 SimpleHeaderExtractor 的 TableParser
        // 在此示例中,文件的标题位于第二行。
        // 如果标题在第一行,则无需指定 rowsToSkip,因为默认值为 0,
        // 实际上,在这种情况下我们只需执行 TableParser.builder().build()。
        Parser tableParser = TableParser.builder()
                .headerExtractor(
                        SimpleHeaderExtractor.builder().rowsToSkip(1).build())
                .build();

        // 使用 TableParser 创建一个 TransformsExcelParser
        TransformsExcelParser transformsParser = TransformsExcelParser.of(tableParser);

        // 解析输入
        ParseResult result =
                transformsParser.parse(myInput.asFiles().getFileSystem().filesAsDataset());

        // 获取解析后的数据,如果输入中没有行或发生错误,则可能为空
        Optional<Dataset<Row>> maybeDf = result.singleResult();

        // 如果解析后的数据不为空,则将其写入输出数据集
        maybeDf.ifPresent(df -> myOutput.getDataFrameWriter(df).write());

        // 将错误信息写入错误输出
        errorOutput.getDataFrameWriter(result.errorDataframe()).write();
}
  • 提交日期:2024-08-08
  • 标签:代码编写代码仓库javatransforms-excel-parserexcel