#!/usr/bin/env python # coding: utf-8 # This notebook assumes kafka (and zookeeper) have been started and are available at localhost:9092. # # https://medium.com/better-programming/your-local-event-driven-environment-using-dockerised-kafka-cluster-6e84af09cd95 # # ``` # $ docker-compose up -d # ``` # # You can explicitly create Kafka topics with appropriate replication and partition config. # # ``` # % docker exec -ti kafka bash # root@kafka:/# kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream # ``` # In[1]: get_ipython().run_line_magic('matplotlib', 'inline') import warnings warnings.simplefilter("ignore") get_ipython().system('pip install kafka-python') import datetime import os.path import pandas as pd import numpy as np # Load some sample data that we will feed into a Kafka topic. # In[2]: data_file = "lending_club_demo.csv" full_data = pd.read_csv(os.path.join(data_file)) full_data['issue_d'].describe() data = full_data[full_data['issue_d'] == 'Jan-2017'] # Load some data into a Kafka topic. # In[3]: from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) for i, row in data.iterrows(): producer.send('whylogs-stream', row.to_dict()) # In[4]: import json from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8'))) # consumer.seek_to_beginning workaround # https://github.com/dpkp/kafka-python/issues/601#issuecomment-331419097 assignments = [] topics=['whylogs-stream'] for topic in topics: partitions = consumer.partitions_for_topic(topic) for p in partitions: print(f'topic {topic} - partition {p}') assignments.append(TopicPartition(topic, p)) consumer.assign(assignments) # A long-running, stand-alone python consumer might use this code to read events from a Kfaka topic. # We don't use this in the Notebook because it does not terminate. # # ``` # import datetime # consumer.seek_to_beginning(); # total = 0 # with session.logger(dataset_name="another-dataset", dataset_timestamp=datetime.datetime(2020, 9, 22, 0, 0)) as logger: # for record in consumer: # total += 1 # print(f'total {total}') # logger.log(record.value) # ``` # # For Notebooks it is better to poll for data and exit when the partition is exhausted. # # For demonstration purposes, we reset all partitions to the beginning. # In[11]: from whylogs import get_or_create_session session = get_or_create_session() consumer.seek_to_beginning(); with session.logger(dataset_name="another-dataset") as logger: total = 0 while True: finished = True record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True) for k,v in record.items(): print(f'{k} - {len(v)}') total += len(v) df = pd.DataFrame([row.value for row in v]) logger.log_dataframe(df) finished = False if finished: print(f"total {total}") break # In[12]: get_ipython().system('find whylogs-output -type f')