跳转至

Transforms(转换(Transforms))

Python

Abort building dataset if the input dataset is empty

How can I prevent or abort a transform from running if the input or source dataset is empty?

This code uses a PySpark transform to check if the input DataFrame is empty before writing it to the output location. If the DataFrame is empty, the transaction is aborted, preventing the writing of empty DataFrames.

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output


@transform(
    out=Output("/Palantir/output_location/datasets/not_process_empty_files"),
    source_df=Input("/Palantir/input_location/sometimes_empty"),
)
def compute(source_df, out):
    source_df = source_df.dataframe()

    # Checking if there is at least one row to write
    if len(source_df.head(1)) == 0:
        # No row to write, abort the transaction
        out.abort()
    else:
        # Write, hence append.
        out.write_dataframe(source_df)
  • Date submitted: 2024-03-26
  • Tags: code authoring, code repositories, python, abort

Generate empty DataFrame

How do I create a dataframe inside a PySpark transform and write it to the output.

This code defines a function to create an empty DataFrame with a single row containing a dummy key and a timestamp column. It then writes the DataFrame to an output dataset path using a transform decorator.

from pyspark.sql import types as T
from pyspark.sql import functions as F

# Generation of empty dataframe, that just appends the current timestamp
def get_empty_df(ctx):
    # Define the schema for the DataFrame
    schema = T.StructType([T.StructField("key", T.StringType(), True)])

    # Create a DataFrame with a single row containing a dummy key
    df = ctx.spark_session.createDataFrame([("dummy_key",)], schema)

    # Add a new column 'when' with the current timestamp
    df = df.withColumn('when', F.current_timestamp())

    return df

@transform(
    out=Output("output_dataset_path")
)
def out_1(ctx, out):
    # Create the empty dataframe
    df = get_empty_df(ctx)

    # Write the dataframe to the output
    out.write_dataframe(df)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python

Fuzzy matching of entity names using phonetic codes

How do I perform fuzzy matching of entity names using phonetic codes in PySpark?

This code uses PySpark to clean entity names, generate phonetic codes, and perform fuzzy matching of entity names using the Jaro similarity metric. It is useful for matching similar entity names in two datasets.

from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform_df, Input, Output
import re
import jellyfish


def _add_phonetic_codes(df):
    # Generate phonetic codes for each part of the name
    df = df.withColumn(
        "name_part", F.split("cleaned_name", " ")
    ).withColumn(
        "name_part", F.explode("name_part")
    ).withColumn(
        "phonetic_code", F.soundex("name_part")
    ).drop("name_part")
    return df


@transform_df(
    Output(),
    entities2=Input(),
    entities1=Input(),
)
def compute(sanctions, entities):

    # Set up UDF for cleaning text
    def clean_text(text):
        cleaned_text = re.sub(r" +", " ", re.sub(r"[./-]+", "", text)).lower()
        return cleaned_text

    clean_text_udf = F.udf(clean_text, T.StringType())

    # Clean entity name
    entities2 = entities2.withColumn("cleaned_name", clean_text_udf(F.col("name")))
    entities1 = entities1.withColumn("cleaned_name", clean_text_udf(F.col("entity_name")))

    # Add phonetic codes
    entities2 = _add_phonetic_codes(entities2)
    entities1 = _add_phonetic_codes(entities1)

    # Fuzzy join
    matched_entities = entities1.join(
        entities2, on=["phonetic_code"], how="inner"
    ).select(
        entities1.cleaned_name.alias("cleaned_name1"), entities1.id.alias("entity_id1")
        entities2.cleaned_name.alias("cleaned_name2"), entities2.id.alias("entity_id2")
    ).drop("phonetic_code")
    matched_entities = matched_entities.dropDuplicates()

    # Set up UDF for string comparison
    @F.udf()
    def jaro_compare(name1, name2):
        return jellyfish.jaro_similarity(name1, name2)

    # Fuzzy matching
    matched_entities = matched_entities.withColumn(
        "match_score", jaro_compare("cleaned_name1", "cleaned_name2")
    )
    matched_entities = matched_entities.filter(entities.match_score > 0.75)
    matched_entities = matched_entities.select("entity_id1", "entity_id2")
    return matched_entities
  • Date submitted: 2024-05-23
  • Tags: pyspark, fuzzy matching, phonetic codes, jaro similarity

Load ORC file using PySpark

How do I load an ORC file using PySpark?

This code reads a raw ORC file from a the Hadoop path of an input dataset and writes the resulting spark dataframe to an output.

from transforms.api import transform, Input, Output


@transform(
    out=Output("output"),
    raw=Input("input"),
)
def compute(ctx, out, raw):
    hadoop_path = raw.filesystem().hadoop_path
    df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/')
    out.write_dataframe(df)
  • Date submitted: 2024-07-18
  • Tags: pyspark, dataframe, orc, hadoop

Transforms with multiple inputs and outputs

How do I create a multi-input, multi-output transform in Foundry?

This code demonstrates how to create a PySpark transform that takes multiple input datasets and produces multiple output datasets. It uses the @transform decorator and explicitly names the inputs and outputs. The transform reads the input dataframes, processes them, and writes the results to the specified output datasets.

from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F

# @incremental decorator or not (compatible with both)
# Changed @transform_df for @transform
# This gives more control over inputs and outputs
# It also requires to explicitly name outputs
@transform(
    output_dataset_1=Output("3_multi_output_1"),
    output_dataset_2=Output("3_multi_output_2"),
    input_dataset_1=Input("fake_dataset"),
    input_dataset_2=Input("fake_dataset_2")
)
def example_transform_multi_inputs_outputs(input_dataset_1, input_dataset_2, output_dataset_1, output_dataset_2):
    # Inputs can be read
    input_df_1 = input_dataset_1.dataframe()
    input_df_2 = input_dataset_2.dataframe()

    # Example processing here
    only_data_from_1 = input_df_1.withColumn('processed_at', F.current_timestamp())
    unioned_version = input_df_1.unionByName(input_df_2).withColumn('processed_at', F.current_timestamp())

    # Instead of returning the modified dataframe, you need to explicitly call "write_dataframe"
    # on the output of your choice with the dataframe (to write) of your choice.
    output_dataset_1.write_dataframe(only_data_from_1)
    output_dataset_2.write_dataframe(unioned_version)
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python

中文翻译


转换(Transforms)

Python

若输入数据集为空则中止构建数据集

如何防止或中止转换在输入或源数据集为空时运行?

以下代码使用 PySpark 转换(transform)在将输入 DataFrame 写入输出位置前检查其是否为空。若 DataFrame 为空,则中止事务,从而避免写入空 DataFrame。

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output


@transform(
    out=Output("/Palantir/output_location/datasets/not_process_empty_files"),
    source_df=Input("/Palantir/input_location/sometimes_empty"),
)
def compute(source_df, out):
    source_df = source_df.dataframe()

    # 检查是否存在至少一行数据可写入
    if len(source_df.head(1)) == 0:
        # 无数据可写入,中止事务
        out.abort()
    else:
        # 写入数据(追加模式)
        out.write_dataframe(source_df)
  • 提交日期:2024-03-26
  • 标签:代码编写代码仓库python中止

生成空 DataFrame

如何在 PySpark 转换中创建 DataFrame 并将其写入输出?

以下代码定义了一个函数,用于创建包含单行虚拟键和时间戳列的空 DataFrame,并通过转换装饰器(transform decorator)将其写入输出数据集路径。

from pyspark.sql import types as T
from pyspark.sql import functions as F

# 生成仅包含当前时间戳的空 DataFrame
def get_empty_df(ctx):
    # 定义 DataFrame 的 schema
    schema = T.StructType([T.StructField("key", T.StringType(), True)])

    # 创建包含单行虚拟键的 DataFrame
    df = ctx.spark_session.createDataFrame([("dummy_key",)], schema)

    # 添加包含当前时间戳的 'when' 列
    df = df.withColumn('when', F.current_timestamp())

    return df

@transform(
    out=Output("output_dataset_path")
)
def out_1(ctx, out):
    # 创建空 DataFrame
    df = get_empty_df(ctx)

    # 将 DataFrame 写入输出
    out.write_dataframe(df)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库python

使用语音编码进行实体名称模糊匹配

如何在 PySpark 中使用语音编码进行实体名称的模糊匹配?

以下代码使用 PySpark 清洗实体名称、生成语音编码,并通过 Jaro 相似度指标进行实体名称的模糊匹配。该方法适用于匹配两个数据集中的相似实体名称。

from pyspark.sql import functions as F
from pyspark.sql import types as T
from transforms.api import transform_df, Input, Output
import re
import jellyfish


def _add_phonetic_codes(df):
    # 为名称的每个部分生成语音编码
    df = df.withColumn(
        "name_part", F.split("cleaned_name", " ")
    ).withColumn(
        "name_part", F.explode("name_part")
    ).withColumn(
        "phonetic_code", F.soundex("name_part")
    ).drop("name_part")
    return df


@transform_df(
    Output(),
    entities2=Input(),
    entities1=Input(),
)
def compute(sanctions, entities):

    # 设置文本清洗的 UDF
    def clean_text(text):
        cleaned_text = re.sub(r" +", " ", re.sub(r"[./-]+", "", text)).lower()
        return cleaned_text

    clean_text_udf = F.udf(clean_text, T.StringType())

    # 清洗实体名称
    entities2 = entities2.withColumn("cleaned_name", clean_text_udf(F.col("name")))
    entities1 = entities1.withColumn("cleaned_name", clean_text_udf(F.col("entity_name")))

    # 添加语音编码
    entities2 = _add_phonetic_codes(entities2)
    entities1 = _add_phonetic_codes(entities1)

    # 模糊连接
    matched_entities = entities1.join(
        entities2, on=["phonetic_code"], how="inner"
    ).select(
        entities1.cleaned_name.alias("cleaned_name1"), entities1.id.alias("entity_id1"),
        entities2.cleaned_name.alias("cleaned_name2"), entities2.id.alias("entity_id2")
    ).drop("phonetic_code")
    matched_entities = matched_entities.dropDuplicates()

    # 设置字符串比较的 UDF
    @F.udf()
    def jaro_compare(name1, name2):
        return jellyfish.jaro_similarity(name1, name2)

    # 模糊匹配
    matched_entities = matched_entities.withColumn(
        "match_score", jaro_compare("cleaned_name1", "cleaned_name2")
    )
    matched_entities = matched_entities.filter(matched_entities.match_score > 0.75)
    matched_entities = matched_entities.select("entity_id1", "entity_id2")
    return matched_entities
  • 提交日期:2024-05-23
  • 标签:pyspark模糊匹配语音编码jaro 相似度

使用 PySpark 加载 ORC 文件

如何使用 PySpark 加载 ORC 文件?

以下代码从输入数据集的 Hadoop 路径读取原始 ORC 文件,并将生成的 Spark DataFrame 写入输出。

from transforms.api import transform, Input, Output


@transform(
    out=Output("output"),
    raw=Input("input"),
)
def compute(ctx, out, raw):
    hadoop_path = raw.filesystem().hadoop_path
    df = ctx.spark_session.read.format('orc').load(f'{hadoop_path}/')
    out.write_dataframe(df)
  • 提交日期:2024-07-18
  • 标签:pysparkdataframeorchadoop

多输入多输出转换

如何在 Foundry 中创建多输入多输出的转换?

以下代码演示了如何创建一个接收多个输入数据集并生成多个输出数据集的 PySpark 转换。它使用 @transform 装饰器,并显式命名输入和输出。该转换读取输入 DataFrame,进行处理,然后将结果写入指定的输出数据集。

from transforms.api import transform, Input, Output, incremental
from pyspark.sql import types as T
from pyspark.sql import functions as F

# 可选用 @incremental 装饰器(两者兼容)
# 将 @transform_df 替换为 @transform
# 这样可以更灵活地控制输入和输出
# 同时需要显式命名输出
@transform(
    output_dataset_1=Output("3_multi_output_1"),
    output_dataset_2=Output("3_multi_output_2"),
    input_dataset_1=Input("fake_dataset"),
    input_dataset_2=Input("fake_dataset_2")
)
def example_transform_multi_inputs_outputs(input_dataset_1, input_dataset_2, output_dataset_1, output_dataset_2):
    # 读取输入
    input_df_1 = input_dataset_1.dataframe()
    input_df_2 = input_dataset_2.dataframe()

    # 示例处理逻辑
    only_data_from_1 = input_df_1.withColumn('processed_at', F.current_timestamp())
    unioned_version = input_df_1.unionByName(input_df_2).withColumn('processed_at', F.current_timestamp())

    # 不再通过 return 返回修改后的 DataFrame,而是需要显式调用 "write_dataframe"
    # 在选定的输出上写入所需的 DataFrame。
    output_dataset_1.write_dataframe(only_data_from_1)
    output_dataset_2.write_dataframe(unioned_version)
  • 提交日期:2024-03-20
  • 标签:代码编写代码仓库python