# Writing Your Own Graph Algorithms¶

The analytical engine in GraphScope derives from GRAPE, a graph processing system proposed on SIGMOD-2017. GRAPE differs from prior systems in its ability to parallelize sequential graph algorithms as a whole. In GRAPE, sequential algorithms can be easily plugged into with only minor changes and get parallelized to handle large graphs efficiently.

In this tutorial, we will show how to define and run your own algorithm in PIE and Pregel models.

Sounds like fun? Excellent, here we go!

In [ ]:
# Install graphscope package if you are NOT in the Playground

!pip3 install graphscope


## Writing algorithm in PIE model¶

GraphScope enables users to write algorithms in the PIE programming model in a pure Python mode, first of all, you should import graphscope package and the pie decorator.

In [ ]:
# Import the graphscope module.

import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pie

graphscope.set_option(show_log=False)  # enable logging


We use the single source shortest path (SSSP) algorithm as an example. To implement the PIE model, you just need to fulfill this class

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
pass

@staticmethod
def PEval(frag, context):
pass

@staticmethod
def IncEval(frag, context):
pass


The pie decorator contains two params named vd_type and md_type , which represent the vertex data type and message type respectively.

You may specify types for your own algorithms, optional values are int, double, and string. In our SSSP case, we compute the shortest distance to the source for all nodes, so we use double value for vd_type and md_type both.

In Init, PEval, and IncEval, it has frag and context as parameters. You can use these two parameters to access the fragment data and intermediate results. Detail usage please refer to Cython SDK API.

### Fulfill Init Function¶

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

@staticmethod
def PEval(frag, context):
pass

@staticmethod
def IncEval(frag, context):
pass


The Init function are responsable for 1) setting the initial value for each node; 2) defining the strategy of message passing; and 3) specifing aggregator for handing received message on each rounds.

Note that the algorithm you defined will run on a property graph. So we should get the vertex label first by v_label_num = frag.vertex_label_num(), then we can traverse all nodes with the same label
and set the initial value by nodes = frag.nodes(v_label_id) and context.init_value(nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate).

Since we are computing the shorest path between the source node and others nodes. So we use PIEAggregateType.kMinAggregate as the aggregator for mesaage aggregation, which means it will
perform min operation upon all received messages. Other avaliable aggregators are kMaxAggregate, kSumAggregate, kProductAggregate, and kOverwriteAggregate.

At the end of Init function, we register the sync buffer for each node with MessageStrategy.kSyncOnOuterVertex, which tells the engine how to pass the message.

### Fulfill PEval Function¶

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)

@staticmethod
def IncEval(frag, context):
pass


In PEval of SSSP, it gets the queried source node by context.get_config(b"src").

PEval checks each fragment whether it contains source node by frag.get_inner_node(v_label_id, src, source). Note that the get_inner_node method needs a source parameter in type Vertex, which you can declare by graphscope.declare(graphscope.Vertex, source)

If a fragment contains the source node, it will traverse the outgoing edges of the source with frag.get_outgoing_edges(source, e_label_id). For each vertex, it computes the distance from the source, and updates the value if the it less than the initial value.

### Fulfill IncEval Function¶

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)

@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)

@staticmethod
def IncEval(frag, context):
v_label_num = frag.vertex_label_num()
e_label_num = frag.edge_label_num()
for v_label_id in range(v_label_num):
iv = frag.inner_nodes(v_label_id)
for v in iv:
v_dist = context.get_node_value(v)
for e_label_id in range(e_label_num):
es = frag.get_outgoing_edges(v, e_label_id)
for e in es:
u = e.neighbor()
u_dist = v_dist + e.get_int(2)
if context.get_node_value(u) > u_dist:
context.set_node_value(u, u_dist)


The only difference between IncEval and PEval of SSSP algorithm is that IncEval are invoked on each fragment, rather than only the fragment with source node. A fragment will repeat the IncEval until there is no messages received. When all the fragments are finished computation, the algorithm is terminated.

### Run Your Algorithm on the p2p network Graph.¶

In [ ]:
# Load p2p network dataset



Then initialize your algorithm and query the shorest path from vertex 6 over the graph.

In [ ]:
sssp = SSSP_PIE()
ctx = sssp(graph, src=6)


Runing this cell, your algorithm should evaluate successfully. The results are stored in vineyard in the distributed machies. Let's fetch and check the results.

In [ ]:
r1 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r1


You can dump and save your define algorithm for future use.

In [ ]:
import os

# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")

# dump
SSSP_PIE.to_gar(dump_path)


Now, you can find a package named sssp_pie.gar in your ~/. Reload this algorithm with following code.

In [ ]:
from graphscope.framework.app import load_app

# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")



## Write Algorithm in Pregel Model¶

In addition to the sub-graph based PIE model, GraphScope supports vertex-centric Pregel model. To define a Pregel algorithm, you should import pregel decorator and fulfil the functions defined on vertex.

In [ ]:
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pregel

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
pass

@staticmethod
def Compute(messages, v, context):
pass


The pregel decorator has two parameters named vd_type and md_type, which represent the vertex data type and message type respectively.

You can specify the types for your algorithm, options are int, double, and string. For SSSP, we set both to double.

Since Pregel model are defined on vertex, the Init and Compute functions has a parameter v to access the vertex data. See more details in Cython SDK API.

### Fulfill Init Function¶¶

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)

@staticmethod
def Compute(messages, v, context):
pass


The Init function sets the initial value for each node by v.set_value(1000000000.0)

### Fulfill Compute function¶¶

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)

@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()


The Compute function for SSSP computes the new distance for each node by the following steps:

1) Initialize the new value with value 1000000000
2) If the vertex is source node, set its distance to 0.
3) Compute the min value of messages received, and set the value if it less than the current value.

Repeat these, until no more new messages(shorter distance) are generated.

### Optional Combiner¶

Optionally, we can define a combiner to reduce the message communication overhead.

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)

@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()

@staticmethod
def Combine(messages):
ret = 1000000000.0
for m in messages:
ret = min(ret, m)
return ret


### Run Your Pregel Algorithm on Graph.¶

Next, let's run your Pregel algorithm on the graph, and check the results.

In [ ]:
sssp_pregel = SSSP_Pregel()
ctx = sssp_pregel(graph, src=6)

In [ ]:
r2 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r2


### Aggregator in Pregel¶

Pregel aggregators are a mechanism for global communication, monitoring, and counting. Each vertex can provide a value to an aggregator in superstep S, the system combines these
values using a reducing operator, and the resulting value is made available to all vertices in superstep S+1. GraphScope provides a number of predefined aggregators for Pregel algorithms, such as min, max, or sum operations on data types.

Here is a example for use a builtin aggregator, more details can be found in Cython SDK API

In [ ]:
@pregel(vd_type="double", md_type="double")
class Aggregators_Pregel_Test(AppAssets):
@staticmethod
def Init(v, context):
# int
context.register_aggregator(
b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator
)
context.register_aggregator(
b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator
)
context.register_aggregator(
b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator
)
# double
context.register_aggregator(
b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator
)
context.register_aggregator(
b"double_overwrite_aggregator",
PregelAggregatorType.kDoubleOverwriteAggregator,
)
# bool
context.register_aggregator(
b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator
)
context.register_aggregator(
b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator
)
context.register_aggregator(
b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator
)
# text
context.register_aggregator(
b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator
)

@staticmethod
def Compute(messages, v, context):
if context.superstep() == 0:
context.aggregate(b"int_sum_aggregator", 1)
context.aggregate(b"int_max_aggregator", int(v.id()))
context.aggregate(b"int_min_aggregator", int(v.id()))
context.aggregate(b"double_product_aggregator", 1.0)
context.aggregate(b"double_overwrite_aggregator", 1.0)
context.aggregate(b"bool_and_aggregator", True)
context.aggregate(b"bool_or_aggregator", False)
context.aggregate(b"bool_overwrite_aggregator", True)
context.aggregate(b"text_append_aggregator", v.id() + b",")
else:
if v.id() == b"1":
assert context.get_aggregated_value(b"int_sum_aggregator") == 62586
assert context.get_aggregated_value(b"int_max_aggregator") == 62586
assert context.get_aggregated_value(b"int_min_aggregator") == 1
assert context.get_aggregated_value(b"double_product_aggregator") == 1.0
assert (
context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0
)
assert context.get_aggregated_value(b"bool_and_aggregator") == True
assert context.get_aggregated_value(b"bool_or_aggregator") == False
assert (
context.get_aggregated_value(b"bool_overwrite_aggregator") == True
)
context.get_aggregated_value(b"text_append_aggregator")
v.vote_to_halt()

In [ ]: