Transforms(转换(Transforms))¶
Python¶
Flatten hierarchical tree data¶
How do I flatten a hierarchical tree data structure into a flat table with parent-child relationships?
This code uses PySpark to transform a hierarchical tree data structure into a flat table with parent-child relationships. It creates a function to extract objects per level and generates unique primary keys for nodes and parents. The output dataframe contains columns for node_id, node_description, node_level, parent_id, parent_level, parents_path, node_pk, and parent_pk.
from transforms.api import transform_df, Input, Output, transform
from pyspark.sql import functions as F
# CONSTANTS
COL_ORDER = ["level1", "level2", "level3", "level4"]
COLS_DESCRIPTION = {
"node_id": "Identifier of a node, non-unique",
"node_description": "Human-readable identifier of the node, non-unique",
"node_level": "Hierarchy level of a node",
"parent_id": "Identifier of a node's parent, non-unique",
"parent_level": "Human-readable identifier of a node's parent, non-unique",
"parents_path": "Array of parent_ids, from highest to closest parent.",
"node_pk": "unique.",
"parent_pk": "unique."
}
'''
Translate something of the format :
level1 | level2 | level3 | level4 | some_value
root | folder1 | folder2 | file_name | file_content
into :
node_id | node_level | parent_id | parent_level | value
root | level1 | null | null | null
folder1 | level2 | root | level1 | null
folder2 | level3 | folder1 | level2 | null
file_name | level4 | folder2 | level3 | file_content
'''
def flatten_tree_data(tree_df, out):
tree_df = tree_df.dataframe()
# Function to extract object "per level".
def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2",
parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"):
# Filter down the columns.
# Note : parent_id_col shall be included in parent_ids_cols
# Using a set to remove potential duplicates if id columns and description columns are the same
columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols]))
out_df = df.select(columns_to_keep)
# DISTINCT to drop duplicates
out_df = out_df.distinct()
# Store values of the specific node
out_df = out_df.withColumn("node_id", F.col(node_id_col))
out_df = out_df.withColumn("node_level", F.lit(node_level))
out_df = out_df.withColumn("node_description", F.col(node_description_col))
# Handle top node that has no parent
is_top_node = parent_id_col is None and parent_level is None
if not is_top_node:
# Store values for its parent
out_df = out_df.withColumn("parent_id", F.col(parent_id_col))
out_df = out_df.withColumn("parent_level", F.lit(parent_level))
else:
# TODO : remove logic in favor of allowMissingColumns=True / Spark 3 feature
out_df = out_df.withColumn("parent_id", F.lit(None))
out_df = out_df.withColumn("parent_level", F.lit(None))
# Concat its parent ids to get "his path"
out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols))
# Cleanup before key generation
out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path")
# PKs are useful to "self-join"
# Generate PK for node
pk_cols = ["node_level", "node_id"]
out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols))
# Generate PK for parent
pk_cols = ["parent_level", "parent_id"]
out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols))
# Generate Title column
title_cols = ["node_level", "node_description", "node_id"]
out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols))
return out_df
out_df = create_object(tree_df, "level4", "level4", "level4",
["level1", "level2", "level3", "level4"], "level3", "level3")
tmp_df = create_object(tree_df, "level3", "level3", "level3",
["level1", "level2", "level3"], "level2", "level2")
out_df = out_df.unionByName(tmp_df)
tmp_df = create_object(tree_df, "level2", "level2", "level2",
["level1", "level2"], "level1", "level1")
out_df = out_df.unionByName(tmp_df)
tmp_df = create_object(tree_df, "level1", "level1", "level1",
[], None, None)
out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True
out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
- Date submitted: 2024-03-26
- Tags:
code repositories,code authoring,python,graph,tree
Extract ancestors and descendants from graph dataset¶
How do I extract ancestors and descendants from a graph dataset using PySpark and NetworkX?
This code uses PySpark and NetworkX to prepare a graph dataset, create a directed graph, and extract the ancestors and descendants of each node in the graph.
from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T
import networkx as nx
GRAPH_SCHEMA = T.StructType([
T.StructField("node_id", T.StringType()),
T.StructField("descendants", T.ArrayType(T.StringType())),
T.StructField("ancestors", T.ArrayType(T.StringType())),
])
# Step 1: Prepare the dataset
@transform_df(
Output("prepared_graph_output"),
graph_structured_dataset=Input("original_dataset_input")
)
def prepare_graph(graph_structured_dataset):
vertices = get_vertices(graph_structured_dataset)
edges = get_edges(graph_structured_dataset)
df = vertices.unionByName(edges)
return df
def get_vertices(df):
df = (
df
.select(
"node_id", # The ID of the node
F.lit(None).cast(T.StringType()).alias("child"), # An empty "child" column so the output can be merged with the edges
F.lit("vertex").alias("type"), # The type of this row (it represents a vertex)
F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel
)
.dropDuplicates(["node_id"])
)
return df
def get_edges(df):
df = (
df
.filter(F.col("parent_node_id").isNotNull())
.select(
F.col("parent_node_id").alias("node_id"), # The ID of the node
F.col("node_id").alias("child_id"), # A reference to the child of this node
F.lit("edge").alias("type"), # The type of this row (it represents an edge)
F.col("_partition_column"), # The property on which nodes can be partitioned so the computation can run in parallel
)
.dropDuplicates(["node_id", "child_id"])
)
return df
# Step 2: Create the graph using networkx and extract the properties you need
@transform_df(
Output("extracted_graph_properties"),
prepared_graph=Input("prepared_graph_output"),
)
def extract_graph_properties(prepared_graph):
out = (
prepared_graph
.groupby("_partition_column")
.applyInPandas(
myNetworkxUserDefinedFunction,
schema=GRAPH_SCHEMA
)
)
out = out.withColumn("ancestors",
F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors"))
)
return out
def myNetworkxUserDefinedFunction(pandas_dataframe):
vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"]
edges = pandas_dataframe[pandas_dataframe["type"] == "edge"]
df = vertices
g = nx.DiGraph()
g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False))
def get_descendants(source):
if not (edges['node_id'] == source).any():
return None
descendents = list(nx.bfs_tree(g, source))
return descendents[1:]
df["descendants"] = df["node_id"].apply(get_descendants)
def get_ancestors(source):
path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")]
return path[1:]
df["ancestors"] = df["node_id"].apply(get_ancestors)
return df[["node_id", "ancestors", "descendants"]]
- Date submitted: 2024-03-20
- Tags:
code authoring,code repositories,python,tree,graph,networkx
中文翻译¶
转换(Transforms)¶
Python¶
展平层级树状数据(Flatten hierarchical tree data)¶
如何将层级树状数据结构展平为包含父子关系的平面表格?
此代码使用PySpark将层级树状数据结构转换为包含父子关系的平面表格。它创建了一个函数来提取每一层的对象,并为节点和父节点生成唯一的主键。输出数据框包含以下列:node_id、node_description、node_level、parent_id、parent_level、parents_path、node_pk和parent_pk。
from transforms.api import transform_df, Input, Output, transform
from pyspark.sql import functions as F
# 常量(CONSTANTS)
COL_ORDER = ["level1", "level2", "level3", "level4"]
COLS_DESCRIPTION = {
"node_id": "节点的标识符,非唯一",
"node_description": "节点的人类可读标识符,非唯一",
"node_level": "节点的层级",
"parent_id": "节点父级的标识符,非唯一",
"parent_level": "节点父级的人类可读标识符,非唯一",
"parents_path": "父级ID数组,从最高级到最近父级",
"node_pk": "唯一",
"parent_pk": "唯一"
}
'''
将如下格式的内容:
level1 | level2 | level3 | level4 | some_value
root | folder1 | folder2 | file_name | file_content
转换为:
node_id | node_level | parent_id | parent_level | value
root | level1 | null | null | null
folder1 | level2 | root | level1 | null
folder2 | level3 | folder1 | level2 | null
file_name | level4 | folder2 | level3 | file_content
'''
def flatten_tree_data(tree_df, out):
tree_df = tree_df.dataframe()
# 按层级提取对象的函数
def create_object(df, node_id_col="level2", node_description_col="level2", node_level="level2",
parent_ids_cols=["level1", "...", "level3"], parent_id_col="level3", parent_level="level3"):
# 筛选列
# 注意:parent_id_col应包含在parent_ids_cols中
# 使用集合去除ID列和描述列相同时可能产生的重复
columns_to_keep = list(set([node_id_col, node_description_col, *parent_ids_cols]))
out_df = df.select(columns_to_keep)
# 使用DISTINCT去除重复项
out_df = out_df.distinct()
# 存储特定节点的值
out_df = out_df.withColumn("node_id", F.col(node_id_col))
out_df = out_df.withColumn("node_level", F.lit(node_level))
out_df = out_df.withColumn("node_description", F.col(node_description_col))
# 处理没有父级的顶层节点
is_top_node = parent_id_col is None and parent_level is None
if not is_top_node:
# 存储其父级的值
out_df = out_df.withColumn("parent_id", F.col(parent_id_col))
out_df = out_df.withColumn("parent_level", F.lit(parent_level))
else:
# TODO:移除逻辑,改用allowMissingColumns=True / Spark 3功能
out_df = out_df.withColumn("parent_id", F.lit(None))
out_df = out_df.withColumn("parent_level", F.lit(None))
# 拼接其父级ID以获取路径
out_df = out_df.withColumn("parents_path", F.array(*parent_ids_cols))
# 在生成键之前进行清理
out_df = out_df.select("node_id", "node_description", "node_level", "parent_id", "parent_level", "parents_path")
# 主键(PK)用于自连接
# 为节点生成主键
pk_cols = ["node_level", "node_id"]
out_df = out_df.withColumn("node_pk", F.concat_ws("__", *pk_cols))
# 为父级生成主键
pk_cols = ["parent_level", "parent_id"]
out_df = out_df.withColumn("parent_pk", F.concat_ws("__", *pk_cols))
# 生成标题列
title_cols = ["node_level", "node_description", "node_id"]
out_df = out_df.withColumn("title", F.concat_ws(" - ", *title_cols))
return out_df
out_df = create_object(tree_df, "level4", "level4", "level4",
["level1", "level2", "level3", "level4"], "level3", "level3")
tmp_df = create_object(tree_df, "level3", "level3", "level3",
["level1", "level2", "level3"], "level2", "level2")
out_df = out_df.unionByName(tmp_df)
tmp_df = create_object(tree_df, "level2", "level2", "level2",
["level1", "level2"], "level1", "level1")
out_df = out_df.unionByName(tmp_df)
tmp_df = create_object(tree_df, "level1", "level1", "level1",
[], None, None)
out_df = out_df.unionByName(tmp_df) # TODO SPARK 3 : , allowMissingColumns=True
out.write_dataframe(out_df, column_descriptions=COLS_DESCRIPTION)
- 提交日期:2024-03-26
- 标签:
代码仓库(code repositories)、代码编写(code authoring)、python、图(graph)、树(tree)
从图数据集中提取祖先和后代(Extract ancestors and descendants from graph dataset)¶
如何使用PySpark和NetworkX从图数据集中提取祖先和后代?
此代码使用PySpark和NetworkX准备图数据集、创建有向图,并提取图中每个节点的祖先和后代。
from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T
import networkx as nx
GRAPH_SCHEMA = T.StructType([
T.StructField("node_id", T.StringType()),
T.StructField("descendants", T.ArrayType(T.StringType())),
T.StructField("ancestors", T.ArrayType(T.StringType())),
])
# 步骤1:准备数据集
@transform_df(
Output("prepared_graph_output"),
graph_structured_dataset=Input("original_dataset_input")
)
def prepare_graph(graph_structured_dataset):
vertices = get_vertices(graph_structured_dataset)
edges = get_edges(graph_structured_dataset)
df = vertices.unionByName(edges)
return df
def get_vertices(df):
df = (
df
.select(
"node_id", # 节点的ID
F.lit(None).cast(T.StringType()).alias("child"), # 空的"child"列,以便输出可以与边合并
F.lit("vertex").alias("type"), # 此行类型(表示顶点)
F.col("_partition_column"), # 节点可分区属性,以便计算可以并行运行
)
.dropDuplicates(["node_id"])
)
return df
def get_edges(df):
df = (
df
.filter(F.col("parent_node_id").isNotNull())
.select(
F.col("parent_node_id").alias("node_id"), # 节点的ID
F.col("node_id").alias("child_id"), # 对此节点子级的引用
F.lit("edge").alias("type"), # 此行类型(表示边)
F.col("_partition_column"), # 节点可分区属性,以便计算可以并行运行
)
.dropDuplicates(["node_id", "child_id"])
)
return df
# 步骤2:使用networkx创建图并提取所需属性
@transform_df(
Output("extracted_graph_properties"),
prepared_graph=Input("prepared_graph_output"),
)
def extract_graph_properties(prepared_graph):
out = (
prepared_graph
.groupby("_partition_column")
.applyInPandas(
myNetworkxUserDefinedFunction,
schema=GRAPH_SCHEMA
)
)
out = out.withColumn("ancestors",
F.when(F.size(F.col("ancestors")) == 0, F.lit(None)).otherwise(F.col("ancestors"))
)
return out
def myNetworkxUserDefinedFunction(pandas_dataframe):
vertices = pandas_dataframe[pandas_dataframe["type"] == "vertex"]
edges = pandas_dataframe[pandas_dataframe["type"] == "edge"]
df = vertices
g = nx.DiGraph()
g.add_edges_from(edges[['node_id', 'child_id']].to_records(index=False))
def get_descendants(source):
if not (edges['node_id'] == source).any():
return None
descendents = list(nx.bfs_tree(g, source))
return descendents[1:]
df["descendants"] = df["node_id"].apply(get_descendants)
def get_ancestors(source):
path = [source] + [parent for parent, child, _ in nx.edge_dfs(g, source=source, orientation="reverse")]
return path[1:]
df["ancestors"] = df["node_id"].apply(get_ancestors)
return df[["node_id", "ancestors", "descendants"]]
- 提交日期:2024-03-20
- 标签:
代码编写(code authoring)、代码仓库(code repositories)、python、树(tree)、图(graph)、networkx