Python transform basics(Python 转换基础)¶
Iceberg tables can be used as inputs and outputs in Python transforms using the transforms.tables API, which can be imported in the transforms-tables package.
This page provides code examples for the fundamentals of working with Iceberg table inputs and outputs in Python transforms.
Example: Generate a simple Iceberg table¶
```python tab="Polars" import polars as pl from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput
@transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pl.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
```python tab="DuckDB"
from transforms.api import LightweightContext, transform
from transforms.tables import IcebergOutput, TableOutput
@transform.using(
output=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, output: IcebergOutput):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM (VALUES ('Hello'), ('World')) AS t(phrase);")
query_arrow = query.to_arrow_table()
output.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput
@transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pd.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
```python tab="PySpark"
from transforms.api import transform, TransformContext
from transforms.tables import TableOutput, TableTransformOutput
@transform(
output=TableOutput("/.../Output")
)
def compute(ctx: TransformContext, output: TableTransformOutput):
df_custom = ctx.spark_session.createDataFrame([["Hello"], ["World"]], schema=["phrase"])
output.write_dataframe(df_custom)
Example: Iceberg table output, Iceberg table input¶
```python tab="Polars" import polars as pl from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform
from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, source_table: IcebergInput, output_table: IcebergOutput):
conn = ctx.duckdb().conn
reader = source_table.table().scan().to_arrow_batch_reader()
conn.register("source_tbl", reader)
query = conn.sql("SELECT * FROM source_tbl")
query_arrow = query.to_arrow_table()
output_table.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.pandas())
```python tab="PySpark"
from transforms.api import transform
from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput
@transform(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(source_table: TableTransformInput, output_table: TableTransformOutput):
output_table.write_dataframe(source_table.dataframe())
Example: Iceberg table output, dataset input¶
```python tab="Polars" import polars as pl from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput
@transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform, Input, LightweightInput
from transforms.tables import IcebergOutput, TableOutput
@transform.using(
source_dataset=Input("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, source_dataset: LightweightInput, output_table: IcebergOutput):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM source_dataset")
query_arrow = query.to_arrow_table()
output_table.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput
@transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.pandas())
```python tab="PySpark"
from transforms.api import transform, Input, TransformInput
from transforms.tables import TableOutput, TableTransformOutput
@transform(
source_dataset=Input("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(source_dataset: TransformInput, output_table: TableTransformOutput):
output_table.write_dataframe(source_dataset.dataframe())
Example: Dataset output, Iceberg table input¶
```python tab="Polars" import polars as pl from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput
@transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform, Output, LightweightOutput
from transforms.tables import IcebergInput, TableInput
@transform.using(
source_table=TableInput("/.../Input"),
output_dataset=Output("/.../Output")
)
def compute(ctx: LightweightContext, source_table: IcebergInput, output_dataset: LightweightOutput):
conn = ctx.duckdb().conn
reader = source_table.table().scan().to_arrow_batch_reader()
conn.register("source_tbl", reader)
query = conn.sql("SELECT * FROM source_tbl")
output_dataset.write_table(query)
```python tab="pandas" import pandas as pd from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput
@transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.pandas())
```python tab="PySpark"
from transforms.api import transform, Output, TransformOutput
from transforms.tables import TableInput, TableTransformInput
@transform(
source_table=TableInput("/.../Input"),
output=Output("/.../Output")
)
def compute(source_table: TableTransformInput, output: TransformOutput):
output.write_dataframe(source_table.dataframe())
Advanced: PyIceberg native scans¶
The examples above use Foundry's streamlined shortcuts on IcebergInput and IcebergOutput, which read and write the full current snapshot of a table. These streamlined APIs offer concise syntax for most standard pipelines. However, they materialize the entire table into memory and do not provide access to some underlying functionality, such as predicate pushdown, column projection, or snapshot selection.
To access these finer-grained controls, you can use .table() on your IcebergInput to return the underlying PyIceberg ↗ table and expose the full PyIceberg scan API.
from pyiceberg.expressions import And, EqualTo, GreaterThan
from transforms.api import transform
from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output"),
)
def compute(source_table: IcebergInput, output_table: IcebergOutput):
iceberg_table = source_table.table()
scan = iceberg_table.scan(
row_filter=And(
EqualTo("region", "EMEA"),
GreaterThan("score", 0.5),
),
selected_fields=("customer_id", "score", "region"),
limit=10_000,
)
output_table.write_table(scan.to_polars())
中文翻译¶
Python 转换基础¶
在 Python 转换(transform)中,可以使用 transforms.tables API 将 Iceberg 表作为输入和输出,该 API 可通过 transforms-tables 包导入。
本页提供了在 Python 转换中使用 Iceberg 表输入和输出的基础代码示例。
示例:生成简单的 Iceberg 表¶
```python tab="Polars" import polars as pl from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput
@transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pl.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
```python tab="DuckDB"
from transforms.api import LightweightContext, transform
from transforms.tables import IcebergOutput, TableOutput
@transform.using(
output=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, output: IcebergOutput):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM (VALUES ('Hello'), ('World')) AS t(phrase);")
query_arrow = query.to_arrow_table()
output.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform from transforms.tables import IcebergOutput, TableOutput
@transform.using( output=TableOutput("/.../Output") ) def compute(output: IcebergOutput): df_custom = pd.DataFrame({"phrase": ["Hello", "World"]}) output.write_table(df_custom)
```python tab="PySpark"
from transforms.api import transform, TransformContext
from transforms.tables import TableOutput, TableTransformOutput
@transform(
output=TableOutput("/.../Output")
)
def compute(ctx: TransformContext, output: TableTransformOutput):
df_custom = ctx.spark_session.createDataFrame([["Hello"], ["World"]], schema=["phrase"])
output.write_dataframe(df_custom)
示例:Iceberg 表输出,Iceberg 表输入¶
```python tab="Polars" import polars as pl from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform
from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, source_table: IcebergInput, output_table: IcebergOutput):
conn = ctx.duckdb().conn
reader = source_table.table().scan().to_arrow_batch_reader()
conn.register("source_tbl", reader)
query = conn.sql("SELECT * FROM source_tbl")
query_arrow = query.to_arrow_table()
output_table.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using( source_table=TableInput("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_table: IcebergInput, output_table: IcebergOutput): output_table.write_table(source_table.pandas())
```python tab="PySpark"
from transforms.api import transform
from transforms.tables import TableInput, TableOutput, TableTransformInput, TableTransformOutput
@transform(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(source_table: TableTransformInput, output_table: TableTransformOutput):
output_table.write_dataframe(source_table.dataframe())
示例:Iceberg 表输出,数据集输入¶
```python tab="Polars" import polars as pl from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput
@transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform, Input, LightweightInput
from transforms.tables import IcebergOutput, TableOutput
@transform.using(
source_dataset=Input("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(ctx: LightweightContext, source_dataset: LightweightInput, output_table: IcebergOutput):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM source_dataset")
query_arrow = query.to_arrow_table()
output_table.write_table(query_arrow)
```python tab="pandas" import pandas as pd from transforms.api import transform, Input, LightweightInput from transforms.tables import IcebergOutput, TableOutput
@transform.using( source_dataset=Input("/.../Input"), output_table=TableOutput("/.../Output") ) def compute(source_dataset: LightweightInput, output_table: IcebergOutput): output_table.write_table(source_dataset.pandas())
```python tab="PySpark"
from transforms.api import transform, Input, TransformInput
from transforms.tables import TableOutput, TableTransformOutput
@transform(
source_dataset=Input("/.../Input"),
output_table=TableOutput("/.../Output")
)
def compute(source_dataset: TransformInput, output_table: TableTransformOutput):
output_table.write_dataframe(source_dataset.dataframe())
示例:数据集输出,Iceberg 表输入¶
```python tab="Polars" import polars as pl from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput
@transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.polars())
```python tab="DuckDB"
from transforms.api import LightweightContext, transform, Output, LightweightOutput
from transforms.tables import IcebergInput, TableInput
@transform.using(
source_table=TableInput("/.../Input"),
output_dataset=Output("/.../Output")
)
def compute(ctx: LightweightContext, source_table: IcebergInput, output_dataset: LightweightOutput):
conn = ctx.duckdb().conn
reader = source_table.table().scan().to_arrow_batch_reader()
conn.register("source_tbl", reader)
query = conn.sql("SELECT * FROM source_tbl")
output_dataset.write_table(query)
```python tab="pandas" import pandas as pd from transforms.api import transform, Output, LightweightOutput from transforms.tables import IcebergInput, TableInput
@transform.using( source_table=TableInput("/.../Input"), output_dataset=Output("/.../Output") ) def compute(source_table: IcebergInput, output_dataset: LightweightOutput): output_dataset.write_table(source_table.pandas())
```python tab="PySpark"
from transforms.api import transform, Output, TransformOutput
from transforms.tables import TableInput, TableTransformInput
@transform(
source_table=TableInput("/.../Input"),
output=Output("/.../Output")
)
def compute(source_table: TableTransformInput, output: TransformOutput):
output.write_dataframe(source_table.dataframe())
高级:PyIceberg 原生扫描¶
上述示例使用了 Foundry 在 IcebergInput 和 IcebergOutput 上提供的简化快捷方式,这些方式会读取和写入表的完整当前快照。这些简化的 API 为大多数标准管道提供了简洁的语法。但是,它们会将整个表加载到内存中,并且无法访问某些底层功能,例如谓词下推(predicate pushdown)、列投影(column projection)或快照选择(snapshot selection)。
要使用这些更细粒度的控制,可以在 IcebergInput 上使用 .table() 方法,该方法会返回底层的 PyIceberg ↗ 表,并暴露完整的 PyIceberg 扫描 API。
from pyiceberg.expressions import And, EqualTo, GreaterThan
from transforms.api import transform
from transforms.tables import IcebergInput, IcebergOutput, TableInput, TableOutput
@transform.using(
source_table=TableInput("/.../Input"),
output_table=TableOutput("/.../Output"),
)
def compute(source_table: IcebergInput, output_table: IcebergOutput):
iceberg_table = source_table.table()
scan = iceberg_table.scan(
row_filter=And(
EqualTo("region", "EMEA"),
GreaterThan("score", 0.5),
),
selected_fields=("customer_id", "score", "region"),
limit=10_000,
)
output_table.write_table(scan.to_polars())