GraphScope 作为一站式超大规模图处理系统,背后依赖 vineyard 作为分布式内存数据管理器,支持在 Kubernetes 管理的集群上运行。
接下来,我们回顾第一个教程中的示例,展示 GraphScope 如何基于 Kubernetes 集群,计算论文引用网络中的节点分类任务。
这一教程将会分为以下几个步骤:
除 GraphScope Jupyter 外,确保运行该教程的环境具备访问操作 Kubernetes 集群 的能力
# Import the graphscope module
import graphscope
graphscope.set_option(show_log=False) # enable logging
# Create a session on kubernetes cluster and
# mount dataset bucket to path "/datasets" in pod.
from graphscope.dataset import load_ogbn_mag
sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')
命令执行后,会话会首先尝试去拉起协调者(Coordinator),其为后端引擎的入口。协调者管理一个由 K8s Pod 组成的集群 (默认有二个 Pod),图交互引擎,图分析引擎,图学习引擎运行在集群上。对于集群中的每一个 Pod,都有一个 Vineyard 实例运行,提供内存中的分布式内存访问。
运行上面的代码单元格之后,输出的日志里包含创建会话的所有过程。
日志中 GraphScope coordinator service connected 代表会话创建成功,且当前 Python 客户端已连接到此会话。
如下命令也可以查看会话状态。
sess
运行此单元格,可以看到 "status: active" 的字样,代表会话状态正常。此外还有一些其他会话的元信息,如工作者 (Worker/Pod)数量,协调者的 endpoint 等等。
# Load the obgn_mag dataset in "sess" as a graph
graph = load_ogbn_mag(sess, "/datasets/ogbn_mag_small/")
# print the schema of the graph
print(graph)
在此示例中,我们启动了一个交互查询引擎,然后使用图遍历来查看两位给定作者共同撰写的论文数量。为了简化查询,我们假设作者可以分别由ID 2 和 4307 唯一标识。
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)
# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)
继续我们的示例,下面我们在图数据中进行图分析来生成节点结构特征。我们首先通过在特定周期内从全图中提取论文(使用Gremlin!)来导出一个子图,然后运行 k-core 分解和三角形计数以生成每个论文节点的结构特征。
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")
# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)
# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})
接着我们利用生成的结构特征和原有特征通过GraphScope的学习引擎来训练一个学习模型。
在本例中,我们训练了GraphSAGE模型,将节点(论文)分类为349个类别,每个类别代表一个出处(例如预印本和会议)。
# Define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")
# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
sub_graph,
nodes=[("paper", paper_features)],
edges=[("paper", "cites", "paper")],
gen_labels=[
("train", "paper", 100, (0, 75)),
("val", "paper", 100, (75, 85)),
("test", "paper", 100, (85, 100)),
],
)
# Then we define the training process using the example EgoGraphSAGE model with tensorflow.
try:
# https://www.tensorflow.org/guide/migrate
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
except ImportError:
import tensorflow as tf
import argparse
import graphscope.learning.graphlearn.python.nn.tf as tfg
from graphscope.learning.examples import EgoGraphSAGE
from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader
from graphscope.learning.examples.tf.trainer import LocalTrainer
def parse_args():
argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.")
argparser.add_argument('--batch_size', type=int, default=128)
argparser.add_argument('--features_num', type=int, default=130)
argparser.add_argument('--hidden_dim', type=int, default=128)
argparser.add_argument('--output_dim', type=int, default=128)
argparser.add_argument('--nbrs_num', type=list, default=[5, 5])
argparser.add_argument('--neg_num', type=int, default=5)
argparser.add_argument('--learning_rate', type=float, default=0.0001)
argparser.add_argument('--epochs', type=int, default=1)
argparser.add_argument('--agg_type', type=str, default="mean")
argparser.add_argument('--drop_out', type=float, default=0.0)
argparser.add_argument('--sampler', type=str, default='random')
argparser.add_argument('--neg_sampler', type=str, default='in_degree')
argparser.add_argument('--temperature', type=float, default=0.07)
return argparser.parse_args()
args = parse_args()
# define model
dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]
model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)
# prepare the training dataset
train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler,
neg_sampler=args.neg_sampler, batch_size=args.batch_size,
node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)
src_emb = model.forward(train_data.src_ego)
dst_emb = model.forward(train_data.dst_ego)
neg_dst_emb = model.forward(train_data.neg_dst_ego)
loss = tfg.unsupervised_softmax_cross_entropy_loss(
src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)
optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)
# Start training
trainer = LocalTrainer()
trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)
最后,会话管理着集群的资源,因此在使用完会话后需要释放资源。
# Close the session.
sess.close()