title: "Kafka Python Feature Store Example" date: 2021-02-24 type: technical_note draft: false
Tested with python 3.6 and python 2.7
Before running this notebook, you should have created a Kafka topic with a name that you can configure in the TOPIC_NAME
variable below in the code.
The screenshots below illustrates the steps necessary to create a Kafka topic on Hops
In this notebook we use two python dependencies:
To install the confluent-kafka-python
libary, use the Hopsworks UI:
The hops-util library is already installed by default when projects are created on Hops. However, if you need to re-install it for some reason you can use the Hopsworks UI to first uninstall it and the install it from pip using the same method as described above.
from hops import kafka
from hops import tls
from confluent_kafka import Producer, Consumer
Define the name of the topic you have created here
TOPIC_NAME = "test"
We can get the schema defined for the topic by using the utility-library to make a REST-call to Hopsworks:
kafka.get_schema(TOPIC_NAME)
{'contents': '[]', 'version': 0}
The hops-util-py library provides utility methods for setting up secure communication using Kafka producers and consumers running inside a Hopsworks cluster. You can use this utility methods in combination with any python kafka client. In this noteobook we will be using confluent-kafka-python.
config = {
"bootstrap.servers": kafka.get_broker_endpoints(),
"security.protocol": kafka.get_security_protocol(),
"ssl.ca.location": tls.get_ca_chain_location(),
"ssl.certificate.location": tls.get_client_certificate_location(),
"ssl.key.location": tls.get_client_key_location(),
"group.id": "something"
}
# equivalently you can use:
# config = kafka.get_kafka_default_config()
producer = Producer(config)
consumer = Consumer(config)
The confluent_kafka api provides a callback-hook for getting notified when a consumer has been assigned to a different Kafka partition
def print_assignment(consumer, partitions):
"""
Callback called when a Kafka consumer is assigned to a partition
"""
print('Assignment:', partitions)
# the consumer can be subscribed to multiple topics
topics = [TOPIC_NAME]
consumer.subscribe(topics, on_assign=print_assignment)
The confluent_kafka api provides a callback-hook so that we can get notified once messages have been successfully acknowledged by the Kafka brokers (the produce method is asynchronous so when it returns we cannot be guaranteed that messages actually was received by the brokers)
def delivery_callback(err, msg):
"""
Optional per-message delivery callback (triggered by poll() or flush())
when a message has been successfully delivered or permanently
failed delivery (after retries).
"""
if err:
print("Message failed delivery: {}".format(err))
else:
print('Message: {} delivered to topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))
for i in range(0, 10):
producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
# Trigger the sending of all messages to the brokers, 10sec timeout
producer.flush(10)
Message: b'message 0' delivered to topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535) Message: b'message 1' delivered to topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535) Message: b'message 2' delivered to topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535) Message: b'message 3' delivered to topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535) Message: b'message 4' delivered to topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535) Message: b'message 5' delivered to topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535) Message: b'message 6' delivered to topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535) Message: b'message 7' delivered to topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535) Message: b'message 8' delivered to topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535) Message: b'message 9' delivered to topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535) 0
for i in range(0, 10):
msg = consumer.poll(timeout=5.0)
if msg is not None:
print('Consumed Message: {} from topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))
else:
print("Topic empty, timeout when trying to consume message, try to produce messages to the topic and then re-consume")
Consumed Message: b'message 0' from topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535) Consumed Message: b'message 1' from topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535) Consumed Message: b'message 2' from topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535) Consumed Message: b'message 3' from topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535) Consumed Message: b'message 4' from topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535) Consumed Message: b'message 5' from topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535) Consumed Message: b'message 6' from topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535) Consumed Message: b'message 7' from topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535) Consumed Message: b'message 8' from topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535) Consumed Message: b'message 9' from topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535)