#!/usr/bin/env python # coding: utf-8 # # Demo Collect Rook Usage # In[ ]: import pandas as pd import hvplot.pandas # noqa from rooki.client import Rooki # In[ ]: # Available hosts hosts = { 'demo': 'rook.dkrz.de', 'dkrz': 'rook3.cloud.dkrz.de', 'ceda': 'rook-wps1.ceda.ac.uk', } # In[ ]: # Use cache cache_id = { 'ceda': '1f8181bc-d351-11eb-9402-005056aba41c', 'dkrz': '34369610-d351-11eb-8f86-fa163e466023', } # In[ ]: # Collect usage from several nodes def collect_usage(sites, time=None, use_cache=True): df_wps_list = [] df_downloads_list = [] for site in sites: if use_cache: ref_wps = f"http://{hosts[site]}/outputs/rook/{cache_id[site]}/wps_requests.csv" ref_downloads = f"http://{hosts[site]}/outputs/rook/{cache_id[site]}/downloads.csv" else: url = f"http://{hosts[site]}/wps" rooki = Rooki(url, mode='sync') resp = rooki.usage(time=time) ref_wps = resp.response.processOutputs[0].reference print(ref_wps) ref_downloads = resp.response.processOutputs[1].reference print(ref_downloads) # load wps df_wps = pd.read_csv(ref_wps, parse_dates=[4, 5]) df_wps['node'] = site df_wps_list.append(df_wps) # load downloads df_downloads = pd.read_csv(ref_downloads, parse_dates=[2]) df_downloads['node'] = site df_downloads_list.append(df_downloads) df_wps_combined = pd.concat(df_wps_list, ignore_index=True) df_downloads_combined = pd.concat(df_downloads_list, ignore_index=True) return df_wps_combined, df_downloads_combined # ## collect usage # In[ ]: df, df_downloads = collect_usage(['ceda', 'dkrz'], time='2021-03-23/', use_cache=False) df.head() # In[ ]: df.nunique() # ## evaluate pywps stats # In[ ]: df.operation.value_counts() # In[ ]: df.loc[df['operation']=='execute'].loc[df['status']==4].identifier.value_counts() # In[ ]: df.loc[df['operation']=='execute'].loc[df['status']==5].identifier.value_counts() # In[ ]: df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].status.value_counts() # ## duration # In[ ]: df['duration'] = df['time_end'] - df['time_start'] df.duration = df.duration.dt.seconds # In[ ]: df_skip_outlier = df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].loc[df.duration<900] # In[ ]: df_skip_outlier.duration.mean() # In[ ]: df_skip_outlier.hvplot.hist(y='duration', logx=False, bins=100) # ## jobs over days # In[ ]: days = (df.time_start.max() - df.time_start.min()).days days # In[ ]: len(df)/days # In[ ]: df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].hvplot.hist(y='time_start', bins=days) # ## jobs over week days # In[ ]: df['dayofweek'] = df['time_start'].dt.dayofweek df # In[ ]: df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].hvplot.hist(y='dayofweek', bins=7) # ## jobs over day time # In[ ]: df['hour'] = df['time_start'].dt.hour df # In[ ]: df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].hvplot.hist(y='hour', bins=24) # ## concurrent jobs # In[ ]: # https://stackoverflow.com/questions/57804145/combining-rows-with-overlapping-time-periods-in-a-pandas-dataframe edf = df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].loc[df['status'].isin([4, 5])] startdf = pd.DataFrame({'time':edf['time_start'], 'what':1}) enddf = pd.DataFrame({'time':edf['time_end'], 'what':-1}) mergdf = pd.concat([startdf, enddf]).sort_values('time') mergdf['running'] = mergdf['what'].cumsum() mergdf # In[ ]: mergdf.running.mean() # In[ ]: max_running = mergdf.running.max() max_running # In[ ]: mergdf.loc[mergdf['running']>0].hvplot.hist(y='running', bins=max_running) # ## concurrent jobs over days # In[ ]: mergdf.loc[mergdf['running']>0].hvplot.scatter(y='running', x='time') # In[ ]: tmpdf = mergdf.groupby(pd.Grouper(key="time", freq="1D")).max() tmpdf # In[ ]: tmp2df = pd.DataFrame() tmp2df['time'] = tmpdf.index.values tmp2df['running'] = tmpdf.running.values tmp2df # In[ ]: tmpdf.running.mean() # In[ ]: tmp2df.hvplot.bar(x='time', y='running') # ## Errors per day # In[ ]: df_errors = df.loc[df['operation']=='execute'].loc[df['identifier']=='orchestrate'].loc[df['status']==5] df_errors # In[ ]: df_errors.hvplot.hist(y='time_start') # ## Error messages # In[ ]: df_errors.message.value_counts() # ## Downloads # In[ ]: df_downloads.head() # In[ ]: df_downloads.nunique() # ### Downloads size # In[ ]: df_downloads['size'].sum() / 1024 ** 3 # In[ ]: def size_mb(size): return size / 1024 ** 2 # In[ ]: df_downloads['size_mb'] = df_downloads['size'].apply(size_mb) df_downloads # In[ ]: df_downloads.hvplot.hist(y='size_mb') # ### Download size per day # In[ ]: downloads_per_day = df_downloads.groupby(df_downloads.datetime.dt.date)["size_mb"].sum() downloads_per_day # In[ ]: downloads_per_day.mean() # In[ ]: downloads_per_day.hvplot.bar() # ### Download requests per day # In[ ]: days = (df_downloads.datetime.max() - df_downloads.datetime.min()).days days # In[ ]: len(df_downloads)/days # In[ ]: df_downloads.hvplot.hist(y='datetime', bins=days) # ### Downloads by IP address # In[ ]: df_downloads.remote_host_ip.value_counts() # ### Downloads GeoIP # https://pypi.org/project/geoip2nation/ # In[ ]: from geoip import xgeoip r = xgeoip.GeoIp() r.load_memory() def lookup_ip(ip): return r.resolve(ip).country # In[ ]: df_downloads['geoip'] = df_downloads.remote_host_ip.apply(lookup_ip) df_downloads # In[ ]: df_downloads.geoip.value_counts().hvplot.bar() # ## GeoHealthCheck # https://geohealthcheck.cloud.dkrz.de # In[ ]: import requests from io import StringIO ghc_url = "https://geohealthcheck.cloud.dkrz.de/resource/45/history/csv" req = requests.get(ghc_url, verify=False) df_ghc = pd.read_csv(StringIO(req.text), parse_dates=['checked_datetime']) df_ghc # In[ ]: df_ghc.status.value_counts() # In[ ]: def up(status): if status == True: return 1 return 0 df_ghc['up'] = df_ghc.status.apply(up) # In[ ]: df_ghc.hvplot.line(x='checked_datetime', y='up')