编写自定义图分析算法

Install graphscope package is you are NOT in the Playground

!pip3 install graphscope GraphScope 的图分析引擎继承了 GRAPE , 该系统于 SIGMOD2017 上首次提出并获得最佳论文奖。

与以往的系统的不同,GRAPE 支持将串行图算法自动并行化。在 GRAPE 中, 只需进行很小的更改即可轻松地将串行算法即插即用,使其并行化的运行在分布式环境,并高效地处理大规模图数据。 除了易于编程外,GRAPE 还被设计为高效且可拓展的系统,可灵活应对现实中图应用多变的规模、多样性和复杂性。

在这个教程中,我们将展示如何进行自定义基于 PIE 模型或者 Pregel 模型的图分析算法。

In [ ]:
# Install graphscope package if you are NOT in the Playground

!pip3 install graphscope

自定义基于 PIE 模型的图算法

GraphScope 支持用户使用纯 Python 语言,以 PIE 编程模型写图算法。首先我们需要导入 GraphScope 的包和 PIE 装饰器。

In [ ]:
# Import the graphscope module.

import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pie


graphscope.set_option(show_log=False)  # enable logging

以单源最短路径(SSSP)为例,编写 PIE 算法需要填写以下几个函数。

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        pass
    @staticmethod
    def PEval(frag, context):
        pass
    @staticmethod
    def IncEval(frag, context):
        pass

装饰器 pie 包含两个参数 vd_typemd_type, 他们分别代表节点上的数据类型和消息的数据类型。

您可以为您的算法指定这两个数据类型,可用的数据类型包括 intdoublestring。 在我们这个示例中,由于 SSSP 计算的距离和发送的消息更新均是 double 类型,所以我们为这两个类型都指定为 double

在函数 InitPEvalIncEval中,都有 fragcontext 这两个参数。它们分别用于在算法逻辑中访问图数据和中间结果数据。可以查看文档 Cython SDK API 了解更多。

定义 Init 函数

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        pass
    @staticmethod
    def IncEval(frag, context):
        pass

Init 函数有以下几个职责:1)为每个节点设置初始值; 2)定义消息传递的策略; 3)指定聚合器,以在每一轮中处理收到的消息。

请注意,您定义的算法将在属性图上运行。所以我们应该首先通过 v_label_num = frag.vertex_label_num() 获得顶点标签,然后可以遍历所有具有相同标签的节点, 并通过 nodes = frag.nodes(v_label_id)context.init_value(nodes, v_label_id, 1000000000.0,PIEAggregateType.kMinAggregate) 设置初始值。

由于我们正在计算源节点和其他节点之间的最短路径,在这里我们将 PIEAggregateType.kMinAggregate 用作消息聚合的聚合器。这意味着它将对所有收到的消息执行去 Min 操作。其他可用的聚合器包括 kMaxAggregatekSumAggregatekProductAggregatekOverwriteAggregate

Init 函数的最后,我们向节点注册 MessageStrategy.kSyncOnOuterVertex 指定如何传递消息。

定义 PEval 函数

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)
    @staticmethod
    def IncEval(frag, context):
        pass

SSSPPEval 函数中, 通过函数 context.get_config(b"src") 获取用户查询的 src 节点。

PEval 会在每个分区上调用 frag.get_inner_node(v_label_id, src, source) 的方法查询 src 节点是否在本分区。请注意这里函数 get_inner_node method needs a source 需要一个类型为 Vertex 的参数, 我们通过 graphscope.declare(graphscope.Vertex, source) 来定义声明。

如果一个分区包含查询的源节点,它将调用 frag.get_outgoing_edges(source,e_label_id) 遍历源节点的所有输出边。 对于每个顶点,它计算到源的距离。如果该值小于初始值,则更新该值。

定义 IncEval 函数

SSSP 算法的 IncEvalPEval 之间唯一区别是 IncEval 将会在每个分区上都被调用。计算完 IncEval 后,将会产生一些消息,这些消息将被发送到其他分区供下一轮IncEval 调用。一直到所有分区都不再产生消息,此时算法结束。

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)
    @staticmethod
    def IncEval(frag, context):
        v_label_num = frag.vertex_label_num()
        e_label_num = frag.edge_label_num()
        for v_label_id in range(v_label_num):
            iv = frag.inner_nodes(v_label_id)
            for v in iv:
                v_dist = context.get_node_value(v)
                for e_label_id in range(e_label_num):
                    es = frag.get_outgoing_edges(v, e_label_id)
                    for e in es:
                        u = e.neighbor()
                        u_dist = v_dist + e.get_int(2)
                        if context.get_node_value(u) > u_dist:
                            context.set_node_value(u, u_dist)

在图上调用您的算法

In [ ]:
# Load p2p network dataset

from graphscope.dataset import load_p2p_network

graph = load_p2p_network(directed=False)

然后,初始化刚才定义的算法,并查询起始点为 6 的最短路径。

In [ ]:
sssp = SSSP_PIE()
ctx = sssp(graph, src=6)

执行这个 Cell,您的算法应该被成功执行。 此时,结果存在于分布式的 pod 内存中,由 vineyard 在进行管理。我们可以通过以下方法将远程数据取过来并展示。

In [ ]:
r1 = (
    ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
    .sort_values(by=["node"])
    .to_numpy(dtype=float)
)
r1

保存和载入您的算法

您可以保存您的自定义算法供之后使用。

In [ ]:
import os

# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")

# dump
SSSP_PIE.to_gar(dump_path)

现在 您可以在目录 ~/ 下看到您的算法包 sssp_pie.gar。下次再次使用时,只需要这样载入:

In [ ]:
from graphscope.framework.app import load_app

# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")

sssp2 = load_app("SSSP_PIE", dump_path)

基于 Pregel 模型编写算法

除了上面的 PIE 模型,您也可以通过基于点中心编程的 Pregel 模型编写自己的算法。首先我们需要导入 GraphScope 包和 pregel 装饰器。

In [ ]:
import graphscope

from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pregel
In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        pass
    @staticmethod
    def Compute(messages, v, context):
        pass

pregel 装饰器也有两个类型参数 vd_typemd_type,分别用于指定点上的数据类型和消息的类型,支持的类型包括 intdoublestring。 在我们的 SSSP 例子中,我们将这两个值都设置为 double

由于 Pregel 模型的算法逻辑是定义在点上的,它的 InitCompute 函数有一个参数 v 用于访问点上的数据。查看文档 Cython SDK API 了解更多。

定义 Init 函数

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)
    @staticmethod
    def Compute(messages, v, context):
        pass

Init 函数将每个点上的初始路径设置为极大值。

定义 Compute 函数

SSSPCompute 函数通过以下步骤为每个节点计算新距离:

1)首先初始化值为1000000000的新值
2)如果顶点是源节点,则将其距离设置为0
3)计算接收到的消息的 Min 值,如果该值小于当前值,则设置该值。

重复这些步骤,直到不再生成新消息(距离更短)为止。

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

可选的 Combiner 函数

我们可以定义一个 Combiner 以减少消息通信的开销,注意这个 Combiner 函数是可选的,您也可以不实现。

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

    @staticmethod
    def Combine(messages):
        ret = 1000000000.0
        for m in messages:
            ret = min(ret, m)
        return ret

运行您的 Pregel 算法

接下来让我们运行基于 Pregel 写的 SSSP 算法和查看结果。

In [ ]:
sssp_pregel = SSSP_Pregel()
ctx = sssp_pregel(graph, src=6)
In [ ]:
r2 = (
    ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
    .sort_values(by=["node"])
    .to_numpy(dtype=float)
)
r2

Pregel 模型中的 Aggregator

Pregel 模型中的 aggregator 聚合器是一种用于全局通信、监视和计数的机制。

每个顶点都可以在超步 S 中为聚合器提供值,系统将这些值聚合在一起,使用归约算符对这些值进行计算,并在超步 S+1 中将所得值提供给所有顶点。 GraphScope 为 Pregel 算法提供了许多预定义的聚合器,例如对数据类型的 minmaxsum 等。

这里是使用内置 aggregator 的示例,您也可以在文档 Cython SDK API 中了解更多

In [ ]:
@pregel(vd_type="double", md_type="double")
class Aggregators_Pregel_Test(AppAssets):
    @staticmethod
    def Init(v, context):
        # int
        context.register_aggregator(
            b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator
        )
        context.register_aggregator(
            b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator
        )
        context.register_aggregator(
            b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator
        )
        # double
        context.register_aggregator(
            b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator
        )
        context.register_aggregator(
            b"double_overwrite_aggregator",
            PregelAggregatorType.kDoubleOverwriteAggregator,
        )
        # bool
        context.register_aggregator(
            b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator
        )
        context.register_aggregator(
            b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator
        )
        context.register_aggregator(
            b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator
        )
        # text
        context.register_aggregator(
            b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator
        )
    @staticmethod
    def Compute(messages, v, context):
        if context.superstep() == 0:
            context.aggregate(b"int_sum_aggregator", 1)
            context.aggregate(b"int_max_aggregator", int(v.id()))
            context.aggregate(b"int_min_aggregator", int(v.id()))
            context.aggregate(b"double_product_aggregator", 1.0)
            context.aggregate(b"double_overwrite_aggregator", 1.0)
            context.aggregate(b"bool_and_aggregator", True)
            context.aggregate(b"bool_or_aggregator", False)
            context.aggregate(b"bool_overwrite_aggregator", True)
            context.aggregate(b"text_append_aggregator", v.id() + b",")
        else:
            if v.id() == b"1":
                assert context.get_aggregated_value(b"int_sum_aggregator") == 62586
                assert context.get_aggregated_value(b"int_max_aggregator") == 62586
                assert context.get_aggregated_value(b"int_min_aggregator") == 1
                assert context.get_aggregated_value(b"double_product_aggregator") == 1.0
                assert (
                    context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0
                )
                assert context.get_aggregated_value(b"bool_and_aggregator") == True
                assert context.get_aggregated_value(b"bool_or_aggregator") == False
                assert (
                    context.get_aggregated_value(b"bool_overwrite_aggregator") == True
                )
                context.get_aggregated_value(b"text_append_aggregator")
            v.vote_to_halt()
In [ ]: