SQL at Scale with Spark SQL and DataFrames

Spark SQL brings native support for SQL to Spark and streamlines the process of querying data stored both in RDDs (Spark’s distributed datasets) and in external sources. Spark SQL conveniently blurs the lines between RDDs and relational tables. Unifying these powerful abstractions makes it easy for developers to intermix SQL commands querying external data with complex analytics, all within in a single application. Concretely, Spark SQL will allow developers to:

  • Import relational data from Parquet files and Hive tables
  • Run SQL queries over imported data and existing RDDs
  • Easily write RDDs out to Hive tables or Parquet files

Spark SQL also includes a cost-based optimizer, columnar storage, and code generation to make queries fast. At the same time, it scales to thousands of nodes and multi-hour queries using the Spark engine, which provides full mid-query fault tolerance, without having to worry about using a different engine for historical data.

For getting a deeper perspective into the background, concepts, architecture of Spark SQL and DataFrames you can check out the original article, 'SQL at Scale with Apache Spark SQL and DataFrames - Concepts, Architecture and Examples'

This tutorial will familiarize you with essential Spark capabilities to deal with structured data typically often obtained from databases or flat files. We will explore typical ways of querying and aggregating relational data by leveraging concepts of DataFrames and SQL using Spark. We will work on an interesting dataset from the KDD Cup 1999 and try to query the data using high level abstrations like the dataframe which has already been a hit in popular data analysis tools like R and Python. We will also look at how easy it is to build data queries using the SQL language which you have learnt and retrieve insightful information from our data. This also happens at scale without us having to do a lot more since Spark distributes these data structures efficiently in the backend which makes our queries scalable and as efficient as possible.

In [2]:
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

Data Retrieval

We will use data from the KDD Cup 1999, which is the data set used for The Third International Knowledge Discovery and Data Mining Tools Competition, which was held in conjunction with KDD-99 The Fifth International Conference on Knowledge Discovery and Data Mining. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between "bad" connections, called intrusions or attacks, and "good" normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment.

We will be using the reduced dataset kddcup.data_10_percent.gz containing nearly half million nework interactions since we would be downloading this Gzip file from the web locally and then work on the same. If you have a good, stable internet connection, feel free to download and work with the full dataset available as kddcup.data.gz

Working with data from the web

Dealing with datasets retrieved from the web can be a bit tricky in Databricks. Fortunately, we have some excellent utility packages like dbutils which help in making our job easier. Let's take a quick look at some essential functions for this module.

In [5]:
dbutils.help()
This module provides various utilities for users to interact with the rest of Databricks.

fs: DbfsUtils -> Manipulates the Databricks filesystem (DBFS) from the console
meta: MetaUtils -> Methods to hook into the compiler (EXPERIMENTAL)
notebook: NotebookUtils -> Utilities for the control flow of a notebook (EXPERIMENTAL)
preview: Preview -> Utilities under preview category
secrets: SecretUtils -> Provides utilities for leveraging secrets within notebooks
widgets: WidgetsUtils -> Methods to create and get bound value of input widgets inside notebooks

Retrieve and store data in Databricks

We will now leverage the python urllib library to extract the KDD Cup 99 data from their web repository, store it in a temporary location and then move it to the Databricks filesystem which can enable easy access to this data for analysis

Note: If you skip this step and download the data directly, you may end up getting a InvalidInputException: Input path does not exist error

In [7]:
import urllib
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))
pathnamesize
dbfs:/kdd/kddcup_data.gzkddcup_data.gz2144903

Building the KDD Dataset

Now that we have our data stored in the Databricks filesystem. Let's load up our data from the disk into Spark's traditional abstracted data structure, the Resilient Distributed Dataset (RDD)

In [9]:
data_file = "dbfs:/kdd/kddcup_data.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)
Out[2]: [u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.', u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']
In [10]:
type(raw_rdd)
Out[3]: pyspark.rdd.RDD

Building a Spark DataFrame on our Data

A Spark DataFrame is an interesting data structure representing a distributed collecion of data. A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs in our case.

Typically the entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance of this call, all we need is a SparkContext reference. In Databricks this global context object is available as sc for this purpose.

In [12]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext 
Out[4]: <pyspark.sql.context.SQLContext at 0x7f470b54d490>

Splitting the CSV data

Each entry in our RDD is a comma-separated line of data which we first need to split before we can parse and build our dataframe

In [14]:
csv_rdd = raw_rdd.map(lambda row: row.split(","))
print(csv_rdd.take(2))
print(type(csv_rdd))
[[u'0', u'tcp', u'http', u'SF', u'181', u'5450', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'9', u'9', u'1.00', u'0.00', u'0.11', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.'], [u'0', u'tcp', u'http', u'SF', u'239', u'486', u'0', u'0', u'0', u'0', u'0', u'1', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'0', u'8', u'8', u'0.00', u'0.00', u'0.00', u'0.00', u'1.00', u'0.00', u'0.00', u'19', u'19', u'1.00', u'0.00', u'0.05', u'0.00', u'0.00', u'0.00', u'0.00', u'0.00', u'normal.']] <class 'pyspark.rdd.PipelinedRDD'>

Check the total number of features (columns)

We can use the following code to check the total number of potential columns in our dataset

In [16]:
len(csv_rdd.take(1)[0])
Out[6]: 42

Data Understanding and Parsing

The KDD 99 Cup data consists of different attributes captured from connection data. The full list of attributes in the data can be obtained here and further details pertaining to the description for each attribute\column can be found here. We will just be using some specific columns from the dataset, the details of which are specified below.

feature num feature name description type
1 duration length (number of seconds) of the connection continuous
2 protocol_type type of the protocol, e.g. tcp, udp, etc. discrete
3 service network service on the destination, e.g., http, telnet, etc. discrete
4 src_bytes number of data bytes from source to destination continuous
5 dst_bytes number of data bytes from destination to source continuous
6 flag normal or error status of the connection discrete
7 wrong_fragment number of ``wrong'' fragments continuous
8 urgent number of urgent packets continuous
9 hot number of ``hot'' indicators continuous
10 num_failed_logins number of failed login attempts continuous
11 num_compromised number of ``compromised'' conditions continuous
12 su_attempted 1 if ``su root'' command attempted; 0 otherwise discrete
13 num_root number of ``root'' accesses continuous
14 num_file_creations number of file creation operations continuous

We will be extracting the following columns based on their positions in each datapoint (row) and build a new RDD as follows

In [18]:
from pyspark.sql import Row

parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)
Out[7]: [Row(dst_bytes=5450, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=181, su_attempted=u'0', urgent=0, wrong_fragment=0), Row(dst_bytes=486, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=239, su_attempted=u'0', urgent=0, wrong_fragment=0), Row(dst_bytes=1337, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=235, su_attempted=u'0', urgent=0, wrong_fragment=0), Row(dst_bytes=1337, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=219, su_attempted=u'0', urgent=0, wrong_fragment=0), Row(dst_bytes=2032, duration=0, flag=u'SF', hot=0, label=u'normal.', num_compromised=0, num_failed_logins=0, num_file_creations=0, num_root=0, protocol_type=u'tcp', service=u'http', src_bytes=217, su_attempted=u'0', urgent=0, wrong_fragment=0)]

Constructing the DataFrame

Now that our data is neatly parsed and formatted, let's build our DataFrame!

In [20]:
df = sqlContext.createDataFrame(parsed_rdd)
display(df.head(10))
dst_bytesdurationflaghotlabelnum_compromisednum_failed_loginsnum_file_creationsnum_rootprotocol_typeservicesrc_bytessu_attemptedurgentwrong_fragment
54500SF0normal.0000tcphttp181000
4860SF0normal.0000tcphttp239000
13370SF0normal.0000tcphttp235000
13370SF0normal.0000tcphttp219000
20320SF0normal.0000tcphttp217000
20320SF0normal.0000tcphttp217000
19400SF0normal.0000tcphttp212000
40870SF0normal.0000tcphttp159000
1510SF0normal.0000tcphttp210000
7860SF1normal.0000tcphttp212000

Now, we can easily have a look at our dataframe's schema using tne printSchema(...) function.

In [22]:
df.printSchema()
root -- dst_bytes: long (nullable = true) -- duration: long (nullable = true) -- flag: string (nullable = true) -- hot: long (nullable = true) -- label: string (nullable = true) -- num_compromised: long (nullable = true) -- num_failed_logins: long (nullable = true) -- num_file_creations: long (nullable = true) -- num_root: long (nullable = true) -- protocol_type: string (nullable = true) -- service: string (nullable = true) -- src_bytes: long (nullable = true) -- su_attempted: string (nullable = true) -- urgent: long (nullable = true) -- wrong_fragment: long (nullable = true)

Building a temporary table

We can leverage the registerTempTable() function to build a temporaty table to run SQL commands on our DataFrame at scale! A point to remember is that the lifetime of this temp table is tied to the session. It creates an in-memory table that is scoped to the cluster in which it was created. The data is stored using Hive's highly-optimized, in-memory columnar format.

You can also check out saveAsTable() which creates a permanent, physical table stored in S3 using the Parquet format. This table is accessible to all clusters. The table metadata including the location of the file(s) is stored within the Hive metastore.`

In [24]:
help(df.registerTempTable)
Help on method registerTempTable in module pyspark.sql.dataframe: registerTempTable(self, name) method of pyspark.sql.dataframe.DataFrame instance Registers this DataFrame as a temporary table using the given name. The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`. >>> df.registerTempTable("people") >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people") .. note:: Deprecated in 2.0, use createOrReplaceTempView instead. .. versionadded:: 1.3
In [25]:
df.registerTempTable("connections")

Executing SQL at Scale

Let's look at a few examples of how we can run SQL queries on our table based off our dataframe. We will start with some simple queries and then look at aggregations, filters, sorting, subqueries and pivots

Connections based on the protocol type

Let's look at how we can get the total number of connections based on the type of connectivity protocol. First we will get this information using normal DataFrame DSL syntax to perform aggregations.

In [28]:
display(df.groupBy('protocol_type')
          .count()
          .orderBy('count', ascending=False))
protocol_typecount
icmp283602
tcp190065
udp20354

Can we also use SQL to perform the same aggregation? Yes we can leverage the table we built earlier for this!

In [30]:
protocols = sqlContext.sql("""
                           SELECT protocol_type, count(*) as freq
                           FROM connections
                           GROUP BY protocol_type
                           ORDER BY 2 DESC
                           """)
display(protocols)
protocol_typefreq
icmp283602
tcp190065
udp20354

You can clearly see, that you get the same results and you do not need to worry about your background infrastructure or how the code is executed. Just write simple SQL!

Connections based on good or bad (attack types) signatures

We will now run a simple aggregation to check the total number of connections based on good (normal) or bad (intrusion attacks) types.

In [33]:
labels = sqlContext.sql("""
                           SELECT label, count(*) as freq
                           FROM connections
                           GROUP BY label
                           ORDER BY 2 DESC
                           """)
display(labels)
labelfreq
smurf.280790
neptune.107201
normal.97278
back.2203
satan.1589
ipsweep.1247
portsweep.1040
warezclient.1020
teardrop.979
pod.264
nmap.231
guess_passwd.53
buffer_overflow.30
land.21
warezmaster.20
imap.12
rootkit.10
loadmodule.9
ftp_write.8
multihop.7
phf.4
perl.3
spy.2

We have a lot of different attack types. We can visualize this in the form of a bar chart. The simplest way is to use the excellent interface options in the Databricks notebook itself as depicted in the following figure!

This gives us the following nice looking bar chart! Which you can customize further by clicking on Plot Options as needed.

In [35]:
labels = sqlContext.sql("""
                           SELECT label, count(*) as freq
                           FROM connections
                           GROUP BY label
                           ORDER BY 2 DESC
                           """)
display(labels)
labelfreq
smurf.280790
neptune.107201
normal.97278
back.2203
satan.1589
ipsweep.1247
portsweep.1040
warezclient.1020
teardrop.979
pod.264
nmap.231
guess_passwd.53
buffer_overflow.30
land.21
warezmaster.20
imap.12
rootkit.10
loadmodule.9
ftp_write.8
multihop.7
phf.4
perl.3
spy.2

Another way is to write the code yourself to do it. You can extract the aggregated data as a pandas DataFrame and then plot it as a regular bar chart.

In [37]:
labels_df = pd.DataFrame(labels.toPandas())
labels_df.set_index("label", drop=True,inplace=True)
labels_fig = labels_df.plot(kind='barh')

plt.rcParams["figure.figsize"] = (7, 5)
plt.rcParams.update({'font.size': 10})
plt.tight_layout()
display(labels_fig.figure)

Connections based on protocols and attacks

Let's look at which protocols are most vulnerable to attacks now based on the following SQL query.

In [39]:
attack_protocol = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as freq
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
display(attack_protocol)
protocol_typestatefreq
icmpattack282314
tcpattack113252
tcpno attack76813
udpno attack19177
icmpno attack1288
udpattack1177

Well, looks like ICMP connections followed by TCP connections have had the maximum attacks!

Connection stats based on protocols and attacks

Let's take a look at some statistical measures pertaining to these protocols and attacks for our connection requests.

In [42]:
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
display(attack_stats)
protocol_typestatetotal_freqmean_src_bytesmean_dst_bytesmean_durationtotal_failed_loginstotal_compromisedtotal_file_creationstotal_root_attemptstotal_root_acceses
icmpattack282314932.140.00.00000.00
tcpattack1132529880.38881.4123.19572269761.0152
tcpno attack768131439.314263.9711.0818277645917.05456
udpno attack1917798.0189.891054.630000.00
icmpno attack128891.470.00.00000.00
udpattack117727.50.230.00000.00

Looks like average amount of data being transmitted in TCP requests are much higher which is not surprising. Interestingly attacks have a much higher average payload of data being transmitted from the source to the destination.

Filtering connection stats based on the TCP protocol by service and attack type

Let's take a closer look at TCP attacks given that we have more relevant data and statistics for the same. We will now aggregate different types of TCP attacks based on service, attack type and observe different metrics.

In [45]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE protocol_type = 'tcp'
                                   AND label != 'normal.'
                                   GROUP BY service, attack_type
                                   ORDER BY total_freq DESC
                                   """)
display(tcp_attack_stats)
serviceattack_typetotal_freqmean_durationtotal_failed_loginstotal_file_creationstotal_root_attemptstotal_root_acceses
privateneptune.1013170.0000.00
httpback.22030.13000.00
othersatan.12210.0000.00
privateportsweep.7251915.81000.00
ftp_datawarezclient.708403.71000.00
ftpwarezclient.3071063.79000.00
otherportsweep.2601058.22000.00
telnetneptune.1970.0000.00
httpneptune.1920.0000.00
fingerneptune.1770.0000.00
ftp_dataneptune.1700.0000.00
privatesatan.1700.05000.00
csnet_nsneptune.1230.0000.00
smtpneptune.1200.0000.00
remote_jobneptune.1180.0000.00
pop_3neptune.1180.0000.00
discardneptune.1150.0000.00
iso_tsapneptune.1150.0000.00
systatneptune.1130.0000.00
domainneptune.1120.0000.00
gopherneptune.1120.0000.00
shellneptune.1110.0000.00
echoneptune.1110.0000.00
sql_netneptune.1090.0000.00
authneptune.1080.0000.00
rjeneptune.1080.0000.00
printerneptune.1070.0000.00
whoisneptune.1070.0000.00
courierneptune.1070.0000.00
bgpneptune.1060.0000.00
nntpneptune.1060.0000.00
netbios_ssnneptune.1060.0000.00
kloginneptune.1060.0000.00
uucp_pathneptune.1050.0000.00
nnspneptune.1050.0000.00
mtpneptune.1050.0000.00
imap4neptune.1050.0000.00
ftpneptune.1040.0000.00
uucpneptune.1040.0000.00
sunrpcneptune.1040.0000.00
timeneptune.1030.0000.00
loginneptune.1020.0000.00
hostnamesneptune.1020.0000.00
efsneptune.1020.0000.00
sshneptune.1020.0000.00
daytimeneptune.1020.0000.00
netbios_nsneptune.1010.0000.00
pop_2neptune.1010.0000.00
supdupneptune.1010.0000.00
ldapneptune.1010.0000.00
vmnetneptune.1010.0000.00
execneptune.990.0000.00
linkneptune.990.0000.00
privatenmap.990.0000.00
netbios_dgmneptune.990.0000.00
http_443neptune.980.0000.00
kshellneptune.980.0000.00
nameneptune.970.0000.00
ctfneptune.960.0000.00
netstatneptune.920.0000.00
otherneptune.910.0000.00
Z39_50neptune.910.0000.00
privateipsweep.680.0000.00
telnetguess_passwd.532.725600.00
telnetbuffer_overflow.21130.670150.05
fingerland.200.0000.00
ftp_datawarezmaster.188.06000.00
imap4imap.126.0000.016
ftp_databuffer_overflow.80.0000.00
telnetloadmodule.563.8090.03
telnetrootkit.5197.4120.025
otherwarezclient.53031.0000.00
httpphf.44.5000.00
ftp_dataftp_write.40.0000.01
vmnetportsweep.40.0000.00
supdupportsweep.410171.5000.00
ftp_dataipsweep.30.0000.00
csnet_nsportsweep.313484.0000.00
telnetperl.341.33060.06
ftp_dataloadmodule.30.0000.00
fingersatan.31.0000.00
ftp_datamultihop.30.33000.00
httpportsweep.313689.67000.00
ftpportsweep.30.0000.00
ftp_datasatan.30.0000.00
pop_3portsweep.313446.33000.00
gopheripsweep.34.33000.00
httpipsweep.32.0000.00
ftpftp_write.229.0020.00
sunrpcportsweep.20.0000.00
smtpportsweep.20.0000.00
ftpmultihop.2185.5020.00
rjeipsweep.20.0000.00
printerportsweep.215467.5000.00
httpsatan.20.0000.00
telnetspy.2318.0011.00
ftpwarezmaster.278.00220.00
systatportsweep.20.0000.00
mtpipsweep.20.0000.00
smtpsatan.20.5000.00
telnetmultihop.2458.0080.093
X11satan.20.0000.00
linkipsweep.20.0000.00
ftp_dataportsweep.221224.0000.00
whoisportsweep.22.0000.00
timeipsweep.21.0000.00
telnetportsweep.20.0000.00
loginftp_write.2100.5000.01
netstatportsweep.20.0000.00
uucp_pathportsweep.14.0000.00
sshportsweep.10.0000.00
telnetipsweep.16.0000.00
uucpsatan.10.0000.00
whoisipsweep.10.0000.00
linkportsweep.10.0000.00
nntpsatan.15.0000.00
nntpnmap.10.0000.00
remote_jobipsweep.10.0000.00
vmnetsatan.10.0000.00
ftpipsweep.12.0000.00
ftprootkit.121.0010.00
fingeripsweep.12.0000.00
discardsatan.111.0000.00
telnetnmap.10.0000.00
courierportsweep.130619.0000.00
echoportsweep.10.0000.00
efsportsweep.130835.0000.00
ftpbuffer_overflow.17.0040.00
ftp_datarootkit.10.0000.02
pop_3nmap.10.0000.00
sunrpcsatan.111.0000.00
http_443portsweep.10.0000.00
gophersatan.12.0000.00
ctfnmap.10.0000.00
telnetland.10.0000.00
fingerportsweep.12.0000.00
pop_3satan.15.0000.00
uucpportsweep.130418.0000.00
domainipsweep.16.0000.00
hostnamessatan.10.0000.00
telnetsatan.13.0000.00
remote_jobportsweep.10.0000.00
rjeportsweep.11.0000.00
gopherportsweep.10.0000.00
netstatsatan.10.0000.00
IRCsatan.10.0000.00
nameipsweep.10.0000.00
smtpipsweep.10.0000.00
netbios_nsportsweep.10.0000.00
ftploadmodule.17.0040.00
sshipsweep.16.0000.00
sql_netportsweep.10.0000.00
netbios_ssnportsweep.10.0000.00
daytimeportsweep.10.0000.00
hostnamesportsweep.10.0000.00
ftpsatan.18.0000.00
pm_dumpsatan.10.0000.00
Z39_50portsweep.10.0000.00

There are a lot of attack types and the preceding output shows a specific section of the same.

Filtering connection stats based on the TCP protocol by service and attack type

We will now filter some of these attack types by imposing some constraints based on duration, file creations, root accesses in our query.

In [48]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE (protocol_type = 'tcp'
                                          AND label != 'normal.')
                                   GROUP BY service, attack_type
                                   HAVING (mean_duration >= 50
                                           AND total_file_creations >= 5
                                           AND total_root_acceses >= 1)
                                   ORDER BY total_freq DESC
                                   """)
display(tcp_attack_stats)
serviceattack_typetotal_freqmean_durationtotal_failed_loginstotal_file_creationstotal_root_attemptstotal_root_acceses
telnetbuffer_overflow.21130.670150.05
telnetloadmodule.563.8090.03
telnetmultihop.2458.0080.093

Interesting to see multihop attacks being able to get root accesses to the destination hosts!

Subqueries to filter TCP attack types based on service

Let's try to get all the TCP attacks based on service and attack type such that the overall mean duration of these attacks is greater than zero (> 0). For this we can do an inner query with all aggregation statistics and then extract the relevant queries and apply a mean duration filter in the outer query as shown below.

In [51]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     t.service,
                                     t.attack_type,
                                     t.total_freq
                                   FROM
                                   (SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE protocol_type = 'tcp'
                                   AND label != 'normal.'
                                   GROUP BY service, attack_type
                                   ORDER BY total_freq DESC) as t
                                     WHERE t.mean_duration > 0 
                                   """)
display(tcp_attack_stats)
serviceattack_typetotal_freq
httpback.2203
privateportsweep.725
ftp_datawarezclient.708
ftpwarezclient.307
otherportsweep.260
privatesatan.170
telnetguess_passwd.53
telnetbuffer_overflow.21
ftp_datawarezmaster.18
imap4imap.12
otherwarezclient.5
telnetloadmodule.5
telnetrootkit.5
httpphf.4
supdupportsweep.4
gopheripsweep.3
telnetperl.3
ftp_datamultihop.3
csnet_nsportsweep.3
pop_3portsweep.3
fingersatan.3
httpipsweep.3
httpportsweep.3
telnetspy.2
smtpsatan.2
ftpmultihop.2
whoisportsweep.2
timeipsweep.2
ftpftp_write.2
ftpwarezmaster.2
printerportsweep.2
ftp_dataportsweep.2
telnetmultihop.2
loginftp_write.2
uucp_pathportsweep.1
ftprootkit.1
telnetipsweep.1
nntpsatan.1
fingerportsweep.1
ftpbuffer_overflow.1
ftpipsweep.1
sunrpcsatan.1
discardsatan.1
ftploadmodule.1
courierportsweep.1
domainipsweep.1
rjeportsweep.1
ftpsatan.1
gophersatan.1
fingeripsweep.1
pop_3satan.1
uucpportsweep.1
sshipsweep.1
telnetsatan.1
efsportsweep.1

This is nice! Now an interesting way to also view this data is to use a pivot table where one attribute represents rows and another one represents columns. Let's see if we can leverage Spark DataFrames to do this!

Building a Pivot Table from Aggregated Data

Here, we will build upon the previous DataFrame object we obtained where we aggregated attacks based on type and service. For this, we can leverage the power of Spark DataFrames and the DataFrame DSL.

In [54]:
display((tcp_attack_stats.groupby('service')
                         .pivot('attack_type')
                         .agg({'total_freq':'max'})
                         .na.fill(0))
)
serviceback.buffer_overflow.ftp_write.guess_passwd.imap.ipsweep.loadmodule.multihop.perl.phf.portsweep.rootkit.satan.spy.warezclient.warezmaster.
telnet021053015230051200
ftp012001120001103072
pop_30000000000301000
discard0000000000001000
login0020000000000000
smtp0000000000002000
domain0000010000000000
http2203000030004300000
courier0000000000100000
other000000000026000050
efs0000000000100000
private00000000007250170000
ftp_data0000000300200070818
whois0000000000200000
nntp0000000000001000
uucp_path0000000000100000
supdup0000000000400000
finger0000010000103000
printer0000000000200000
time0000020000000000
csnet_ns0000000000300000
sunrpc0000000000001000
imap400001200000000000
gopher0000030000001000
uucp0000000000100000
ssh0000010000000000
rje0000000000100000

We get a nice neat pivot table showing all the occurences based on service and attack type!

There are plenty of articles\tutorials available online so I would recommend you to check them out. Some useful resources for you to check out include, the complete guide to Spark SQL from Databricks.