!pip3 install graphscope GraphScope 的图分析引擎继承了 GRAPE , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。
与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。
在这个教程中,我们将展示如何进行自定义基于 PIE 模型或者 Pregel 模型的图分析算法。
# Install graphscope package if you are NOT in the Playground
!pip3 install graphscope
GraphScope 支持用户使用纯 Python 语言,以 PIE 编程模型写图算法。首先我们需要导入 GraphScope 的包和 PIE 装饰器。
# Import the graphscope module.
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pie
graphscope.set_option(show_log=False) # enable logging
以单源最短路径(SSSP)为例,编写 PIE 算法需要填写以下几个函数。
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
pass
@staticmethod
def PEval(frag, context):
pass
@staticmethod
def IncEval(frag, context):
pass
装饰器 pie 包含两个参数 vd_type
和 md_type
, 他们分别代表节点上的数据类型和消息的数据类型。
您可以为您的算法指定这两个数据类型,可用的数据类型包括 int
,double
和 string
。
在我们这个示例中,由于 SSSP 计算的距离和发送的消息更新均是 double
类型,所以我们为这两个类型都指定为 double
。
在函数 Init
,PEval
和 IncEval
中,都有 frag 和 context 这两个参数。它们分别用于在算法逻辑中访问图数据和中间结果数据。可以查看文档 Cython SDK API 了解更多。
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
pass
@staticmethod
def IncEval(frag, context):
pass
Init
函数有以下几个职责:1)为每个节点设置初始值; 2)定义消息传递的策略; 3)指定聚合器,以在每一轮中处理收到的消息。
请注意,您定义的算法将在属性图上运行。所以我们应该首先通过 v_label_num = frag.vertex_label_num()
获得顶点标签,然后可以遍历所有具有相同标签的节点,
并通过 nodes = frag.nodes(v_label_id)
和 context.init_value(nodes, v_label_id, 1000000000.0,PIEAggregateType.kMinAggregate)
设置初始值。
由于我们正在计算源节点和其他节点之间的最短路径,在这里我们将 PIEAggregateType.kMinAggregate
用作消息聚合的聚合器。这意味着它将对所有收到的消息执行去 Min 操作。其他可用的聚合器包括 kMaxAggregate
,kSumAggregate
,kProductAggregate
和 kOverwriteAggregate
。
在 Init
函数的最后,我们向节点注册 MessageStrategy.kSyncOnOuterVertex
指定如何传递消息。
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)
@staticmethod
def IncEval(frag, context):
pass
在 SSSP 的 PEval
函数中, 通过函数 context.get_config(b"src")
获取用户查询的 src 节点。
PEval
会在每个分区上调用 frag.get_inner_node(v_label_id, src, source)
的方法查询 src 节点是否在本分区。请注意这里函数 get_inner_node
method needs a source
需要一个类型为 Vertex
的参数, 我们通过 graphscope.declare(graphscope.Vertex, source)
来定义声明。
如果一个分区包含查询的源节点,它将调用 frag.get_outgoing_edges(source,e_label_id)
遍历源节点的所有输出边。 对于每个顶点,它计算到源的距离。如果该值小于初始值,则更新该值。
SSSP 算法的 IncEval
和 PEval
之间唯一区别是 IncEval
将会在每个分区上都被调用。计算完 IncEval
后,将会产生一些消息,这些消息将被发送到其他分区供下一轮IncEval
调用。一直到所有分区都不再产生消息,此时算法结束。
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)
@staticmethod
def IncEval(frag, context):
v_label_num = frag.vertex_label_num()
e_label_num = frag.edge_label_num()
for v_label_id in range(v_label_num):
iv = frag.inner_nodes(v_label_id)
for v in iv:
v_dist = context.get_node_value(v)
for e_label_id in range(e_label_num):
es = frag.get_outgoing_edges(v, e_label_id)
for e in es:
u = e.neighbor()
u_dist = v_dist + e.get_int(2)
if context.get_node_value(u) > u_dist:
context.set_node_value(u, u_dist)
# Load p2p network dataset
from graphscope.dataset import load_p2p_network
graph = load_p2p_network(directed=False, generate_eid=False)
然后,初始化刚才定义的算法,并查询起始点为 6
的最短路径。
sssp = SSSP_PIE()
ctx = sssp(graph, src=6)
执行这个 Cell,您的算法应该被成功执行。 此时,结果存在于分布式的 pod 内存中,由 vineyard 在进行管理。我们可以通过以下方法将远程数据取过来并展示。
r1 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r1
您可以保存您的自定义算法供之后使用。
import os
# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")
# dump
SSSP_PIE.to_gar(dump_path)
现在 您可以在目录 ~/
下看到您的算法包 sssp_pie.gar
。下次再次使用时,只需要这样载入:
from graphscope.framework.app import load_app
# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")
sssp2 = load_app(dump_path)
除了上面的 PIE 模型,您也可以通过基于点中心编程的 Pregel 模型编写自己的算法。首先我们需要导入 GraphScope 包和 pregel 装饰器。
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pregel
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
pass
@staticmethod
def Compute(messages, v, context):
pass
pregel 装饰器也有两个类型参数 vd_type
和 md_type
,分别用于指定点上的数据类型和消息的类型,支持的类型包括 int
,double
和 string
。
在我们的 SSSP 例子中,我们将这两个值都设置为 double
。
由于 Pregel 模型的算法逻辑是定义在点上的,它的 Init
和 Compute
函数有一个参数 v
用于访问点上的数据。查看文档 Cython SDK API 了解更多。
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
pass
Init
函数将每个点上的初始路径设置为极大值。
SSSP 的 Compute
函数通过以下步骤为每个节点计算新距离:
1)首先初始化值为1000000000的新值
2)如果顶点是源节点,则将其距离设置为0
3)计算接收到的消息的 Min 值,如果该值小于当前值,则设置该值。
重复这些步骤,直到不再生成新消息(距离更短)为止。
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()
我们可以定义一个 Combiner 以减少消息通信的开销,注意这个 Combiner 函数是可选的,您也可以不实现。
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()
@staticmethod
def Combine(messages):
ret = 1000000000.0
for m in messages:
ret = min(ret, m)
return ret
接下来让我们运行基于 Pregel 写的 SSSP 算法和查看结果。
sssp_pregel = SSSP_Pregel()
ctx = sssp_pregel(graph, src=6)
r2 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r2
Pregel 模型中的 aggregator 聚合器是一种用于全局通信、监视和计数的机制。
每个顶点都可以在超步 S
中为聚合器提供值,系统将这些值聚合在一起,使用归约算符对这些值进行计算,并在超步 S+1
中将所得值提供给所有顶点。
GraphScope 为 Pregel 算法提供了许多预定义的聚合器,例如对数据类型的 min
,max
或 sum
等。
这里是使用内置 aggregator 的示例,您也可以在文档 Cython SDK API 中了解更多
@pregel(vd_type="double", md_type="double")
class Aggregators_Pregel_Test(AppAssets):
@staticmethod
def Init(v, context):
# int
context.register_aggregator(
b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator
)
context.register_aggregator(
b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator
)
context.register_aggregator(
b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator
)
# double
context.register_aggregator(
b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator
)
context.register_aggregator(
b"double_overwrite_aggregator",
PregelAggregatorType.kDoubleOverwriteAggregator,
)
# bool
context.register_aggregator(
b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator
)
context.register_aggregator(
b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator
)
context.register_aggregator(
b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator
)
# text
context.register_aggregator(
b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator
)
@staticmethod
def Compute(messages, v, context):
if context.superstep() == 0:
context.aggregate(b"int_sum_aggregator", 1)
context.aggregate(b"int_max_aggregator", int(v.id()))
context.aggregate(b"int_min_aggregator", int(v.id()))
context.aggregate(b"double_product_aggregator", 1.0)
context.aggregate(b"double_overwrite_aggregator", 1.0)
context.aggregate(b"bool_and_aggregator", True)
context.aggregate(b"bool_or_aggregator", False)
context.aggregate(b"bool_overwrite_aggregator", True)
context.aggregate(b"text_append_aggregator", v.id() + b",")
else:
if v.id() == b"1":
assert context.get_aggregated_value(b"int_sum_aggregator") == 62586
assert context.get_aggregated_value(b"int_max_aggregator") == 62586
assert context.get_aggregated_value(b"int_min_aggregator") == 1
assert context.get_aggregated_value(b"double_product_aggregator") == 1.0
assert (
context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0
)
assert context.get_aggregated_value(b"bool_and_aggregator") == True
assert context.get_aggregated_value(b"bool_or_aggregator") == False
assert (
context.get_aggregated_value(b"bool_overwrite_aggregator") == True
)
context.get_aggregated_value(b"text_append_aggregator")
v.vote_to_halt()