In this notebook, we will explore how to use Python in a streaming and distributed manner
To simulate streaming data, we will load data into a Pandas dataframe. Then, we will iterate via each Row
object, which is a dictionary object.
whylogs.DatasetProfile.track
method accepts dictionary of [feature_name, value]
.
import datetime
import os.path
import pandas as pd
data_file = "lending_club_demo.csv"
full_data = pd.read_csv(data_file)
full_data['issue_d'].describe()
data = full_data[full_data['issue_d'] == 'Jan-2017']
data
id | member_id | loan_amnt | funded_amnt | funded_amnt_inv | term | int_rate | installment | grade | sub_grade | ... | hardship_payoff_balance_amount | hardship_last_payment_amount | disbursement_method | debt_settlement_flag | debt_settlement_flag_date | settlement_status | settlement_date | settlement_amount | settlement_percentage | settlement_term | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
10599 | 96596008 | NaN | 15000.0 | 15000.0 | 15000.0 | 36 months | 15.99 | 527.29 | C | C5 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10601 | 96703051 | NaN | 14575.0 | 14575.0 | 14575.0 | 36 months | 25.49 | 583.29 | E | E4 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10602 | 96960509 | NaN | 5000.0 | 5000.0 | 5000.0 | 36 months | 8.24 | 157.24 | B | B1 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10603 | 97463966 | NaN | 13200.0 | 13200.0 | 13200.0 | 60 months | 13.99 | 307.08 | C | C3 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10605 | 96841832 | NaN | 9500.0 | 9500.0 | 9500.0 | 36 months | 8.24 | 298.75 | B | B1 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
10914 | 95617334 | NaN | 6500.0 | 6500.0 | 6250.0 | 36 months | 5.32 | 195.75 | A | A1 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10915 | 95129874 | NaN | 15000.0 | 15000.0 | 15000.0 | 60 months | 15.99 | 364.70 | C | C5 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10916 | 96187258 | NaN | 40000.0 | 40000.0 | 40000.0 | 36 months | 7.49 | 1244.07 | A | A4 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10917 | 94469381 | NaN | 5050.0 | 5050.0 | 5050.0 | 36 months | 21.49 | 191.54 | D | D5 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
10918 | 94480548 | NaN | 7350.0 | 7350.0 | 7350.0 | 36 months | 12.74 | 246.74 | C | C1 | ... | NaN | NaN | Cash | N | NaN | NaN | NaN | NaN | NaN | NaN |
309 rows × 150 columns
Let's now explore import a function from whylogs that allows us to create a logging session.
This session can be connected with multiple writers that output the results of our profiling locally in JSON, a flat CSV, or binary protobuf format as well as writers to an AWS S3 bucket in the cloud. Further writing functionality will be added as well.
Let's create a default session below.
from whylogs import get_or_create_session
session = get_or_create_session()
We can create a logger for a specific dataset timestamp. This often represents a window of data or a batch of data.
logger= session.logger(dataset_name="dataset", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0))
We'll stream through the dataframe and call logger.log
.
In practice, you'll call this on individual data points
for i, r in data.iterrows():
logger.log(r)
# close the logger to write to dist
logger.close()
<whylogs.core.datasetprofile.DatasetProfile at 0x7fea48b59e90>
We'll create another logger and write data to the new logger, but with a different timestamp
with session.logger(dataset_name="dataset", dataset_timestamp=datetime.datetime(2020, 9, 21, 0, 0)) as logger:
for i, r in data.iterrows():
logger.log(r)
Once data is written to disk, we can then merge the entries together to get a summary view.
If you run a distributed systems, this means that you can collect your whylogs
data into a cloud storage such as S3 and then aggregate them.
import glob
binaries = glob.glob('whylogs-output/dataset/**/*.bin', recursive=True)
binaries
['whylogs-output/dataset/dataset_profile/protobuf/datase_profile-1600732800000.bin', 'whylogs-output/dataset/dataset_profile/protobuf/datase_profile-1600646400000.bin']
from whylogs import DatasetProfile
# currently, whylogs writer writes non-delimited files
profiles = [DatasetProfile.read_protobuf(x, delimited_file=False) for x in binaries]
from functools import reduce
merged = reduce(lambda x, y: x.merge(y), profiles)
We can check the counter to see if the merged data reflect the "merge" here
print("First DTI count: ", profiles[0].columns['dti'].counters.count)
print("Second DTI count: ", profiles[1].columns['dti'].counters.count)
print("Merged count: ", merged.columns['dti'].counters.count)
First DTI count: 309 Second DTI count: 309 Merged count: 618