# Zeek to Kafka¶

This notebook covers how to stream Zeek data using Kafka as a message queue. The setup takes a bit of work but the result will be robust way to stream data from Zeek.

# Part 1: Streaming data pipeline¶

To set some context, our long term plan is to build out a streaming data pipeline. This notebook will help you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.

So our streaming pipeline looks conceptually like this.

• Kafka Plugin for Zeek
• Publish (provides a nice decoupled architecture)
• Pull/Subscribe to whatever feed you want (http, dns, conn, x509...)
• ETL (Extract Transform Load) on the raw message data (parsed data with types)
• Perform Filtering/Aggregation
• Data Analysis and Machine Learning

# Getting Everything Setup¶

Things you'll need:

The weblinks above do a pretty good job of getting you setup with Zeek, Kafka, and the Kafka plugin. If you already have these thing setup then you're good to go. If not take some time and get both up and running. If you're a bit wacky (like me) and want to set these thing up on a Mac you might check out my notes here Zeek/Kafka Mac Setup

## Systems Check¶

Okay now that Zeek with the Kafka Plugin is setup, lets do just a bit of testing to make sure it's all AOK before we get into making a Kafka consumer in Python.

Test the Zeek Kafka Plugin

Make sure the Kafka plugin is ready to go by running the follow command on your Zeek instance:

or
$zeekctl deploy ## Verify messages are in the queue¶ $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns

After a second or two.. you should start seeing DNS requests/replies coming out.. hit Ctrl-C after you see some.

{"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}

# If you made it this far you are done!¶

In [4]:
# Okay so now that the setup is done lets put together a bit of code to
# process the Kafka 'topics' that are now being streamed from our Zeek instance

# First we create a Kafka Consumer
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer('dns', bootstrap_servers=['localhost:9092'],

In [ ]:
# Now lets process our Kafka Messages
for message in consumer:
print(message.value)

# Note: This will just loop forever, but here's an
# example of the types of output you'll see
{'ts': 1570120289.692109, 'uid': 'CAdnHRVdI94Upoej7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.655344, 'uid': 'Ctcv6F2bLT8fB9GOUb', 'id.orig_h': '192.168.1.5', ...
{'ts': 1570120295.663177, 'uid': 'CLrohRNbVWuBecKud', 'id.orig_h': '192.168.1.2', '...
{'ts': 1570120295.765735, 'uid': 'CxhnkA3sMdZcQJ6vf7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.765745, 'uid': 'CEPF9E4a9WeM1cFlSk', 'id.orig_h': 'fe80::4b8:c380:5a7...


## Now What?¶

Okay so now we can actually do something useful with our new streaming data, in this case we're going to use some results from our 'Risky Domains' Notebook that computed a risky set of TLDs.

In [12]:
from pprint import pprint
import tldextract
from zat.utils import vt_query

# Create a VirusTotal Query Class
vtq = vt_query.VTQuery()
risky_tlds = set(['info', 'tk', 'xyz', 'online', 'club', 'ru', 'website',
'in', 'ws', 'top', 'site', 'work', 'biz', 'name', 'tech'])

Using public VT API Key: Please set apikey=<your key> when creating this class

In [ ]:
# Now lets process our Kafka 'dns' Messages
for message in consumer:
dns_message = message.value

# Pull out the TLD
query = dns_message['query']
tld = tldextract.extract(query).suffix

# Check if the TLD is in the risky group
if tld in risky_tlds:
# Make the query with the full query
results = vtq.query_url(query)
if results.get('positives'):
print('\nOMG the Network is on Fire!!!')
pprint(results)

OMG the Network is on Fire!!!
{'filescan_id': None,
'positives': 2,
'query': 'uni10.tk',
'scan_date': '2019-05-29 09:03:43',
'scan_results': [('clean site', 59),
('unrated site', 8),
('malware site', 1),
('suspicious site', 1),
('malicious site', 1)],
'total': 70,
'url': 'http://uni10.tk/'}


## Wrap Up¶

Well that's it for this notebook, we setup Zeek with the Kafka plugin and showed a simple use of how we might process the streaming data coming from Kafka.

### Software¶