跳转至

Advanced single-node compute options(高级单节点计算选项)

The basic principle of the following integrations is that you can access tabular Foundry datasets in multiple formats, such as a pandas DataFrame, Arrow Table, Polars DataFrame, and even as raw Parquet or CSV files. This is also shown in the transforms overview. When saving tables from memory to Foundry, we can pass them in any of the formats in which they have been read.

Working with Apache DataFusion

Sometimes, it is easier to cut out the data deserialization step and directly pass the raw underlying dataset files to our compute engine. We can get the path to the files on disk, which get downloaded on-demand by calling my_input.path(). When it comes to writing raw files back to Foundry, we have two limitations to keep in mind:

  • Only Parquet files can be stored in Foundry datasets through this API.
  • Files must be placed in the folder located at the value of my_output.path_for_write_table.

When both criteria are met, we can call write_table with the path to this folder, like so:

my_output.write_table(my_output.path_for_write_table)

To see this in action, consider the following example demonstrating how to use DataFusion ↗ in the platform.

import datafusion
from datafusion import lit
from datafusion.functions import col, starts_with
from transforms.api import transform, Output, Input


@transform.using(my_input=Input('my-input'), my_output=Output('my-output'))
def my_datafusion_transform(my_input, my_output):
    ctx = datafusion.SessionContext()
    table = ctx.read_parquet(my_input.path())
    my_output.write_table(
        table
        .filter(starts_with(col("name"), lit("John")))
        .to_arrow_table()
    )

Using cuDF

You can also achieve integration through the use of pandas.DataFrame. The following snippet shows an instance using cuDF ↗ in a lightweight transform. This will essentially run pandas code in a highly parallelized manner on the GPU where possible.

@transform.using(
    my_input=Input('my-input'),
    my_output=Output('my-output')
).with_resources(
    cpu_cores=4,
    memory_gb=32,
    gpu_type='NVIDIA_T4'
)
def my_cudf_transform(my_input, my_output):
    import cudf  # Only import CUDF at runtime, not during CI
    df = cudf.from_pandas(my_input.pandas()[['name']])
    filtered_df = df[df['name'].str.startswith('John')]
    sorted_df = filtered_df.sort_values(by='name')
    my_output.write_table(sorted_df)

:::callout{theme="normal"} The above snippet assumes that your Foundry enrollment is equipped with an NVIDIA T4 GPU and it is available to your project through a resource queue. :::

GPU-accelerated Polars with cuDF

:::callout{theme="neutral"} Your Foundry enrollment must be equipped with a supported NVIDIA GPU, and it must be available to your project through a resource queue to use this feature. See GPU provisioning for the list of available GPU types. :::

You can accelerate Polars lazy API queries on GPUs using cudf-polars, a GPU-backed execution engine for Polars. This allows you to run existing Polars lazy API queries on GPUs with minimal code changes by passing pl.GPUEngine() to the collect() call. For details on supported operations and performance characteristics, see the Polars GPU support documentation ↗.

:::callout{theme="warning"} GPU-accelerated collection currently only supports collect(), which fully materializes results in memory. Streaming operations such as sink_parquet() are not yet supported. Ensure that your output data fits in available GPU and system memory. :::

Setup

To use GPU acceleration for the Polars lazy API, add the cudf-polars package to the run requirements in your conda_recipe/meta.yaml file. For details on adding packages, refer to the Python libraries documentation.

requirements:
  run:
    - python
    - transforms {{ PYTHON_TRANSFORMS_VERSION }}
    - cudf-polars

Next, request a GPU in your transform by chaining .with_resources() onto your decorator:

from transforms.api import transform, Output, Input

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
).with_resources(
    gpu_type="NVIDIA_T4",
    memory_gb=32,
)
def compute(output, my_input):
    ...

Writing with the Polars GPU engine

Build your query using the standard Polars lazy API, then pass engine="gpu" to write_table() to collect and write the result using the GPU:

import polars as pl
from transforms.api import transform, Output, Input

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
).with_resources(
    gpu_type="NVIDIA_T4",
    memory_gb=32,
)
def compute(output, my_input):
    lf = my_input.polars(lazy=True)

    result = (
        lf.with_columns(
            (pl.col("a") * pl.col("b")).alias("product"),
            (pl.col("a").pow(2) + pl.col("b").pow(2)).sqrt().alias("norm"),
        )
        .group_by("category")
        .agg(
            pl.col("product").sum().alias("total_product"),
            pl.col("norm").mean().alias("mean_norm"),
            pl.len().alias("n"),
        )
        .sort("total_product", descending=True)
    )

    output.write_table(result, engine="gpu")

The GPU engine only supports a subset of Polars operations. For the full list, see the Polars GPU support documentation ↗. By default, unsupported queries silently fall back to the CPU engine. To raise an error instead, collect manually with raise_on_fail=True and pass the resulting DataFrame to write_table():

df = lf.collect(engine=pl.GPUEngine(raise_on_fail=True))
output.write_table(df)

中文翻译

高级单节点计算选项

以下集成方案的基本原理是,您可以以多种格式访问表格型Foundry数据集,例如pandas DataFrame、Arrow Table、Polars DataFrame,甚至原始Parquet或CSV文件。这在转换概览中也有说明。当将表格从内存保存到Foundry时,我们可以使用读取时所用的任意格式进行传递。

使用Apache DataFusion

有时,跳过数据反序列化步骤,直接将原始底层数据集文件传递给计算引擎会更简单。我们可以通过调用my_input.path()获取磁盘上文件的路径,这些文件会按需下载。在将原始文件写回Foundry时,需要注意两个限制条件:

  • 通过此API只能将Parquet文件存储在Foundry数据集中。
  • 文件必须放置在my_output.path_for_write_table值所指定的文件夹中。

当两个条件都满足时,我们可以像这样调用write_table并传入该文件夹的路径:

my_output.write_table(my_output.path_for_write_table)

以下示例展示了如何在平台中使用DataFusion ↗

import datafusion
from datafusion import lit
from datafusion.functions import col, starts_with
from transforms.api import transform, Output, Input


@transform.using(my_input=Input('my-input'), my_output=Output('my-output'))
def my_datafusion_transform(my_input, my_output):
    ctx = datafusion.SessionContext()
    table = ctx.read_parquet(my_input.path())
    my_output.write_table(
        table
        .filter(starts_with(col("name"), lit("John")))
        .to_arrow_table()
    )

使用cuDF

您也可以通过使用pandas.DataFrame实现集成。以下代码片段展示了在轻量级转换中使用cuDF ↗的实例。这将在GPU上以高度并行化的方式运行pandas代码(在可行的情况下)。

@transform.using(
    my_input=Input('my-input'),
    my_output=Output('my-output')
).with_resources(
    cpu_cores=4,
    memory_gb=32,
    gpu_type='NVIDIA_T4'
)
def my_cudf_transform(my_input, my_output):
    import cudf  # 仅在运行时导入CUDF,不在CI期间导入
    df = cudf.from_pandas(my_input.pandas()[['name']])
    filtered_df = df[df['name'].str.startswith('John')]
    sorted_df = filtered_df.sort_values(by='name')
    my_output.write_table(sorted_df)

:::callout{theme="normal"} 上述代码片段假设您的Foundry环境配备了NVIDIA T4 GPU,并且通过资源队列可供您的项目使用。 :::

使用cuDF实现GPU加速的Polars

:::callout{theme="neutral"} 您的Foundry环境必须配备受支持的NVIDIA GPU,并且通过资源队列可供您的项目使用,才能使用此功能。有关可用GPU类型的列表,请参见GPU配置。 :::

您可以使用cudf-polars(一个基于GPU的Polars执行引擎)在GPU上加速Polars惰性API查询。通过将pl.GPUEngine()传递给collect()调用,您只需对代码进行极少的修改,即可在GPU上运行现有的Polars惰性API查询。有关支持的操作和性能特征的详细信息,请参见Polars GPU支持文档 ↗

:::callout{theme="warning"} GPU加速的收集操作目前仅支持collect(),该操作会将结果完全物化到内存中。尚不支持流式操作(如sink_parquet())。请确保您的输出数据能够放入可用的GPU和系统内存中。 :::

设置

要为Polars惰性API使用GPU加速,请将cudf-polars包添加到conda_recipe/meta.yaml文件的run依赖项中。有关添加包的详细信息,请参阅Python库文档

requirements:
  run:
    - python
    - transforms {{ PYTHON_TRANSFORMS_VERSION }}
    - cudf-polars

接下来,通过在装饰器后链式调用.with_resources()来为您的转换请求GPU:

from transforms.api import transform, Output, Input

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
).with_resources(
    gpu_type="NVIDIA_T4",
    memory_gb=32,
)
def compute(output, my_input):
    ...

使用Polars GPU引擎写入

使用标准的Polars惰性API构建查询,然后将engine="gpu"传递给write_table(),以使用GPU收集并写入结果:

import polars as pl
from transforms.api import transform, Output, Input

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
).with_resources(
    gpu_type="NVIDIA_T4",
    memory_gb=32,
)
def compute(output, my_input):
    lf = my_input.polars(lazy=True)

    result = (
        lf.with_columns(
            (pl.col("a") * pl.col("b")).alias("product"),
            (pl.col("a").pow(2) + pl.col("b").pow(2)).sqrt().alias("norm"),
        )
        .group_by("category")
        .agg(
            pl.col("product").sum().alias("total_product"),
            pl.col("norm").mean().alias("mean_norm"),
            pl.len().alias("n"),
        )
        .sort("total_product", descending=True)
    )

    output.write_table(result, engine="gpu")

GPU引擎仅支持Polars操作的一个子集。有关完整列表,请参见Polars GPU支持文档 ↗。默认情况下,不支持的查询会静默回退到CPU引擎。如果希望改为抛出错误,请使用raise_on_fail=True手动收集,并将生成的DataFrame传递给write_table()

df = lf.collect(engine=pl.GPUEngine(raise_on_fail=True))
output.write_table(df)