跳转至

DuckDB API

Foundry offers support for streamed, lazy execution using DuckDB, similar to the Polars lazy API.

To access a preconfigured DuckDB connection in your transform, use the Context object as shown below:

from transforms.api import transform, Input, Output

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn
    query = conn.sql("""SELECT * FROM my_input""")
    output.write_table(query)

The DuckDB connection comes preconfigured with all input datasets registered as unmaterialized DuckDB tables, using the input names as table names. This allows easy access to input data without manual configuration.

When to use DuckDB

DuckDB is a highly performant, efficient query engine which will often outperform other single-node compute engines like Pandas or Polars for many workloads. Consider using DuckDB for:

  • Single-node transforms that operate on very large scale data.
  • Transforms that need strict control over memory usage.

However, DuckDB does not yet support a Python DataFrame API like Polars or Pandas, instead requiring raw SQL queries for data manipulation. Third party libraries such as Ibis can be used to provide a dataframe API on top of DuckDB, see Using Ibis with DuckDB below.

Incremental workflows

Similar to other compute engines, the DuckDB API lets you configure incremental read modes on inputs.

Like with other compute engines, this defaults to added when running incrementally, and current when running non-incrementally.

from transforms.api import transform, Input, Output, incremental

@incremental()
@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    ddb = ctx.duckdb(read_modes={my_input: "current"})
    result = ddb.conn.sql("""SELECT * FROM my_input""")
    output.write_table(result)

Read from a .sql file

Some DuckDB workflows may involve complex SQL queries that are better managed in separate .sql files. You can reference such files using the sql_from_file method on the DuckDB connection. The paths are relative to the file containing the call to sql_from_file.

# -- query.sql
SELECT * FROM my_input WHERE column_a > 100;
from transforms.api import transform, Input, Output, incremental

@incremental()
@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn
    query = conn.sql_from_file("query.sql")
    output.write_table(query)

Use Ibis with DuckDB

:::callout{theme="warning"} Ibis is a third-party dependency unaffiliated with Palantir. Its use in DuckDB transforms comes with no product support guarantees. :::

Ibis ↗ is a Python library that provides a portable dataframe API that can be used with nearly 20 different backends. DuckDB is supported, providing a dataframe API on its otherwise SQL backend.

To use Ibis, first install ibis-framework and ibis-duckdb packages in your environment. For details on adding packages to the repository, see the package tab documentation.

import ibis
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input) -> None:
    conn = ctx.duckdb().conn
    ibis_conn = ibis.duckdb.from_connection(conn)
    table = ibis_conn.table("my_input")
    # Ibis dataframe transforms
    sql = ibis.to_sql(table, dialect="duckdb")
    output.write_table(conn.sql(sql))

For more details on writing Ibis dataframe transformations, consult the official Ibis documentation ↗.

Use SQLFrame with DuckDB

:::callout{theme="warning"} SQLFrame is a third-party dependency unaffiliated with Palantir. Its use in DuckDB transforms comes with no product support guarantees. :::

SQLFrame ↗ is a Python library that allows the execution of PySpark DataFrame API code on SQL-based query engines, including DuckDB.

To use SQLFrame, first install the sqlframe package in your environment. For details on adding packages to the repository, see the package tab documentation.

from sqlframe import activate
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input) -> None:
    conn = ctx.duckdb().conn

    activate("duckdb", conn=conn)

    from pyspark.sql import SparkSession
    # Other pyspark imports must come after activate

    session = SparkSession.builder.getOrCreate()
    input_df = session.table("my_input")  # inputs are referenced by alias

    # Pyspark dataframe transforms

    sql = result_df.sql(optimize=True, dialect="duckdb")
    output.write_table(conn.sql(sql))

For more details on writing SQLFrame dataframe transformations, consult the official SQLFrame documentation ↗.

Migrate a Spark transform to DuckDB with SQLFrame

Existing Spark transforms can be refactored to run on DuckDB with SQLFrame. In this section, we will refactor the Spark example below:

from transforms.api import transform, Input, Output
from pyspark.sql import functions as F

@transform.spark.using(
    output=Output("/Users/jsmith/meteorite_enriched"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
    meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
)
def enriched(output, meteorite_landings, meteorite_stats):
    df_landings = meteorite_landings.dataframe()
    df_stats = meteorite_stats.dataframe()

    enriched_together = df_landings.join(df_stats, "class")
    greater_mass = enriched_together.withColumn(
        'greater_mass', F.col("mass") > F.col("avg_mass_per_class")
    )
    result = greater_mass.filter(F.col("greater_mass")).select(
        F.col("name"),
        F.col("class"),
        F.col("mass"),
        F.col("greater_mass")
    )
    output.write_dataframe(result)

First, change the decorator to make this a single node transform by default, and add a LightweightContext to the compute function.

- from transforms.api import transform, Input, Output
+ from transforms.api import transform, Input, Output, LightweightContext
+ from sqlframe import activate
  from pyspark.sql import functions as F

- @transform.spark.using(
+ @transform.using(
      output=Output("/Users/jsmith/meteorite_enriched"),
      meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
      meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
  )
- def enriched(output, meteorite_landings, meteorite_stats):
+ def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):

Next, activate SQLFrame with a DuckDB connection and move PySpark imports into the compute function.

  from transforms.api import transform, Input, Output, LightweightContext
  from sqlframe import activate
- from pyspark.sql import functions as F

  def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):
+     conn = ctx.duckdb().conn
+     activate("duckdb", conn=conn)
+
+     # Import PySpark AFTER activation
+     from pyspark.sql import SparkSession, functions as F

Now, you can generate the input DataFrames from the SQLFrame SparkSession.

-     df_landings = meteorite_landings.dataframe()
-     df_stats = meteorite_stats.dataframe()
+     df_landings = session.table("meteorite_landings")  # Use alias from decorator
+     df_stats = session.table("meteorite_stats")

Finally, convert the result to SQL to be written as output.

      result = greater_mass.filter("greater_mass")
-     output.write_dataframe(result)
+     sql = result.sql(optimize=True, dialect="duckdb")
+     output.write_table(conn.sql(sql))

The complete refactored transform is shown below:

from transforms.api import transform, Input, Output, LightweightContext
from sqlframe import activate

@transform.using(
    output=Output("/Users/jsmith/meteorite_enriched"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
    meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
)
def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):
    conn = ctx.duckdb().conn
    activate("duckdb", conn=conn)

    from pyspark.sql import SparkSession, functions as F

    session = SparkSession.builder.getOrCreate()

    df_landings = session.table("meteorite_landings")
    df_stats = session.table("meteorite_stats")

    # PySpark DataFrame transformations work as-is
    enriched_together = df_landings.join(df_stats, "class")
    greater_mass = enriched_together.withColumn(
        'greater_mass', F.col("mass") > F.col("avg_mass_per_class")
    )
    result = greater_mass.filter(F.col("greater_mass")).select(
        F.col("name"),
        F.col("class"),
        F.col("mass"),
        F.col("greater_mass")
    )

    # Convert to SQL and write
    sql = result.sql(optimize=True, dialect="duckdb")
    output.write_table(conn.sql(sql))

DuckDB configuration

Unlike many other single-node compute engines, DuckDB supports resource configuration to control memory usage and parallelism, which allows fine-grained optimization for different workloads. This is especially important for memory-constrained contexts, where DuckDB can self-limit its memory consumption to avoid out-of-memory errors, at the cost of performance.

You can set these options when initializing the DuckDB connection via the duckdb method on the Context object. A full list of configuration options can be found in the official documentation ↗.

conn = ctx.duckdb().conn
conn.execute("SET memory_limit='2GB';")
conn.execute("SET threads=4;")

Advanced use cases

Some DuckDB workflows require access to lower level APIs than simple references to input datasets. Examples include

For these use cases, you can read your dataset's raw Parquet or CSV fields with a preconfigured DuckDB UDF called <input_name>_files(). This function returns a list of file paths for the underlying data files of the input dataset, which can then be passed to DuckDB's read functions with custom parameters.

You can then copy your output to an intermediate location on disk, or directly stream the output back to Foundry using DuckDB's COPY TO command.

To infer a schema from files that have been manually uploaded with COPY TO without calling write_table, you can use the put_metadata() method on the Output object.

import uuid

from transforms.api import transform, Input, Output

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn

    # Write to disk, then upload dataset from disk
    query = conn.sql(f"""
    COPY (
        SELECT *
        FROM read_parquet(my_input_files(), file_row_number=True)
        WHERE my_column = 'abc'
        LIMIT 10
    ) TO '{output.path_for_write_table}'
    (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE)
    """)
    output.write_table(output.path_for_write_table)

    # Directly stream inputs and outputs
    path_uuid = str(uuid.uuid4())
    conn.execute(f"""
        COPY (
            SELECT *
            FROM read_parquet(my_input_files(), file_row_number=True)
            WHERE my_column = 'abc'
            LIMIT 10
        ) TO '{output.path_for_object_store_write_table}'
        (FORMAT PARQUET, PARTITION_BY (date), WRITE_PARTITION_COLUMNS true, FILENAME_PATTERN 'file_{path_uuid}')
        """
    )
    output.put_metadata()

Incremental outputs

Users have direct control over the outputs to their datasets using COPY TO syntax, including the file names of the output files. For incremental workflows, users should ensure that sequential writes do not share file names with previous transactions to avoid conflicts and overwrites between files. DuckDB provides the ability to provide a filename pattern ↗ for outputs, and users are encouraged to append a UUID to their filenames, per the example above, to ensure uniqueness.

Partitioned outputs

Note that DuckDB's partitioning behavior differs from defaults in common libraries like Polars or Spark. When writing partitioned datasets, you are strongly encouraged to set WRITE_PARTITION_COLUMNS to true, as in the example above, to ensure compatibility with other transforms. You should also note that DuckDB uses the string null in Hive filepaths for missing values, instead of the Hive standard of __HIVE_DEFAULT_PARTITION__, and take special care to ensure that downstream transforms parse these partitions properly, generally by infilling nulls on the partition column before writing outputs with DuckDB.

Eager download

By default, Foundry's DuckDB integration uses lazy streamed downloading of input datasets to optimize performance and resource usage. However, in some scenarios, users may want to eagerly download all input data to disk at the start of the transform. You can pull to disk and query inputs by calling path() on the input:

import duckdb
from transforms.api import transform, Output, Input


@transform.using(my_input=Input('my-input'), my_output=Output('my-output'))
def my_duckdb_transform(my_input, my_output):
    duckdb.connect(database=':memory:').execute(f"""
        COPY
        (
            SELECT *
            FROM parquet_scan('{my_input.path()}/**/*.parquet')
            WHERE Name LIKE 'John%'
        )
        TO '{my_output.path_for_write_table}'
        (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE)
    """)  # Optimize performance by writing a separate Parquet file per thread in parallel

    my_output.write_table(my_output.path_for_write_table)

中文翻译

DuckDB API

Foundry 支持使用 DuckDB 进行流式惰性执行,这与 Polars 惰性 API 类似。

要在转换中访问预配置的 DuckDB 连接,请使用 Context 对象,如下所示:

from transforms.api import transform, Input, Output

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn
    query = conn.sql("""SELECT * FROM my_input""")
    output.write_table(query)

DuckDB 连接已预先配置,所有输入数据集都注册为未物化的 DuckDB 表,并使用输入名称作为表名。这样可以轻松访问输入数据,无需手动配置。

何时使用 DuckDB

DuckDB 是一个高性能、高效的查询引擎,在许多工作负载中通常优于 Pandas 或 Polars 等其他单节点计算引擎。考虑在以下场景使用 DuckDB:

  • 处理超大规模数据的单节点转换。
  • 需要严格控制内存使用的转换。

然而,DuckDB 目前还不支持像 Polars 或 Pandas 那样的 Python DataFrame API,而是需要原始 SQL 查询来进行数据操作。可以使用 Ibis 等第三方库在 DuckDB 之上提供 DataFrame API,详情请参见下方的将 Ibis 与 DuckDB 结合使用

增量工作流

与其他计算引擎类似,DuckDB API 允许您在输入上配置增量读取模式。

与其他计算引擎一样,增量运行时默认为 added,非增量运行时默认为 current

from transforms.api import transform, Input, Output, incremental

@incremental()
@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    ddb = ctx.duckdb(read_modes={my_input: "current"})
    result = ddb.conn.sql("""SELECT * FROM my_input""")
    output.write_table(result)

从 .sql 文件读取

某些 DuckDB 工作流可能涉及复杂的 SQL 查询,这些查询更适合在单独的 .sql 文件中管理。您可以使用 DuckDB 连接上的 sql_from_file 方法来引用此类文件。路径相对于调用 sql_from_file 的文件。

# -- query.sql
SELECT * FROM my_input WHERE column_a > 100;
from transforms.api import transform, Input, Output, incremental

@incremental()
@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn
    query = conn.sql_from_file("query.sql")
    output.write_table(query)

将 Ibis 与 DuckDB 结合使用

:::callout{theme="warning"} Ibis 是一个与 Palantir 无关的第三方依赖项。在 DuckDB 转换中使用它不提供产品支持保证。 :::

Ibis ↗ 是一个 Python 库,提供可移植的 DataFrame API,可与近 20 种不同的后端一起使用。它支持 DuckDB,从而在其原本仅支持 SQL 的后端上提供了 DataFrame API。

要使用 Ibis,首先在您的环境中安装 ibis-frameworkibis-duckdb 包。有关向仓库添加包的详细信息,请参阅包选项卡文档

import ibis
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input) -> None:
    conn = ctx.duckdb().conn
    ibis_conn = ibis.duckdb.from_connection(conn)
    table = ibis_conn.table("my_input")
    # Ibis dataframe transforms
    sql = ibis.to_sql(table, dialect="duckdb")
    output.write_table(conn.sql(sql))

有关编写 Ibis DataFrame 转换的更多详细信息,请查阅官方 Ibis 文档 ↗

将 SQLFrame 与 DuckDB 结合使用

:::callout{theme="warning"} SQLFrame 是一个与 Palantir 无关的第三方依赖项。在 DuckDB 转换中使用它不提供产品支持保证。 :::

SQLFrame ↗ 是一个 Python 库,允许在基于 SQL 的查询引擎(包括 DuckDB)上执行 PySpark DataFrame API 代码。

要使用 SQLFrame,首先在您的环境中安装 sqlframe 包。有关向仓库添加包的详细信息,请参阅包选项卡文档

from sqlframe import activate
from transforms.api import Input, Output, transform, LightweightInput, LightweightOutput, LightweightContext

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input) -> None:
    conn = ctx.duckdb().conn

    activate("duckdb", conn=conn)

    from pyspark.sql import SparkSession
    # 其他 pyspark 导入必须在 activate 之后

    session = SparkSession.builder.getOrCreate()
    input_df = session.table("my_input")  # 输入通过别名引用

    # Pyspark dataframe transforms

    sql = result_df.sql(optimize=True, dialect="duckdb")
    output.write_table(conn.sql(sql))

有关编写 SQLFrame DataFrame 转换的更多详细信息,请查阅官方 SQLFrame 文档 ↗

使用 SQLFrame 将 Spark 转换迁移到 DuckDB

现有的 Spark 转换可以重构为在 DuckDB 上使用 SQLFrame 运行。在本节中,我们将重构下面的 Spark 示例:

from transforms.api import transform, Input, Output
from pyspark.sql import functions as F

@transform.spark.using(
    output=Output("/Users/jsmith/meteorite_enriched"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
    meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
)
def enriched(output, meteorite_landings, meteorite_stats):
    df_landings = meteorite_landings.dataframe()
    df_stats = meteorite_stats.dataframe()

    enriched_together = df_landings.join(df_stats, "class")
    greater_mass = enriched_together.withColumn(
        'greater_mass', F.col("mass") > F.col("avg_mass_per_class")
    )
    result = greater_mass.filter(F.col("greater_mass")).select(
        F.col("name"),
        F.col("class"),
        F.col("mass"),
        F.col("greater_mass")
    )
    output.write_dataframe(result)

首先,更改装饰器使其默认为单节点转换,并在计算函数中添加 LightweightContext

- from transforms.api import transform, Input, Output
+ from transforms.api import transform, Input, Output, LightweightContext
+ from sqlframe import activate
  from pyspark.sql import functions as F

- @transform.spark.using(
+ @transform.using(
      output=Output("/Users/jsmith/meteorite_enriched"),
      meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
      meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
  )
- def enriched(output, meteorite_landings, meteorite_stats):
+ def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):

接下来,使用 DuckDB 连接激活 SQLFrame,并将 PySpark 导入移到计算函数中。

  from transforms.api import transform, Input, Output, LightweightContext
  from sqlframe import activate
- from pyspark.sql import functions as F

  def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):
+     conn = ctx.duckdb().conn
+     activate("duckdb", conn=conn)
+
+     # 在激活后导入 PySpark
+     from pyspark.sql import SparkSession, functions as F

现在,您可以从 SQLFrame SparkSession 生成输入 DataFrame。

-     df_landings = meteorite_landings.dataframe()
-     df_stats = meteorite_stats.dataframe()
+     df_landings = session.table("meteorite_landings")  # 使用装饰器中的别名
+     df_stats = session.table("meteorite_stats")

最后,将结果转换为 SQL 以写入输出。

      result = greater_mass.filter("greater_mass")
-     output.write_dataframe(result)
+     sql = result.sql(optimize=True, dialect="duckdb")
+     output.write_table(conn.sql(sql))

完整的重构转换如下所示:

from transforms.api import transform, Input, Output, LightweightContext
from sqlframe import activate

@transform.using(
    output=Output("/Users/jsmith/meteorite_enriched"),
    meteorite_landings=Input("/Users/jsmith/meteorite_landings"),
    meteorite_stats=Input("/Users/jsmith/meteorite_stats"),
)
def enriched(ctx: LightweightContext, output, meteorite_landings, meteorite_stats):
    conn = ctx.duckdb().conn
    activate("duckdb", conn=conn)

    from pyspark.sql import SparkSession, functions as F

    session = SparkSession.builder.getOrCreate()

    df_landings = session.table("meteorite_landings")
    df_stats = session.table("meteorite_stats")

    # PySpark DataFrame 转换按原样工作
    enriched_together = df_landings.join(df_stats, "class")
    greater_mass = enriched_together.withColumn(
        'greater_mass', F.col("mass") > F.col("avg_mass_per_class")
    )
    result = greater_mass.filter(F.col("greater_mass")).select(
        F.col("name"),
        F.col("class"),
        F.col("mass"),
        F.col("greater_mass")
    )

    # 转换为 SQL 并写入
    sql = result.sql(optimize=True, dialect="duckdb")
    output.write_table(conn.sql(sql))

DuckDB 配置

与许多其他单节点计算引擎不同,DuckDB 支持资源配置以控制内存使用和并行度,从而允许针对不同工作负载进行细粒度优化。这对于内存受限的环境尤其重要,在这些环境中,DuckDB 可以自我限制内存消耗以避免内存不足错误,但代价是性能下降。

您可以在通过 Context 对象上的 duckdb 方法初始化 DuckDB 连接时设置这些选项。完整的配置选项列表可以在官方文档 ↗中找到。

conn = ctx.duckdb().conn
conn.execute("SET memory_limit='2GB';")
conn.execute("SET threads=4;")

高级用例

某些 DuckDB 工作流需要访问比简单引用输入数据集更底层的 API。示例包括:

对于这些用例,您可以使用预配置的 DuckDB UDF(名为 <input_name>_files())读取数据集的原始 Parquet 或 CSV 字段。此函数返回输入数据集底层数据文件的文件路径列表,然后可以将其传递给 DuckDB 的读取函数并附带自定义参数。

然后,您可以将输出复制到磁盘上的中间位置,或使用 DuckDB 的 COPY TO 命令直接将输出流式传输回 Foundry。

要从使用 COPY TO 手动上传的文件推断模式而不调用 write_table,您可以使用 Output 对象上的 put_metadata() 方法。

import uuid

from transforms.api import transform, Input, Output

@transform.using(
    output=Output("/Users/jsmith/output"),
    my_input=Input("/Users/jsmith/input"),
)
def compute(ctx, output, my_input):
    conn = ctx.duckdb().conn

    # 写入磁盘,然后从磁盘上传数据集
    query = conn.sql(f"""
    COPY (
        SELECT *
        FROM read_parquet(my_input_files(), file_row_number=True)
        WHERE my_column = 'abc'
        LIMIT 10
    ) TO '{output.path_for_write_table}'
    (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE)
    """)
    output.write_table(output.path_for_write_table)

    # 直接流式传输输入和输出
    path_uuid = str(uuid.uuid4())
    conn.execute(f"""
        COPY (
            SELECT *
            FROM read_parquet(my_input_files(), file_row_number=True)
            WHERE my_column = 'abc'
            LIMIT 10
        ) TO '{output.path_for_object_store_write_table}'
        (FORMAT PARQUET, PARTITION_BY (date), WRITE_PARTITION_COLUMNS true, FILENAME_PATTERN 'file_{path_uuid}')
        """
    )
    output.put_metadata()

增量输出

用户可以使用 COPY TO 语法直接控制数据集的输出,包括输出文件的文件名。对于增量工作流,用户应确保顺序写入不与先前事务共享文件名,以避免文件之间的冲突和覆盖。DuckDB 提供了为输出提供文件名模式 ↗的功能,建议用户按照上述示例在文件名后附加 UUID 以确保唯一性。

分区输出

请注意,DuckDB 的分区行为与 Polars 或 Spark 等常见库的默认行为不同。在写入分区数据集时,强烈建议将 WRITE_PARTITION_COLUMNS 设置为 true,如上述示例所示,以确保与其他转换的兼容性。您还应注意,DuckDB 在 Hive 文件路径中使用字符串 null 表示缺失值,而不是 Hive 标准的 __HIVE_DEFAULT_PARTITION__,并且需要特别注意确保下游转换正确解析这些分区,通常的做法是在使用 DuckDB 写入输出之前,对分区列的空值进行填充。

急切下载

默认情况下,Foundry 的 DuckDB 集成使用惰性流式下载输入数据集以优化性能和资源使用。然而,在某些场景下,用户可能希望在转换开始时急切地将所有输入数据下载到磁盘。您可以通过在输入上调用 path() 来拉取到磁盘并查询输入:

import duckdb
from transforms.api import transform, Output, Input


@transform.using(my_input=Input('my-input'), my_output=Output('my-output'))
def my_duckdb_transform(my_input, my_output):
    duckdb.connect(database=':memory:').execute(f"""
        COPY
        (
            SELECT *
            FROM parquet_scan('{my_input.path()}/**/*.parquet')
            WHERE Name LIKE 'John%'
        )
        TO '{my_output.path_for_write_table}'
        (FORMAT 'parquet', PER_THREAD_OUTPUT TRUE)
    """)  # 通过并行为每个线程写入单独的 Parquet 文件来优化性能

    my_output.write_table(my_output.path_for_write_table)