Zeek to Spark Clustering

In this notebook we will pull Zeek data into Spark then do some analysis and clustering. The first step is to convert your Zeek log data into a Parquet file, for instructions on how to do this (just a few lines of Python code using the ZAT package) please see this notebook:

Apache Parquet is a columnar storage format focused on performance. Reading Parquet data is fast and efficient, for this notebook we will specifically be using it for loading data into Spark.

Software

Data

  • About 1/2 million rows of a Zeek dns.log
  • Grabe the data here: data.kitware.com (with headers)
In [1]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession

# Local imports
import zat

# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
ZAT: 0.3.7
PySpark: 2.4.4

Spark It!

Spin up Spark with 4 Parallel Executors

Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

  • If you have 4/8 cores use them!
  • It's the exact same code logic as if we were running on a distributed cluster.
  • We run the same code on DataBricks (www.databricks.com) which is awesome BTW.
In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master("local[4]").appName('my_awesome').getOrCreate()

Read in our Parquet File

Here we're loading in a Zeek DNS log with ~1/2 million rows to demonstrate the functionality and do some analysis and clustering on the data. For more information on converting Zeek logs to Parquet files please see our Zeek to Spark notebook:

Zeek logs to Parquet Notebook

In [5]:
# Have Spark read in the Parquet File
spark_df = spark.read.parquet('/Users/briford/data/bro/dns.parquet')

Lets look at our data

We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be.

In [6]:
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))
Number of Rows: 427935
Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,trans_id,query,qclass,qclass_name,qtype,qtype_name,rcode,rcode_name,AA,TC,RD,RA,Z,answers,TTLs,rejected
In [7]:
spark_df.groupby('qtype_name','proto').count().sort('count', ascending=False).show()
+----------+-----+------+
|qtype_name|proto| count|
+----------+-----+------+
|         A|  udp|212473|
|        NB|  udp| 77199|
|      AAAA|  udp| 54519|
|       PTR|  udp| 52991|
|       TXT|  udp| 12644|
|       SRV|  udp| 12268|
|         -|  udp|  3472|
|         *|  udp|   882|
|      AXFR|  tcp|   440|
|       SOA|  udp|   346|
|       TXT|  tcp|   226|
|         -|  tcp|   176|
|        MX|  udp|   169|
|        NS|  udp|    43|
|     HINFO|  udp|    30|
|     NAPTR|  udp|    27|
|       PTR|  tcp|    26|
|         A|  tcp|     4|
+----------+-----+------+

Data looks good, lets take a deeper dive

Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've loaded our Zeek data we're going to utilize the Spark SQL commands to do some investigation of our data including clustering from the MLLib.

In [9]:
# Plotting defaults
%matplotlib inline
import matplotlib.pyplot as plt
from zat.utils import plot_utils
plot_utils.plot_defaults()
In [10]:
# Add a column with the string length of the DNS query
from pyspark.sql.functions import col, length

# Create new dataframe that includes two new column
spark_df = spark_df.withColumn('query_length', length(col('query')))
spark_df = spark_df.withColumn('answer_length', length(col('answers')))
In [11]:
# Show histogram of the Spark DF request body lengths
bins, counts = spark_df.select('query_length').rdd.flatMap(lambda x: x).histogram(50)

# This is a bit awkward but I believe this is the correct way to do it
plt.hist(bins[:-1], bins=bins, weights=counts, log=True)
plt.grid(True)
plt.xlabel('DNS Query Lengths')
plt.ylabel('Counts')
Out[11]:
Text(0, 0.5, 'Counts')
In [12]:
# Show histogram of the Spark DF request body lengths
bins, counts = spark_df.select('answer_length').rdd.flatMap(lambda x: x).histogram(50)

# This is a bit awkward but I believe this is the correct way to do it
plt.hist(bins[:-1], bins=bins, weights=counts, log=True)
plt.grid(True)
plt.xlabel('DNS Answer Lengths')
plt.ylabel('Counts')
Out[12]:
Text(0, 0.5, 'Counts')

Cleanup

Note: This bit of cleanup code is no longer needed as the ZAT log_to_sparkdf now takes care of these things for us. :)

There are two bits of cleanup that we MUST do:

  • Remove '.' from the column names (see Note:)
  • Drop NULLs

Note: Yes you can do backticks when selecting the column names BUT some of the pipeline operations below will FAIL internally if the column names have a '.' in them.

Spark Pipelines

A Spark pipeline is a way to combine a sequence of complex algorithms and transformations to create a workflow. Once a pipeline is created Spark can optimize that pipeline when it's executed.

Below our pipeline consists of the following stages:

  • String Indexer: Takes our string columns and assigns an index to each unique string
  • OneHotEncoder: Takes our string index and maps it to a bit vector
  • Normalization: Converts our numeric data into a 0-1 range
  • Assembler: Combines the encoded categorical data and numerical data into a combined matrix

For more information on the details of Categorical Type to One Hot Encoding see our SCP Labs Encoding Dangers notebook.

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler

cat_columns = ['qtype_name', 'proto']
num_columns = ['query_length', 'answer_length']
features = cat_columns + num_columns
stages = []

# String Indexer + One Hot Encoder (for categorical columns)
for cat_col in cat_columns:
    string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + '_index')
    encoder = OneHotEncoder(inputCol=cat_col + '_index', outputCol=cat_col + '_onehot')
    stages += [string_indexer, encoder]

# Run StandardScaler on all the numerical features
num_vector = VectorAssembler(inputCols=num_columns, outputCol = 'num_features')
norm = StandardScaler(inputCol='num_features', outputCol='num_features_norm')
stages += [num_vector, norm]

# Assemble the categorical (one hot vectors) and numeric columns together
assembler_inputs = [c + "_onehot" for c in cat_columns] + ['num_features_norm']
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')
stages += [assembler]

# Run the pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(spark_df)
spark_df = pipelineModel.transform(spark_df)
In [14]:
spark_df.select('features').show(truncate = False)
+------------------------------------------------------------------+
|features                                                          |
+------------------------------------------------------------------+
|(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) |
|(16,[0,13,14,15],[1.0,1.0,1.1858184979975104,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.3682521130740506,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) |
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])|
|(16,[3,13,14,15],[1.0,1.0,2.462853803533291,0.07560960809619262]) |
|(16,[2,13,14,15],[1.0,1.0,1.5506857281505906,0.07560960809619262])|
+------------------------------------------------------------------+
only showing top 20 rows

In [26]:
from pyspark.ml.clustering import KMeans

# Train a k-means model.
kmeans = KMeans().setK(40)
model = kmeans.fit(spark_df)
In [27]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(spark_df)
print("Within Set Sum of Squared Errors = " + str(wssse))
Within Set Sum of Squared Errors = 6885.514765318761
In [28]:
# Lets look at some of the clustering results
transformed = model.transform(spark_df).select(features + ['prediction'])
transformed.groupby(cat_columns + ['prediction']).count().sort('prediction').show(50)
+----------+-----+----------+-----+
|qtype_name|proto|prediction|count|
+----------+-----+----------+-----+
|       TXT|  tcp|         0|  135|
|       TXT|  udp|         0|11149|
|       PTR|  udp|         1|11809|
|      AXFR|  tcp|         2|   87|
|      AXFR|  tcp|         3|   78|
|         A|  udp|         4|28609|
|       PTR|  udp|         5|40370|
|       PTR|  tcp|         5|   25|
|         A|  udp|         6|55667|
|        NB|  udp|         7|11374|
|        NB|  udp|         8|20059|
|      AAAA|  udp|         9|    2|
|         -|  udp|         9|  180|
|         -|  tcp|         9|    5|
|      AAAA|  udp|        10| 9241|
|         *|  udp|        11|  144|
|      AAAA|  udp|        11|   71|
|       SRV|  udp|        11|10419|
|     NAPTR|  udp|        12|   27|
|         A|  udp|        12|25369|
|        MX|  udp|        12|  163|
|        NB|  udp|        13|15787|
|      AAAA|  udp|        14| 6062|
|      AXFR|  tcp|        15|   68|
|         -|  udp|        15|   37|
|       PTR|  udp|        15|   48|
|         *|  udp|        16|   52|
|         A|  udp|        16| 6059|
|       PTR|  udp|        17|  100|
|       TXT|  udp|        17|   12|
|       PTR|  tcp|        17|    1|
|      AXFR|  tcp|        18|  107|
|       SOA|  udp|        19|   31|
|        NB|  udp|        19| 1920|
|        MX|  udp|        20|    6|
|      AXFR|  tcp|        20|   24|
|         *|  udp|        20|  652|
|     HINFO|  udp|        20|   30|
|      AAAA|  udp|        21|14022|
|      AAAA|  udp|        22| 3764|
|         -|  tcp|        23|  163|
|         -|  udp|        23| 3255|
|       SRV|  udp|        24|  727|
|         A|  udp|        25|  614|
|         *|  udp|        25|   31|
|      AAAA|  udp|        25|   38|
|         A|  udp|        26| 5429|
|       TXT|  udp|        27| 1445|
|         A|  udp|        28|13479|
|         A|  udp|        29|50330|
+----------+-----+----------+-----+
only showing top 50 rows

The clusters seem to look okay

We can see that there's some natural grouping/clusters around the different qtype_names and protocpls but we also see that many of the query types/protocols are in several clusters... so lets take a closer look at the 'TXT' queries (Note: Replace 'TXT', with any other type and feel free to explore the other 'sub-clusters')

In [29]:
# Lets look at the 'TXT' query_name clusters
txt_queries = transformed.where(transformed['qtype_name'] == 'TXT').groupby(features + ['prediction']).\
count().sort('prediction')
txt_queries.show(50)
+----------+-----+------------+-------------+----------+-----+
|qtype_name|proto|query_length|answer_length|prediction|count|
+----------+-----+------------+-------------+----------+-----+
|       TXT|  udp|          12|            1|         0|  488|
|       TXT|  udp|           9|            1|         0|   21|
|       TXT|  udp|          12|           12|         0|    1|
|       TXT|  tcp|          12|            5|         0|  106|
|       TXT|  udp|          23|            1|         0|   12|
|       TXT|  udp|          22|            1|         0|   62|
|       TXT|  udp|          14|           17|         0|    1|
|       TXT|  tcp|          12|            1|         0|   24|
|       TXT|  udp|          12|            5|         0|  214|
|       TXT|  udp|          14|            1|         0|10305|
|       TXT|  udp|          13|            1|         0|   31|
|       TXT|  udp|          24|            1|         0|    1|
|       TXT|  tcp|          12|           12|         0|    5|
|       TXT|  udp|          13|            6|         0|   13|
|       TXT|  udp|          29|           32|        17|    6|
|       TXT|  udp|          36|           33|        17|    6|
|       TXT|  udp|          36|            1|        27|    9|
|       TXT|  udp|          33|            1|        27|    2|
|       TXT|  udp|          36|           22|        27|    2|
|       TXT|  udp|          40|            1|        27|  347|
|       TXT|  udp|          39|            1|        27|   98|
|       TXT|  udp|          41|            1|        27|  436|
|       TXT|  udp|          35|            1|        27|   16|
|       TXT|  udp|          42|            1|        27|  532|
|       TXT|  udp|          34|            1|        27|    3|
|       TXT|  udp|          72|            1|        34|    4|
|       TXT|  udp|          64|            1|        34|    2|
|       TXT|  udp|          67|            1|        34|    2|
|       TXT|  udp|          82|           11|        34|    6|
|       TXT|  udp|          83|            1|        34|    2|
|       TXT|  udp|          12|           33|        37|   22|
|       TXT|  tcp|          12|           33|        37|   91|
+----------+-----+------------+-------------+----------+-----+

In [30]:
# The groupby/count produces a very small amount of data that 
# we can easily pull down and plot from our local client
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Convert to Pandas (make sure it's small)
txt_df = txt_queries.toPandas()

# Now use dataframe group by cluster
cluster_groups = txt_df.groupby('prediction')

# Plot the Machine Learning results
choices = ['red', 'green', 'blue', 'black', 'orange', 'purple', 'brown',
           'pink', 'lightblue', 'grey', 'yellow']
colors = {value: choices[index] for index, value in enumerate(txt_df['prediction'].unique())}

fig, ax = plt.subplots()
for key, group in cluster_groups:
    group.plot(ax=ax, kind='scatter', x='query_length', y='answer_length', alpha=0.5, s=250,
               label='Cluster: {:d}'.format(key), color=colors[key])

Interesting...

So we gave the clustering algorithm both categorical types and numerical types and it seems to have done a reasonable job using both, we can see that the categorical types are clustered and then within the categorical clustering we have a set of 'sub-clusters' based on the numerical values.

Wrap Up

Well that's it for this notebook, we pulled in Zeek log data from a Parquet file, then did some digging with high speed, parallel SQL operations and finally we clustered our data to organize the restuls.

If you liked this notebook please visit the ZAT project for more notebooks and examples.

About SuperCowPowers

The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers