GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s).
Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.
This tutorial has the following steps:
Please note, since this tutorial is designed to run on a k8s cluster, it may not compatible to the environments besides Playground.
# Import the graphscope module
import graphscope
graphscope.set_option(show_log=False) # enable logging
# Create a session on kubernetes cluster and
# mount dataset bucket to path "/datasets" in pod.
from graphscope.dataset import load_ogbn_mag
sess = graphscope.session(with_dataset=True, k8s_service_type='LoadBalancer', k8s_image_pull_policy='Always')
Behind the scenes, the session tries to launch a coordinator, which is the entry for the back-end engines. The coordinator manages a cluster of k8s pods (2 pods by default), and the interactive/analytical/learning engines run on them. For each pod in the cluster, there is a vineyard instance at service for distributed data in memory.
Run the cell and take a look at the log, it prints the whole process of the session launching.
The log GraphScope coordinator service connected means the session launches successfully, and the current Python client has connected to the session.
You can also check a session's status by this.
sess
Run this cell, you may find a "status" field with value "active". Together with the status, it also prints other meta-info of this session, i.e., such as the number of workers (pods), the coordinator endpoint for connection, and so on.
# Load the obgn_mag dataset in "sess" as a graph
graph = load_ogbn_mag(sess, "/datasets/ogbn_mag_small/")
# print the schema of the graph
print(graph)
In this example, we launch a interactive query and use graph traversal to count the number of papers two given authors have co-authored. To simplify the query, we assume the authors can be uniquely identified by ID 2
and 4307
, respectively.
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)
# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
"g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)
Continuing our example, we run graph algorithms on graph to generate structural features. below we first derive a subgraph by extracting publications in specific time out of the entire graph (using Gremlin!), and then run k-core decomposition and triangle counting to generate the structural features of each paper node.
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")
# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)
# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})
Then, we use the generated structural features and original features to train a learning model with learning engine.
In our example, we train a supervised GraphSAGE model to classify the nodes (papers) into 349 categories, each of which represents a venue (e.g. pre-print and conference).
# Define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")
# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
sub_graph,
nodes=[("paper", paper_features)],
edges=[("paper", "cites", "paper")],
gen_labels=[
("train", "paper", 100, (0, 75)),
("val", "paper", 100, (75, 85)),
("test", "paper", 100, (85, 100)),
],
)
# Then we define the training process using the example EgoGraphSAGE model with tensorflow.
try:
# https://www.tensorflow.org/guide/migrate
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
except ImportError:
import tensorflow as tf
import argparse
import graphscope.learning.graphlearn.python.nn.tf as tfg
from graphscope.learning.examples import EgoGraphSAGE
from graphscope.learning.examples import EgoSAGEUnsupervisedDataLoader
from graphscope.learning.examples.tf.trainer import LocalTrainer
def parse_args():
argparser = argparse.ArgumentParser("Train EgoSAGE Unsupervised.")
argparser.add_argument('--batch_size', type=int, default=128)
argparser.add_argument('--features_num', type=int, default=130)
argparser.add_argument('--hidden_dim', type=int, default=128)
argparser.add_argument('--output_dim', type=int, default=128)
argparser.add_argument('--nbrs_num', type=list, default=[5, 5])
argparser.add_argument('--neg_num', type=int, default=5)
argparser.add_argument('--learning_rate', type=float, default=0.0001)
argparser.add_argument('--epochs', type=int, default=1)
argparser.add_argument('--agg_type', type=str, default="mean")
argparser.add_argument('--drop_out', type=float, default=0.0)
argparser.add_argument('--sampler', type=str, default='random')
argparser.add_argument('--neg_sampler', type=str, default='in_degree')
argparser.add_argument('--temperature', type=float, default=0.07)
return argparser.parse_args()
args = parse_args()
# define model
dims = [args.features_num] + [args.hidden_dim] * (len(args.nbrs_num) - 1) + [args.output_dim]
model = EgoGraphSAGE(dims, agg_type=args.agg_type, dropout=args.drop_out)
# prepare the training dataset
train_data = EgoSAGEUnsupervisedDataLoader(lg, None, sampler=args.sampler,
neg_sampler=args.neg_sampler, batch_size=args.batch_size,
node_type='paper', edge_type='cites', nbrs_num=args.nbrs_num)
src_emb = model.forward(train_data.src_ego)
dst_emb = model.forward(train_data.dst_ego)
neg_dst_emb = model.forward(train_data.neg_dst_ego)
loss = tfg.unsupervised_softmax_cross_entropy_loss(
src_emb, dst_emb, neg_dst_emb, temperature=args.temperature)
optimizer = tf.train.AdamOptimizer(learning_rate=args.learning_rate)
# Start training
trainer = LocalTrainer()
trainer.train(train_data.iterator, loss, optimizer, epochs=args.epochs)
Finally, a session manages the resources in the cluster, thus it is important to release these resources when they are no longer required. To de-allocate the resources, use the method close on the session when all the graph tasks are finished.
# Close the session.
sess.close()