Read and write unstructured files(读写非结构化文件)¶
:::callout{theme="neutral"} The examples here contain more advanced content. Make sure to go through the section on defining Transforms before going through this section. :::
This page contains various example data transformations using Transforms Java:
- Parallel processing with Spark
- Uncompressing dataset files & writing to an output fileSystem
- Uncompressing dataset files & writing to an output dataFrame
- Combining dataset files
The examples here are data transformations expressed in terms of files. If you want to have access to files in your transformation, you must define a low-level Transform. This is because underlying dataset files are exposed by FoundryInput and FoundryOutput objects. Low-level Transforms, unlike high-level ones, expect the input(s) and output(s) to the compute function to be of type FoundryInput and FoundryOutput, respectively. The included examples are also low-level Transforms intended for manual registration.
Parallel processing with Spark¶
Uncompressing dataset files & writing to an output filesystem¶
This example takes in .zip files as input. It unzips the files and then writes the files to the output FileSystem — because .zip is not splittable, this work is done in parallel per .zip file using Spark. If you want to parallelize decompression within a single compressed file, use a splittable file format like .bz2.
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* This is an example of unzipping files in parallel using Spark.
* <p>
* The work is distributed to executors.
*/
public final class UnzipWithSpark {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
inputFileSystem.filesAsDataset().foreach(portableFile -> {
// "processWith" gives you the InputStream for the given input file.
portableFile.processWithThenClose(stream -> {
try (ZipInputStream zis = new ZipInputStream(stream)) {
ZipEntry entry;
// For each file in the .zip file, write it to the output file system.
while ((entry = zis.getNextEntry()) != null) {
outputFileSystem.writeTo(
Paths.get(entry.getName()),
outputStream -> ByteStreams.copy(zis, outputStream));
}
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
}
Uncompressing dataset files & writing to an output DataFrame¶
This example takes in .csv, .gz, and .zip files as input. It uncompresses the files and then writes the files to the output DataFrame—this work is done in parallel using Spark.
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.collect.AbstractIterator;
import com.google.common.io.CharSource;
import com.google.common.io.Closeables;
import com.palantir.spark.binarystream.data.PortableFile;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.util.TaskCompletionListener;
/**
* This is an example expects .csv/.gz/.zip files as input.
* <p>
* It does the following operations in parallel over Spark:
* <ol>
* <li>Detects the type of the file.</li>
* <li>If the file type is .gz or .zip, it uncompressed the files.</li>
* <li>Infers the schema of the uncompressed .csv files.</li>
* <li>Converts the dataset of .csv lines into a dataset of that schema.</li>
* </ol>
*/
public final class UnzipWithSparkToDataset {
@Compute
public void compute(FoundryInput input, FoundryOutput output) {
// Get a Spark dataset of input files.
Dataset<PortableFile> files = input.asFiles().getFileSystem().filesAsDataset();
// Convert the dataset of input files to a dataset of .csv lines.
Dataset<String> csvDataset = files.flatMap((FlatMapFunction<PortableFile, String>) portableFile ->
// Get an InputStream from the current input file.
portableFile.convertToIterator(inputStream -> {
String fileName = portableFile.getLogicalPath().getFileName().toString();
// Detect .gz and .zip files and get a line iterator from each.
if (fileName.endsWith(".gz")) {
return new InputStreamCharSource(new GZIPInputStream(inputStream)).getLineIterator();
} else if (fileName.endsWith(".zip")) {
return new ZipIterator(new ZipInputStream(inputStream));
} else {
return new InputStreamCharSource(inputStream).getLineIterator();
}
}), Encoders.STRING());
// Infers the schema and converts the dataset of .csv lines into a dataset of that schema.
Dataset<Row> dataset = files
.sparkSession()
.read()
.option("inferSchema", "true")
.csv(csvDataset);
output.getDataFrameWriter(dataset).write();
}
/*
* This ZipIterator assumes that all files within the archive are .csvs with the
* same schema and belong to the same dataset.
*/
private static final class ZipIterator extends AbstractIterator<String> {
private Iterator<String> lineIterator;
private ZipInputStream zis;
ZipIterator(ZipInputStream zis) throws IOException {
this.zis = zis;
lineIterator = new InputStreamCharSource(zis).getLineIterator();
}
@Override
protected String computeNext() {
if (!lineIterator.hasNext()) {
// If the line iterator does not have a next element, check if there is a next file.
try {
// Find the next file that is non empty.
while (zis.getNextEntry() != null) {
lineIterator = new InputStreamCharSource(zis).getLineIterator();
if (lineIterator.hasNext()) {
break;
}
}
return lineIterator.hasNext() ? lineIterator.next() : endOfData();
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return lineIterator.next();
}
}
}
private static final class InputStreamCharSource extends CharSource {
private final Reader inputStream;
private InputStreamCharSource(InputStream inputStream) {
this.inputStream = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
}
@Override
public Reader openStream() throws IOException {
return inputStream;
}
@SuppressWarnings("MustBeClosedChecker")
Iterator<String> getLineIterator() {
try {
return super.lines().iterator();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (TaskContext.get() != null) {
// If running in Spark, close the stream when the task is finished.
TaskContext.get().addTaskCompletionListener((TaskCompletionListener) context -> close());
} else {
close();
}
}
}
private void close() {
Closeables.closeQuietly(inputStream);
}
}
}
Combining dataset files¶
This example takes in .zip files as input, and it combines all of the input dataset files into a single .zip file. Note that the computation in this Transform is not parallelized.
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
* This is an example of combining all files in a dataset into one big .zip file.
* <p>
* The work is done on a single thread, because it is difficult to run in parallel.
* <p>
* WARNING: In general, it's preferred to use the APIs in {@link UnzipWithSpark} and {@link UnzipWithSparkToDataset}
* to take advantage of Spark. This is an example of a computation that is difficult to meaningfully parallelize,
* which is why it is done using file system operations only on the driver.
*/
public final class ZipOnDriver {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
// Write to a file called "bigzip.zip" in the output dataset's file system.
outputFileSystem.writeTo(Paths.get("bigzip.zip"), outputStream -> {
// Wrap the OutputStream in a ZipOutputStream in order to write each file into the same .zip file.
try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
// For each file in the input dataset's FileSystem, read it, mark a new entry in the
// "bigzip.zip", then copy the bytes over.
inputFileSystem.listAllFiles().forEach(inputPath -> {
inputFileSystem.readFrom(inputPath, inputStream -> {
zipOutputStream.putNextEntry(new ZipEntry(inputPath.toString()));
ByteStreams.copy(inputStream, zipOutputStream);
return null;
});
});
}
});
}
}
中文翻译¶
读写非结构化文件¶
:::callout{theme="neutral"} 此处的示例包含较高级的内容。在阅读本节之前,请务必先阅读定义转换(Transforms)的章节。 :::
本页包含使用 Transforms Java 进行各种数据转换的示例:
此处的示例是以文件形式表达的数据转换。如果您希望在转换中访问文件,则必须定义底层转换(low-level Transform)。这是因为底层数据集文件是通过 FoundryInput 和 FoundryOutput 对象公开的。与高层转换不同,底层转换要求计算函数的输入和输出分别为 FoundryInput 和 FoundryOutput 类型。所包含的示例也是用于手动注册的底层转换。
使用 Spark 进行并行处理¶
解压缩数据集文件并写入输出文件系统¶
此示例以 .zip 文件作为输入。它解压缩文件,然后将文件写入输出文件系统(FileSystem)——由于 .zip 不可拆分(splittable),此项工作使用 Spark 按每个 .zip 文件并行执行。如果您希望在单个压缩文件内并行化解压缩,请使用可拆分的文件格式,如 .bz2。
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* This is an example of unzipping files in parallel using Spark.
* <p>
* The work is distributed to executors.
*/
public final class UnzipWithSpark {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
inputFileSystem.filesAsDataset().foreach(portableFile -> {
// "processWith" gives you the InputStream for the given input file.
portableFile.processWithThenClose(stream -> {
try (ZipInputStream zis = new ZipInputStream(stream)) {
ZipEntry entry;
// For each file in the .zip file, write it to the output file system.
while ((entry = zis.getNextEntry()) != null) {
outputFileSystem.writeTo(
Paths.get(entry.getName()),
outputStream -> ByteStreams.copy(zis, outputStream));
}
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
}
解压缩数据集文件并写入输出数据帧(DataFrame)¶
此示例以 .csv、.gz 和 .zip 文件作为输入。它解压缩文件,然后将文件写入输出数据帧(DataFrame)——此项工作使用 Spark 并行执行。
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.collect.AbstractIterator;
import com.google.common.io.CharSource;
import com.google.common.io.Closeables;
import com.palantir.spark.binarystream.data.PortableFile;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.util.TaskCompletionListener;
/**
* This is an example expects .csv/.gz/.zip files as input.
* <p>
* It does the following operations in parallel over Spark:
* <ol>
* <li>Detects the type of the file.</li>
* <li>If the file type is .gz or .zip, it uncompressed the files.</li>
* <li>Infers the schema of the uncompressed .csv files.</li>
* <li>Converts the dataset of .csv lines into a dataset of that schema.</li>
* </ol>
*/
public final class UnzipWithSparkToDataset {
@Compute
public void compute(FoundryInput input, FoundryOutput output) {
// Get a Spark dataset of input files.
Dataset<PortableFile> files = input.asFiles().getFileSystem().filesAsDataset();
// Convert the dataset of input files to a dataset of .csv lines.
Dataset<String> csvDataset = files.flatMap((FlatMapFunction<PortableFile, String>) portableFile ->
// Get an InputStream from the current input file.
portableFile.convertToIterator(inputStream -> {
String fileName = portableFile.getLogicalPath().getFileName().toString();
// Detect .gz and .zip files and get a line iterator from each.
if (fileName.endsWith(".gz")) {
return new InputStreamCharSource(new GZIPInputStream(inputStream)).getLineIterator();
} else if (fileName.endsWith(".zip")) {
return new ZipIterator(new ZipInputStream(inputStream));
} else {
return new InputStreamCharSource(inputStream).getLineIterator();
}
}), Encoders.STRING());
// Infers the schema and converts the dataset of .csv lines into a dataset of that schema.
Dataset<Row> dataset = files
.sparkSession()
.read()
.option("inferSchema", "true")
.csv(csvDataset);
output.getDataFrameWriter(dataset).write();
}
/*
* This ZipIterator assumes that all files within the archive are .csvs with the
* same schema and belong to the same dataset.
*/
private static final class ZipIterator extends AbstractIterator<String> {
private Iterator<String> lineIterator;
private ZipInputStream zis;
ZipIterator(ZipInputStream zis) throws IOException {
this.zis = zis;
lineIterator = new InputStreamCharSource(zis).getLineIterator();
}
@Override
protected String computeNext() {
if (!lineIterator.hasNext()) {
// If the line iterator does not have a next element, check if there is a next file.
try {
// Find the next file that is non empty.
while (zis.getNextEntry() != null) {
lineIterator = new InputStreamCharSource(zis).getLineIterator();
if (lineIterator.hasNext()) {
break;
}
}
return lineIterator.hasNext() ? lineIterator.next() : endOfData();
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return lineIterator.next();
}
}
}
private static final class InputStreamCharSource extends CharSource {
private final Reader inputStream;
private InputStreamCharSource(InputStream inputStream) {
this.inputStream = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
}
@Override
public Reader openStream() throws IOException {
return inputStream;
}
@SuppressWarnings("MustBeClosedChecker")
Iterator<String> getLineIterator() {
try {
return super.lines().iterator();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
if (TaskContext.get() != null) {
// If running in Spark, close the stream when the task is finished.
TaskContext.get().addTaskCompletionListener((TaskCompletionListener) context -> close());
} else {
close();
}
}
}
private void close() {
Closeables.closeQuietly(inputStream);
}
}
}
合并数据集文件¶
此示例以 .zip 文件作为输入,并将所有输入数据集文件合并为一个 .zip 文件。请注意,此转换(Transform)中的计算未进行并行化。
/*
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
*/
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
* This is an example of combining all files in a dataset into one big .zip file.
* <p>
* The work is done on a single thread, because it is difficult to run in parallel.
* <p>
* WARNING: In general, it's preferred to use the APIs in {@link UnzipWithSpark} and {@link UnzipWithSparkToDataset}
* to take advantage of Spark. This is an example of a computation that is difficult to meaningfully parallelize,
* which is why it is done using file system operations only on the driver.
*/
public final class ZipOnDriver {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
// Write to a file called "bigzip.zip" in the output dataset's file system.
outputFileSystem.writeTo(Paths.get("bigzip.zip"), outputStream -> {
// Wrap the OutputStream in a ZipOutputStream in order to write each file into the same .zip file.
try (ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream)) {
// For each file in the input dataset's FileSystem, read it, mark a new entry in the
// "bigzip.zip", then copy the bytes over.
inputFileSystem.listAllFiles().forEach(inputPath -> {
inputFileSystem.readFrom(inputPath, inputStream -> {
zipOutputStream.putNextEntry(new ZipEntry(inputPath.toString()));
ByteStreams.copy(inputStream, zipOutputStream);
return null;
});
});
}
});
}
}