跳转至

Transforms(转换(Transforms))

Python

Fetch and update data incrementally from API using PySpark

How do I fetch data from an API and update it incrementally using external transforms?

This code uses PySpark and the requests library to fetch data from an API between a specified date range and update the output incrementally. It additionally supports paging if the API also supports paging.

from pyspark.sql import functions as F
from transforms.api import incremental, transform, Output
import requests
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
import logging
from datetime import datetime as dt
import json


def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'):
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}

    data = {
        "from": start_date,
        "to": end_date,
    }
    response = requests.post(next_link_url, json=data, headers=headers)
    logging.warn(response.json())
    data = response.json()["data"]
    return json.dumps(data)


@use_external_systems(
    creds=Credential(),
    egress=EgressPolicy(),
)
@incremental()
@transform(
    output=Output(),
)
def compute(output, creds, egress, ctx):
    token = creds.get("token")
    if ctx.is_incremental:
        previous = output.dataframe('current').localCheckpoint()
        if NEXT_LINK_COLUMN in previous.columns:
            latest_row = (
                previous
                .where(F.col(LAST_MODIFIED_COLUMN).isNotNull())
                .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()])
                .limit(1).collect()[0]
            )
            next_link_url = latest_row[NEXT_LINK_COLUMN]
            last_date = latest_row[LAST_MODIFIED_COLUMN]
        else:
            last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN]

    today = dt.today().strftime("%Y-%m-%d")

    data = _get_data(token, last_date, today, next_link_url)

    df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}])
    output.set_mode("modify")
    output.write_dataframe(df)
  • Date submitted: 2024-04-26
  • Tags: API, pyspark, incremental, dataframe, external transform

Incremental helper

How can I change inputs of an incremental transform depending on whether it is running incrementally or not?

@incremental()
@transform(
    x=Output(),
    y=Input(),
    z=Input()
)
def compute(ctx, x, y, z):
    if ctx.is_incremental:
        ## Some Code
    else:
        ## Other Code
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Incremental sum aggregation

How do I optimize non-incremental pipelines with incremental decorators to do incremental aggregation on a very large dataset?

This code uses PySpark to compute daily aggregates of a dataset by grouping the dataset based on a specific field and counting the distinct values of another field. The code then stores the result in an output dataframe, handling both incremental and non-incremental cases.

from pyspark.sql import functions as F
from transforms.api import transform, incremental, Input, Output
from pyspark.sql import DataFrame


@incremental(semantic_version=1)
@transform(
    # input dataset
    input_data=Input(""),
    daily_aggregate=Output("")
)
def compute(ctx, input_data, daily_aggregate):
    input_df = input_data.dataframe()
    latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique"))
    # you need a schema to load a previous output.
    latest_daily_agg_schema = latest_daily_agg.schema

    if ctx.is_incremental:

        last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema)
        sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique"))
        daily_aggregate.set_mode('replace')
        daily_aggregate.write_dataframe(sum_daily)

    else:
        # not incremental - just store the latest daily
        daily_aggregate.write_dataframe(latest_daily_agg)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental, aggregation

Incremental transform in PySpark

How to implement an incremental transform in Palantir Foundry using PySpark?

This code demonstrates how to use the incremental decorator in a PySpark transform to handle incremental processing. It adds a processed_at timestamp column and an is_running_as_incremental column to the input dataframe.

from transforms.api import transform_df, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F

# Apply the incremental decorator
@incremental()
@transform_df(
    Output(""),
    input_df=Input("")
)
def example_transform_incremental_processing(ctx, input_df):
    # The behavior of the transform depends on the state of the inputs/outputs
    # (See cases 1 to 3 in the original code snippet for details)
    #
    # Nothing changes in the logic below compared to a snapshot processing.
    # The behavior is taken care of by Foundry.

    # Example processing here
    input_df = input_df.withColumn('processed_at', F.current_timestamp())

    # Use ctx.is_incremental to know if Foundry is running the current build as "a snapshot" or as "an incremental"
    input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental))

    # Returns the edited dataframe
    return input_df
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, incremental

Snapshot input to incremental output

How can I do an incremental transform on a snapshot input that is fully rewritten and only process new rows I haven't process before?

This code uses PySpark to read an input dataset as a snapshot and compares it with the previous output to find new rows. It then adds a timestamp to the new rows and appends them to the output dataset, performing an incremental transformation.

from transforms.api import incremental, transform, Input, Output
from pyspark.sql import types as T, functions as F

# @incremental decorator to get advanced read/write modes.
@incremental(
    snapshot_inputs=["input_dataset"]
)
# @transform to have more control over inputs and outputs.
@transform(
    output_dataset=Output("incremental_output"),
    input_dataset=Input("snapshot_input")
)
def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset):
    # We enforce the read of the input dataframe as a snapshot
    input_df_all_dataframe = input_dataset.dataframe(mode="current")

    # We read the current output to see what we already processed in previous builds
    out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True),
                               T.StructField("other_column", T.IntegerType(), True), # Include other columns you store in your output
                               T.StructField("processed_at", T.TimestampType(), True)])
    output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema)

    # ==== Example processing here ====
    # We diff the input with the current output, to find the "new rows".
    KEY = ["primary_key"]
    new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY)
    # We add a timestamp for easier tracking/debugging/understanding of the example
    new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp())

    # ==== End of example processing ====

    # This will append rows
    output_dataset.set_mode("modify")
    output_dataset.write_dataframe(new_rows_df)
  • Date submitted: 2024-07-18
  • Tags: code authoring, code repositories, python, incremental

中文翻译


转换(Transforms)

Python

使用PySpark从API增量获取和更新数据

如何通过外部转换(external transforms)从API获取数据并增量更新?

以下代码使用PySpark和requests库,在指定日期范围内从API获取数据,并增量更新输出。如果API支持分页,该代码还额外支持分页功能。

from pyspark.sql import functions as F
from transforms.api import incremental, transform, Output
import requests
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
import logging
from datetime import datetime as dt
import json


def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'):
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}

    data = {
        "from": start_date,
        "to": end_date,
    }
    response = requests.post(next_link_url, json=data, headers=headers)
    logging.warn(response.json())
    data = response.json()["data"]
    return json.dumps(data)


@use_external_systems(
    creds=Credential(),
    egress=EgressPolicy(),
)
@incremental()
@transform(
    output=Output(),
)
def compute(output, creds, egress, ctx):
    token = creds.get("token")
    if ctx.is_incremental:
        previous = output.dataframe('current').localCheckpoint()
        if NEXT_LINK_COLUMN in previous.columns:
            latest_row = (
                previous
                .where(F.col(LAST_MODIFIED_COLUMN).isNotNull())
                .orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()])
                .limit(1).collect()[0]
            )
            next_link_url = latest_row[NEXT_LINK_COLUMN]
            last_date = latest_row[LAST_MODIFIED_COLUMN]
        else:
            last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN]

    today = dt.today().strftime("%Y-%m-%d")

    data = _get_data(token, last_date, today, next_link_url)

    df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}])
    output.set_mode("modify")
    output.write_dataframe(df)
  • 提交日期:2024-04-26
  • 标签:APIpysparkincrementaldataframeexternal transform

增量辅助(Incremental helper)

如何根据转换是否以增量模式运行来改变增量转换的输入?

@incremental()
@transform(
    x=Output(),
    y=Input(),
    z=Input()
)
def compute(ctx, x, y, z):
    if ctx.is_incremental:
        ## 部分代码
    else:
        ## 其他代码
  • 提交日期:2024-03-20
  • 标签:code authoringcode repositoriespythonincremental

增量求和聚合(Incremental sum aggregation)

如何利用增量装饰器(incremental decorators)优化非增量管道,对超大数据集进行增量聚合?

以下代码使用PySpark,通过按特定字段对数据集进行分组并统计另一个字段的不同值数量,来计算数据集的每日聚合值。代码将结果存储到输出数据框中,同时处理增量与非增量两种情况。

from pyspark.sql import functions as F
from transforms.api import transform, incremental, Input, Output
from pyspark.sql import DataFrame


@incremental(semantic_version=1)
@transform(
    # 输入数据集
    input_data=Input(""),
    daily_aggregate=Output("")
)
def compute(ctx, input_data, daily_aggregate):
    input_df = input_data.dataframe()
    latest_daily_agg = input_df.groupBy(F.col("group_by_field")).agg(F.count_distinct(F.col("unique_thing")).alias("sum_of_unique"))
    # 需要定义schema来加载之前的输出
    latest_daily_agg_schema = latest_daily_agg.schema

    if ctx.is_incremental:

        last_daily_agg = daily_aggregate.dataframe(mode='previous', schema=latest_daily_agg_schema)
        sum_daily = last_daily_agg.unionByName(latest_daily_agg).groupBy(F.col("group_by_field")).agg(F.sum(F.col("sum_of_unique")).alias("sum_of_unique"))
        daily_aggregate.set_mode('replace')
        daily_aggregate.write_dataframe(sum_daily)

    else:
        # 非增量模式 - 仅存储最新的每日聚合
        daily_aggregate.write_dataframe(latest_daily_agg)
  • 提交日期:2024-03-20
  • 标签:code authoringcode repositoriespythonincrementalaggregation

使用PySpark实现增量转换(Incremental transform)

如何在Palantir Foundry中使用PySpark实现增量转换?

以下代码演示了如何在PySpark转换中使用增量装饰器来处理增量计算。它为输入数据框添加了processed_at时间戳列和is_running_as_incremental列。

from transforms.api import transform_df, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F

# 应用增量装饰器
@incremental()
@transform_df(
    Output(""),
    input_df=Input("")
)
def example_transform_incremental_processing(ctx, input_df):
    # 转换的行为取决于输入/输出的状态
    # (详见原始代码片段中的情况1至3)
    #
    # 与快照处理相比,以下逻辑没有变化。
    # 该行为由Foundry自动处理。

    # 此处为示例处理
    input_df = input_df.withColumn('processed_at', F.current_timestamp())

    # 使用ctx.is_incremental来判断Foundry当前是以"快照"还是"增量"模式运行
    input_df = input_df.withColumn('is_running_as_incremental', F.lit(ctx.is_incremental))

    # 返回编辑后的数据框
    return input_df
  • 提交日期:2024-03-20
  • 标签:code authoringcode repositoriespythonincremental

快照输入到增量输出(Snapshot input to incremental output)

如何对完全重写的快照输入进行增量转换,仅处理之前未处理过的新行?

以下代码使用PySpark将输入数据集作为快照读取,并与之前的输出进行比较以找出新行。然后为新行添加时间戳,并将其追加到输出数据集中,从而实现增量转换。

from transforms.api import incremental, transform, Input, Output
from pyspark.sql import types as T, functions as F

# @incremental装饰器用于获取高级读/写模式
@incremental(
    snapshot_inputs=["input_dataset"]
)
# @transform用于更精细地控制输入和输出
@transform(
    output_dataset=Output("incremental_output"),
    input_dataset=Input("snapshot_input")
)
def example_transform_very_advanced_processing__snapshot_to_incremental(ctx, input_dataset, output_dataset):
    # 强制将输入数据框作为快照读取
    input_df_all_dataframe = input_dataset.dataframe(mode="current")

    # 读取当前输出,查看之前构建中已处理的内容
    out_schema = T.StructType([T.StructField("primary_key", T.StringType(), True),
                               T.StructField("other_column", T.IntegerType(), True), # 包含输出中存储的其他列
                               T.StructField("processed_at", T.TimestampType(), True)])
    output_df_previous_dataframe = output_dataset.dataframe('previous', out_schema)

    # ==== 此处为示例处理 ====
    # 将输入与当前输出进行差异比较,找出"新行"
    KEY = ["primary_key"]
    new_rows_df = input_df_all_dataframe.join(output_df_previous_dataframe, how="left_anti", on=KEY)
    # 添加时间戳以便于跟踪/调试/理解示例
    new_rows_df = new_rows_df.withColumn('processed_at', F.current_timestamp())

    # ==== 示例处理结束 ====

    # 这将追加行
    output_dataset.set_mode("modify")
    output_dataset.write_dataframe(new_rows_df)
  • 提交日期:2024-07-18
  • 标签:code authoringcode repositoriespythonincremental