This notebook assumes kafka (and zookeeper) have been started and are available at localhost:9092.
$ docker-compose up -d
You can explicitly create Kafka topics with appropriate replication and partition config.
% docker exec -ti kafka bash
root@kafka:/# kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream
%matplotlib inline
import warnings
warnings.simplefilter("ignore")
!pip install kafka-python
import datetime
import os.path
import pandas as pd
import numpy as np
Requirement already satisfied: kafka-python in /Users/chris/opt/miniconda3/envs/jupyter/lib/python3.8/site-packages (2.0.2)
Load some sample data that we will feed into a Kafka topic.
data_file = "lending_club_demo.csv"
full_data = pd.read_csv(os.path.join(data_file))
full_data['issue_d'].describe()
data = full_data[full_data['issue_d'] == 'Jan-2017']
Load some data into a Kafka topic.
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i, row in data.iterrows():
producer.send('whylogs-stream', row.to_dict())
import json
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
# consumer.seek_to_beginning workaround
# https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097
assignments = []
topics=['whylogs-stream']
for topic in topics:
partitions = consumer.partitions_for_topic(topic)
for p in partitions:
print(f'topic {topic} - partition {p}')
assignments.append(TopicPartition(topic, p))
consumer.assign(assignments)
topic whylogs-stream - partition 0
A long-running, stand-alone python consumer might use this code to read events from a Kfaka topic. We don't use this in the Notebook because it does not terminate.
import datetime
consumer.seek_to_beginning();
total = 0
with session.logger(dataset_name="another-dataset", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0)) as logger:
for record in consumer:
total += 1
print(f'total {total}')
logger.log(record.value)
For Notebooks it is better to poll for data and exit when the partition is exhausted.
For demonstration purposes, we reset all partitions to the beginning.
from whylogs import get_or_create_session
session = get_or_create_session()
consumer.seek_to_beginning();
with session.logger(dataset_name="another-dataset") as logger:
total = 0
while True:
finished = True
record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)
for k,v in record.items():
print(f'{k} - {len(v)}')
total += len(v)
df = pd.DataFrame([row.value for row in v])
logger.log_dataframe(df)
finished = False
if finished:
print(f"total {total}")
break
TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 67 TopicPartition(topic='whylogs-stream', partition=0) - 42 total 309
!find whylogs-output -type f
whylogs-output/another-dataset/dataset_summary/freq_numbers/dataset_summary-batch.json whylogs-output/another-dataset/dataset_summary/json/dataset_summary-batch.json whylogs-output/another-dataset/dataset_summary/flat_table/dataset_summary-batch.csv whylogs-output/another-dataset/dataset_summary/histogram/dataset_summary-batch.json whylogs-output/another-dataset/dataset_summary/frequent_strings/dataset_summary-batch.json whylogs-output/another-dataset/dataset_profile/protobuf/datase_profile-batch.bin