GraphScope 作为一站式超大规模图处理系统,背后依赖 vineyard 作为分布式内存数据管理器,支持在 Kubernetes 管理的集群上运行。
接下来,我们回顾第一个教程中的示例,展示 GraphScope 如何基于 Kubernetes 集群,计算论文引用网络中的节点分类任务。
这一教程将会分为以下几个步骤:
除 GraphScope Jupyter 外,确保运行该教程的环境具备访问操作 Kubernetes 集群 的能力
# Import the graphscope module
import graphscope
graphscope.set_option(show_log=False) # enable logging
# Create a session on kubernetes cluster and
# mount dataset bucket to path "/home/jovyan/datasets" in pod.
from graphscope.dataset import load_ogbn_mag
sess = graphscope.session(mount_dataset="/home/jovyan/datasets")
命令执行后,会话会首先尝试去拉起协调者(Coordinator),其为后端引擎的入口。协调者管理一个由 K8s Pod 组成的集群 (默认有二个 Pod),图交互引擎,图分析引擎,图学习引擎运行在集群上。对于集群中的每一个 Pod,都有一个 Vineyard 实例运行,提供内存中的分布式内存访问。
运行上面的代码单元格之后,输出的日志里包含创建会话的所有过程。
日志中 GraphScope coordinator service connected 代表会话创建成功,且当前 Python 客户端已连接到此会话。
如下命令也可以查看会话状态。
sess
运行此单元格,可以看到 "status: active" 的字样,代表会话状态正常。此外还有一些其他会话的元信息,如工作者 (Worker/Pod)数量,协调者的 endpoint 等等。
# Load the obgn_mag dataset in "sess" as a graph
graph = load_ogbn_mag(sess, "/home/jovyan/datasets/ogbn_mag_small/")
在此示例中,我们启动了一个交互查询引擎,然后使用图遍历来查看两位给定作者共同撰写的论文数量。为了简化查询,我们假设作者可以分别由ID 2 和 4307 唯一标识。
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)
# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)
继续我们的示例,下面我们在图数据中进行图分析来生成节点结构特征。我们首先通过在特定周期内从全图中提取论文(使用Gremlin!)来导出一个子图,然后运行 k-core 分解和三角形计数以生成每个论文节点的结构特征。
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")
# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)
# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})
接着我们利用生成的结构特征和原有特征通过GraphScope的学习引擎来训练一个学习模型。
在本例中,我们训练了GCN 模型,将节点(论文)分类为349个类别,每个类别代表一个出处(例如预印本和会议)。
# define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")
# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
sub_graph,
nodes=[("paper", paper_features)],
edges=[("paper", "cites", "paper")],
gen_labels=[
("train", "paper", 100, (0, 75)),
("val", "paper", 100, (75, 85)),
("test", "paper", 100, (85, 100)),
],
)
# Then we define the training process, use internal GCN model.
from graphscope.learning.examples import GCN
from graphscope.learning.graphlearn.python.model.tf.optimizer import get_tf_optimizer
from graphscope.learning.graphlearn.python.model.tf.trainer import LocalTFTrainer
def train(config, graph):
def model_fn():
return GCN(
graph,
config["class_num"],
config["features_num"],
config["batch_size"],
val_batch_size=config["val_batch_size"],
test_batch_size=config["test_batch_size"],
categorical_attrs_desc=config["categorical_attrs_desc"],
hidden_dim=config["hidden_dim"],
in_drop_rate=config["in_drop_rate"],
neighs_num=config["neighs_num"],
hops_num=config["hops_num"],
node_type=config["node_type"],
edge_type=config["edge_type"],
full_graph_mode=config["full_graph_mode"],
)
trainer = LocalTFTrainer(
model_fn,
epoch=config["epoch"],
optimizer=get_tf_optimizer(
config["learning_algo"], config["learning_rate"], config["weight_decay"]
),
)
trainer.train_and_evaluate()
# hyperparameters config.
config = {
"class_num": 349, # output dimension
"features_num": 130, # 128 dimension + kcore + triangle count
"batch_size": 500,
"val_batch_size": 100,
"test_batch_size": 100,
"categorical_attrs_desc": "",
"hidden_dim": 256,
"in_drop_rate": 0.5,
"hops_num": 2,
"neighs_num": [5, 10],
"full_graph_mode": False,
"agg_type": "gcn", # mean, sum
"learning_algo": "adam",
"learning_rate": 0.01,
"weight_decay": 0.0005,
"epoch": 5,
"node_type": "paper",
"edge_type": "cites",
}
# start traning and evaluating
train(config, lg)
最后,会话管理着集群的资源,因此在使用完会话后需要释放资源。
# Close the session.
sess.close()