🚩 Create a free WhyLabs account to get more value out of whylogs!
Did you know you can store, visualize, and monitor whylogs profiles with the WhyLabs Observability Platform? Sign up for a free WhyLabs account to leverage the power of whylogs and WhyLabs together!
whylogs
from a Kafka topic¶In this example we will show how you can profile and merge different profiles from a Kafka topic. To simplify our example and make it reproducible anywhere, we will create a Kafka topic, generate the data from an existing CSV file and ingest it, consume the messages from the topic and then profile these consumed messages.
NOTE: In order to get this example going, we will use Apache Zookeper and Apache Kafka locally with Docker Compose, so be sure to have it installed and ready in your environment. If you want to read more on how this YAML file was built, check out this blogpost.
To get things going, we will put the services up and create the topic in kafka with the following commands:
$ docker-compose up -d
% docker exec -ti kafka bash
root@kafka: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic whylogs-stream
If you haven't already, make sure to also install kafka-python
and whylogs
in your environment by uncommenting the following cell.
# Note: you may need to restart the kernel to use updated packages.
%pip install whylogs
%pip install kafka-python
To generate the data, we will fetch a small CSV file from a publicly available s3 endpoint and then use the KafkaProducer to send this data over to the topic we have created above
import json
import os.path
import warnings
import pandas as pd
from kafka import KafkaProducer
warnings.simplefilter("ignore")
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data_url = "https://whylabs-public.s3.us-west-2.amazonaws.com/datasets/tour/current.csv"
full_data = pd.read_csv(os.path.join(data_url))
for i, row in full_data.iterrows():
producer.send('whylogs-stream', row.to_dict())
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
assignments = []
topics=['whylogs-stream']
for topic in topics:
partitions = consumer.partitions_for_topic(topic)
for p in partitions:
print(f'topic {topic} - partition {p}')
assignments.append(TopicPartition(topic, p))
consumer.assign(assignments)
topic whylogs-stream - partition 0
whylogs
¶For the sake of simplicity, we will build a pandas.DataFrame
from the read messages and then profile and write profiles locally until there aren't more messages in the topic. This is done with our log rotation implementation, which we will see in the code block below. You will also need a directory called "profiles", which is the base where the logger will save profiles to, so let's go ahead and create it as well.
import whylogs as why
import pandas as pd
try:
os.mkdir("profiles")
except FileExistsError as e:
pass
consumer.seek_to_beginning()
total = 0
with why.logger(mode="rolling", interval=5, when="M", base_name="whylogs-kafka") as logger:
logger.append_writer("local", base_dir="profiles")
while True:
finished = True
record = consumer.poll(timeout_ms=500, max_records=100, update_offsets=True)
for k,v in record.items():
print(f'{k} - {len(v)}')
df = pd.DataFrame([row.value for row in v])
logger.log(df)
total += len(v)
finished = False
if finished:
print(f"total {total}")
break
TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 100 TopicPartition(topic='whylogs-stream', partition=0) - 45 total 945
import whylogs as why
from glob import glob
profiles_binaries = glob("profiles/*")
profiles_list = []
for profile in profiles_binaries:
profiles_list.append(why.read(profile).view())
from functools import reduce
merged_profile = reduce((lambda x, y: x.merge(y)), profiles_list)
merged_profile.to_pandas()
types/integral | types/fractional | types/boolean | types/string | types/object | cardinality/est | cardinality/upper_1 | cardinality/lower_1 | distribution/mean | distribution/stddev | ... | distribution/q_75 | distribution/q_90 | distribution/q_95 | distribution/q_99 | counts/n | counts/null | type | frequent_items/frequent_strings | ints/max | ints/min | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
column | |||||||||||||||||||||
Age | 0 | 945 | 0 | 0 | 0 | 25.000001 | 25.001250 | 25.000000 | 31.609524 | 6.747796 | ... | 38.0000 | 41.000 | 42.0000 | 43.0000 | 945 | 0 | SummaryType.COLUMN | NaN | NaN | NaN |
Customer ID | 0 | 0 | 0 | 945 | 0 | 869.683985 | 881.067672 | 858.577213 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='C268100', est=3, upper=2,... | NaN | NaN |
Gender | 0 | 0 | 0 | 945 | 0 | 2.000000 | 2.000100 | 2.000000 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='M', est=489, upper=489, l... | NaN | NaN |
Item Price | 0 | 945 | 0 | 0 | 0 | 705.028228 | 714.256661 | 696.024282 | 79.848148 | 41.921716 | ... | 116.6000 | 138.200 | 145.1000 | 149.0000 | 945 | 0 | SummaryType.COLUMN | NaN | NaN | NaN |
Product Category | 0 | 0 | 0 | 945 | 0 | 6.000000 | 6.000300 | 6.000000 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='Books', est=243, upper=24... | NaN | NaN |
Product Subcategory | 0 | 0 | 0 | 945 | 0 | 18.000001 | 18.000899 | 18.000000 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='Mens', est=141, upper=141... | NaN | NaN |
Quantity | 945 | 0 | 0 | 0 | 0 | 10.000000 | 10.000500 | 10.000000 | 2.450794 | 2.279227 | ... | 4.0000 | 5.000 | 5.0000 | 5.0000 | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='2.000000', est=183, upper... | 5.0 | -5.0 |
Store Type | 0 | 0 | 0 | 945 | 0 | 4.000000 | 4.000200 | 4.000000 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='e-Shop', est=392, upper=3... | NaN | NaN |
Total Amount | 0 | 945 | 0 | 0 | 0 | 844.069184 | 855.117588 | 833.289540 | 214.615556 | 261.215174 | ... | 361.5560 | 580.346 | 656.8120 | 804.4400 | 945 | 0 | SummaryType.COLUMN | NaN | NaN | NaN |
Total Tax | 0 | 945 | 0 | 0 | 0 | 828.657950 | 839.504628 | 818.075123 | 25.664756 | 19.314519 | ... | 36.7605 | 57.834 | 62.7375 | 76.5975 | 945 | 0 | SummaryType.COLUMN | NaN | NaN | NaN |
Transaction ID | 0 | 0 | 0 | 945 | 0 | 935.275741 | 947.517988 | 923.331294 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [] | NaN | NaN |
Transaction Type | 0 | 0 | 0 | 945 | 0 | 2.000000 | 2.000100 | 2.000000 | NaN | NaN | ... | NaN | NaN | NaN | NaN | 945 | 0 | SummaryType.COLUMN | [FrequentItem(value='Purchase', est=859, upper... | NaN | NaN |
12 rows × 28 columns
import shutil
shutil.rmtree("profiles")
And voilà! With just a few lines of code we could profile and track incoming messages from a Kafka topic.
Hopefully this tutorial will get you going for your existing streaming pipelines. If there are any other integrations you wanted to see, or maybe see how other users are getting the most out of whylogs
, please check out our community Slack.