跳转至

Polars lazy API(Polars 惰性 API)

The Polars lazy API ↗ only evaluates queries after they are collected. This allows the engine to apply optimizations that improve performance in most cases, and is the recommended mode of compute when using Polars. Lazy execution is recommended for pipelines that process large amounts of data. However, different queries will benefit to varying degrees, and the stability of individual pipelines should always be verified before deploying to production systems.

To access the Polars lazy API in Foundry, set the lazy flag to True in your transform as shown below:

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # your data transformation logic
    output.write_table(df)

By default, lazy execution is disabled.

The Polars streaming engine ↗, is generally used during lazy computation. This allows for larger-than-memory data to be processed, as the query can be executed in batches instead of all at once. In addition, queries will execute faster when data streaming is enabled. Streaming is always enabled when Polars is used in lazy mode.

Filter pushdown

Lazy execution is especially beneficial when filter pushdown, also known as predicate pushdown, can be used. Filter pushdown means that filtering operations such as .filter(), .where(), or boolean indexing, are not executed immediately. Instead, they are recorded as part of the query plan. When the query is executed, Polars attempts to push these filters in the execution plan as early as possible. In some cases, they can be pushed all the way down to the data scan itself, avoiding data input/output altogether. The smaller the fraction of data used in the pipeline, the greater the impact filter pushdown will have. The full set of optimizations used during lazy compute can be found in the Polars lazy optimization ↗ documentation.

If your query resembles the example below, enabling lazy execution is strongly recommended, as it will significantly improve performance.

import polars as pl
from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/2025_06_sale_data"),
    sales_data=Input("/Users/jsmith/sales_data"),
)
def june_2025_sales(output, sales_data):
    df = (
        sales_data.polars(lazy=True)
        .filter(
            (pl.col("date") >= pl.lit("2025-06-01")) &
            (pl.col("date") <= pl.lit("2025-06-30"))
        )
    )
    output.write_table(df)

Debug lazy pipelines

In lazy execution mode, Polars will generate a query plan. Use .explain() or .describe_plan() on a lazy DataFrame to view the planned execution steps and applied optimizations.

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # your data transformation logic
    print(df.explain())
    output.write_table(df)

If a query is failing and lazy execution makes it challenging to identify the issue, you can materialize intermediate results with .collect().

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # part of a query
    df.collect()
    # remainder of a query
    output.write_table(df)

Use with incremental transforms on streaming datasets

When using polars(lazy=True) in an incremental transform on a streaming dataset, the incremental window may sometimes contain no new files. Calling .polars(lazy=True) on an empty input in this scenario can trigger a BinderException. To handle this, check whether the input is empty before using the lazy API:

from transforms.api import transform, incremental, Input, Output


@incremental()
@transform.using(
    my_output=Output('/path/to/output'),
    my_input=Input('/path/to/input'),
)
def compute(ctx, my_output, my_input):
    if ctx.is_incremental and my_input.polars().is_empty():
        return
    df = my_input.polars(lazy=True)
    my_output.write_table(df)

For more details on this known limitation, see the incremental transforms documentation.

When to use lazy APIs

:::callout{theme="neutral"} To fully leverage Polars optimizations in lazy compute, your foundry dataset is exposed using an internal object store proxy, so queries will only load the necessary data. Since Polars in eager mode does not have the same optimizations, your entire dataset is prefetched to disk when lazy=False. :::

Consider the following points to decide when to enable lazy execution.

  • Use lazy execution (lazy=True) in the following situations:
  • You are working with large datasets or complex transformation pipelines.
  • You want to benefit from query optimizations such as filter pushdown, predicate pushdown, projection pushdown, or streaming.
  • You have multiple chained operations and want Polars to optimize execution order.
  • You need to process data that does not fit into memory.
  • Use eager execution (lazy=False) in the following situations:
  • You are prototyping or exploring data transformations step by step.
  • You are debugging a complex pipeline and want more predictable behavior.

中文翻译

Polars 惰性 API

Polars 惰性 API ↗ 仅在查询被收集后才会对其进行求值。这使得引擎能够应用优化,在大多数情况下提升性能,因此是使用 Polars 时的推荐计算模式。对于处理大量数据的流水线,建议使用惰性执行。然而,不同查询的受益程度各不相同,在部署到生产系统之前,应始终验证单个流水线的稳定性。

要在 Foundry 中使用 Polars 惰性 API,请在转换中按如下方式将 lazy 标志设置为 True

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # 你的数据转换逻辑
    output.write_table(df)

默认情况下,惰性执行是禁用的

Polars 流式引擎 ↗ 通常在惰性计算中使用。这使得可以处理超出内存容量的数据,因为查询可以分批执行,而不是一次性全部执行。此外,启用数据流式处理后,查询执行速度会更快。当 Polars 以惰性模式使用时,流式处理始终处于启用状态。

过滤下推

当可以使用过滤下推(也称为谓词下推)时,惰性执行尤其有益。过滤下推意味着诸如 .filter().where() 或布尔索引等过滤操作不会立即执行。相反,它们会被记录为查询计划的一部分。当查询执行时,Polars 会尝试在执行计划中尽可能早地推送这些过滤条件。在某些情况下,它们可以被一直推送到数据扫描本身,从而完全避免数据输入/输出。流水线中使用的数据比例越小,过滤下推的影响就越大。惰性计算期间使用的完整优化集可以在 Polars 惰性优化 ↗ 文档中找到。

如果你的查询类似于以下示例,强烈建议启用惰性执行,因为它将显著提升性能。

import polars as pl
from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/2025_06_sale_data"),
    sales_data=Input("/Users/jsmith/sales_data"),
)
def june_2025_sales(output, sales_data):
    df = (
        sales_data.polars(lazy=True)
        .filter(
            (pl.col("date") >= pl.lit("2025-06-01")) &
            (pl.col("date") <= pl.lit("2025-06-30"))
        )
    )
    output.write_table(df)

调试惰性流水线

在惰性执行模式下,Polars 会生成一个查询计划。使用惰性 DataFrame 上的 .explain().describe_plan() 来查看计划的执行步骤和应用的优化。

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # 你的数据转换逻辑
    print(df.explain())
    output.write_table(df)

如果查询失败,且惰性执行使得识别问题变得困难,你可以使用 .collect() 来物化中间结果。

from transforms.api import transform, Input, Output


@transform.lightweight(
    output=Output("/Users/jsmith/output"),
    input=Input("/Users/jsmith/input"),
)
def compute(output, input):
    df = input.polars(lazy=True)
    # 查询的一部分
    df.collect()
    # 查询的剩余部分
    output.write_table(df)

在流式数据集上使用增量转换

当在流式数据集上的增量转换中使用 polars(lazy=True) 时,增量窗口有时可能不包含任何新文件。在这种情况下,对空输入调用 .polars(lazy=True) 可能会触发 BinderException。为了处理这种情况,在使用惰性 API 之前,请检查输入是否为空:

from transforms.api import transform, incremental, Input, Output


@incremental()
@transform.using(
    my_output=Output('/path/to/output'),
    my_input=Input('/path/to/input'),
)
def compute(ctx, my_output, my_input):
    if ctx.is_incremental and my_input.polars().is_empty():
        return
    df = my_input.polars(lazy=True)
    my_output.write_table(df)

有关此已知限制的更多详细信息,请参阅增量转换文档。

何时使用惰性 API

:::callout{theme="neutral"} 为了在惰性计算中充分利用 Polars 优化,你的 Foundry 数据集通过内部对象存储代理暴露,因此查询只会加载必要的数据。由于 Polars 在急切模式下没有相同的优化,当 lazy=False 时,整个数据集会被预取到磁盘。 :::

请考虑以下几点来决定何时启用惰性执行。

  • 在以下情况下使用惰性执行(lazy=True):
  • 你正在处理大型数据集或复杂的转换流水线。
  • 你希望受益于查询优化,如过滤下推、谓词下推、投影下推或流式处理。
  • 你有多个链式操作,并希望 Polars 优化执行顺序。
  • 你需要处理无法放入内存的数据。
  • 在以下情况下使用急切执行(lazy=False):
  • 你正在逐步进行原型设计或探索数据转换。
  • 你正在调试复杂的流水线,并希望获得更可预测的行为。