Revisit Classification on Citation Network on K8s

GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s).

Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.

This tutorial has the following steps:

  • Creating a session and loading graph;
  • Querying graph data;
  • Running graph algorithms;
  • Running graph-based machine learning tasks;
  • Closing the session and releasing resources.

Please note, since this tutorial is designed to run on a k8s cluster, it may not compatible to the environments besides Playground.

Create a session on kubernetes and load graph

In [ ]:
# Import the graphscope module

import graphscope

graphscope.set_option(show_log=False)  # enable logging
In [ ]:
# Create a session on kubernetes cluster and 
# mount dataset bucket to path "/home/jovyan/datasets" in pod.

from graphscope.dataset import load_ogbn_mag

sess = graphscope.session(mount_dataset="/home/jovyan/datasets")

Behind the scenes, the session tries to launch a coordinator, which is the entry for the back-end engines. The coordinator manages a cluster of k8s pods (2 pods by default), and the interactive/analytical/learning engines ran on them. For each pod in the cluster, there is a vineyard instance at service for distributed data in memory.

Run the cell and take a look at the log, it prints the whole process of the session launching.

The log GraphScope coordinator service connected means the session launches successfully, and the current Python client has connected to the session.

You can also check a session's status by this.

In [ ]:
sess

Run this cell, you may find a "status" field with value "active". Together with the status, it also prints other metainfo of this session, i.e., such as the number of workers (pods), the coordinator endpoint for connection, and so on.

In [ ]:
# Load the obgn_mag dataset in "sess" as a graph

graph = load_ogbn_mag(sess, "/home/jovyan/datasets/ogbn_mag_small/")

Interactive query with gremlin

In this example, we launch a interactive query and use graph traversal to count the number of papers two given authors have co-authored. To simplify the query, we assume the authors can be uniquely identified by ID 2 and 4307, respectively.

In [ ]:
# Get the entrypoint for submitting Gremlin queries on graph g.
interactive = sess.gremlin(graph)

# Count the number of papers two authors (with id 2 and 4307) have co-authored.
papers = interactive.execute(
    "g.V().has('author', 'id', 2).out('writes').where(__.in('writes').has('id', 4307)).count()"
).one()
print("result", papers)

Graph analytics with analytical engine

Continuing our example, we run graph algorithms on graph to generate structural features. below we first derive a subgraph by extracting publications in specific time out of the entire graph (using Gremlin!), and then run k-core decomposition and triangle counting to generate the structural features of each paper node.

In [ ]:
# Exact a subgraph of publication within a time range.
sub_graph = interactive.subgraph("g.V().has('year', inside(2014, 2020)).outE('cites')")

# Project the subgraph to simple graph by selecting papers and their citations.
simple_g = sub_graph.project(vertices={"paper": []}, edges={"cites": []})
# compute the kcore and triangle-counting.
kc_result = graphscope.k_core(simple_g, k=5)
tc_result = graphscope.triangles(simple_g)

# Add the results as new columns to the citation graph.
sub_graph = sub_graph.add_column(kc_result, {"kcore": "r"})
sub_graph = sub_graph.add_column(tc_result, {"tc": "r"})

Graph neural networks (GNNs)

Then, we use the generated structural features and original features to train a learning model with learning engine.

In our example, we train a GCN model to classify the nodes (papers) into 349 categories, each of which represents a venue (e.g. pre-print and conference).

In [ ]:
# Define the features for learning,
# we chose original 128-dimension feature and k-core, triangle count result as new features.
paper_features = []
for i in range(128):
    paper_features.append("feat_" + str(i))
paper_features.append("kcore")
paper_features.append("tc")

# launch a learning engine. here we split the dataset, 75% as train, 10% as validation and 15% as test.
lg = sess.graphlearn(
    sub_graph,
    nodes=[("paper", paper_features)],
    edges=[("paper", "cites", "paper")],
    gen_labels=[
        ("train", "paper", 100, (0, 75)),
        ("val", "paper", 100, (75, 85)),
        ("test", "paper", 100, (85, 100)),
    ],
)

# Then we define the training process, use internal GCN model.
from graphscope.learning.examples import GCN
from graphscope.learning.graphlearn.python.model.tf.optimizer import get_tf_optimizer
from graphscope.learning.graphlearn.python.model.tf.trainer import LocalTFTrainer


def train(config, graph):
    def model_fn():
        return GCN(
            graph,
            config["class_num"],
            config["features_num"],
            config["batch_size"],
            val_batch_size=config["val_batch_size"],
            test_batch_size=config["test_batch_size"],
            categorical_attrs_desc=config["categorical_attrs_desc"],
            hidden_dim=config["hidden_dim"],
            in_drop_rate=config["in_drop_rate"],
            neighs_num=config["neighs_num"],
            hops_num=config["hops_num"],
            node_type=config["node_type"],
            edge_type=config["edge_type"],
            full_graph_mode=config["full_graph_mode"],
        )

    trainer = LocalTFTrainer(
        model_fn,
        epoch=config["epoch"],
        optimizer=get_tf_optimizer(
            config["learning_algo"], config["learning_rate"], config["weight_decay"]
        ),
    )
    trainer.train_and_evaluate()


# hyperparameters config.
config = {
    "class_num": 349,  # output dimension
    "features_num": 130,  # 128 dimension + kcore + triangle count
    "batch_size": 500,
    "val_batch_size": 100,
    "test_batch_size": 100,
    "categorical_attrs_desc": "",
    "hidden_dim": 256,
    "in_drop_rate": 0.5,
    "hops_num": 2,
    "neighs_num": [5, 10],
    "full_graph_mode": False,
    "agg_type": "gcn",  # mean, sum
    "learning_algo": "adam",
    "learning_rate": 0.01,
    "weight_decay": 0.0005,
    "epoch": 5,
    "node_type": "paper",
    "edge_type": "cites",
}

# Start traning and evaluating
train(config, lg)

Finally, a session manages the resources in the cluster, thus it is important to release these resources when they are no longer required. To de-allocate the resources, use the method close on the session when all the graph tasks are finished.

In [ ]:
# Close the session.
sess.close()