Transforms and pipelines(转换与管道(Transforms and pipelines))¶
:::callout{theme="neutral"}
All transformations currently default to transaction type SNAPSHOT.
:::
A Transform is a description of how to compute a dataset. It describes the following:
- The input and output datasets,
- The code used to transform the input datasets into the output dataset (we’ll refer to this as the compute function), and
- Any additional configuration (such as the custom Transforms profiles to use at runtime).
The input and output datasets, as well as the transformation code, are specified in a Transform object and then registered to a Pipeline. How you can define a Transform depends on two factors:
- Whether you’re defining a high-level or low-level Transform, and
- Whether you’re using automatic or manual registration to add your Transform to your project’s Pipeline.
Transform type¶
:::callout{theme="success" title="Tip"}
Data transformations can be expressed in terms of DataFrame objects as well as files. These DataFrame objects just refer to regular Spark DataFrames. In the Spark Scala/Java API, a DataFrame is represented by a Dataset. Thus, as a user, you directly interact with Dataset objects in your data transformation code.\
For more information about working with Spark, you can refer to the Java API for Spark documentation ↗ that’s available online.
:::
For transformations that rely on DataFrame objects, you can:
- Define a high-level Transform, which supports input and output of type
Dataset<Row>, or - Define a low-level Transform and explicitly call a method to access the
Dataset<Row>containing your input dataset.
For transformations that rely on files, you must define a low-level Transform and then access files within your datasets.
Here is a summary of the key differences between the two types of Transforms:
| Description | High-Level Transform | Low-Level Transform |
|---|---|---|
Allows for data transformations that depend on DataFrame objects |
✓ * | ✓ |
| Allows for data transformations that depend on access to files | ✓ | |
| Supports multiple input datasets | ✓ | ✓ |
| Supports multiple output datasets | ✓ | |
Compute function must return DataFrame value |
✓ | |
| Compute function writes to output, instead of returning a value | ✓ |
* We recommend using high-level Transforms for data transformations that depend on DataFrame objects.
For both Transform types, you need to create a class that contains your compute function. Within this class, your compute function must be a public, non-static method that’s annotated with @Compute. Without this annotation, your data transformation code will not get correctly registered.
Registration Type¶
Each Transforms Java subproject within a repository exposes a single Pipeline object. This Pipeline object is used to:
- Register datasets in Foundry with instructions for how to build them, and
- Locate and execute the
Transformobject responsible for building a given dataset during a Foundry build.
Entry Point¶
The runtime responsible for executing a Java transformation needs to be able to find the project’s Pipeline. Note that Transforms Java uses the standard Java facility for service loading ↗.
In order to define a Pipeline object that is associated with your project, you must implement a PipelineDefiner object. In this PipelineDefiner object, you can add Transforms to your project’s Pipeline. Specifically, it’s required that each Java subproject implements a single PipelineDefiner object:
package myproject;
import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;
public final class MyPipelineDefiner implements PipelineDefiner {
@Override
public void define(Pipeline pipeline) {
// Code here to add Transforms to your project's Pipeline using
// automatic or manual registration.
}
}
Once you create Java package and implement a PipelineDefiner object, you must update resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner to point to your PipelineDefiner implementation:
// Replace this with the class name for your "PipelineDefiner" implementation.
// Since each Java subproject implements a single "PipelineDefiner", this file
// can only contain a single entry.
myproject.MyPipelineDefiner
MyPipelineDefiner refers to the class name for your PipelineDefiner implementation.
Adding transforms to a pipeline¶
Once a Transform associated with your project’s Pipeline declares a dataset as an output, you can build this dataset in Foundry. The two recommended ways to add Transform objects to a Pipeline are manual registration and automatic registration.
:::callout{theme="success" title="Tip"}
If you have a more advanced workflow and/or want to explicitly add each Transform object to your project’s Pipeline, you can use manual registration. For instance, it’s useful to use manual registration if you want to meta-programmatically apply the same data transformation logic to multiple input and output dataset combinations.
Otherwise, it’s highly recommended to use automatic registration to ensure that your registration code is concise and contained. With automatic registration, the Pipeline.autoBindFromPackage() discovers any Transform definitions in a package (provided that these objects have the required @Input and @Output annotations).
:::
Automatic registration¶
As the complexity of a project grows, manually adding Transform objects to a Pipeline can become unwieldy. Thus, the Pipeline object provides the autoBindFromPackage() method to discover all Transform objects within a Java package. To use automatic registration, you must do the following:
- Define a class corresponding to your
Transform. With automatic registration, you define a class that contains information about your input and output datasets as well as your compute function. - Add the sufficient
@Inputand@Outputannotations. - Call the
Pipeline.autoBindFromPackage()method to register anyTransformdefinitions in your provided Java package. The autoBindFromPackage() method will only register Transform definitions in that have the required annotations. Any Transforms that do not have the required annotations will not be added to your project’sPipeline, even if these Transforms are in the Java package you provide to theautoBindFromPackage()method.
Manual Registration¶
Transform objects can manually be added to a Pipeline using the Pipeline.register() method. Each call to this method can register one Transform. In order to use manual registration with Transforms, you must do the following:
- Define a class containing the compute function for your
Transformobject. Unlike automatic registration, with manual registration, you provide information about your input and output datasets within your PipelineDefiner implementation - Use the
HighLevelTransform.builder()or theLowLevelTransform.builder()to specify which compute function to use as well as provide your input and output datasets. - Call the
Pipeline.register()method to explicitly add yourTransformdefinitions to your project’s Pipeline.
:::callout{theme="warning" title="Warning"}
Note that use of annotations such as @StopProgagating and @StopRequiring are only supported for automatically registered Java transforms.
:::
Transform Context¶
There may be cases when a data transformation depends on things other
than its input datasets. For instance, a transformation may be required
to access the current Spark session or access transforms parameters in
the jobSpec. In such cases, you can inject a TransformContext object
into the transformation. To do this, your compute function must accept a
parameter of type TransformContext. TransformContext contains the
Transforms authHeader, Spark session, transform parameters and a
ServiceDiscovery object. ServiceDiscovery class exposes service URIs
of discovered Foundry services.
package myproject.datasets;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import com.palantir.transforms.lang.java.api.TransformContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* This is an example high-level Transform that accesses the TransformContext
*/
@Compute
public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) {
int limit = (int) context.parameters().get("limit");
return myInput.limit(limit);
}
package myproject.datasets;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import com.palantir.transforms.lang.java.api.TransformContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* This is an example low-level Transform that accesses the TransformContext
*/
@Compute
public void compute(FoundryInput input, FoundryOutput output, TransformContext context) {
int limit = (int) context.parameters().get("limit");
output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write();
}
中文翻译¶
转换与管道(Transforms and pipelines)¶
:::callout{theme="neutral"}
目前所有转换默认使用SNAPSHOT事务类型。
:::
Transform(转换)是对如何计算数据集的一种描述。它描述了以下内容:
- 输入和输出数据集,
- 用于将输入数据集转换为输出数据集的代码(我们称之为计算函数),以及
- 任何额外的配置(例如运行时使用的自定义转换配置文件)。
输入和输出数据集以及转换代码在Transform对象中指定,然后注册到Pipeline(管道)中。定义Transform的方式取决于两个因素:
- 您定义的是高级转换还是低级转换,以及
- 您使用的是自动注册还是手动注册来将转换添加到项目的管道中。
转换类型(Transform type)¶
:::callout{theme="success" title="提示"}
数据转换可以基于DataFrame对象或文件来表达。这些DataFrame对象仅指常规的Spark DataFrame。在Spark Scala/Java API中,DataFrame由Dataset表示。因此,作为用户,您在数据转换代码中直接与Dataset对象交互。\
有关使用Spark的更多信息,您可以参考在线提供的Java API for Spark文档 ↗。
:::
对于依赖DataFrame对象的转换,您可以:
- 定义高级转换(High-Level Transform),支持
Dataset<Row>类型的输入和输出,或 - 定义低级转换(Low-Level Transform),并显式调用方法来访问包含输入数据集的
Dataset<Row>。
对于依赖文件的转换,您必须定义低级转换,然后在数据集中访问文件。
以下是两种转换类型之间主要差异的总结:
| 描述 | 高级转换(High-Level Transform) | 低级转换(Low-Level Transform) |
|---|---|---|
支持依赖DataFrame对象的数据转换 |
✓ * | ✓ |
| 支持依赖文件访问的数据转换 | ✓ | |
| 支持多个输入数据集 | ✓ | ✓ |
| 支持多个输出数据集 | ✓ | |
计算函数必须返回DataFrame值 |
✓ | |
| 计算函数写入输出而非返回值 | ✓ |
* 对于依赖DataFrame对象的数据转换,我们建议使用高级转换。
对于两种Transform类型,您都需要创建一个包含计算函数的类。在此类中,您的计算函数必须是一个使用@Compute注解的公共非静态方法。没有此注解,您的数据转换代码将无法正确注册。
注册类型(Registration Type)¶
存储库中的每个Transforms Java子项目都暴露一个单一的Pipeline对象。此Pipeline对象用于:
- 在Foundry中注册数据集并提供构建说明,以及
- 在Foundry构建期间定位并执行负责构建给定数据集的
Transform对象。
入口点(Entry Point)¶
负责执行Java转换的运行时需要能够找到项目的Pipeline。请注意,Transforms Java使用标准的Java服务加载机制 ↗。
为了定义与项目关联的Pipeline对象,您必须实现一个PipelineDefiner对象。在此PipelineDefiner对象中,您可以将转换添加到项目的管道中。具体来说,每个Java子项目都需要实现一个单一的PipelineDefiner对象:
package myproject;
import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;
public final class MyPipelineDefiner implements PipelineDefiner {
@Override
public void define(Pipeline pipeline) {
// 在此处添加代码,使用自动或手动注册将转换添加到项目的管道中
}
}
创建Java包并实现PipelineDefiner对象后,您必须更新resources/META-INF/services/com.palantir.transforms.lang.java.api.PipelineDefiner,使其指向您的PipelineDefiner实现:
// 将此替换为您的"PipelineDefiner"实现的类名。
// 由于每个Java子项目实现一个单一的"PipelineDefiner",此文件
// 只能包含一个条目。
myproject.MyPipelineDefiner
MyPipelineDefiner指的是您的PipelineDefiner实现的类名。
向管道添加转换(Adding transforms to a pipeline)¶
一旦与项目管道关联的Transform将数据集声明为输出,您就可以在Foundry中构建此数据集。向Pipeline添加Transform对象的两种推荐方式是手动注册和自动注册。
:::callout{theme="success" title="提示"}
如果您有更高级的工作流,和/或希望显式地将每个Transform对象添加到项目的管道中,可以使用手动注册。例如,如果您想以元编程方式将相同的数据转换逻辑应用于多个输入和输出数据集组合,手动注册会很有用。
否则,强烈建议使用自动注册,以确保您的注册代码简洁且集中。使用自动注册时,Pipeline.autoBindFromPackage()方法会发现包中的任何Transform定义(前提是这些对象具有所需的@Input和@Output注解)。
:::
自动注册(Automatic registration)¶
随着项目复杂度的增长,手动将Transform对象添加到Pipeline可能会变得难以管理。因此,Pipeline对象提供了autoBindFromPackage()方法来发现Java包中的所有Transform对象。要使用自动注册,您必须执行以下操作:
- 定义一个与您的
Transform对应的类。使用自动注册时,您定义一个包含输入和输出数据集信息以及计算函数的类。 - 添加足够的
@Input和@Output注解。 - 调用
Pipeline.autoBindFromPackage()方法,以注册您提供的Java包中的任何Transform定义。autoBindFromPackage()方法只会注册具有所需注解的Transform定义。任何没有所需注解的转换都不会被添加到项目的Pipeline中,即使这些转换位于您提供给autoBindFromPackage()方法的Java包中。
手动注册(Manual Registration)¶
可以使用Pipeline.register()方法手动将Transform对象添加到Pipeline中。每次调用此方法可以注册一个Transform。要使用手动注册与转换,您必须执行以下操作:
- 定义一个包含
Transform对象计算函数的类。与自动注册不同,使用手动注册时,您在PipelineDefiner实现中提供输入和输出数据集的信息。 - 使用
HighLevelTransform.builder()或LowLevelTransform.builder()来指定要使用的计算函数,并提供输入和输出数据集。 - 调用
Pipeline.register()方法,将您的Transform定义显式添加到项目的管道中。
:::callout{theme="warning" title="警告"}
请注意,@StopProgagating和@StopRequiring等注解仅支持用于自动注册的Java转换。
:::
转换上下文(Transform Context)¶
在某些情况下,数据转换可能依赖于输入数据集之外的其他因素。例如,转换可能需要访问当前的Spark会话或访问jobSpec中的转换参数。在这种情况下,您可以将TransformContext对象注入到转换中。为此,您的计算函数必须接受一个TransformContext类型的参数。TransformContext包含转换的authHeader、Spark会话、转换参数以及一个ServiceDiscovery对象。ServiceDiscovery类暴露了已发现的Foundry服务的服务URI。
package myproject.datasets;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import com.palantir.transforms.lang.java.api.TransformContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* 这是一个访问TransformContext的高级转换示例
*/
@Compute
public Dataset<Row> myComputeFunction(Dataset<Row> myInput, TransformContext context) {
int limit = (int) context.parameters().get("limit");
return myInput.limit(limit);
}
package myproject.datasets;
import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import com.palantir.transforms.lang.java.api.TransformContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* 这是一个访问TransformContext的低级转换示例
*/
@Compute
public void compute(FoundryInput input, FoundryOutput output, TransformContext context) {
int limit = (int) context.parameters().get("limit");
output.getDataFrameWriter(input.asDataFrame().read().limit(limit)).write();
}