#!/usr/bin/env python # coding: utf-8 # # 编写自定义图分析算法 # # Install graphscope package if you are NOT in the Playground # # !pip3 install graphscope # GraphScope 的图分析引擎继承了 [GRAPE](https://dl.acm.org/doi/10.1145/3282488) , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。 # # 与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。 # # 在这个教程中,我们将展示如何进行自定义基于 PIE 模型或者 Pregel 模型的图分析算法。 # In[ ]: # Install graphscope package if you are NOT in the Playground get_ipython().system('pip3 install graphscope') # ## 自定义基于 PIE 模型的图算法 # GraphScope 支持用户使用纯 Python 语言,以 [PIE](https://dl.acm.org/doi/10.1145/3282488) 编程模型写图算法。首先我们需要导入 GraphScope 的包和 PIE 装饰器。 # In[ ]: # Import the graphscope module. import graphscope from graphscope.framework.app import AppAssets from graphscope.analytical.udf.decorators import pie graphscope.set_option(show_log=False) # enable logging # 以单源最短路径([SSSP](https://en.wikipedia.org/wiki/Shortest_path_problem))为例,编写 PIE 算法需要填写以下几个函数。 # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): pass @staticmethod def PEval(frag, context): pass @staticmethod def IncEval(frag, context): pass # 装饰器 **pie** 包含两个参数 `vd_type` 和 `md_type`, 他们分别代表节点上的数据类型和消息的数据类型。 # # 您可以为您的算法指定这两个数据类型,可用的数据类型包括 `int`,`double` 和 `string`。 # 在我们这个示例中,由于 SSSP 计算的距离和发送的消息更新均是 `double` 类型,所以我们为这两个类型都指定为 `double`。 # # 在函数 `Init`,`PEval` 和 `IncEval`中,都有 **frag** 和 **context** 这两个参数。它们分别用于在算法逻辑中访问图数据和中间结果数据。可以查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。 # # ### 定义 Init 函数 # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): pass @staticmethod def IncEval(frag, context): pass # `Init` 函数有以下几个职责:1)为每个节点设置初始值; 2)定义消息传递的策略; 3)指定聚合器,以在每一轮中处理收到的消息。 # # 请注意,您定义的算法将在属性图上运行。所以我们应该首先通过 `v_label_num = frag.vertex_label_num()` 获得顶点标签,然后可以遍历所有具有相同标签的节点, # 并通过 `nodes = frag.nodes(v_label_id)` 和 `context.init_value(nodes, v_label_id, 1000000000.0,PIEAggregateType.kMinAggregate)` 设置初始值。 # # 由于我们正在计算源节点和其他节点之间的最短路径,在这里我们将 `PIEAggregateType.kMinAggregate` 用作消息聚合的聚合器。这意味着它将对所有收到的消息执行去 Min 操作。其他可用的聚合器包括 `kMaxAggregate`,`kSumAggregate`,`kProductAggregate` 和 `kOverwriteAggregate`。 # # 在 `Init` 函数的最后,我们向节点注册 `MessageStrategy.kSyncOnOuterVertex` 指定如何传递消息。 # ### 定义 PEval 函数 # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): src = int(context.get_config(b"src")) graphscope.declare(graphscope.Vertex, source) native_source = False v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): if frag.get_inner_node(v_label_id, src, source): native_source = True break if native_source: context.set_node_value(source, 0) else: return e_label_num = frag.edge_label_num() for e_label_id in range(e_label_num): edges = frag.get_outgoing_edges(source, e_label_id) for e in edges: dst = e.neighbor() distv = e.get_int(2) if context.get_node_value(dst) > distv: context.set_node_value(dst, distv) @staticmethod def IncEval(frag, context): pass # 在 **SSSP** 的 `PEval` 函数中, 通过函数 `context.get_config(b"src")` 获取用户查询的 src 节点。 # # `PEval` 会在每个分区上调用 `frag.get_inner_node(v_label_id, src, source)` 的方法查询 src 节点是否在本分区。请注意这里函数 `get_inner_node` method needs a `source` 需要一个类型为 `Vertex` 的参数, 我们通过 `graphscope.declare(graphscope.Vertex, source)` 来定义声明。 # # 如果一个分区包含查询的源节点,它将调用 `frag.get_outgoing_edges(source,e_label_id)` 遍历源节点的所有输出边。 对于每个顶点,它计算到源的距离。如果该值小于初始值,则更新该值。 # ### 定义 IncEval 函数 # # **SSSP** 算法的 `IncEval` 和 `PEval` 之间唯一区别是 `IncEval` 将会在每个分区上都被调用。计算完 `IncEval` 后,将会产生一些消息,这些消息将被发送到其他分区供下一轮`IncEval` 调用。一直到所有分区都不再产生消息,此时算法结束。 # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): src = int(context.get_config(b"src")) graphscope.declare(graphscope.Vertex, source) native_source = False v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): if frag.get_inner_node(v_label_id, src, source): native_source = True break if native_source: context.set_node_value(source, 0) else: return e_label_num = frag.edge_label_num() for e_label_id in range(e_label_num): edges = frag.get_outgoing_edges(source, e_label_id) for e in edges: dst = e.neighbor() distv = e.get_int(2) if context.get_node_value(dst) > distv: context.set_node_value(dst, distv) @staticmethod def IncEval(frag, context): v_label_num = frag.vertex_label_num() e_label_num = frag.edge_label_num() for v_label_id in range(v_label_num): iv = frag.inner_nodes(v_label_id) for v in iv: v_dist = context.get_node_value(v) for e_label_id in range(e_label_num): es = frag.get_outgoing_edges(v, e_label_id) for e in es: u = e.neighbor() u_dist = v_dist + e.get_int(2) if context.get_node_value(u) > u_dist: context.set_node_value(u, u_dist) # ### 在图上调用您的算法 # In[ ]: # Load p2p network dataset from graphscope.dataset import load_p2p_network graph = load_p2p_network(directed=False, generate_eid=False) # 然后,初始化刚才定义的算法,并查询起始点为 `6` 的最短路径。 # In[ ]: sssp = SSSP_PIE() ctx = sssp(graph, src=6) # 执行这个 Cell,您的算法应该被成功执行。 # 此时,结果存在于分布式的 pod 内存中,由 vineyard 在进行管理。我们可以通过以下方法将远程数据取过来并展示。 # In[ ]: r1 = ( ctx.to_dataframe({"node": "v:host.id", "r": "r:host"}) .sort_values(by=["node"]) .to_numpy(dtype=float) ) r1 # ### 保存和载入您的算法 # # 您可以保存您的自定义算法供之后使用。 # In[ ]: import os # specify the path you want to dump dump_path = os.path.expanduser("~/sssp_pie.gar") # dump SSSP_PIE.to_gar(dump_path) # 现在 您可以在目录 `~/` 下看到您的算法包 `sssp_pie.gar`。下次再次使用时,只需要这样载入: # In[ ]: from graphscope.framework.app import load_app # specify the path you want to dump dump_path = os.path.expanduser("~/sssp_pie.gar") sssp2 = load_app(dump_path) # ## 基于 Pregel 模型编写算法 # # 除了上面的 PIE 模型,您也可以通过基于点中心编程的 Pregel 模型编写自己的算法。首先我们需要导入 GraphScope 包和 **pregel** 装饰器。 # In[ ]: import graphscope from graphscope.framework.app import AppAssets from graphscope.analytical.udf.decorators import pregel # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): pass @staticmethod def Compute(messages, v, context): pass # **pregel** 装饰器也有两个类型参数 `vd_type` 和 `md_type`,分别用于指定点上的数据类型和消息的类型,支持的类型包括 `int`,`double` 和 `string`。 # 在我们的 **SSSP** 例子中,我们将这两个值都设置为 `double`。 # # 由于 Pregel 模型的算法逻辑是定义在点上的,它的 `Init` 和 `Compute` 函数有一个参数 `v` 用于访问点上的数据。查看文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 了解更多。 # ### 定义 Init 函数 # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): pass # `Init` 函数将每个点上的初始路径设置为极大值。 # ### 定义 Compute 函数 # # **SSSP** 的 `Compute` 函数通过以下步骤为每个节点计算新距离: # # 1)首先初始化值为1000000000的新值 # 2)如果顶点是源节点,则将其距离设置为0 # 3)计算接收到的消息的 Min 值,如果该值小于当前值,则设置该值。 # # 重复这些步骤,直到不再生成新消息(距离更短)为止。 # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): src_id = context.get_config(b"src") cur_dist = v.value() new_dist = 1000000000.0 if v.id() == src_id: new_dist = 0 for message in messages: new_dist = min(message, new_dist) if new_dist < cur_dist: v.set_value(new_dist) for e_label_id in range(context.edge_label_num()): edges = v.outgoing_edges(e_label_id) for e in edges: v.send(e.vertex(), new_dist + e.get_int(2)) v.vote_to_halt() # ### 可选的 Combiner 函数 # # 我们可以定义一个 Combiner 以减少消息通信的开销,注意这个 Combiner 函数是可选的,您也可以不实现。 # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): src_id = context.get_config(b"src") cur_dist = v.value() new_dist = 1000000000.0 if v.id() == src_id: new_dist = 0 for message in messages: new_dist = min(message, new_dist) if new_dist < cur_dist: v.set_value(new_dist) for e_label_id in range(context.edge_label_num()): edges = v.outgoing_edges(e_label_id) for e in edges: v.send(e.vertex(), new_dist + e.get_int(2)) v.vote_to_halt() @staticmethod def Combine(messages): ret = 1000000000.0 for m in messages: ret = min(ret, m) return ret # ### 运行您的 Pregel 算法 # # 接下来让我们运行基于 Pregel 写的 SSSP 算法和查看结果。 # In[ ]: sssp_pregel = SSSP_Pregel() ctx = sssp_pregel(graph, src=6) # In[ ]: r2 = ( ctx.to_dataframe({"node": "v:host.id", "r": "r:host"}) .sort_values(by=["node"]) .to_numpy(dtype=float) ) r2 # ### Pregel 模型中的 Aggregator # Pregel 模型中的 aggregator 聚合器是一种用于全局通信、监视和计数的机制。 # # 每个顶点都可以在超步 `S` 中为聚合器提供值,系统将这些值聚合在一起,使用归约算符对这些值进行计算,并在超步 `S+1` 中将所得值提供给所有顶点。 # GraphScope 为 Pregel 算法提供了许多预定义的聚合器,例如对数据类型的 `min`,`max` 或 `sum` 等。 # # 这里是使用内置 aggregator 的示例,您也可以在文档 [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) 中了解更多 # In[ ]: @pregel(vd_type="double", md_type="double") class Aggregators_Pregel_Test(AppAssets): @staticmethod def Init(v, context): # int context.register_aggregator( b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator ) context.register_aggregator( b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator ) context.register_aggregator( b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator ) # double context.register_aggregator( b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator ) context.register_aggregator( b"double_overwrite_aggregator", PregelAggregatorType.kDoubleOverwriteAggregator, ) # bool context.register_aggregator( b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator ) context.register_aggregator( b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator ) context.register_aggregator( b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator ) # text context.register_aggregator( b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator ) @staticmethod def Compute(messages, v, context): if context.superstep() == 0: context.aggregate(b"int_sum_aggregator", 1) context.aggregate(b"int_max_aggregator", int(v.id())) context.aggregate(b"int_min_aggregator", int(v.id())) context.aggregate(b"double_product_aggregator", 1.0) context.aggregate(b"double_overwrite_aggregator", 1.0) context.aggregate(b"bool_and_aggregator", True) context.aggregate(b"bool_or_aggregator", False) context.aggregate(b"bool_overwrite_aggregator", True) context.aggregate(b"text_append_aggregator", v.id() + b",") else: if v.id() == b"1": assert context.get_aggregated_value(b"int_sum_aggregator") == 62586 assert context.get_aggregated_value(b"int_max_aggregator") == 62586 assert context.get_aggregated_value(b"int_min_aggregator") == 1 assert context.get_aggregated_value(b"double_product_aggregator") == 1.0 assert ( context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0 ) assert context.get_aggregated_value(b"bool_and_aggregator") == True assert context.get_aggregated_value(b"bool_or_aggregator") == False assert ( context.get_aggregated_value(b"bool_overwrite_aggregator") == True ) context.get_aggregated_value(b"text_append_aggregator") v.vote_to_halt() # In[ ]: