#!/usr/bin/env python # coding: utf-8 # # Writing Your Own Graph Algorithms # The analytical engine in GraphScope derives from [GRAPE](https://dl.acm.org/doi/10.1145/3282488), a graph processing system proposed on SIGMOD-2017. GRAPE differs from prior systems in its ability to parallelize sequential graph algorithms as a whole. In GRAPE, sequential algorithms can be easily **plugged into** with only minor changes and get parallelized to handle large graphs efficiently. # # In this tutorial, we will show how to define and run your own algorithm in PIE and Pregel models. # # Sounds like fun? Excellent, here we go! # In[ ]: # Install graphscope package if you are NOT in the Playground get_ipython().system('pip3 install graphscope') # ## Writing algorithm in PIE model # GraphScope enables users to write algorithms in the [PIE](https://dl.acm.org/doi/10.1145/3282488) programming model in a pure Python mode, first of all, you should import **graphscope** package and the **pie** decorator. # In[ ]: # Import the graphscope module. import graphscope from graphscope.framework.app import AppAssets from graphscope.analytical.udf.decorators import pie graphscope.set_option(show_log=False) # enable logging # We use the single source shortest path ([SSSP](https://en.wikipedia.org/wiki/Shortest_path_problem)) algorithm as an example. To implement the PIE model, you just need to **fulfill this class** # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): pass @staticmethod def PEval(frag, context): pass @staticmethod def IncEval(frag, context): pass # The **pie** decorator contains two params named `vd_type` and `md_type` , which represent the vertex data type and message type respectively. # # You may specify types for your own algorithms, optional values are `int`, `double`, and `string`. # In our **SSSP** case, we compute the shortest distance to the source for all nodes, so we use `double` value for `vd_type` and `md_type` both. # # In `Init`, `PEval`, and `IncEval`, it has **frag** and **context** as parameters. You can use these two parameters to access the fragment data and intermediate results. Detail usage please refer to [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html). # # ### Fulfill Init Function # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): pass @staticmethod def IncEval(frag, context): pass # The `Init` function are responsable for 1) setting the initial value for each node; 2) defining the strategy of message passing; and 3) specifing aggregator for handing received message on each rounds. # # Note that the algorithm you defined will run on a property graph. So we should get the vertex label first by `v_label_num = frag.vertex_label_num()`, then we can traverse all nodes with the same label # and set the initial value by `nodes = frag.nodes(v_label_id)` and `context.init_value(nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate)`. # # Since we are computing the shorest path between the source node and others nodes. So we use `PIEAggregateType.kMinAggregate` as the aggregator for mesaage aggregation, which means it will # perform `min` operation upon all received messages. Other avaliable aggregators are `kMaxAggregate`, `kSumAggregate`, `kProductAggregate`, and `kOverwriteAggregate`. # # At the end of `Init` function, we register the sync buffer for each node with `MessageStrategy.kSyncOnOuterVertex`, which tells the engine how to pass the message. # ### Fulfill PEval Function # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): src = int(context.get_config(b"src")) graphscope.declare(graphscope.Vertex, source) native_source = False v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): if frag.get_inner_node(v_label_id, src, source): native_source = True break if native_source: context.set_node_value(source, 0) else: return e_label_num = frag.edge_label_num() for e_label_id in range(e_label_num): edges = frag.get_outgoing_edges(source, e_label_id) for e in edges: dst = e.neighbor() distv = e.get_int(2) if context.get_node_value(dst) > distv: context.set_node_value(dst, distv) @staticmethod def IncEval(frag, context): pass # In `PEval` of **SSSP**, it gets the queried source node by `context.get_config(b"src")`. # # `PEval` checks each fragment whether it contains source node by `frag.get_inner_node(v_label_id, src, source)`. Note that the `get_inner_node` method needs a `source` parameter in type `Vertex`, which you can declare by `graphscope.declare(graphscope.Vertex, source)` # # If a fragment contains the source node, it will traverse the outgoing edges of the source with `frag.get_outgoing_edges(source, e_label_id)`. For each vertex, it computes the distance from the source, and updates the value if the it less than the initial value. # ### Fulfill IncEval Function # In[ ]: @pie(vd_type="double", md_type="double") class SSSP_PIE(AppAssets): @staticmethod def Init(frag, context): v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): nodes = frag.nodes(v_label_id) context.init_value( nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate ) context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex) @staticmethod def PEval(frag, context): src = int(context.get_config(b"src")) graphscope.declare(graphscope.Vertex, source) native_source = False v_label_num = frag.vertex_label_num() for v_label_id in range(v_label_num): if frag.get_inner_node(v_label_id, src, source): native_source = True break if native_source: context.set_node_value(source, 0) else: return e_label_num = frag.edge_label_num() for e_label_id in range(e_label_num): edges = frag.get_outgoing_edges(source, e_label_id) for e in edges: dst = e.neighbor() distv = e.get_int(2) if context.get_node_value(dst) > distv: context.set_node_value(dst, distv) @staticmethod def IncEval(frag, context): v_label_num = frag.vertex_label_num() e_label_num = frag.edge_label_num() for v_label_id in range(v_label_num): iv = frag.inner_nodes(v_label_id) for v in iv: v_dist = context.get_node_value(v) for e_label_id in range(e_label_num): es = frag.get_outgoing_edges(v, e_label_id) for e in es: u = e.neighbor() u_dist = v_dist + e.get_int(2) if context.get_node_value(u) > u_dist: context.set_node_value(u, u_dist) # The only difference between `IncEval` and `PEval` of **SSSP** algorithm is that `IncEval` are invoked # on each fragment, rather than only the fragment with source node. A fragment will repeat the `IncEval` until there is no messages received. When all the fragments are finished computation, the algorithm is terminated. # ### Run Your Algorithm on the p2p network Graph. # In[ ]: # Load p2p network dataset from graphscope.dataset import load_p2p_network graph = load_p2p_network(directed=False) # Then initialize your algorithm and query the shorest path from vertex `6` over the graph. # In[ ]: sssp = SSSP_PIE() ctx = sssp(graph, src=6) # Runing this cell, your algorithm should evaluate successfully. The results are stored in vineyard in the distributed machies. Let's fetch and check the results. # In[ ]: r1 = ( ctx.to_dataframe({"node": "v:host.id", "r": "r:host"}) .sort_values(by=["node"]) .to_numpy(dtype=float) ) r1 # ### Dump and Reload Your Algorithm # # You can dump and save your define algorithm for future use. # In[ ]: import os # specify the path you want to dump dump_path = os.path.expanduser("~/sssp_pie.gar") # dump SSSP_PIE.to_gar(dump_path) # Now, you can find a package named `sssp_pie.gar` in your `~/`. Reload this algorithm with following code. # In[ ]: from graphscope.framework.app import load_app # specify the path you want to dump dump_path = os.path.expanduser("~/sssp_pie.gar") sssp2 = load_app(dump_path) # ## Write Algorithm in Pregel Model # In addition to the sub-graph based PIE model, GraphScope supports vertex-centric Pregel model. To define a Pregel algorithm, you should import **pregel** decorator and fulfil the functions defined on vertex. # In[ ]: import graphscope from graphscope.framework.app import AppAssets from graphscope.analytical.udf.decorators import pregel # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): pass @staticmethod def Compute(messages, v, context): pass # The **pregel** decorator has two parameters named `vd_type` and `md_type`, which represent the vertex data type and message type respectively. # # You can specify the types for your algorithm, options are `int`, `double`, and `string`. For **SSSP**, we set both to `double`. # # Since Pregel model are defined on vertex, the `Init` and `Compute` functions has a parameter `v` to access the vertex data. See more details in [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html). # ### Fulfill Init Function¶ # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): pass # The `Init` function sets the initial value for each node by `v.set_value(1000000000.0)` # ### Fulfill Compute function¶ # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): src_id = context.get_config(b"src") cur_dist = v.value() new_dist = 1000000000.0 if v.id() == src_id: new_dist = 0 for message in messages: new_dist = min(message, new_dist) if new_dist < cur_dist: v.set_value(new_dist) for e_label_id in range(context.edge_label_num()): edges = v.outgoing_edges(e_label_id) for e in edges: v.send(e.vertex(), new_dist + e.get_int(2)) v.vote_to_halt() # The `Compute` function for **SSSP** computes the new distance for each node by the following steps: # # 1) Initialize the new value with value 1000000000 # 2) If the vertex is source node, set its distance to 0. # 3) Compute the `min` value of messages received, and set the value if it less than the current value. # # Repeat these, until no more new messages(shorter distance) are generated. # ### Optional Combiner # Optionally, we can define a combiner to reduce the message communication overhead. # In[ ]: @pregel(vd_type="double", md_type="double") class SSSP_Pregel(AppAssets): @staticmethod def Init(v, context): v.set_value(1000000000.0) @staticmethod def Compute(messages, v, context): src_id = context.get_config(b"src") cur_dist = v.value() new_dist = 1000000000.0 if v.id() == src_id: new_dist = 0 for message in messages: new_dist = min(message, new_dist) if new_dist < cur_dist: v.set_value(new_dist) for e_label_id in range(context.edge_label_num()): edges = v.outgoing_edges(e_label_id) for e in edges: v.send(e.vertex(), new_dist + e.get_int(2)) v.vote_to_halt() @staticmethod def Combine(messages): ret = 1000000000.0 for m in messages: ret = min(ret, m) return ret # ### Run Your Pregel Algorithm on Graph. # Next, let's run your Pregel algorithm on the graph, and check the results. # In[ ]: sssp_pregel = SSSP_Pregel() ctx = sssp_pregel(graph, src=6) # In[ ]: r2 = ( ctx.to_dataframe({"node": "v:host.id", "r": "r:host"}) .sort_values(by=["node"]) .to_numpy(dtype=float) ) r2 # ### Aggregator in Pregel # Pregel aggregators are a mechanism for global communication, monitoring, and counting. Each vertex can provide a value to an aggregator in superstep `S`, the system combines these # values using a reducing operator, and the resulting value is made available to all vertices in superstep `S+1`. GraphScope provides a number of predefined aggregators for Pregel algorithms, such as `min`, `max`, or `sum` operations on data types. # # Here is a example for use a builtin aggregator, more details can be found in [Cython SDK API](https://graphscope.io/docs/reference/cython_sdk.html) # In[ ]: @pregel(vd_type="double", md_type="double") class Aggregators_Pregel_Test(AppAssets): @staticmethod def Init(v, context): # int context.register_aggregator( b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator ) context.register_aggregator( b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator ) context.register_aggregator( b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator ) # double context.register_aggregator( b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator ) context.register_aggregator( b"double_overwrite_aggregator", PregelAggregatorType.kDoubleOverwriteAggregator, ) # bool context.register_aggregator( b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator ) context.register_aggregator( b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator ) context.register_aggregator( b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator ) # text context.register_aggregator( b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator ) @staticmethod def Compute(messages, v, context): if context.superstep() == 0: context.aggregate(b"int_sum_aggregator", 1) context.aggregate(b"int_max_aggregator", int(v.id())) context.aggregate(b"int_min_aggregator", int(v.id())) context.aggregate(b"double_product_aggregator", 1.0) context.aggregate(b"double_overwrite_aggregator", 1.0) context.aggregate(b"bool_and_aggregator", True) context.aggregate(b"bool_or_aggregator", False) context.aggregate(b"bool_overwrite_aggregator", True) context.aggregate(b"text_append_aggregator", v.id() + b",") else: if v.id() == b"1": assert context.get_aggregated_value(b"int_sum_aggregator") == 62586 assert context.get_aggregated_value(b"int_max_aggregator") == 62586 assert context.get_aggregated_value(b"int_min_aggregator") == 1 assert context.get_aggregated_value(b"double_product_aggregator") == 1.0 assert ( context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0 ) assert context.get_aggregated_value(b"bool_and_aggregator") == True assert context.get_aggregated_value(b"bool_or_aggregator") == False assert ( context.get_aggregated_value(b"bool_overwrite_aggregator") == True ) context.get_aggregated_value(b"text_append_aggregator") v.vote_to_halt() # In[ ]: