Snowflake compute pushdown(Snowflake 计算下推(Compute Pushdown))¶
To use compute pushdown with Snowflake, create a Python repository and install the most recent version of the transforms-tables library.
A Snowpark ↗ session will be configured based on the connection details of the Snowflake tables configured as inputs and/or outputs to the transforms. Data can be transformed using the Snowpark DataFrame API. For complete guidance on the Snowpark API, consult the Snowpark documentation ↗.
:::callout{theme="neutral"} Note the use of Foundry lightweight API syntax rather than Foundry Spark syntax. :::
An example of a Snowpark transform is shown below:
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import StringType
import snowflake.snowpark as snow
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
ID_PREFIX = "CUSTOMER-NO-"
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED"),
),
)
def compute_in_snowflake(input_table: SnowflakeInput, output_table: SnowflakeOutput):
"""
With Snowflake tables, you can perform lightweight transforms using the Snowpark APIs. All compute for these
is pushed down to the underlying Snowflake instance, so this can tackle big data workloads. In a set up like this,
all data must live in the same Snowpark instance and be accessible through the same connection.
"""
# Get a Snowpark DataFrame instance
df: snow.DataFrame = input_table.dataframe()
session: snow.Session = df.session
# Define a UDF to apply to our data
@udf(session=session, return_type=StringType())
def fix_id_col(ident: int) -> str:
"""
UDF to convert ID to string and prepend "CUSTOMER-NO-".
"""
return ID_PREFIX + str(ident)
# Apply UDF
df = df.with_column("ID", fix_id_col(col("ID")))
# Write back to the new table
output_table.write(df)
Compute configuration¶
By default, Foundry uses the warehouse you configure on the source to allocate compute resources.
Alternatively, you can use .with_warehouse(warehouse="MY_WAREHOUSE") to configure a connection with a specific warehouse:
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
# Transform with custom warehouse configuration
@transform.snowflake(
source_table=TableInput(
"ri.tables.main.table.my_snowflake_input_table"
),
output_table=TableOutput(
"ri.tables.main.table.my_snowflake_output_table",
"ri.magritte..source.my_snowflake_source",
SnowflakeTable("MY_DATABASE", "PUBLIC", "MY_TABLE"),
),
).with_warehouse("my_warehouse")
def compute(source_table: SnowflakeInput, output_table: SnowflakeOutput):
df: snow.DataFrame = source_table.dataframe()
output_table.write(df)
Use PySpark API with Spark Connect¶
As an alternative to Snowpark, you can use .with_engine(engine="pyspark") to establish a Snowpark Connect session. This will enable the use of PySpark APIs through Spark Connect. See Snowpark Connect ↗ for more information.
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.functions import lit
from pyspark.sql.connect.session import SparkSession
from transforms.api import transform
from transforms.tables import (
TableInput,
TableOutput,
SnowparkConnectInput,
SnowparkConnectOutput,
SnowflakeTable
)
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"),
),
).with_engine("pyspark")
def compute(source_table: SnowparkConnectInput, output_table: SnowparkConnectOutput):
# This will be a Spark Connect Dataframe instead of Snowpark
df: DataFrame = source_table.dataframe()
# Similarly this will be a Spark Connect session
session: SparkSession = source_table.session
df = df.withColumn("test", lit(3))
output_table.write_dataframe(df)
Convert data into a pandas DataFrame¶
The Snowpark API allows data to be converted into a pandas DataFrame. If the scale of your data is small enough, this can be used to bring the data from Snowflake into Foundry lightweight compute. This enables the use of transforms beyond the capabilities of the Snowpark APIs, and allows Snowflake tables to be combined with other Foundry data.
import hashlib
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"),
),
)
def compute_local(input_table: SnowflakeInput, output_table: SnowflakeOutput):
"""
Snowpark also supports conversion to pandas DataFrames, meaning that you can use lightweight
transforms on top of Snowflake tables to conduct in-container compute work. You can use
this to go beyond the scope of what is supported in Snowpark.
"""
# Get a Snowpark DataFrame instance
df = input_table.dataframe()
session = df.session
# Convert to pandas
pd_df = df.to_pandas()
# Create ANON_CODE by hashing the concatenation of CITY, STATE, and ZIP_CODE
def generate_anon_code(row):
concatenated = f"{row['CITY']}{row['STATE']}{row['ZIP_CODE']}"
return hashlib.sha256(concatenated.encode("utf-8")).hexdigest()
# Apply the function to create the ANON_CODE column
pd_df["ANON_CODE"] = pd_df.apply(generate_anon_code, axis=1)
# Select the ID and ANON_CODE columns
result_data = pd_df[["ID", "ANON_CODE"]]
# Write back to the new table
new_df = session.create_dataframe(result_data, schema=["ID", "ANON_CODE"])
output_table.write(new_df)
:::callout{theme="neutral"}
Incremental computation using the @incremental decorator is not currently supported when using compute pushdown to Snowflake.
:::
中文翻译¶
Snowflake 计算下推(Compute Pushdown)¶
要在 Snowflake 中使用计算下推,请创建一个 Python 代码库 并安装最新版本的 transforms-tables 库。
系统将根据配置为转换输入和/或输出的 Snowflake 表的连接详细信息,配置 Snowpark 会话(Session)。可以使用 Snowpark 数据帧(DataFrame) API 对数据进行转换。有关 Snowpark API 的完整指南,请参阅 Snowpark 文档 ↗。
:::callout{theme="neutral"} 请注意,此处使用的是 Foundry 轻量级 API 语法,而非 Foundry Spark 语法。 :::
Snowpark 转换示例如下所示:
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import StringType
import snowflake.snowpark as snow
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
ID_PREFIX = "CUSTOMER-NO-"
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED"),
),
)
def compute_in_snowflake(input_table: SnowflakeInput, output_table: SnowflakeOutput):
"""
With Snowflake tables, you can perform lightweight transforms using the Snowpark APIs. All compute for these
is pushed down to the underlying Snowflake instance, so this can tackle big data workloads. In a set up like this,
all data must live in the same Snowpark instance and be accessible through the same connection.
"""
# Get a Snowpark DataFrame instance
df: snow.DataFrame = input_table.dataframe()
session: snow.Session = df.session
# Define a UDF to apply to our data
@udf(session=session, return_type=StringType())
def fix_id_col(ident: int) -> str:
"""
UDF to convert ID to string and prepend "CUSTOMER-NO-".
"""
return ID_PREFIX + str(ident)
# Apply UDF
df = df.with_column("ID", fix_id_col(col("ID")))
# Write back to the new table
output_table.write(df)
计算资源配置¶
默认情况下,Foundry 使用您在数据源上配置的仓库(Warehouse)来分配计算资源。
或者,您可以使用 .with_warehouse(warehouse="MY_WAREHOUSE") 来配置与特定仓库的连接:
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
# Transform with custom warehouse configuration
@transform.snowflake(
source_table=TableInput(
"ri.tables.main.table.my_snowflake_input_table"
),
output_table=TableOutput(
"ri.tables.main.table.my_snowflake_output_table",
"ri.magritte..source.my_snowflake_source",
SnowflakeTable("MY_DATABASE", "PUBLIC", "MY_TABLE"),
),
).with_warehouse("my_warehouse")
def compute(source_table: SnowflakeInput, output_table: SnowflakeOutput):
df: snow.DataFrame = source_table.dataframe()
output_table.write(df)
结合 Spark Connect 使用 PySpark API¶
作为 Snowpark 的替代方案,您可以使用 .with_engine(engine="pyspark") 来建立 Snowpark Connect 会话。这将支持通过 Spark Connect 使用 PySpark API。更多信息请参阅 Snowpark Connect ↗。
from pyspark.sql.connect.dataframe import DataFrame
from pyspark.sql.connect.functions import lit
from pyspark.sql.connect.session import SparkSession
from transforms.api import transform
from transforms.tables import (
TableInput,
TableOutput,
SnowparkConnectInput,
SnowparkConnectOutput,
SnowflakeTable
)
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"),
),
).with_engine("pyspark")
def compute(source_table: SnowparkConnectInput, output_table: SnowparkConnectOutput):
# This will be a Spark Connect Dataframe instead of Snowpark
df: DataFrame = source_table.dataframe()
# Similarly this will be a Spark Connect session
session: SparkSession = source_table.session
df = df.withColumn("test", lit(3))
output_table.write_dataframe(df)
将数据转换为 pandas DataFrame¶
Snowpark API 允许将数据转换为 pandas DataFrame。如果您的数据规模足够小,可以使用此方法将数据从 Snowflake 引入 Foundry 轻量级计算环境。这不仅支持执行超出 Snowpark API 能力范围的转换操作,还允许将 Snowflake 表与其他 Foundry 数据结合使用。
import hashlib
from transforms.api import transform
from transforms.tables import (
SnowflakeTable,
TableInput,
TableOutput,
SnowflakeInput,
SnowflakeOutput,
)
@transform.snowflake.using(
input_table=TableInput("ri.tables.main.table.1234"),
output_table=TableOutput(
"ri.tables.main.table.5678",
"ri.magritte..source.1234",
SnowflakeTable("DATABASE", "PUBLIC", "CUSTOMERS_CLEANED_ANON"),
),
)
def compute_local(input_table: SnowflakeInput, output_table: SnowflakeOutput):
"""
Snowpark also supports conversion to pandas DataFrames, meaning that you can use lightweight
transforms on top of Snowflake tables to conduct in-container compute work. You can use
this to go beyond the scope of what is supported in Snowpark.
"""
# Get a Snowpark DataFrame instance
df = input_table.dataframe()
session = df.session
# Convert to pandas
pd_df = df.to_pandas()
# Create ANON_CODE by hashing the concatenation of CITY, STATE, and ZIP_CODE
def generate_anon_code(row):
concatenated = f"{row['CITY']}{row['STATE']}{row['ZIP_CODE']}"
return hashlib.sha256(concatenated.encode("utf-8")).hexdigest()
# Apply the function to create the ANON_CODE column
pd_df["ANON_CODE"] = pd_df.apply(generate_anon_code, axis=1)
# Select the ID and ANON_CODE columns
result_data = pd_df[["ID", "ANON_CODE"]]
# Write back to the new table
new_df = session.create_dataframe(result_data, schema=["ID", "ANON_CODE"])
output_table.write(new_df)
:::callout{theme="neutral"}
在使用 Snowflake 计算下推时,目前不支持使用 @incremental 装饰器进行增量计算(Incremental computation)。
:::