import pandas as pd import matplotlib.pyplot as plt plt.style.use('fivethirtyeight') dbutils.help() import urllib urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz") dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz") display(dbutils.fs.ls("dbfs:/kdd")) data_file = "dbfs:/kdd/kddcup_data.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5) type(raw_rdd) from pyspark.sql import SQLContext sqlContext = SQLContext(sc) sqlContext csv_rdd = raw_rdd.map(lambda row: row.split(",")) print(csv_rdd.take(2)) print(type(csv_rdd)) len(csv_rdd.take(1)[0]) from pyspark.sql import Row parsed_rdd = csv_rdd.map(lambda r: Row( duration=int(r[0]), protocol_type=r[1], service=r[2], flag=r[3], src_bytes=int(r[4]), dst_bytes=int(r[5]), wrong_fragment=int(r[7]), urgent=int(r[8]), hot=int(r[9]), num_failed_logins=int(r[10]), num_compromised=int(r[12]), su_attempted=r[14], num_root=int(r[15]), num_file_creations=int(r[16]), label=r[-1] ) ) parsed_rdd.take(5) df = sqlContext.createDataFrame(parsed_rdd) display(df.head(10)) df.printSchema() help(df.registerTempTable) df.registerTempTable("connections") display(df.groupBy('protocol_type') .count() .orderBy('count', ascending=False)) protocols = sqlContext.sql(""" SELECT protocol_type, count(*) as freq FROM connections GROUP BY protocol_type ORDER BY 2 DESC """) display(protocols) labels = sqlContext.sql(""" SELECT label, count(*) as freq FROM connections GROUP BY label ORDER BY 2 DESC """) display(labels) labels = sqlContext.sql(""" SELECT label, count(*) as freq FROM connections GROUP BY label ORDER BY 2 DESC """) display(labels) labels_df = pd.DataFrame(labels.toPandas()) labels_df.set_index("label", drop=True,inplace=True) labels_fig = labels_df.plot(kind='barh') plt.rcParams["figure.figsize"] = (7, 5) plt.rcParams.update({'font.size': 10}) plt.tight_layout() display(labels_fig.figure) attack_protocol = sqlContext.sql(""" SELECT protocol_type, CASE label WHEN 'normal.' THEN 'no attack' ELSE 'attack' END AS state, COUNT(*) as freq FROM connections GROUP BY protocol_type, state ORDER BY 3 DESC """) display(attack_protocol) attack_stats = sqlContext.sql(""" SELECT protocol_type, CASE label WHEN 'normal.' THEN 'no attack' ELSE 'attack' END AS state, COUNT(*) as total_freq, ROUND(AVG(src_bytes), 2) as mean_src_bytes, ROUND(AVG(dst_bytes), 2) as mean_dst_bytes, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_compromised) as total_compromised, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections GROUP BY protocol_type, state ORDER BY 3 DESC """) display(attack_stats) tcp_attack_stats = sqlContext.sql(""" SELECT service, label as attack_type, COUNT(*) as total_freq, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections WHERE protocol_type = 'tcp' AND label != 'normal.' GROUP BY service, attack_type ORDER BY total_freq DESC """) display(tcp_attack_stats) tcp_attack_stats = sqlContext.sql(""" SELECT service, label as attack_type, COUNT(*) as total_freq, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections WHERE (protocol_type = 'tcp' AND label != 'normal.') GROUP BY service, attack_type HAVING (mean_duration >= 50 AND total_file_creations >= 5 AND total_root_acceses >= 1) ORDER BY total_freq DESC """) display(tcp_attack_stats) tcp_attack_stats = sqlContext.sql(""" SELECT t.service, t.attack_type, t.total_freq FROM (SELECT service, label as attack_type, COUNT(*) as total_freq, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections WHERE protocol_type = 'tcp' AND label != 'normal.' GROUP BY service, attack_type ORDER BY total_freq DESC) as t WHERE t.mean_duration > 0 """) display(tcp_attack_stats) display((tcp_attack_stats.groupby('service') .pivot('attack_type') .agg({'total_freq':'max'}) .na.fill(0)) )