#!/usr/bin/env python # coding: utf-8 # In[ ]: # This benchmark measures the performance of pipeline related operations in Kubeflow Pipelines, including latencies of creating/getting/deleting pipelines. import random import kfp import kfp_server_api import os import string import time import numpy as np import pandas as pd import seaborn as sns import matplotlib.pyplot as plt from scipy import stats # CHANGE necessary paramters here # Host is your KFP endpoint host = 'http://127.0.0.1:3001' # Number of pipelines you want to create num_pipelines = 10 # Number of pipeline versions you want to create under each pipeline num_pipeline_versions_per_pipeline = 10 # Use the pipeline you prefer pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml' def random_suffix() -> string: return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) if __name__ == '__main__': client = kfp.Client(host) api_url = kfp_server_api.models.ApiUrl(pipeline_file_url) # Create pipeline latency create_latencies = [] created_pipeline_ids = [] for i in range(num_pipelines): api_pipeline = kfp_server_api.models.ApiPipeline( name='pipeline-' + random_suffix(), url=api_url) start = time.perf_counter() pipeline = client.pipelines.create_pipeline(body=api_pipeline) dur = time.perf_counter() - start create_latencies.append(dur) created_pipeline_ids.append(pipeline.id) # Create version latency create_version_latencies = [] created_version_ids = [] for pipeline_id in created_pipeline_ids: for j in range(num_pipeline_versions_per_pipeline): key = kfp_server_api.models.ApiResourceKey(id=pipeline_id, type=kfp_server_api.models.ApiResourceType.PIPELINE) reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER) resource_references=[reference] api_pipeline_version = kfp_server_api.models.ApiPipelineVersion( name='pipeline-version-' + random_suffix(), package_url=api_url, resource_references=resource_references) start = time.perf_counter() pipeline_version = client.pipelines.create_pipeline_version(body=api_pipeline_version) dur = time.perf_counter() - start create_version_latencies.append(dur) created_version_ids.append(pipeline_version.id) # We sometimes observe errors when the version creation calls are too close to each other when those # versions are created in the same pipeline. When adding a new version to a specific pipeline, the # pipeline's default version is updated to the new version. Therefore, when we create a bunch of versions # for the same pipeline in a row within a short period of time, these creation operations are competing # for a write lock on the same row of pipelines table in our db. This is one of the possible hypotheses # to explain the errors when we've observed. But this is definitely an interesting symptom that worths # further investigation. For now, we separate the version creation calls by 2 seconds. time.sleep(2) # Get pipeline latency get_latencies = [] for i in created_pipeline_ids: start = time.perf_counter() pipeline = client.pipelines.get_pipeline(i) dur = time.perf_counter() - start get_latencies.append(dur) # Delete pipeline latency delete_latencies= [] for i in created_pipeline_ids: start = time.perf_counter() pipeline = client.pipelines.delete_pipeline(i) dur = time.perf_counter() - start delete_latencies.append(dur) # Plots fig, axs = plt.subplots(nrows=4, figsize=(10,20)) axs[0].set(title='Create Pipeline Latency', xlabel='Time (Second)', ylabel='Create Pipeline') sns.distplot(a=create_latencies, ax=axs[0], hist=True, kde=False, rug=True) axs[1].set(title='Create Pipeline Version Latency', xlabel='Time (Second)', ylabel='Create Pipeline Version') sns.distplot(a=create_version_latencies, ax=axs[1], hist=True, kde=False, rug=True) axs[2].set(title='Get Pipeline Latency', xlabel='Time (Second)', ylabel='Get Pipeline') sns.distplot(a=get_latencies, ax=axs[2], hist=True, kde=False, rug=True) axs[3].set(title='Delete Pipeline Latency', xlabel='Time (Second)', ylabel='Delete Pipeline') sns.distplot(a=delete_latencies, ax=axs[3], hist=True, kde=False, rug=True) # TODO(jingzhang36): maybe dump the durations data to db or gcs, and let searborn read from there # In[ ]: # In[ ]: