跳转至

Transforms(转换(Transforms))

Python

Flatten hierarchical tree data

How do I flatten a hierarchical tree data structure into a flat table with parent-child relationships?

This code uses PySpark to transform a hierarchical tree data structure into a flat table with parent-child relationships. It creates a function to extract objects per level and generates unique primary keys for nodes and parents. The output dataframe contains columns for node_id, node_description, node_level, parent_id, parent_level, parents_path, node_pk, and parent_pk.

from transforms.api import transform_df, Input, Output, transform
from pyspark.sql import functions as F

# CONSTANTS
COL_ORDER = ["level1", "level2", "level3", "level4"]

COLS_DESCRIPTION = {
    "node_id": "Identifier of a node, non-unique",
    "node_description": "Human-readable identifier of the node, non-unique",
    "node_level": "Hierarchy level of a node",
    "parent_id": "Identifier of a node's parent,  non-unique",
    "parent_level": "Human-readable identifier of a node's parent, non-unique",
    "parents_path": "Array of parent_ids, from highest to closest parent.",
    "node_pk": "unique.",
    "parent_pk": "unique."
}


'''
Translate something of the format :
level1  | level2  | level3  | level4    | some_value
root    | folder1 | folder2 | file_name | file_content
into : 
node_id   | node_level | parent_id | parent_level | value
root      | level1     | null      | null         | null
folder1   | level2     | root      | level1       | null
folder2   | level3     | folder1   | level2       | null
file_name | level4     | folder2   | level3       | file_content
'''
def flatten_tree_data(tree_df, out):

    tree_df = tree_df.dataframe()

    # Function to extract object "per level".
    def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2",
                      parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"):

        # Filter down the columns.
        # Note : parent_id_col shall be included in parent_ids_cols
        # Using a set to remove potential duplicates if id columns and description columns are the same
        columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols]))
        out_df = df.select(columns_to_keep)

        #  DISTINCT to drop duplicates
        out_df = out_df.distinct()

        # Store values of the specific node
        out_df = out_df.withColumn("node_id", F.col(node_id_col))
        out_df = out_df.withColumn("node_level", F.lit(node_level))
        out_df = out_df.withColumn("node_description", F.col(node_description_col))

        # Handle top node that has no parent
        is_top_node = parent_id_col is None and parent_level is None
        if not is_top_node:
            # Store values for its parent
            out_df = out_df.withColumn("parent_id", F.col(parent_id_col))
            out_df = out_df.withColumn("parent_level", F.lit(parent_level))
        else:
            # TODO : remove logic in favor of allowMissingColumns=True / Spark 3 feature
            out_df = out_df.withColumn("parent_id", F.lit(None))
            out_df = out_df.withColumn("parent_level", F.lit(None))

        # Concat its parent ids to get "his path"
        out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols))

        # Cleanup before key generation
        out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path")

        # PKs are useful to "self-join"
        # Generate PK for node
        pk_cols = ["node_level", "node_id"]
        out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols))

        # Generate PK for parent
        pk_cols = ["parent_level", "parent_id"]
        out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols))

        # Generate Title column
        title_cols = ["node_level", "node_description", "node_id"]
        out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols))

        return out_df

    out_df = create_object(tree_df, "level4", "level4", "level4",
                           ["level1", "level2", "level3", "level4"], "level3", "level3")

    tmp_df = create_object(tree_df, "level3", "level3", "level3", 
                           ["level1", "level2", "level3"], "level2", "level2")
    out_df = out_df.unionByName(tmp_df)

    tmp_df = create_object(tree_df, "level2", "level2", "level2", 
                           ["level1", "level2"], "level1", "level1")
    out_df = out_df.unionByName(tmp_df)

    tmp_df = create_object(tree_df, "level1", "level1", "level1", 
                           [], None, None)
    out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True

    out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
  • Date submitted: 2024-03-26
  • Tags: code repositories, code authoring, python, graph, tree

Extract ancestors and descendants from graph dataset

How do I extract ancestors and descendants from a graph dataset using PySpark and NetworkX?

This code uses PySpark and NetworkX to prepare a graph dataset, create a directed graph, and extract the ancestors and descendants of each node in the graph.

from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T
import networkx as nx

GRAPH_SCHEMA = T.StructType([
    T.StructField("node_id", T.StringType()),
    T.StructField("descendants", T.ArrayType(T.StringType())),
    T.StructField("ancestors", T.ArrayType(T.StringType())),
])

# Step 1: Prepare the dataset
@transform_df(
    Output("prepared_graph_output"),
    graph_structured_dataset=Input("original_dataset_input")
)
def prepare_graph(graph_structured_dataset):
    vertices = get_vertices(graph_structured_dataset)
    edges = get_edges(graph_structured_dataset)

    df = vertices.unionByName(edges)

    return df

def get_vertices(df):
    df = (
        df
        .select(
            "node_id", # The ID of the node
            F.lit(None).cast(T.StringType()).alias("child"), # An empty "child" column so the output can be merged with the edges
            F.lit("vertex").alias("type"), # The type of this row (it represents a vertex)
            F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel
        )
        .dropDuplicates(["node_id"])
    )

    return df

def get_edges(df):
    df = (
        df
        .filter(F.col("parent_node_id").isNotNull())
        .select(
            F.col("parent_node_id").alias("node_id"), # The ID of the node
            F.col("node_id").alias("child_id"), # A reference to the child of this node
            F.lit("edge").alias("type"), # The type of this row (it represents an edge)
            F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel
        )
        .dropDuplicates(["node_id", "child_id"])
    )

    return df

# Step 2: Create the graph using networkx and extract the properties you need
@transform_df(
    Output("extracted_graph_properties"),
    prepared_graph=Input("prepared_graph_output"),
)
def extract_graph_properties(prepared_graph):

    out = (
        prepared_graph
        .groupby("_partition_column")
        .applyInPandas(
            myNetworkxUserDefinedFunction,
            schema=GRAPH_SCHEMA
        )
    )

    out = out.withColumn("ancestors",
        F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors"))
    )

    return out

def myNetworkxUserDefinedFunction(pandas_dataframe):

    vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"]
    edges = pandas_dataframe[pandas_dataframe["type"] == "edge"]

    df = vertices

    g = nx.DiGraph()
    g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False))

    def get_descendants(source):
        if not (edges['node_id'] == source).any():
            return None
        descendents = list(nx.bfs_tree(g, source))
        return descendents[1:]

    df["descendants"] = df["node_id"].apply(get_descendants)

    def get_ancestors(source):
        path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")]
        return path[1:]

    df["ancestors"] = df["node_id"].apply(get_ancestors)

    return df[["node_id", "ancestors", "descendants"]]
  • Date submitted: 2024-03-20
  • Tags: code authoring, code repositories, python, tree, graph, networkx

中文翻译

转换(Transforms)

Python

展平层级树状数据(Flatten hierarchical tree data)

如何将层级树状数据结构展平为包含父子关系的平面表格?

此代码使用PySpark将层级树状数据结构转换为包含父子关系的平面表格。它创建了一个函数来提取每一层的对象,并为节点和父节点生成唯一的主键。输出数据框包含以下列:node_idnode_descriptionnode_levelparent_idparent_levelparents_pathnode_pkparent_pk

from transforms.api import transform_df, Input, Output, transform
from pyspark.sql import functions as F

# 常量(CONSTANTS)
COL_ORDER = ["level1", "level2", "level3", "level4"]

COLS_DESCRIPTION = {
    "node_id": "节点的标识符,非唯一",
    "node_description": "节点的人类可读标识符,非唯一",
    "node_level": "节点的层级",
    "parent_id": "节点父级的标识符,非唯一",
    "parent_level": "节点父级的人类可读标识符,非唯一",
    "parents_path": "父级ID数组,从最高级到最近父级",
    "node_pk": "唯一",
    "parent_pk": "唯一"
}


'''
将如下格式的内容:
level1  | level2  | level3  | level4    | some_value
root    | folder1 | folder2 | file_name | file_content
转换为:
node_id   | node_level | parent_id | parent_level | value
root      | level1     | null      | null         | null
folder1   | level2     | root      | level1       | null
folder2   | level3     | folder1   | level2       | null
file_name | level4     | folder2   | level3       | file_content
'''
def flatten_tree_data(tree_df, out):

    tree_df = tree_df.dataframe()

    # 按层级提取对象的函数
    def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2",
                      parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"):

        # 筛选列
        # 注意:parent_id_col应包含在parent_ids_cols中
        # 使用集合去除ID列和描述列相同时可能产生的重复
        columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols]))
        out_df = df.select(columns_to_keep)

        # 使用DISTINCT去除重复项
        out_df = out_df.distinct()

        # 存储特定节点的值
        out_df = out_df.withColumn("node_id", F.col(node_id_col))
        out_df = out_df.withColumn("node_level", F.lit(node_level))
        out_df = out_df.withColumn("node_description", F.col(node_description_col))

        # 处理没有父级的顶层节点
        is_top_node = parent_id_col is None and parent_level is None
        if not is_top_node:
            # 存储其父级的值
            out_df = out_df.withColumn("parent_id", F.col(parent_id_col))
            out_df = out_df.withColumn("parent_level", F.lit(parent_level))
        else:
            # TODO:移除逻辑,改用allowMissingColumns=True / Spark 3功能
            out_df = out_df.withColumn("parent_id", F.lit(None))
            out_df = out_df.withColumn("parent_level", F.lit(None))

        # 拼接其父级ID以获取路径
        out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols))

        # 在生成键之前进行清理
        out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path")

        # 主键(PK)用于自连接
        # 为节点生成主键
        pk_cols = ["node_level", "node_id"]
        out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols))

        # 为父级生成主键
        pk_cols = ["parent_level", "parent_id"]
        out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols))

        # 生成标题列
        title_cols = ["node_level", "node_description", "node_id"]
        out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols))

        return out_df

    out_df = create_object(tree_df, "level4", "level4", "level4",
                           ["level1", "level2", "level3", "level4"], "level3", "level3")

    tmp_df = create_object(tree_df, "level3", "level3", "level3", 
                           ["level1", "level2", "level3"], "level2", "level2")
    out_df = out_df.unionByName(tmp_df)

    tmp_df = create_object(tree_df, "level2", "level2", "level2", 
                           ["level1", "level2"], "level1", "level1")
    out_df = out_df.unionByName(tmp_df)

    tmp_df = create_object(tree_df, "level1", "level1", "level1", 
                           [], None, None)
    out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True

    out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
  • 提交日期:2024-03-26
  • 标签:代码仓库(code repositories)代码编写(code authoring)python图(graph)树(tree)

从图数据集中提取祖先和后代(Extract ancestors and descendants from graph dataset)

如何使用PySpark和NetworkX从图数据集中提取祖先和后代?

此代码使用PySpark和NetworkX准备图数据集、创建有向图,并提取图中每个节点的祖先和后代。

from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T
import networkx as nx

GRAPH_SCHEMA = T.StructType([
    T.StructField("node_id", T.StringType()),
    T.StructField("descendants", T.ArrayType(T.StringType())),
    T.StructField("ancestors", T.ArrayType(T.StringType())),
])

# 步骤1:准备数据集
@transform_df(
    Output("prepared_graph_output"),
    graph_structured_dataset=Input("original_dataset_input")
)
def prepare_graph(graph_structured_dataset):
    vertices = get_vertices(graph_structured_dataset)
    edges = get_edges(graph_structured_dataset)

    df = vertices.unionByName(edges)

    return df

def get_vertices(df):
    df = (
        df
        .select(
            "node_id", # 节点的ID
            F.lit(None).cast(T.StringType()).alias("child"), # 空的"child"列,以便输出可以与边合并
            F.lit("vertex").alias("type"), # 此行类型(表示顶点)
            F.col("_partition_column"), # 节点可分区属性,以便计算可以并行运行
        )
        .dropDuplicates(["node_id"])
    )

    return df

def get_edges(df):
    df = (
        df
        .filter(F.col("parent_node_id").isNotNull())
        .select(
            F.col("parent_node_id").alias("node_id"), # 节点的ID
            F.col("node_id").alias("child_id"), # 对此节点子级的引用
            F.lit("edge").alias("type"), # 此行类型(表示边)
            F.col("_partition_column"), # 节点可分区属性,以便计算可以并行运行
        )
        .dropDuplicates(["node_id", "child_id"])
    )

    return df

# 步骤2:使用networkx创建图并提取所需属性
@transform_df(
    Output("extracted_graph_properties"),
    prepared_graph=Input("prepared_graph_output"),
)
def extract_graph_properties(prepared_graph):

    out = (
        prepared_graph
        .groupby("_partition_column")
        .applyInPandas(
            myNetworkxUserDefinedFunction,
            schema=GRAPH_SCHEMA
        )
    )

    out = out.withColumn("ancestors",
        F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors"))
    )

    return out

def myNetworkxUserDefinedFunction(pandas_dataframe):

    vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"]
    edges = pandas_dataframe[pandas_dataframe["type"] == "edge"]

    df = vertices

    g = nx.DiGraph()
    g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False))

    def get_descendants(source):
        if not (edges['node_id'] == source).any():
            return None
        descendents = list(nx.bfs_tree(g, source))
        return descendents[1:]

    df["descendants"] = df["node_id"].apply(get_descendants)

    def get_ancestors(source):
        path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")]
        return path[1:]

    df["ancestors"] = df["node_id"].apply(get_ancestors)

    return df[["node_id", "ancestors", "descendants"]]
  • 提交日期:2024-03-20
  • 标签:代码编写(code authoring)代码仓库(code repositories)python树(tree)图(graph)networkx