We've seen how to deploy machine learning pipelines into production with OpenShift Pipelines and now we'll see how we can use these services to make predictions.
First, make sure that the model service you built with source-to-image is running. Your next step, which is absolutely necessary, is to change the DEFAULT_HOST
in the first code cell. If you're running this notebook in OpenShift, you'll want to change pipeline
to the internal service name. You can get the service host from the OpenShift web console; in our lab, the service name will be something like pipeline.opendatahub-user1.svc.cluster.local
.
DEFAULT_HOST = "pipeline"
DEFAULT_BASE_URL = ("http://%s/" % DEFAULT_HOST) + r"%s"
We'll use the requests
library to interact with the Knative REST service that our Tekton pipeline created. Although we're running this in a notebook, you can certainly imagine how you'd interact with a similar service from an application using your favorite REST client.
import requests
from urllib.parse import urlencode
import json
import numpy as np
import pandas as pd
def score_transaction(user_id, merchant_id, amount, trans_type, foreign, interarrival, url = None):
d = {'user_id': [user_id], 'amount': [amount], 'merchant_id': [merchant_id], 'trans_type': [trans_type], 'foreign': [foreign], 'interarrival': [interarrival]}
return score_transactions(pd.DataFrame(d), url)
def score_transactions(df, url = None):
url = (url or (DEFAULT_BASE_URL % "predict"))
count = len(df)
zeros = list(np.zeros(count))
d = { 'timestamp': zeros,
'label': zeros,
'user_id': df['user_id'].values.tolist(),
'amount': df['amount'].values.tolist(),
'merchant_id': df['merchant_id'].values.tolist(),
'trans_type': list(df['trans_type'].values),
'foreign': df['foreign'].values.tolist(),
'interarrival': df['interarrival'].values.tolist() }
payload = urlencode({"json_args" : json.dumps(d)})
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.request("POST", url, data=payload, headers=headers)
try:
return json.loads(response.text)
except BaseException as e:
raise RuntimeError("Error: caught %r while processing %r (%r)" % (e, response, response.text))
def get_metrics(url = None):
def parse_one_metric(line):
ll = line.rsplit(' ', 1)
return (ll[0], float(ll[1]))
url = (url or (DEFAULT_BASE_URL % "metrics"))
response = requests.request("POST", url)
return dict([parse_one_metric(line) for line in response.text.split('\n') if len(line) > 0 and line[0] != '#'])
The score_transaction
function we just defined will let us pass in a single transaction (as attributes); the score_transactions
function will let us pass in a set of transactions. Let's try it out!
score_transaction(1698, 7915, 22.37, 'contactless', False, 9609)
Let's try our service with some more transactions from our training set:
data = pd.read_parquet("fraud-cleaned-sample.parquet")
sample = data.sample(200)
sample["predictions"] = score_transactions(sample)
sample
Running our models as services gives us an interesting opportunity to detect data drift by publishing the distribution of our predictions as metrics. If the distribution of predictions shifts over time, we can use that as an indication that the distribution of the data we're evaluating has shifted as well, and that we should re-train our model.
In this example, our pipeline service publishes metrics related to the predictions made by the model (keys beginning with pipeline_predictions_
) as well as metrics related to the computational performance of our pipeline service (keys beginning with pipeline_processing_seconds_
).
get_metrics()
Since our service publishes Prometheus metrics, we can define alerting rules or visualize how our metric values change over time. If you're running the model service in a place where a Prometheus service can scrape the metrics (like OpenShift with the Open Data Hub installed), then you'll be able to add the following query to see the distribution of predictions over time:
sum(pipeline_predictions_total) by (app, value)
We're taking the sum
of these counts because we could have multiple instances of the pipeline
service running, and we're aggregating over the app
label (in this case, pipeline
) and the predicted label (spam
or legitimate
).
To see these metrics in Prometheus, go to the OpenShift console and select Networking -> Routes
and then click on the route for Prometheus. You can also visualize how each prediction count changes by taking the logarithm of each:
ln(sum(pipeline_predictions_total) by (app, value))
Now we'll set up an experiment to simulate data drift. The experiment
function will take a percentage of legitimate
and spam
messages from our training set and score them against our live pipeline service.
✅ Change the distributions in the below cells to simulate data drift.
def experiment(data, size, **kwargs):
for k, v in kwargs.items():
sample = data[data.label == k].sample(int(size * v), replace=True)
score_transactions(sample)
experiment(data, 100000, fraud=.02, legitimate=.98)
experiment(data, 100000, fraud=.1, legitimate=.9)
experiment(data, 100000, fraud=.3, legitimate=.7)