跳转至

Examples(示例)

High-Level Transforms

It’s common for data transformations in Java to read, process, and write DataFrame objects. Recall that in the Java API, a DataFrame is represented by a Dataset<Row>. If your data transformation depends on DataFrame objects, you can define a high-level Transform. A high-level Transform accepts inputs of type Dataset<Row> and expects the compute function to return a single output of type Dataset<Row>. Alternatively, you can define a more general low-level Transform and explicitly call the asDataFrame() method to access a Dataset<Row> containing your input dataset.

To define a high-level Transform, you define a compute function that takes in any number of inputs of type Dataset<Row> and returns a single output of type Dataset<Row>.

Automatic registration

Here is an example for how to define a Transform by creating a class called HighLevelAutoTransform in the myproject.datasets package:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example high-level Transform intended for automatic registration.
*/
public final class HighLevelAutoTransform {

   // The class for an automatically registered Transform contains the compute
   // function and information about the input/output datasets.
   // Automatic registration requires "@Input" and "@Output" annotations.
   @Compute
   @Output("/path/to/output/dataset")
   public Dataset<Row> myComputeFunction(@Input("/path/to/input/dataset") Dataset<Row> myInput) {
       // The compute function for a high-level Transform returns an output of type "Dataset<Row>".
       return myInput.limit(10);
   }
}

High-level Transforms support multiple inputs and a single output. Thus, each input parameter must be annotated with @Input (which contains the full path to your input dataset), and the compute function must be annotated with @Output (which contains the full path to your output dataset).

Now, you can add this Transform to your project’s Pipeline by calling the Pipeline.autoBindFromPackage() method in your PipelineDefiner implementation:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

  @Override
  public void define(Pipeline pipeline) {
    // Provide the Java package containing any Transforms you want to
    // automatically register.
    pipeline.autoBindFromPackage("myproject.datasets");
  }
}

Manual registration

Here is an example for how to define a Transform by creating a class called HighLevelManualFunction in the myproject.datasets package:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example compute function for a high-level Transform intended for manual registration.
*/
public final class HighLevelManualFunction {

   // The class for a manually registered Transform contains just the compute function.
   @Compute
   public Dataset<Row> myComputeFunction(Dataset<Row> myInput) {
       // The compute function for a high-level Transform returns an output of type "Dataset<Row>".
       return myInput.limit(10);
   }
}

Now, in your PipelineDefiner implementation, you finish defining your Transform using HighLevelTransform.builder(), and you add this Transform to your project’s Pipeline by calling Pipeline.register():

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

    @Override
    public void define(Pipeline pipeline) {
        // This is a sample manual registration for a high-level Transform.
        HighLevelTransform highLevelManualTransform = HighLevelTransform.builder()
                // Pass in the compute function to use. Here, "HighLevelManualFunction" corresponds
                // to the class name for a compute function for a high-level Transform.
                .computeFunctionInstance(new HighLevelManualFunction())
                // Pass in the input dataset(s) to use.
                // "myInput" corresponds to an input parameter for your compute function.
                .putParameterToInputAlias("myInput", "/path/to/input/dataset")
                // Pass in the output dataset to use.
                .returnedAlias("/path/to/output/dataset")
                .build();
        pipeline.register(highLevelManualTransform);
    }
}

High-level Transforms support multiple inputs and a single output. Each input dataset for your compute function should be provided using putParameterToInputAlias()—this method requires an input name corresponding to a parameter for your compute function followed by the full path to your input dataset. For instance, in the example above, “myInput” is an input parameter name in my_compute_function(). Provide the full path to your output dataset using returnedAlias().

Low-level transforms

A low-level Transform can be used if you’re writing data transformations that depend on DataFrame objects or files.

Automatic registration

Let’s say you’re using automatic registration. Here is an example for how to create a Transform object by defining a class called LowLevelAutoTransform in the myproject.datasets package:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example low-level Transform intended for automatic registration.
*/
public final class LowLevelAutoTransform {

   // The class for an automatically registered Transform contains the compute
   // function and information about the input/output datasets.
   // Automatic registration requires "@Input" and "@Output" annotations.
   @Compute
   public void myComputeFunction(
           @Input("/path/to/input/dataset") FoundryInput myInput,
           @Output("/path/to/output/dataset") FoundryOutput myOutput) {
       Dataset<Row> limited = myInput.asDataFrame().read().limit(10);
       // The compute function for a low-level Transform writes to the output dataset(s),
       // instead of returning the output(s).
       myOutput.getDataFrameWriter(limited).write();
   }
}

Low-level Transforms support a multiple input and output datasets. Thus, each input parameter must be annotated with @Input (which contains the full path to your input dataset), and each output parameter must be annotated with @Output (which contains the full path to your output dataset).

Now, you can add this Transform to your project’s Pipeline by calling the Pipeline.autoBindFromPackage() method in your PipelineDefiner implementation:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

  @Override
  public void define(Pipeline pipeline) {
    // Provide the Java package containing any Transforms you want to
    // automatically register.
    pipeline.autoBindFromPackage("myproject.datasets");
  }
}

Manual registration

Now, let’s say you’re using manual registration. In this case, you would define a class that contains just your compute function. Here is an example for how to define a class called LowLevelManualFunction in the myproject.datasets package:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example compute function for a low-level Transform intended for manual registration.
*/
public final class LowLevelManualFunction {

    // The class for a manually registered Transform contains just the compute function.
    @Compute
    public void myComputeFunction(FoundryInput myInput, FoundryOutput myOutput) {
        Dataset<Row> limited = myInput.asDataFrame().read().limit(10);
        // The compute function for a low-level Transform writes to the output dataset(s),
        // instead of returning the output(s).
        myOutput.getDataFrameWriter(limited).write();
    }
}

Now, in your PipelineDefiner implementation, you create your actual Transform object using LowLevelTransform.builder(), and you add this Transform to your project’s Pipeline by calling Pipeline.register():

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

    @Override
    public void define(Pipeline pipeline) {
        // This is a sample manual registration for a low-level Transform.
        LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
                // Pass in the compute function to use. Here, "LowLevelManualFunction" corresponds
                // to the class name for a compute function for a low-level Transform.
                .computeFunctionInstance(new LowLevelManualFunction())
                // Pass in the input dataset(s) to use.
                // "myInput" corresponds to an input parameter for your compute function.
                .putParameterToInputAlias("myInput", "/path/to/input/dataset")
                // Pass in the output dataset(s) to use.
                // "myOutput" corresponds to an input parameter for your compute function.
                .putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
                .build();
        pipeline.register(lowLevelManualTransform);
    }
}

Low-level Transforms support multiple input and output datasets. Each input dataset for your compute function should be provided using putParameterToInputAlias(), and each output dataset should be provided using putParameterToOutputAlias(). These methods require an input/output name corresponding to a parameter for your compute function as well as the full path to your input/output dataset. For instance, in the example above, “myInput” and “myOutput” are input parameter names in my_compute_function(). Recall that the compute function for a low-level Transform writes to output datasets and does not return a value. This is why your input/output datasets are passed in as parameters to the compute function.


中文翻译

示例

高级转换

在 Java 中,数据转换通常会读取、处理并写入 DataFrame 对象。回顾一下,在 Java API 中,DataFrameDataset<Row> 表示。如果您的数据转换依赖于 DataFrame 对象,您可以定义一个高级转换(Transform)。高级 Transform 接受 Dataset<Row> 类型的输入,并要求计算函数(compute function)返回一个 Dataset<Row> 类型的单一输出。或者,您可以定义一个更通用的低级 Transform,并显式调用 asDataFrame() 方法来访问包含输入数据集的 Dataset<Row>

要定义高级 Transform,您需要定义一个计算函数,该函数接受任意数量的 Dataset<Row> 类型输入,并返回一个 Dataset<Row> 类型的单一输出。

自动注册

以下示例展示了如何通过 myproject.datasets 包中创建一个名为 HighLevelAutoTransform 的类来定义 Transform

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example high-level Transform intended for automatic registration.
*/
public final class HighLevelAutoTransform {

   // The class for an automatically registered Transform contains the compute
   // function and information about the input/output datasets.
   // Automatic registration requires "@Input" and "@Output" annotations.
   @Compute
   @Output("/path/to/output/dataset")
   public Dataset<Row> myComputeFunction(@Input("/path/to/input/dataset") Dataset<Row> myInput) {
       // The compute function for a high-level Transform returns an output of type "Dataset<Row>".
       return myInput.limit(10);
   }
}

高级转换支持多个输入和单一输出。因此,每个输入参数都必须使用 @Input 进行注解(其中包含输入数据集的完整路径),并且计算函数必须使用 @Output 进行注解(其中包含输出数据集的完整路径)。

现在,您可以在 PipelineDefiner 实现中调用 Pipeline.autoBindFromPackage() 方法,将此 Transform 添加到项目的管道(Pipeline)中:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

  @Override
  public void define(Pipeline pipeline) {
    // Provide the Java package containing any Transforms you want to
    // automatically register.
    pipeline.autoBindFromPackage("myproject.datasets");
  }
}

手动注册

以下示例展示了如何通过在 myproject.datasets 包中创建一个名为 HighLevelManualFunction 的类来定义 Transform

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example compute function for a high-level Transform intended for manual registration.
*/
public final class HighLevelManualFunction {

   // The class for a manually registered Transform contains just the compute function.
   @Compute
   public Dataset<Row> myComputeFunction(Dataset<Row> myInput) {
       // The compute function for a high-level Transform returns an output of type "Dataset<Row>".
       return myInput.limit(10);
   }
}

现在,在您的 PipelineDefiner 实现中,使用 HighLevelTransform.builder() 完成 Transform 的定义,并通过调用 Pipeline.register() 将此 Transform 添加到项目的 Pipeline 中:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

    @Override
    public void define(Pipeline pipeline) {
        // This is a sample manual registration for a high-level Transform.
        HighLevelTransform highLevelManualTransform = HighLevelTransform.builder()
                // Pass in the compute function to use. Here, "HighLevelManualFunction" corresponds
                // to the class name for a compute function for a high-level Transform.
                .computeFunctionInstance(new HighLevelManualFunction())
                // Pass in the input dataset(s) to use.
                // "myInput" corresponds to an input parameter for your compute function.
                .putParameterToInputAlias("myInput", "/path/to/input/dataset")
                // Pass in the output dataset to use.
                .returnedAlias("/path/to/output/dataset")
                .build();
        pipeline.register(highLevelManualTransform);
    }
}

高级转换支持多个输入和单一输出。应使用 putParameterToInputAlias() 提供计算函数的每个输入数据集——此方法需要一个与计算函数参数对应的输入名称,后跟输入数据集的完整路径。例如,在上面的示例中,“myInput”是 my_compute_function() 中的输入参数名称。使用 returnedAlias() 提供输出数据集的完整路径。

低级转换

如果您编写的数据转换依赖于 DataFrame 对象或文件,则可以使用低级 Transform

自动注册

假设您使用的是自动注册。以下示例展示了如何通过在 myproject.datasets 包中定义一个名为 LowLevelAutoTransform 的类来创建 Transform 对象:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import com.palantir.transforms.lang.java.api.Input;
import com.palantir.transforms.lang.java.api.Output;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example low-level Transform intended for automatic registration.
*/
public final class LowLevelAutoTransform {

   // The class for an automatically registered Transform contains the compute
   // function and information about the input/output datasets.
   // Automatic registration requires "@Input" and "@Output" annotations.
   @Compute
   public void myComputeFunction(
           @Input("/path/to/input/dataset") FoundryInput myInput,
           @Output("/path/to/output/dataset") FoundryOutput myOutput) {
       Dataset<Row> limited = myInput.asDataFrame().read().limit(10);
       // The compute function for a low-level Transform writes to the output dataset(s),
       // instead of returning the output(s).
       myOutput.getDataFrameWriter(limited).write();
   }
}

低级转换支持多个输入和输出数据集。因此,每个输入参数都必须使用 @Input 进行注解(其中包含输入数据集的完整路径),每个输出参数都必须使用 @Output 进行注解(其中包含输出数据集的完整路径)。

现在,您可以在 PipelineDefiner 实现中调用 Pipeline.autoBindFromPackage() 方法,将此 Transform 添加到项目的 Pipeline 中:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

  @Override
  public void define(Pipeline pipeline) {
    // Provide the Java package containing any Transforms you want to
    // automatically register.
    pipeline.autoBindFromPackage("myproject.datasets");
  }
}

手动注册

现在,假设您使用的是手动注册。在这种情况下,您需要定义一个仅包含计算函数的类。以下示例展示了如何在 myproject.datasets 包中定义一个名为 LowLevelManualFunction 的类:

package myproject.datasets;

import com.palantir.transforms.lang.java.api.Compute;
import com.palantir.transforms.lang.java.api.FoundryInput;
import com.palantir.transforms.lang.java.api.FoundryOutput;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* This is an example compute function for a low-level Transform intended for manual registration.
*/
public final class LowLevelManualFunction {

    // The class for a manually registered Transform contains just the compute function.
    @Compute
    public void myComputeFunction(FoundryInput myInput, FoundryOutput myOutput) {
        Dataset<Row> limited = myInput.asDataFrame().read().limit(10);
        // The compute function for a low-level Transform writes to the output dataset(s),
        // instead of returning the output(s).
        myOutput.getDataFrameWriter(limited).write();
    }
}

现在,在您的 PipelineDefiner 实现中,使用 LowLevelTransform.builder() 创建实际的 Transform 对象,并通过调用 Pipeline.register() 将此 Transform 添加到项目的 Pipeline 中:

package myproject;

import com.palantir.transforms.lang.java.api.Pipeline;
import com.palantir.transforms.lang.java.api.PipelineDefiner;

public final class MyPipelineDefiner implements PipelineDefiner {

    @Override
    public void define(Pipeline pipeline) {
        // This is a sample manual registration for a low-level Transform.
        LowLevelTransform lowLevelManualTransform = LowLevelTransform.builder()
                // Pass in the compute function to use. Here, "LowLevelManualFunction" corresponds
                // to the class name for a compute function for a low-level Transform.
                .computeFunctionInstance(new LowLevelManualFunction())
                // Pass in the input dataset(s) to use.
                // "myInput" corresponds to an input parameter for your compute function.
                .putParameterToInputAlias("myInput", "/path/to/input/dataset")
                // Pass in the output dataset(s) to use.
                // "myOutput" corresponds to an input parameter for your compute function.
                .putParameterToOutputAlias("myOutput", "/path/to/output/dataset")
                .build();
        pipeline.register(lowLevelManualTransform);
    }
}

低级转换支持多个输入和输出数据集。应使用 putParameterToInputAlias() 提供计算函数的每个输入数据集,并使用 putParameterToOutputAlias() 提供每个输出数据集。这些方法需要与计算函数参数对应的输入/输出名称,以及输入/输出数据集的完整路径。例如,在上面的示例中,“myInput”和“myOutput”是 my_compute_function() 中的输入参数名称。回顾一下,低级转换的计算函数会写入输出数据集而不返回值。这就是为什么将输入/输出数据集作为参数传递给计算函数的原因。