Zeek to Parquet

In this notebook will show how easy it is to load up really big Zeek logs by using the classes within the Zeek Analysis Tools, convert it to a Parquet file, and do some Spark analysis.



In [1]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession

# Local imports
import zat
from zat import log_to_sparkdf

# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
ZAT: 0.3.7
PySpark: 2.4.4

Spark It!

Spin up Spark with 4 Parallel Executors

Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

  • If you have 4/8 cores use them!
  • It's the exact same code logic as if we were running on a distributed cluster.
  • We run the same code on DataBricks (www.databricks.com) which is awesome BTW.
In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()
In [3]:
# Use the ZAT class to load our log file into a Spark dataframe (2 lines of code!)
spark_it = log_to_sparkdf.LogToSparkDF(spark)
spark_df = spark_it.create_dataframe('/Users/briford/data/bro/conn.log')

Spark Workers and Data Partitions

Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.

Image Credit: Jacek Laskowski, please see his excellent book - Mastering Apache Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark

In [4]:

Convert my Zeek logs to Parquet files

Apache Parquet is a columnar storage format focused on performance. Here's we going to convert our Zeek/Zeek log to a Parquet file is one line of code. The conversion is super scalable since we're using spark distributed executors to do the conversions.

In [5]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
spark_df.write.parquet('conn.parquet', compression='gzip')
In [6]:
# Have Spark read in the Parquet File
spark_df = spark.read.parquet('conn.parquet')

Parquet files are compressed

Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).

Original conn.log = 2.5 GB

conn.parquet = ~420MB

Light it Up!

Now that we have our Parquet data loaded into Spark, we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.

In [7]:
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))
Number of Rows: 22694356
Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,service,duration,orig_bytes,resp_bytes,conn_state,local_orig,missed_bytes,history,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents

Did we mention fast?

The query below was executed on 4 workers. The data contains over 22 million Zeek conn log entries and the time to complete was a fraction of a second running on my Mac Laptop :)

In [17]:
# Lets look at some 'service' breakdowns in our Zeek conn log
spark_df = spark_df.filter(spark_df['service'] != '-')
%timeit -r 1 -n 1 spark_df.groupby('proto','service').count().sort('count', ascending=False).show()   
|proto| service| count|
|  tcp|    http|445214|
|  udp|     dns|160559|
|  tcp|     ssl| 49017|
|  tcp|     ssh|  4778|
|  udp|    dhcp|  3052|
|  tcp|ftp-data|  2880|
|  tcp|     ftp|  2675|
|  tcp|     dns|   706|
|  tcp|    smtp|   194|
|  tcp|    pop3|     2|

482 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
In [16]:
# Lets look at our individual hosts, group by ports/services and
# sum up the bytes transferred from the originating host
import pyspark.sql.functions as sf 
spark_df.groupby('id_orig_h','id_resp_p','service') \
                 .agg(sf.sum('orig_bytes').alias('total_bytes')) \
                 .sort('total_bytes', ascending=False).show(20)
|      id_orig_h|id_resp_p|service|total_bytes|
||       22|    ssh|  491259422|
||       80|   http|  381506783|
||       80|   http|   80956460|
||       80|   http|   54699732|
||       22|    ssh|   42247132|
||       80|   http|   37952120|
||       80|   http|   18731116|
||      443|    ssl|   17883212|
||       22|    ssh|   13947240|
||       80|   http|   11871726|
||       80|   http|   10689231|
||      443|    ssl|    8550078|
||       80|   http|    7860895|
||    55553|    ssl|    6489031|
||     8080|   http|    5595350|
||      443|    ssl|    4883939|
||       80|   http|    4289446|
||       80|   http|    4248981|
||       22|    ssh|    3656175|
||    55553|    ssl|    3510471|
only showing top 20 rows

Data looks good, lets take a deeper dive

Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into a Spark Dataframe we're going to utilize Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)

Spark Clustering Notebook

Wrap Up

Well that's it for this notebook, we went from a Zeek log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.

If you liked this notebook please visit the ZAT project for more notebooks and examples.

About SuperCowPowers

The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers