Transforms FAQ(转换常见问题解答)¶
The following are some frequently asked questions about transforms.
For general information, see our transforms documentation.
- Is it possible to save a CSV file in
transforms-pythonrather than saving Parquet? - Can I build multiple output datasets from one Python transform?
- How can I open a GZIP file with transforms?
- How can I unzip a file as part of a Foundry pipeline? In parallel?
Is it possible to save a CSV file in transforms-python rather than saving Parquet?¶
Below are examples of how to do this in each transform language:
Java
foundryOutput.getDataFrameWriter(dataFrame)
.setFormatSettings(DatasetFormatSettings.builder().format("csv").build())
.write();
Python
from transforms.api import transform, Input, Output
@transform(
output=Output("/path/to/python_csv"),
my_input=Input("/path/to/input")
)
def my_compute_function(output, my_input):
output.write_dataframe(my_input.dataframe(), output_format="csv")
SQL
CREATE TABLE `/path/to/sql_csv` USING CSV AS SELECT * FROM `/path/to/input`
Can I build multiple output datasets from one Python transform?¶
If you want multiple transforms/datasets, you can create them using a for loop:
from transforms.api import transforms_df, Input, Output
def transform_generator(sources):
#type: (List[str]) -> List([transforms.api.Transform])
transforms = []
# This example uses multiple input datasets. You can also generate multiple outputs
# from a single input dataset.
for source in sources:
@transforms_df(
Output('/sources/{source}/output'.format(source=source)),
my_input=Input('/sources/{source}/input'.format(source=source))
)
def compute_function(my_input, source=source):
# To capture the source variable in the function, you pass it as a defaulted keyword argument.
return my_input.filter(my_input.source == source)
transforms.append(compute_function)
return transforms
TRANSFORMS = transforms_generator(['src1', 'src2', 'src3'])
You can now import the TRANSFORMS attribute of the module and manually add each transform to your pipeline:
import my_module
my_pipeline = Pipeline()
my_pipeline.add_transforms(*my_module.TRANSFORMS)
To have a single transform that takes in one input and outputs multiple datasets in the same build, you can also do this programmatically as below:
# Using the `/examples/students_hair_eye_color` dataset
students_input = foundry.input('/examples/students_hair_eye_color')
students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
Note that this example only shows the top three rows.
from transforms.api import transform, Input, Output
@transform(
hair_eye_color=Input('/examples/students_hair_eye_color'),
males=Output('/examples/hair_eye_color_males'),
females=Output('examples/hair_eye_color_females'),
)
def brown_hair_by_sex(hair_eye_color, males, females):
# type: (TransformInput, TransformOutput, TransformOutput) -> None
brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown')
males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male'))
females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female'))
For more help and information on transforms, review the documentation for:
How can I open a GZIP file with transforms?¶
Since the input to transforms is a file-like object that is backed by a stream, you can process it as a file. This means you do not need to be concerned about reading the whole file in to memory or copying it on to a disk, allowing for usage of much larger files.
Use the gzip and io packages included in Python 3:
import gzip, io
def process_file(file_stauts):
fs = input_dataset.filesystem()
with fs.open(file_status.path, 'rb') as f:
gz = gzip.GzipFile(fileobj=f)
br = io.BufferedReader(gz)
And if you want reads to return strings, you can wrap it:
tw = io.TextIOWrapper(br)
If your file has an encoding you can specify it:
tw = io.TextIOWrapper(br, encoding='CP500')
For more help and information on transforms, review the documentation for:
How can I unzip a file as part of a Foundry pipeline? In parallel?¶
This uses Java and Spark to unzip each file within the zip archive in a parallelized fashion. If you want to parallelize decompression within a single compressed file, use a splittable file format like .bz2.
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* This is an example of unzipping files in parallel using Spark.
* <p>
* The work is distributed to executors.
*/
public final class UnzipWithSpark {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
inputFileSystem.filesAsDataset().foreach(portableFile -> {
// "processWith" gives you the InputStream for the given input file.
portableFile.processWithThenClose(stream -> {
try (ZipInputStream zis = new ZipInputStream(stream)) {
ZipEntry entry;
// For each file in the zip file, write it to the output file system.
while ((entry = zis.getNextEntry()) != null) {
outputFileSystem.writeTo(
Paths.get(entry.getName()),
outputStream -> ByteStreams.copy(zis, outputStream));
}
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
}
For more help and information on transforms, review the documentation for:
中文翻译¶
转换常见问题解答¶
以下是关于转换的一些常见问题。
有关一般信息,请参阅我们的转换文档。
- 是否可以在
transforms-python中保存 CSV 文件而不是 Parquet? - 能否从一个 Python 转换构建多个输出数据集?
- 如何使用转换打开 GZIP 文件?
- 如何在 Foundry 管道中解压文件?如何并行解压?
是否可以在 transforms-python 中保存 CSV 文件而不是 Parquet?¶
以下是在每种转换语言中实现此操作的示例:
Java
foundryOutput.getDataFrameWriter(dataFrame)
.setFormatSettings(DatasetFormatSettings.builder().format("csv").build())
.write();
Python
from transforms.api import transform, Input, Output
@transform(
output=Output("/path/to/python_csv"),
my_input=Input("/path/to/input")
)
def my_compute_function(output, my_input):
output.write_dataframe(my_input.dataframe(), output_format="csv")
SQL
CREATE TABLE `/path/to/sql_csv` USING CSV AS SELECT * FROM `/path/to/input`
能否从一个 Python 转换构建多个输出数据集?¶
如果您需要多个转换/数据集,可以使用 for 循环创建它们:
from transforms.api import transforms_df, Input, Output
def transform_generator(sources):
#type: (List[str]) -> List([transforms.api.Transform])
transforms = []
# 此示例使用了多个输入数据集。您也可以从单个输入数据集生成多个输出。
for source in sources:
@transforms_df(
Output('/sources/{source}/output'.format(source=source)),
my_input=Input('/sources/{source}/input'.format(source=source))
)
def compute_function(my_input, source=source):
# 要捕获函数中的 source 变量,将其作为默认关键字参数传递。
return my_input.filter(my_input.source == source)
transforms.append(compute_function)
return transforms
TRANSFORMS = transforms_generator(['src1', 'src2', 'src3'])
现在您可以导入模块的 TRANSFORMS 属性,并手动将每个转换添加到管道中:
import my_module
my_pipeline = Pipeline()
my_pipeline.add_transforms(*my_module.TRANSFORMS)
如果您希望单个转换接收一个输入并在同一次构建中输出多个数据集,也可以按如下方式以编程方式实现:
# 使用 `/examples/students_hair_eye_color` 数据集
students_input = foundry.input('/examples/students_hair_eye_color')
students_input.dataframe().sort('id').show(n=3)
+---+-----+-----+----+
| id| hair| eye| sex|
+---+-----+-----+----+
| 1|Black|Brown|Male|
| 2|Brown|Brown|Male|
| 3| Red|Brown|Male|
+---+-----+-----+----+
请注意,此示例仅显示前三行。
from transforms.api import transform, Input, Output
@transform(
hair_eye_color=Input('/examples/students_hair_eye_color'),
males=Output('/examples/hair_eye_color_males'),
females=Output('examples/hair_eye_color_females'),
)
def brown_hair_by_sex(hair_eye_color, males, females):
# type: (TransformInput, TransformOutput, TransformOutput) -> None
brown_hair_df = hair_eye_color.dataframe().filter(hair_eye_color.dataframe().hair == 'Brown')
males.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Male'))
females.write_dataframe(brown_hair_df.filter(brown_hair_df.sex == 'Female'))
有关转换的更多帮助和信息,请查看以下文档:
如何使用转换打开 GZIP 文件?¶
由于转换的输入是一个由流支持的文件类对象,您可以将其作为文件处理。这意味着您无需担心将整个文件读入内存或将其复制到磁盘上,从而可以使用更大的文件。
使用 Python 3 中包含的 gzip 和 io 包:
import gzip, io
def process_file(file_stauts):
fs = input_dataset.filesystem()
with fs.open(file_status.path, 'rb') as f:
gz = gzip.GzipFile(fileobj=f)
br = io.BufferedReader(gz)
如果您希望读取返回字符串,可以将其包装:
tw = io.TextIOWrapper(br)
如果您的文件有编码,可以指定它:
tw = io.TextIOWrapper(br, encoding='CP500')
有关转换的更多帮助和信息,请查看以下文档:
如何在 Foundry 管道中解压文件?如何并行解压?¶
以下使用 Java 和 Spark 以并行方式解压 zip 归档中的每个文件。如果您希望在单个压缩文件内并行化解压缩,请使用可分割的文件格式,如 .bz2。
package com.palantir.transforms.java.examples;
import com.google.common.io.ByteStreams;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.ReadOnlyLogicalFileSystem;
import com.palantir.transforms.lang.java.api.WriteOnlyLogicalFileSystem;
import com.palantir.util.syntacticpath.Paths;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/**
* 这是一个使用 Spark 并行解压文件的示例。
* <p>
* 工作被分发到执行器。
*/
public final class UnzipWithSpark {
@Compute
public void compute(FoundryInput zipFiles, FoundryOutput output) throws IOException {
ReadOnlyLogicalFileSystem inputFileSystem = zipFiles.asFiles().getFileSystem();
WriteOnlyLogicalFileSystem outputFileSystem = output.getFileSystem();
inputFileSystem.filesAsDataset().foreach(portableFile -> {
// "processWith" 为您提供给定输入文件的 InputStream。
portableFile.processWithThenClose(stream -> {
try (ZipInputStream zis = new ZipInputStream(stream)) {
ZipEntry entry;
// 对于 zip 文件中的每个文件,将其写入输出文件系统。
while ((entry = zis.getNextEntry()) != null) {
outputFileSystem.writeTo(
Paths.get(entry.getName()),
outputStream -> ByteStreams.copy(zis, outputStream));
}
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
});
}
}
有关转换的更多帮助和信息,请查看以下文档: