#!/usr/bin/env python # coding: utf-8 # >### 🚩 *Create a free WhyLabs account to get more value out of whylogs!*
# >*Did you know you can store, visualize, and monitor whylogs profiles with the [WhyLabs Observability Platform](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example)? Sign up for a [free WhyLabs account](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Kafka_Example) to leverage the power of whylogs and WhyLabs together!* # # Profiling with `whylogs` from a Kafka topic # # In this example we will show how you can profile and merge different profiles from a Kafka topic. To simplify our example and make it reproducible anywhere, we will create a Kafka topic, generate the data from an existing CSV file and ingest it, consume the messages from the topic and then profile these consumed messages. # # >**NOTE**: In order to get this example going, we will use Apache Zookeper and Apache Kafka locally with Docker Compose, so be sure to have it installed and ready in your environment. If you want to read more on how this YAML file was built, check out [this blogpost](https://medium.com/better-programming/your-local-event-driven-environment-using-dockerised-kafka-cluster-6e84af09cd95). # To get things going, we will put the services up and create the topic in kafka with the following commands: # # ```bash # $ docker-compose up -d # # % docker exec -ti kafka bash # # root@kafka: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream # ``` # # If you haven't already, make sure to also install `kafka-python` and `whylogs` in your environment by uncommenting the following cell. # In[1]: # Note: you may need to restart the kernel to use updated packages. get_ipython().run_line_magic('pip', 'install whylogs') get_ipython().run_line_magic('pip', 'install kafka-python') # ## Generating Data # # To generate the data, we will fetch a small CSV file from a publicly available s3 endpoint and then use the KafkaProducer to send this data over to the topic we have created above # In[2]: import json import os.path import warnings import pandas as pd from kafka import KafkaProducer warnings.simplefilter("ignore") producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) data_url = "https://whylabs-public.s3.us-west-2.amazonaws.com/datasets/tour/current.csv" full_data = pd.read_csv(os.path.join(data_url)) for i, row in full_data.iterrows(): producer.send('whylogs-stream', row.to_dict()) # ## Consuming the messages with KafkaConsumer # In[3]: from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer(bootstrap_servers='localhost:9092', value_deserializer=lambda x: json.loads(x.decode('utf-8'))) assignments = [] topics=['whylogs-stream'] for topic in topics: partitions = consumer.partitions_for_topic(topic) for p in partitions: print(f'topic {topic} - partition {p}') assignments.append(TopicPartition(topic, p)) consumer.assign(assignments) # ## Profiling with `whylogs` # # For the sake of simplicity, we will build a `pandas.DataFrame` from the read messages and then profile and write profiles locally until there aren't more messages in the topic. This is done with our log rotation implementation, which we will see in the code block below. You will also need a directory called "profiles", which is the base where the logger will save profiles to, so let's go ahead and create it as well. # In[4]: import whylogs as why import pandas as pd try: os.mkdir("profiles") except FileExistsError as e: pass consumer.seek_to_beginning() total = 0 with why.logger(mode="rolling", interval=5, when="M", base_name="whylogs-kafka") as logger: logger.append_writer("local", base_dir="profiles") while True: finished = True record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True) for k,v in record.items(): print(f'{k} - {len(v)}') df = pd.DataFrame([row.value for row in v]) logger.log(df) total += len(v) finished = False if finished: print(f"total {total}") break # In[5]: import whylogs as why from glob import glob profiles_binaries = glob("profiles/*") profiles_list = [] for profile in profiles_binaries: profiles_list.append(why.read(profile).view()) # In[6]: from functools import reduce merged_profile = reduce((lambda x, y: x.merge(y)), profiles_list) # In[7]: merged_profile.to_pandas() # In[8]: import shutil shutil.rmtree("profiles") # And voilà! With just a few lines of code we could profile and track incoming messages from a Kafka topic. # Hopefully this tutorial will get you going for your existing streaming pipelines. If there are any other integrations you wanted to see, or maybe see how other users are getting the most out of `whylogs`, please check out our [community Slack](https://bit.ly/rsqrd-slack).