基于 Kubernetes 环境再探 论文引用网络中的节点分类任务

GraphScope 作为一站式超大规模图处理系统,背后依赖 vineyard 作为分布式内存数据管理器,支持在 Kubernetes 管理的集群上运行。

接下来,我们回顾第一个教程中的示例,展示 GraphScope 如何基于 Kubernetes 集群,计算论文引用网络中的节点分类任务。

这一教程将会分为以下几个步骤:

  • 建立会话和载图;
  • 通过gremlin交互式查询图;
  • 执行图算法做图分析;
  • 执行基于图数据的机器学习任务;
  • 关闭会话

GraphScope Jupyter 外,确保运行该教程的环境具备访问操作 Kubernetes 集群 的能力

创建一个会话,并载入 ogbn_mag 数据集

In [ ]:
# Import the graphscope module

import graphscope

graphscope.set_option(show_log=False)  # enable logging
In [ ]:
# Create a session on kubernetes cluster and 
# mount dataset bucket to path "/home/jovyan/datasets" in pod.

from graphscope.dataset import load_ogbn_mag

sess = graphscope.session(mount_dataset="/home/jovyan/datasets")

命令执行后,会话会首先尝试去拉起协调者(Coordinator),其为后端引擎的入口。协调者管理一个由 K8s Pod 组成的集群 (默认有二个 Pod),图交互引擎,图分析引擎,图学习引擎运行在集群上。对于集群中的每一个 Pod,都有一个 Vineyard 实例运行,提供内存中的分布式内存访问。

运行上面的代码单元格之后,输出的日志里包含创建会话的所有过程。

日志中 GraphScope coordinator service connected 代表会话创建成功,且当前 Python 客户端已连接到此会话。

如下命令也可以查看会话状态。

In [ ]:
sess

运行此单元格,可以看到 "status: active" 的字样,代表会话状态正常。此外还有一些其他会话的元信息,如工作者 (Worker/Pod)数量,协调者的 endpoint 等等。

In [ ]:
# Load the obgn_mag dataset in "sess" as a graph

graph = load_ogbn_mag(sess, "/home/jovyan/datasets/ogbn_mag_small/")

Interactive query with gremlin

在此示例中,我们启动了一个交互查询引擎,然后使用图遍历来查看两位给定作者共同撰写的论文数量。为了简化查询,我们假设作者可以分别由ID 2 和 4307 唯一标识。

In [ ]:
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)

# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
    "g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)

Graph analytics with analytical engine

继续我们的示例,下面我们在图数据中进行图分析来生成节点结构特征。我们首先通过在特定周期内从全图中提取论文(使用Gremlin!)来导出一个子图,然后运行 k-core 分解和三角形计数以生成每个论文节点的结构特征。

In [ ]:
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)

# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})

Graph neural networks (GNNs)

接着我们利用生成的结构特征和原有特征通过GraphScope的学习引擎来训练一个学习模型。

在本例中,我们训练了GCN 模型,将节点(论文)分类为349个类别,每个类别代表一个出处(例如预印本和会议)。

In [ ]:
# define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
    paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")

# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
    sub_graph,
    nodes=[("paper", paper_features)],
    edges=[("paper", "cites", "paper")],
    gen_labels=[
        ("train", "paper", 100, (0, 75)),
        ("val", "paper", 100, (75, 85)),
        ("test", "paper", 100, (85, 100)),
    ],
)

# Then we define the training process, use internal GCN model.
from graphscope.learning.examples import GCN
from graphscope.learning.graphlearn.python.model.tf.optimizer import get_tf_optimizer
from graphscope.learning.graphlearn.python.model.tf.trainer import LocalTFTrainer


def train(config, graph):
    def model_fn():
        return GCN(
            graph,
            config["class_num"],
            config["features_num"],
            config["batch_size"],
            val_batch_size=config["val_batch_size"],
            test_batch_size=config["test_batch_size"],
            categorical_attrs_desc=config["categorical_attrs_desc"],
            hidden_dim=config["hidden_dim"],
            in_drop_rate=config["in_drop_rate"],
            neighs_num=config["neighs_num"],
            hops_num=config["hops_num"],
            node_type=config["node_type"],
            edge_type=config["edge_type"],
            full_graph_mode=config["full_graph_mode"],
        )

    trainer = LocalTFTrainer(
        model_fn,
        epoch=config["epoch"],
        optimizer=get_tf_optimizer(
            config["learning_algo"], config["learning_rate"], config["weight_decay"]
        ),
    )
    trainer.train_and_evaluate()


# hyperparameters config.
config = {
    "class_num": 349,  # output dimension
    "features_num": 130,  # 128 dimension + kcore + triangle count
    "batch_size": 500,
    "val_batch_size": 100,
    "test_batch_size": 100,
    "categorical_attrs_desc": "",
    "hidden_dim": 256,
    "in_drop_rate": 0.5,
    "hops_num": 2,
    "neighs_num": [5, 10],
    "full_graph_mode": False,
    "agg_type": "gcn",  # mean, sum
    "learning_algo": "adam",
    "learning_rate": 0.01,
    "weight_decay": 0.0005,
    "epoch": 5,
    "node_type": "paper",
    "edge_type": "cites",
}

# start traning and evaluating
train(config, lg)

最后,会话管理着集群的资源,因此在使用完会话后需要释放资源。

In [ ]:
# Close the session.
sess.close()