#!/usr/bin/env python # coding: utf-8 # In this notebook, we will explore how to use Python in a streaming and distributed manner # # ## Loading the dataset # # To simulate streaming data, we will load data into a Pandas dataframe. Then, we will iterate via each `Row` object, which is a dictionary object. # # `whylogs.DatasetProfile.track` method accepts dictionary of `[feature_name, value]`. # In[1]: import datetime import os.path import pandas as pd # In[2]: data_file = "lending_club_demo.csv" full_data = pd.read_csv(data_file) full_data['issue_d'].describe() data = full_data[full_data['issue_d'] == 'Jan-2017'] data # ## Creating a whylogs session # # Let's now explore import a function from whylogs that allows us to create a logging session. # # This session can be connected with multiple writers that output the results of our profiling locally in JSON, a flat CSV, or binary protobuf format as well as writers to an AWS S3 bucket in the cloud. Further writing functionality will be added as well. # # Let's create a default session below. # In[3]: from whylogs import get_or_create_session session = get_or_create_session() # ## Creating a logger # # We can create a logger for a specific dataset timestamp. This often represents a window of data or a batch of data. # # In[4]: logger= session.logger(dataset_name="dataset", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0)) # ## Log streaming data # We'll stream through the dataframe and call `logger.log`. # # In practice, you'll call this on individual data points # In[5]: for i, r in data.iterrows(): logger.log(r) # In[6]: # close the logger to write to dist logger.close() # ## Another logger # We'll create another logger and write data to the new logger, but with a different timestamp # In[7]: with session.logger(dataset_name="dataset", dataset_timestamp=datetime.datetime(2020, 9, 21, 0, 0)) as logger: for i, r in data.iterrows(): logger.log(r) # ## Merging data # Once data is written to disk, we can then merge the entries together to get a summary view. # # If you run a distributed systems, this means that you can collect your `whylogs` data into a cloud storage such as S3 and then aggregate them. # In[8]: import glob # In[9]: binaries = glob.glob('whylogs-output/dataset/**/*.bin', recursive=True) binaries # In[10]: from whylogs import DatasetProfile # currently, whylogs writer writes non-delimited files profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries] # In[11]: from functools import reduce merged = reduce(lambda x, y: x.merge(y), profiles) # ## Quick check with the merged data # We can check the counter to see if the merged data reflect the "merge" here # In[12]: print("First DTI count: ", profiles[0].columns['dti'].counters.count) print("Second DTI count: ", profiles[1].columns['dti'].counters.count) print("Merged count: ", merged.columns['dti'].counters.count)