Zeek to Kafka to Spark

This notebook covers how to stream Zeek data into Spark using Kafka as a message queue. The setup takes a bit of work but the result will be a nice scalable, robust way to process and analyze streaming data from Zeek.

For getting started with Spark (without Kafka) you can view this notebook:

Software

Getting Zeek to Kafka Setup

We have an entire notebook on getting the Kafka plugin for Zeek setup.

Completing the 'Zeek to Kafka' notebook will ensure your zeek instance with Kafka plugin is up and running. Once that's complete you're ready for the next phase of our Streaming Data Pipeline (Spark).

Part 2: Streaming data pipeline

To set some context, our long term plan is to build out a streaming data pipeline. This notebook is the second phase of our streaming pipeline architecture. So our network data pipeline looks conceptually like this.

  • Kafka Plugin for Zeek
  • Publish (provides a nice decoupled architecture)
  • Subscribe to whatever feed you want (http, dns, conn, x509...)
  • ETL (Extract Transform Load) on the raw message data (parsed data with types)
  • Perform Filtering/Aggregation
  • Data Analysis and Machine Learning

Structured Streaming in Spark

Structured Streaming is the new hotness with Spark. Michael Armbrust from DataBricks gave a great talk at Spark Summit 2017 on Structured Streaming:

There's also a good example on the DataBricks blog:

In [1]:
import pyspark
from pyspark.sql import SparkSession

# Always good to print out versions of libraries
print('PySpark: {:s}'.format(pyspark.__version__))
PySpark: 2.4.4

Spark It!

Spin up Spark with 4 Parallel Executors

Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

  • If you have 4/8 cores use them!
  • It's the exact same code logic as if we were running on a distributed cluster.
  • We run the same code on DataBricks (www.databricks.com) which is awesome BTW.
In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome')\
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4')\
        .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

Loading the Kafka package

In the Spark builder call above we have added the Kafka package as part of the session creation. There are two important things of note:

  1. The version at the end (2.4.4) must match the current Spark version.
  2. The latest package is spark-sql-kafka-0-10_2.12, we've had no luck with that version in our local testing, it would crash during the 'readStream' call below so we've reverted to the spark-sql-kafka-0-10_2.11 version.

Sidebar: Checkout Apache Arrow

For all kinds of reasons, multi-core pipelines, cross language storage, basically it will improve and enable flexible/performant data analysis and machine learning pipelines.

In [3]:
# Optimize the conversion to Spark
spark.conf.set("spark.sql.execution.arrow.enable", "true")
In [4]:
# SUBSCRIBE: Setup connection to Kafka Stream 
raw_data = spark.readStream.format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'dns') \
  .option('startingOffsets', 'latest') \
  .load()

# Notes:
# Using 'latest' for the 'startingOffsets' option will give only 'new' live data.
# We could also use the value 'earliest' and that would give us everything Kafka has
In [5]:
# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType, IntegerType
from pyspark.sql.functions import from_json, to_json, col, struct, udf

dns_schema = StructType() \
    .add('ts', StringType()) \
    .add('uid', StringType()) \
    .add('id.orig_h', StringType()) \
    .add('id.orig_p', IntegerType()) \
    .add('id.resp_h', StringType()) \
    .add('id.resp_p', IntegerType()) \
    .add('proto', StringType()) \
    .add('trans_id', IntegerType()) \
    .add('query', StringType()) \
    .add('qclass', IntegerType()) \
    .add('qclass_name', StringType()) \
    .add('qtype', IntegerType()) \
    .add('qtype_name', StringType()) \
    .add('rcode', IntegerType()) \
    .add('rcode_name', StringType()) \
    .add('AA', BooleanType()) \
    .add('TC', BooleanType()) \
    .add('RD', BooleanType()) \
    .add('RA', BooleanType()) \
    .add('Z', IntegerType()) \
    .add('answers', StringType()) \
    .add('TTLs', StringType()) \
    .add('rejected', BooleanType())
In [6]:
# ETL: Convert raw data into parsed and proper typed data
parsed_data = raw_data \
  .select(from_json(col("value").cast("string"), dns_schema).alias('data')) \
  .select('data.*')

# FILTER: Only get DNS records that have 'query' field filled out
filtered_data = parsed_data.filter(parsed_data.query.isNotNull() & (parsed_data.query!='')==True)

# FILTER 2: Remove Local/mDNS queries
filtered_data = filtered_data.filter(~filtered_data.query.like('%.local'))  # Note: using the '~' negation operator
In [7]:
# Helper method that allows us to compute the 2nd level domain
import tldextract

def compute_domain(query):
    # Pull out the domain
    if query.endswith('.local'):
        return 'local'
    return tldextract.extract(query).registered_domain if query else None
In [8]:
# COMPUTE: A new column with the 2nd level domain extracted from the query
udf_compute_domain = udf(compute_domain, StringType())
computed_data = filtered_data.withColumn('domain', udf_compute_domain('query'))

# AGGREGATE: In this case a simple groupby operation
group_data = computed_data.groupBy('`id.orig_h`', 'domain', 'qtype_name').count()
In [9]:
# At any point in the pipeline you can see what you're getting out
group_data.printSchema()
root
 |-- id.orig_h: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- qtype_name: string (nullable = true)
 |-- count: long (nullable = false)

Streaming pipeline output to an in-memory table

Now, for demonstration and discussion purposes, we're going to pull the end of the pipeline back into memory to inspect the output. A couple of things to note explicitly here:

  • Writing a stream to memory is dangerous and should be done only on small data. Since this is aggregated output we know it's going to be small.

  • The queryName param used below will be the name of the in-memory table.

In [10]:
# Take the end of our pipeline and pull it into memory
dns_count_memory_table = group_data.writeStream.format('memory') \
  .queryName('dns_counts') \
  .outputMode('complete') \
  .start()

Streaming Query/Table: Looking Deeper

Note: The in-memory table above is dynamic. So as the streaming data pipeline continues to process data the table contents will change. Below we make two of the same queries and as more data streams in the results will change.

In [11]:
# Create a Pandas Dataframe by querying the in memory table and converting
dns_counts_df = spark.sql("select * from dns_counts").toPandas()
print('\nDNS Query Total Counts = {:d}'.format(dns_counts_df['count'].sum()))
dns_counts_df.sort_values(ascending=False, by='count')
DNS Query Total Counts = 1
Out[11]:
id.orig_h domain qtype_name count
0 192.168.1.7 toggl.com A 1

Same Query with Updated Results

Now we run the same query as above and since the streaming pipeline continues to process new incoming data the in-memory table will dynamically update.

In [16]:
# Create a Pandas Dataframe by querying the in memory table and converting
dns_counts_df = spark.sql("select * from dns_counts").toPandas()
print('\nDNS Query Total Counts = {:d}'.format(dns_counts_df['count'].sum()))
dns_counts_df.sort_values(ascending=False, by='count')
DNS Query Total Counts = 20
Out[16]:
id.orig_h domain qtype_name count
5 192.168.1.7 google.com A 3
11 192.168.1.7 stackoverflow.com A 2
14 192.168.1.7 doubleclick.net A 2
0 192.168.1.7 t-mobile.com A 1
1 192.168.1.7 google-analytics.com A 1
2 192.168.1.7 googlesyndication.com A 1
3 192.168.1.7 quantserve.com A 1
4 192.168.1.7 gravatar.com A 1
6 192.168.1.7 imgur.com A 1
7 192.168.1.7 googleapis.com A 1
8 192.168.1.7 githubusercontent.com A 1
9 192.168.1.7 googleusercontent.com A 1
10 192.168.1.7 toggl.com A 1
12 192.168.1.7 gstatic.com A 1
13 192.168.1.7 scorecardresearch.com A 1
15 192.168.1.7 stackexchange.com A 1
In [17]:
# We should stop our streaming pipeline when we're done
dns_count_memory_table.stop()

Part 2: Streaming data pipeline

Recall that our long term plan is to build out a streaming data pipeline. This notebook has covered the steps in bold of our growing network data pipeline.

  • Kafka Plugin for Zeek
  • Publish (provides a nice decoupled architecture)
  • Subscribe to whatever feed you want (http, dns, conn, x509...)
  • ETL (Extract Transform Load) on the raw message data (parsed data with types)
  • Perform Filtering/Aggregation
  • Data Analysis and Machine Learning

Software

Wrap Up

Well that's it for this notebook, we know this ended before we got to the exciting part of the streaming data pipeline. For this notebook we showed everything in the pipeline up to aggregation. In future notebooks we'll dive into the deep end of our pipeline and cover the data analysis and machine learning aspects of Spark.

If you liked this notebook please visit the zat project for more notebooks and examples.

About SuperCowPowers

The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers