跳转至

Getting started(入门指南)

:::callout{theme="success" title="Tip"} The instructions below will guide you through a simple Python data transformation. If you are just getting started with data transformation, consider going through the batch pipeline tutorial for Pipeline Builder or Code Repositories first. :::

This tutorial walks through how you can use Python transforms to transform a spreadsheet of recent meteorite discoveries to a usable dataset ready for analysis.

About the dataset

This tutorial uses data from NASA’s Open Data Portal ↗. You can follow along on your own code repository with this sample dataset:

Download meteorite_landings

This dataset contains data about meteorites that have been found on Earth. Note that the data has been cleaned to make it easier to work with.

The dataset includes name, mass, classification, and other identifying information for each meteorite, along with the year it was discovered and coordinates of where it was found. It is good practice to open the CSV to review the data before uploading it into Foundry.

Set up a Python code repository

Get started by creating a Python code repository.

  1. Navigate to a project, and select + New > Code repository.
  2. In the Repository type section, select Pipelines.
  3. Select Python as the Language.
  4. Choose to Initialize repository.

Optional: Use a local Python repository

Alternatively, you can copy your local Python repository into Code Repositories with the following steps:

  1. Create a new Python code repository, as described above.

  2. On your local repository, remove your previous Git origin (if you cloned it from GitHub, for example): git remote remove origin

  3. Add your code repository's Git remote URL: git remote add origin <repository_url>

You can find your code repository URL in the top right corner of the GitHub interface. Select the green Clone button, then copy the Git remote URL. Confirm this by running git remote -v to return the code repository URL.

  1. Merge the current master branch (or another branch of your choosing) in Code Repositories into your local branch: git merge master

If an error about refusing to merge unrelated histories occurs, run the command: git merge master --allow-unrelated-histories. This will remove the current Git history associated with your previous remote GitHub repository.

This merge will bring essential files to your local repository that are required to make commits and changes in Code Repositories.

  1. Create a new branch and name it (testbranch, for example): git checkout -b testbranch.

  2. Make your changes and commit them to your branch.

  3. Perform git push, and confirm that the new branch appears in the Code Repositories interface. Verify that checks are successful.

Learn more about local development in Code Repositories.

Write a Python data transformation

Navigate to your Python transforms repository. The default examples.py file contains example code to help you get started. Note that you can choose between pandas, Polars, and Spark compute engines. For more information on choosing a compute engine, refer to the compute engine comparison documentation. Start by creating a new file in src/myproject/datasets, and call it meteor_analysis.py to organize your analysis. Make sure you import the required functions and classes. Define a transformation that takes your meteor_landings dataset as input and creates meteor_landings_cleaned as its output:

```python tab="Polars" from transforms.api import transform, Input, Output

@transform.using( # Replace this with your output dataset path output=Output("/Users/jsmith/meteorite_landings_cleaned"), # Replace this with your input dataset path meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.polars() # Your data transformation logic output.write_table(df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output


@transform.using(
    # Replace this with your output dataset path
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    # Replace this with your input dataset path
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(ctx, output, meteorite_landings):
    conn = ctx.duckdb().conn
    # Your data transformation logic using SQL
    query = conn.sql("SELECT * FROM meteorite_landings")
    output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( # Replace this with your output dataset path output=Output("/Users/jsmith/meteorite_landings_cleaned"), # Replace this with your input dataset path meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.pandas() # Your data transformation logic output.write_table(df)

```python tab="PySpark"
from transforms.api import transform, Input, Output


@transform.spark.using(
    # Replace this with your output dataset path
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    # Replace this with your input dataset path
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = meteorite_landings.dataframe()
    # Your data transformation logic
    output.write_dataframe(df)

Now, suppose you want to filter your input dataset down to any “Valid” meteors that happened after the year 1950. Update your data transformation logic to filter the meteorites by nametype and year:

```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

@transform.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = ( meteorite_landings.polars() .filter(pl.col("nametype") == "Valid") .filter(pl.col("year") >= 1950) ) output.write_table(df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output

@transform.using(
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(ctx, output, meteorite_landings):
    conn = ctx.duckdb().conn
    query = conn.sql("""
        SELECT *
        FROM meteorite_landings
        WHERE nametype = 'Valid' AND year >= 1950
    """)
    output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.pandas() df = df[ (df["nametype"] == "Valid") & (df["year"] >= 1950) ] output.write_table(df)

```python tab="PySpark"
from transforms.api import transform, Input, Output


@transform.spark.using(
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = (
        meteorite_landings.dataframe().filter(
            meteorite_landings.dataframe().nametype == 'Valid'
        ).filter(
            meteorite_landings.dataframe().year >= 1950
        )
    )
    output.write_dataframe(df)

Build your output dataset

To build your resulting dataset, commit your changes and select Build in the top right corner. For more information about building datasets in Code Repositories, review the Create a simple batch pipeline tutorial.

Add to your data transformation

:::callout{theme="neutral"} With Python transforms, you can create multiple output datasets in a single Python file. :::

Let’s say you want to filter down even further to only meteors that were particularly large for their meteorite type. To do so, you will need to:

  1. Find the average mass for each meteorite type, and
  2. Compare each meteor’s mass to the average mass for its meteor type.

First, add a data transformation to meteor_analysis.py that finds the average mass for each meteorite type. This transformation takes your meteor_landings dataset as input and creates meteorite_stats as its output:

```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

@transform.using( # Output dataset name must be unique output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.polars() .groupby("class") .agg(pl.col("mass").mean().alias("avg_mass_per_class")) ) output.write_table(df)

```python tab="Pandas"
from transforms.api import transform, Input, Output


@transform.using(
    # Output dataset name must be unique
    output=Output("/Users/jsmith/meteorite_stats"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def stats(output, meteorite_landings):
    df = (
        meteorite_landings.pandas()
            .groupby("class", as_index=False)["mass"]
            .mean()
            .rename(columns={"mass": "avg_mass_per_class"})
    )
    output.write_table(df)

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( # Output dataset name must be unique output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.dataframe().groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") ) ) output.write_dataframe(df)

Next, create a data transformation that compares each meteor’s mass to the average mass for its meteor type. The information needed for this transformation is spread across the `meteorite_landings` and `meteorite_stats` tables that you created in this tutorial. You must join the two datasets and filter the resulting dataset to find meteorites that have a greater-than-average mass, as shown below:

```python tab="Polars"
import polars as pl
from transforms.api import transform, Input, Output


@transform.using(
    output=Output("/Users/jsmith/meteorite_enriched"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
    meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
)
def enriched(output, meteorite_landings, meteorite_stats):
    df_landings = meteorite_landings.polars()
    df_stats = meteorite_stats.polars()

    enriched = df_landings.join(df_stats, on="class", how="inner")
    enriched = enriched.with_columns(
        (pl.col("mass") > pl.col("avg_mass_per_class")).alias("greater_mass")
    )
    result = enriched.filter(pl.col("greater_mass"))
    output.write_table(result)

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.spark.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.pandas() df_stats = meteorite_stats.pandas()

enriched = df_landings.merge(df_stats, on="class", how="inner")
enriched["greater_mass"] = enriched["mass"] > enriched["avg_mass_per_class"]
result = enriched[enriched["greater_mass"]]
output.write_table(result)

python tab="PySpark" from transforms.api import transform, Input, Output @transform.spark.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.dataframe() df_stats = meteorite_stats.dataframe() enriched_together = df_landings.join(df_stats, "class") greater_mass=enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) result = greater_mass.filter("greater_mass") output.write_dataframe(result) ```

Now, you can further analyze the resulting meteorite_enriched dataset by exploring it in Contour.

Apply your data transformation to multiple inputs

So far, you created a dataset that contains meteorites of all types with a greater-than-average mass. The next step is to create separate datasets for each meteorite type. With Python transforms, you can use a for-loop to apply the same data transformation to each type of meteorite. For more information on applying the same data transformation to different inputs, refer to the section on transform generation.

Create a new file in src/myproject/datasets and name it meteor_class.py. Note that you can continue writing your data transformation code in the meteor_analysis.py file, but this tutorial uses a new file to separate the data transformation logic.

To create separate datasets for each meteorite type, filter the meteorite_enriched dataset by class. Then, define a transform_generator function that applies the same data transformation logic to each of the meteorite types you want to analyze:

```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

def transform_generator(sources): transforms = [] for source in sources: @transform.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): df = my_input.polars() result = df.filter(pl.col("class") == source) output.write_table(result) transforms.append(filter_by_source) return transforms

TRANSFORMS = transform_generator(["L6", "H5", "H4"])

```python tab="Pandas"
from transforms.api import transform, Input, Output


def transform_generator(sources):
    transforms = []
    for source in sources:
        @transform.using(
            output=Output(f'/Users/jsmith/meteorite_{source}'),
            my_input=Input('/Users/jsmith/meteorite_enriched')
        )
        def filter_by_source(output, my_input, source=source):
            df = my_input.pandas()
            result = df[df["class"] == source]
            output.write_table(result)
        transforms.append(filter_by_source)
    return transforms

TRANSFORMS = transform_generator(["L6", "H5", "H4"])

```python tab="PySpark" from transforms.api import transform, Input, Output

def transform_generator(sources): transforms = [] for source in sources: @transform.spark.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): result = my_input.dataframe().filter(my_input["class"] == source) output.write_table(result) transforms.append(filter_by_source) return transforms

TRANSFORMS = transform_generator(["L6", "H5", "H4"])

This will create a transformation that filters our meteorite dataset by class. Note that we must pass `source=source` into the `filter_by_source` function in order to capture the `source` parameter in the function’s scope.

:::callout{theme="success" title="Tip"}
For the initial data transformation created in the `meteor_analysis.py` file, you did not have to do any additional configuration to add Transforms to your project’s pipeline. This is because the default Python project structure uses automatic registration to discover all transform objects within your `datasets` folder.

To add this final transformation to your project’s pipeline using automatic registration, you must add the generated transforms to a variable as a list. In the example above, we used the variable `TRANSFORMS`. For more information about automatic registration and transforms generators, refer to the section on [transforms generation](https://palantir.com/docs/foundry/transforms-python/pipelines/#transform-generation) in the Python transforms documentation.
:::


---

# 中文翻译

# 入门指南

:::callout{theme="success" title="提示"}
以下说明将指导您完成一个简单的Python数据转换。如果您刚开始接触数据转换,建议先通过[Pipeline Builder](https://palantir.com/docs/foundry/building-pipelines/create-batch-pipeline-pb/)或[代码仓库(Code Repositories)](https://palantir.com/docs/foundry/building-pipelines/create-batch-pipeline-cr/)的批处理管道教程进行学习。
:::

本教程将演示如何使用Python转换(transforms)将一份近期陨石发现的电子表格转换为可供分析使用的可用数据集。

## 关于数据集

本教程使用来自[NASA开放数据门户 ↗](https://data.nasa.gov/)的数据。您可以在自己的代码仓库中使用以下示例数据集进行实践:

[`下载 meteorite_landings`](https://palantir.com/docs/resources/foundry/transforms-python/meteorite_landings.csv)

该数据集包含在地球上发现的陨石数据。请注意,数据已经过清理以便于使用。

数据集包含每颗陨石的名称、质量、分类和其他识别信息,以及发现年份和发现地点的坐标。在将数据上传到Foundry之前,建议先打开CSV文件查看数据。

## 设置Python代码仓库

首先创建一个Python代码仓库。

1. 导航至项目,选择 **+ 新建 > 代码仓库(Code repository)**。
2. 在 **仓库类型(Repository type)** 部分,选择 **管道(Pipelines)**。
3. 选择 **Python** 作为 **语言(Language)**。
4. 选择 **初始化仓库(Initialize repository)**。

### 可选:使用本地Python仓库

或者,您可以通过以下步骤将本地Python仓库复制到代码仓库中:

1. 按照上述说明创建一个新的Python代码仓库。

2. 在本地仓库中,移除之前的Git远程源(例如,如果您是从GitHub克隆的):`git remote remove origin`

3. 添加代码仓库的Git远程URL:`git remote add origin <repository_url>`

   您可以在GitHub界面右上角找到代码仓库URL。选择绿色的 **克隆(Clone)** 按钮,然后复制Git远程URL。通过运行 `git remote -v` 确认返回代码仓库URL。

4. 将代码仓库中的当前 `master` 分支(或您选择的其他分支)合并到本地分支:`git merge master`

   如果出现 `refusing to merge unrelated histories` 错误,请运行命令:`git merge master --allow-unrelated-histories`。这将移除与之前远程GitHub仓库关联的当前Git历史记录。

   此合并将为您的本地仓库带来在代码仓库中进行提交和更改所必需的文件。

5. 创建新分支并命名(例如 `testbranch`):`git checkout -b testbranch`。

6. 进行更改并提交到您的分支。

7. 执行 `git push`,确认新分支出现在代码仓库界面中。验证检查是否成功。

了解更多关于代码仓库中[本地开发(Local Development)](https://palantir.com/docs/foundry/transforms-python/local-development/)的信息。

## 编写Python数据转换

导航至您的Python转换仓库。默认的 `examples.py` 文件包含示例代码,可帮助您入门。请注意,您可以选择pandas、Polars和Spark计算引擎。有关选择计算引擎的更多信息,请参阅[计算引擎对比(Compute Engine Comparison)](https://palantir.com/docs/foundry/transforms-python/compute-engines/)文档。

首先在 `src/myproject/datasets` 中创建一个新文件,命名为 `meteor_analysis.py` 以组织您的分析。确保导入所需的函数和类。定义一个转换,将 `meteor_landings` 数据集作为输入,并创建 `meteor_landings_cleaned` 作为输出:

```python tab="Polars"
from transforms.api import transform, Input, Output


@transform.using(
    # 替换为您的输出数据集路径
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    # 替换为您的输入数据集路径
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = meteorite_landings.polars()
    # 您的数据转换逻辑
    output.write_table(df)

```python tab="DuckDB" from transforms.api import transform, Input, Output

@transform.using( # 替换为您的输出数据集路径 output=Output("/Users/jsmith/meteorite_landings_cleaned"), # 替换为您的输入数据集路径 meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(ctx, output, meteorite_landings): conn = ctx.duckdb().conn # 使用SQL的数据转换逻辑 query = conn.sql("SELECT * FROM meteorite_landings") output.write_table(query)

```python tab="Pandas"
from transforms.api import transform, Input, Output


@transform.using(
    # 替换为您的输出数据集路径
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    # 替换为您的输入数据集路径
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = meteorite_landings.pandas()
    # 您的数据转换逻辑
    output.write_table(df)

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( # 替换为您的输出数据集路径 output=Output("/Users/jsmith/meteorite_landings_cleaned"), # 替换为您的输入数据集路径 meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.dataframe() # 您的数据转换逻辑 output.write_dataframe(df)

现在,假设您想要将输入数据集过滤为1950年之后发现的"有效(Valid)"陨石。更新您的数据转换逻辑,按 `nametype` 和 `year` 过滤陨石:

```python tab="Polars"
import polars as pl
from transforms.api import transform, Input, Output


@transform.using(
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = (
        meteorite_landings.polars()
        .filter(pl.col("nametype") == "Valid")
        .filter(pl.col("year") >= 1950)
    )
    output.write_table(df)

```python tab="DuckDB" from transforms.api import transform, Input, Output

@transform.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(ctx, output, meteorite_landings): conn = ctx.duckdb().conn query = conn.sql(""" SELECT * FROM meteorite_landings WHERE nametype = 'Valid' AND year >= 1950 """) output.write_table(query)

```python tab="Pandas"
from transforms.api import transform, Input, Output


@transform.using(
    output=Output("/Users/jsmith/meteorite_landings_cleaned"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def clean(output, meteorite_landings):
    df = meteorite_landings.pandas()
    df = df[
        (df["nametype"] == "Valid") &
        (df["year"] >= 1950)
    ]
    output.write_table(df)

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = ( meteorite_landings.dataframe().filter( meteorite_landings.dataframe().nametype == 'Valid' ).filter( meteorite_landings.dataframe().year >= 1950 ) ) output.write_dataframe(df)

## 构建输出数据集

要构建结果数据集,请提交您的更改并选择右上角的 **构建(Build)**。有关在代码仓库中构建数据集的更多信息,请查看[创建简单批处理管道(Create a Simple Batch Pipeline)](https://palantir.com/docs/foundry/building-pipelines/create-batch-pipeline-cr/)教程。

## 扩展数据转换

:::callout{theme="neutral"}
使用Python转换,您可以在单个Python文件中创建多个输出数据集。
:::

假设您想要进一步过滤,只保留在其陨石类型中特别大的陨石。为此,您需要:

1. 找出每种陨石类型的平均质量,以及
2. 将每颗陨石的质量与其所属类型的平均质量进行比较。

首先,在 `meteor_analysis.py` 中添加一个数据转换,找出每种陨石类型的平均质量。此转换将 `meteor_landings` 数据集作为输入,并创建 `meteorite_stats` 作为输出:

```python tab="Polars"
import polars as pl
from transforms.api import transform, Input, Output


@transform.using(
    # 输出数据集名称必须唯一
    output=Output("/Users/jsmith/meteorite_stats"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def stats(output, meteorite_landings):
    df = (
        meteorite_landings.polars()
        .groupby("class")
        .agg(pl.col("mass").mean().alias("avg_mass_per_class"))
    )
    output.write_table(df)

```python tab="Pandas" from transforms.api import transform, Input, Output

@transform.using( # 输出数据集名称必须唯一 output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.pandas() .groupby("class", as_index=False)["mass"] .mean() .rename(columns={"mass": "avg_mass_per_class"}) ) output.write_table(df)

```python tab="PySpark"
from transforms.api import transform, Input, Output


@transform.spark.using(
    # 输出数据集名称必须唯一
    output=Output("/Users/jsmith/meteorite_stats"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
)
def stats(output, meteorite_landings):
    df = (
        meteorite_landings.dataframe().groupBy("class").agg(
            F.mean("mass").alias("avg_mass_per_class")
        )
    )
    output.write_dataframe(df)

接下来,创建一个数据转换,将每颗陨石的质量与其所属类型的平均质量进行比较。此转换所需的信息分布在您在本教程中创建的 meteorite_landingsmeteorite_stats 表中。您需要连接这两个数据集并过滤结果数据集,以找出质量大于平均值的陨石,如下所示:

```python tab="Polars" import polars as pl from transforms.api import transform, Input, Output

@transform.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.polars() df_stats = meteorite_stats.polars()

enriched = df_landings.join(df_stats, on="class", how="inner")
enriched = enriched.with_columns(
    (pl.col("mass") > pl.col("avg_mass_per_class")).alias("greater_mass")
)
result = enriched.filter(pl.col("greater_mass"))
output.write_table(result)

python tab="Pandas" from transforms.api import transform, Input, Output @transform.spark.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.pandas() df_stats = meteorite_stats.pandas() enriched = df_landings.merge(df_stats, on="class", how="inner") enriched["greater_mass"] = enriched["mass"] > enriched["avg_mass_per_class"] result = enriched[enriched["greater_mass"]] output.write_table(result) ```

```python tab="PySpark" from transforms.api import transform, Input, Output

@transform.spark.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.dataframe() df_stats = meteorite_stats.dataframe()

enriched_together = df_landings.join(df_stats, "class")
greater_mass=enriched_together.withColumn(
    'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class)
)
result = greater_mass.filter("greater_mass")
output.write_dataframe(result)

`` 现在,您可以在Contour中探索生成的meteorite_enriched` 数据集,进行进一步分析。

将数据转换应用于多个输入

到目前为止,您创建了一个包含所有类型中质量大于平均值的陨石的数据集。下一步是为每种陨石类型创建单独的数据集。使用Python转换,您可以使用for循环将相同的数据转换应用于每种陨石类型。有关将相同数据转换应用于不同输入的更多信息,请参阅转换生成(Transform Generation)部分。 在 src/myproject/datasets 中创建一个新文件,命名为 meteor_class.py。请注意,您可以继续在 meteor_analysis.py 文件中编写数据转换代码,但本教程使用新文件来分离数据转换逻辑。 要为每种陨石类型创建单独的数据集,请按类别过滤 meteorite_enriched 数据集。然后,定义一个 transform_generator 函数,将相同的数据转换逻辑应用于您想要分析的每种陨石类型: python tab="Polars" import polars as pl from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): df = my_input.polars() result = df.filter(pl.col("class") == source) output.write_table(result) transforms.append(filter_by_source) return transforms TRANSFORMS = transform_generator(["L6", "H5", "H4"])

```python tab="Pandas" from transforms.api import transform, Input, Output

def transform_generator(sources): transforms = [] for source in sources: @transform.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): df = my_input.pandas() result = df[df["class"] == source] output.write_table(result) transforms.append(filter_by_source) return transforms

TRANSFORMS = transform_generator(["L6", "H5", "H4"])

```python tab="PySpark"
from transforms.api import transform, Input, Output


def transform_generator(sources):
    transforms = []
    for source in sources:
        @transform.spark.using(
            output=Output(f'/Users/jsmith/meteorite_{source}'),
            my_input=Input('/Users/jsmith/meteorite_enriched')
        )
        def filter_by_source(output, my_input, source=source):
            result = my_input.dataframe().filter(my_input["class"] == source)
            output.write_table(result)
        transforms.append(filter_by_source)
    return transforms

TRANSFORMS = transform_generator(["L6", "H5", "H4"])

这将创建一个按类别过滤陨石数据集的转换。请注意,我们必须将 source=source 传入 filter_by_source 函数,以便在函数作用域中捕获 source 参数。

:::callout{theme="success" title="提示"} 对于在 meteor_analysis.py 文件中创建的初始数据转换,您无需进行任何额外配置即可将转换(Transforms)添加到项目的管道中。这是因为默认的Python项目结构使用自动注册(Automatic Registration)来发现 datasets 文件夹中的所有转换对象。

要使用自动注册将此最终转换添加到项目的管道中,您必须将生成的转换以列表形式添加到变量中。在上面的示例中,我们使用了变量 TRANSFORMS。有关自动注册和转换生成器的更多信息,请参阅Python转换文档中的转换生成(Transform Generation)部分。 :::