#!/usr/bin/env python # coding: utf-8 # ## 基于 Kubernetes 环境再探 论文引用网络中的节点分类任务 # # GraphScope 作为一站式超大规模图处理系统,背后依赖 [vineyard](https://github.com/v6d-io/v6d) 作为分布式内存数据管理器,支持在 Kubernetes 管理的集群上运行。 # # 接下来,我们回顾第一个教程中的示例,展示 GraphScope 如何基于 Kubernetes 集群,计算论文引用网络中的节点分类任务。 # # 这一教程将会分为以下几个步骤: # # - 建立会话和载图; # - 通过gremlin交互式查询图; # - 执行图算法做图分析; # - 执行基于图数据的机器学习任务; # - 关闭会话 # # **除 [GraphScope Jupyter](https://try.graphscope.app) 外,确保运行该教程的环境具备访问操作 [Kubernetes 集群](https://github.com/kubernetes/kubernetes) 的能力** # ## 创建一个会话,并载入 ogbn_mag 数据集 # In[ ]: # Import the graphscope module import graphscope graphscope.set_option(show_log=False) # enable logging # In[ ]: # Create a session on kubernetes cluster and # mount dataset bucket to path "/datasets" in pod. from graphscope.dataset import load_ogbn_mag sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always') # 命令执行后,会话会首先尝试去拉起协调者(Coordinator),其为后端引擎的入口。协调者管理一个由 K8s Pod 组成的集群 (默认有二个 Pod),图交互引擎,图分析引擎,图学习引擎运行在集群上。对于集群中的每一个 Pod,都有一个 Vineyard 实例运行,提供内存中的分布式内存访问。 # # 运行上面的代码单元格之后,输出的日志里包含创建会话的所有过程。 # # 日志中 **GraphScope coordinator service connected** 代表会话创建成功,且当前 Python 客户端已连接到此会话。 # # 如下命令也可以查看会话状态。 # In[ ]: sess # 运行此单元格,可以看到 "status: active" 的字样,代表会话状态正常。此外还有一些其他会话的元信息,如工作者 (Worker/Pod)数量,协调者的 endpoint 等等。 # In[ ]: # Load the obgn_mag dataset in "sess" as a graph graph = load_ogbn_mag(sess, "/datasets/ogbn_mag_small/") # print the schema of the graph print(graph) # ## Interactive query with gremlin # # 在此示例中,我们启动了一个交互查询引擎,然后使用图遍历来查看两位给定作者共同撰写的论文数量。为了简化查询,我们假设作者可以分别由ID 2 和 4307 唯一标识。 # In[ ]: # Get the entrypoint for submitting Gremlin queries on graph g. interactive = sess.gremlin(graph) # Count the number of papers two authors (with id 2 and 4307) have co-authored. papers = interactive.execute( "g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()" ).one() print("result", papers) # ## Graph analytics with analytical engine # # 继续我们的示例,下面我们在图数据中进行图分析来生成节点结构特征。我们首先通过在特定周期内从全图中提取论文(使用Gremlin!)来导出一个子图,然后运行 k-core 分解和三角形计数以生成每个论文节点的结构特征。 # In[ ]: # Exact a subgraph of publication within a time range. sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')") # Project the subgraph to simple graph by selecting papers and their citations. simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []}) # compute the kcore and triangle-counting. kc_result = graphscope.k_core(simple_g, k=5) tc_result = graphscope.triangles(simple_g) # Add the results as new columns to the citation graph. sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"}) sub_graph = sub_graph.add_column(tc_result, {"tc": "r"}) # ## Graph neural networks (GNNs) # # 接着我们利用生成的结构特征和原有特征通过GraphScope的学习引擎来训练一个学习模型。 # # 在本例中,我们训练了GraphSAGE模型,将节点(论文)分类为349个类别,每个类别代表一个出处(例如预印本和会议)。 # In[ ]: # Define the features for learning, # we chose original 128-dimension feature and k-core, triangle count result as new features. paper_features = [] for i in range(128): paper_features.append("feat_" + str(i)) paper_features.append("kcore") paper_features.append("tc") # launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test. lg = sess.graphlearn( sub_graph, nodes=[("paper", paper_features)], edges=[("paper", "cites", "paper")], gen_labels=[ ("train", "paper", 100, (0, 75)), ("val", "paper", 100, (75, 85)), ("test", "paper", 100, (85, 100)), ], ) # Then we define the training process using the example EgoGraphSAGE model with tensorflow. try: # https://www.tensorflow.org/guide/migrate import tensorflow.compat.v1 as tf tf.disable_v2_behavior() except ImportError: import tensorflow as tf import argparse import graphscope.learning.graphlearn.python.nn.tf as tfg from graphscope.learning.examples import EgoGraphSAGE from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader from graphscope.learning.examples.tf.trainer import LocalTrainer def parse_args(): argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.") argparser.add_argument('--batch_size', type=int, default=128) argparser.add_argument('--features_num', type=int, default=130) argparser.add_argument('--hidden_dim', type=int, default=128) argparser.add_argument('--output_dim', type=int, default=128) argparser.add_argument('--nbrs_num', type=list, default=[5, 5]) argparser.add_argument('--neg_num', type=int, default=5) argparser.add_argument('--learning_rate', type=float, default=0.0001) argparser.add_argument('--epochs', type=int, default=1) argparser.add_argument('--agg_type', type=str, default="mean") argparser.add_argument('--drop_out', type=float, default=0.0) argparser.add_argument('--sampler', type=str, default='random') argparser.add_argument('--neg_sampler', type=str, default='in_degree') argparser.add_argument('--temperature', type=float, default=0.07) return argparser.parse_args() args = parse_args() # define model dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim] model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out) # prepare the training dataset train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler, neg_sampler=args.neg_sampler, batch_size=args.batch_size, node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num) src_emb = model.forward(train_data.src_ego) dst_emb = model.forward(train_data.dst_ego) neg_dst_emb = model.forward(train_data.neg_dst_ego) loss = tfg.unsupervised_softmax_cross_entropy_loss( src_emb, dst_emb, neg_dst_emb, temperature=args.temperature) optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate) # Start training trainer = LocalTrainer() trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs) # 最后,会话管理着集群的资源,因此在使用完会话后需要释放资源。 # In[ ]: # Close the session. sess.close()