Demonstration of how lineage tracking works
# To use the latest publish `kubeflow-metadata` library, you can run:
!pip install kubeflow-metadata --user
# Install other packages:
!pip install pandas --user
# Then restart the Notebook kernel.
import pandas
from kubeflow.metadata import metadata
from datetime import datetime
from uuid import uuid4
METADATA_STORE_HOST = "metadata-grpc-service.kubeflow" # default DNS of Kubeflow Metadata gRPC serivce.
METADATA_STORE_PORT = 8080
ws1 = metadata.Workspace(
# Connect to metadata service in namespace kubeflow in k8s cluster.
store=metadata.Store(grpc_host=METADATA_STORE_HOST, grpc_port=METADATA_STORE_PORT),
name="xgboost-synthetic",
description="workspace for xgboost-synthetic artifacts and executions",
labels={"n1": "v1"})
r = metadata.Run(
workspace=ws1,
name="xgboost-synthetic-faring-run" + datetime.utcnow().isoformat("T") ,
description="a notebook run",
)
exec = metadata.Execution(
name = "execution" + datetime.utcnow().isoformat("T") ,
workspace=ws1,
run=r,
description="execution for training xgboost-synthetic",
)
print("An execution was created with id %s" % exec.id)
date_set_version = "data_set_version_" + str(uuid4())
data_set = exec.log_input(
metadata.DataSet(
description="xgboost synthetic data",
name="synthetic-data",
owner="[email protected]",
uri="file://path/to/dataset",
version="v1.0.0",
query="SELECT * FROM mytable"))
print("Data set id is {0.id} with version '{0.version}'".format(data_set))
model_version = "model_version_" + str(uuid4())
model = exec.log_output(
metadata.Model(
name="MNIST",
description="model to recognize handwritten digits",
owner="[email protected]",
uri="gcs://my-bucket/mnist",
model_type="neural network",
training_framework={
"name": "tensorflow",
"version": "v1.0"
},
hyperparameters={
"learning_rate": 0.5,
"layers": [10, 3, 1],
"early_stop": True
},
version=model_version,
labels={"mylabel": "l1"}))
print(model)
print("\nModel id is {0.id} and version is {0.version}".format(model))
metrics = exec.log_output(
metadata.Metrics(
name="MNIST-evaluation",
description="validating the MNIST model to recognize handwritten digits",
owner="[email protected]",
uri="gcs://my-bucket/mnist-eval.csv",
data_set_id=str(data_set.id),
model_id=str(model.id),
metrics_type=metadata.Metrics.VALIDATION,
values={"accuracy": 0.95},
labels={"mylabel": "l1"}))
print("Metrics id is %s" % metrics.id)
serving_application = metadata.Execution(
name="serving model",
workspace=ws1,
description="an execution to represent model serving component",
)
# Noticed we use model name, version, uri to uniquely identify existing model.
served_model = metadata.Model(
name="MNIST",
uri="gcs://my-bucket/mnist",
version=model.version,
)
m=serving_application.log_input(served_model)
print("Found the mode with id {0.id} and version '{0.version}'.".format(m))