Zeek to Kafka

This notebook covers how to stream Zeek data using Kafka as a message queue. The setup takes a bit of work but the result will be robust way to stream data from Zeek.

Software

Part 1: Streaming data pipeline

To set some context, our long term plan is to build out a streaming data pipeline. This notebook will help you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.

So our streaming pipeline looks conceptually like this.

  • Kafka Plugin for Zeek
  • Publish (provides a nice decoupled architecture)
  • Pull/Subscribe to whatever feed you want (http, dns, conn, x509...)
  • ETL (Extract Transform Load) on the raw message data (parsed data with types)
  • Perform Filtering/Aggregation
  • Data Analysis and Machine Learning

Getting Everything Setup

Things you'll need:

The weblinks above do a pretty good job of getting you setup with Zeek, Kafka, and the Kafka plugin. If you already have these thing setup then you're good to go. If not take some time and get both up and running. If you're a bit wacky (like me) and want to set these thing up on a Mac you might check out my notes here Zeek/Kafka Mac Setup

Systems Check

Okay now that Zeek with the Kafka Plugin is setup, lets do just a bit of testing to make sure it's all AOK before we get into making a Kafka consumer in Python.

Test the Zeek Kafka Plugin

Make sure the Kafka plugin is ready to go by running the follow command on your Zeek instance:

$ zeek -N Apache::Kafka
Apache::Kafka - Writes logs to Kafka (dynamic, version 0.3.0)

Activate the Kafka Plugin

There's a good explanation of all the options here (https://github.com/apache/metron-bro-plugin-kafka). In my case I needed to put a different load command when 'activating' the Kafka plugin in my local.zeek configuration file. Here's what I added to the 'standard' site/local.zeek file.

@load Apache/Kafka
redef Kafka::topic_name = "";
redef Kafka::send_all_active_logs = T;
redef Kafka::kafka_conf = table(
    ["metadata.broker.list"] = "localhost:9092"
);
  • The first line took me a while to figure out
  • The rest is, at least for me, the best setup:

    By putting in a blank topic name, all output topics are labeled with the name of their log file. For instance, stuff that goes to dns.log is mapped to the 'dns' Kafka topic, http.log to the 'http' topic, and so on. This was exactly what I wanted.

Start Kafka

Run Zeek

$ zeek -i en0 <path to>/local.zeek
or 
$ zeekctl deploy

Verify messages are in the queue

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns

After a second or two.. you should start seeing DNS requests/replies coming out.. hit Ctrl-C after you see some.

{"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}

If you made it this far you are done!

In [4]:
# Okay so now that the setup is done lets put together a bit of code to
# process the Kafka 'topics' that are now being streamed from our Zeek instance

# First we create a Kafka Consumer
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer('dns', bootstrap_servers=['localhost:9092'],
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))
In [ ]:
# Now lets process our Kafka Messages
for message in consumer:
    print(message.value)

# Note: This will just loop forever, but here's an 
# example of the types of output you'll see
{'ts': 1570120289.692109, 'uid': 'CAdnHRVdI94Upoej7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.655344, 'uid': 'Ctcv6F2bLT8fB9GOUb', 'id.orig_h': '192.168.1.5', ...
{'ts': 1570120295.663177, 'uid': 'CLrohRNbVWuBecKud', 'id.orig_h': '192.168.1.2', '...
{'ts': 1570120295.765735, 'uid': 'CxhnkA3sMdZcQJ6vf7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.765745, 'uid': 'CEPF9E4a9WeM1cFlSk', 'id.orig_h': 'fe80::4b8:c380:5a7...

Now What?

Okay so now we can actually do something useful with our new streaming data, in this case we're going to use some results from our 'Risky Domains' Notebook that computed a risky set of TLDs.

In [12]:
from pprint import pprint
import tldextract
from zat.utils import vt_query

# Create a VirusTotal Query Class
vtq = vt_query.VTQuery()
risky_tlds = set(['info', 'tk', 'xyz', 'online', 'club', 'ru', 'website', 
                  'in', 'ws', 'top', 'site', 'work', 'biz', 'name', 'tech'])
Using public VT API Key: Please set apikey=<your key> when creating this class
In [ ]:
# Now lets process our Kafka 'dns' Messages
for message in consumer:
    dns_message = message.value

    # Pull out the TLD
    query = dns_message['query']
    tld = tldextract.extract(query).suffix

    # Check if the TLD is in the risky group
    if tld in risky_tlds:
        # Make the query with the full query
        results = vtq.query_url(query)
        if results.get('positives'):
            print('\nOMG the Network is on Fire!!!')
            pprint(results)
OMG the Network is on Fire!!!
{'filescan_id': None,
 'positives': 2,
 'query': 'uni10.tk',
 'scan_date': '2019-05-29 09:03:43',
 'scan_results': [('clean site', 59),
                  ('unrated site', 8),
                  ('malware site', 1),
                  ('suspicious site', 1),
                  ('malicious site', 1)],
 'total': 70,
 'url': 'http://uni10.tk/'}

Part 1: Streaming data pipeline

Recall that our long term plan is to build out a streaming data pipeline. This notebook has helped you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.

Wrap Up

Well that's it for this notebook, we setup Zeek with the Kafka plugin and showed a simple use of how we might process the streaming data coming from Kafka.

Software

About SuperCowPowers

The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers

In [ ]: