Notebook Version: 1.0
Python Version: Python 3.8 - AzureML
Required Packages: azureml-synapse, Msticpy, azure-storage-file-datalake
Platforms Supported: Azure Machine Learning Notebooks connected to Azure Synapse Workspace
Data Source Required: Yes
Data Source: CommonSecurityLogs
Spark Version: 3.1 or above
In this sample guided scenario notebook, we will demonstrate how to set up continuous data pipeline to store data into azure data lake storage (ADLS) and
then hunt on that data at scale using distributed processing via Azure Synapse workspace connected to serverless Spark pool.
Once historical dataset is available in ADLS , we can start performing common hunt operations, create a baseline of normal behavior using PySpark API and also apply data transformations
to find anomalous behaviors such as periodic network beaconing as explained in the blog - Detect Network beaconing via Intra-Request time delta patterns in Microsoft Sentinel - Microsoft Tech Community.
You can use various other spark API to perform other data transformation to understand the data better.
The output generated can also be further enriched to populate Geolocation information and also visualize using Msticpy capabilities to identify any anomalies.
.
*** Python modules download may be needed. *
* Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
Note: Install below packages only for the first time and restart the kernel once done.
# Install AzureML Synapse package to use spark magics
import sys
!{sys.executable} -m pip install azureml-synapse
# Install Azure storage datalake library to manipulate file systems
import sys
!{sys.executable} -m pip install azure-storage-file-datalake --pre
# Install Azure storage datalake library to manipulate file systems
import sys
!{sys.executable} -m pip install msticpy
*** $\color{red}{Note:~After~installing~the~packages,~please~restart~the~kernel.}$ ***
# Load Python libraries that will be used in this notebook
from azure.common.client_factory import get_client_from_cli_profile
from azure.common.credentials import get_azure_cli_credentials
from azure.mgmt.resource import ResourceManagementClient
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration, Datastore
from azureml.core.compute import SynapseCompute, ComputeTarget
from datetime import timedelta, datetime
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
import json
import os, uuid, sys
import IPython
import pandas as pd
from ipywidgets import widgets, Layout
from IPython.display import display, HTML
from pathlib import Path
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(1, 4, 4)
display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
from utils.nb_check import check_python_ver, check_mp_ver
check_python_ver(min_py_ver=REQ_PYTHON_VER)
try:
check_mp_ver(min_msticpy_ver=REQ_MSTICPY_VER)
except ImportError:
!pip install --upgrade msticpy
if "msticpy" in sys.modules:
importlib.reload(sys.modules["msticpy"])
else:
import msticpy
check_mp_ver(REQ_MSTICPY_VER)
# If not using Azure Notebooks, install msticpy with
# !pip install msticpy
from msticpy.nbtools import nbinit
extra_imports = [
"msticpy.nbtools.nbdisplay, draw_alert_entity_graph",
"msticpy.sectools.ip_utils, convert_to_ip_entities",
"msticpy.nbtools.ti_browser, browse_results",
"IPython.display, Image",
"msticpy.sectools.ip_utils, get_whois_info",
"msticpy.sectools.ip_utils, get_ip_type"
]
nbinit.init_notebook(
namespace=globals(),
# additional_packages=["azureml-synapse", "azure-cli", "azure-storage-file-datalake"],
extra_imports=extra_imports,
);
WIDGET_DEFAULTS = {
"layout": Layout(width="95%"),
"style": {"description_width": "initial"},
}
#Set pandas options
pd.get_option('max_rows',10)
pd.set_option('max_colwidth',50)
Please use notebook Configurate Azure ML and Azure Synapse Analytics to configure environment.
The notebook will configure existing Azure synapse workspace to create and connect to Spark pool. You can then create linked service and connect AML workspace to Azure Synapse workspaces.
It will also configure data export rules to export data from Log analytics workspace CommonSecurityLog table to Azure Data lake storage Gen 2.
Note: Specify the input parameters in below step in order to connect AML workspace to synapse workspace using linked service.
amlworkspace = "<aml workspace name>" # fill in your AML workspace name
subscription_id = "<subscription id>" # fill in your subscription id
resource_group = '<resource group of AML workspace>' # fill in your resource groups for AML workspace
linkedservice = '<linked service name>' # fill in your linked service created to connect to synapse workspace
In this step we will connect aml workspace to linked service connected to Azure Synapse workspace
# Get the aml workspace
aml_workspace = Workspace.get(name=amlworkspace, subscription_id=subscription_id, resource_group=resource_group)
# Retrieve a known linked service
linked_service = LinkedService.get(aml_workspace, linkedservice)
Enter your Synapse Spark compute below. To find the Spark compute, please follow these steps:
synapse_spark_compute = input('Synapse Spark compute:')
# Start spark session
%synapse start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute
In this step, we will define several details associated with ADLS account and specify input date and lookback period to calculate baseline. Based on the input dates and lookback period , we will load the data.
%%synapse
# Primary storage info
account_name = '<storage account name>' # fill in your primary account name
container_name = '<container name>' # fill in your container name
subscription_id = '<subscription if>' # fill in your subscription id
resource_group = '<resource group>' # fill in your resource groups for ADLS
workspace_name = '<Microsoft sentinel/log analytics workspace name>' # fill in your workspace name
device_vendor = "Fortinet" # Replace your desired network vendor from commonsecuritylogs
# Datetime and lookback parameters
end_date = "<enter date in the format yyyy-MM-dd e.g.2021-09-17>" # fill in your input date
lookback_days = 21 # fill in lookback days if you want to run it on historical data. make sure you have historical data available in ADLS
%%synapse
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from datetime import timedelta, datetime, date
# Compiling ADLS paths from date string
end_date_str = end_date.split("-")
current_path = f"/y={end_date_str[0]}/m={end_date_str[1]}/d={end_date_str[2]}"
def generate_adls_paths(end_date, lookback_days, adls_path):
endDate = datetime.strptime(end_date, '%Y-%m-%d')
endDate = endDate - timedelta(days=1)
startDate = endDate - timedelta(days=lookback_days)
daterange = [startDate + timedelta(days=x) for x in range((endDate-startDate).days + 1)]
pathlist = []
for day in daterange:
date_str = day.strftime('%Y-%m-%d').split("-")
day_path = adls_path + f"/y={date_str[0]}/m={date_str[1]}/d={date_str[2]}"
pathlist.append(day_path)
return pathlist
adls_path = f'abfss://{container_name}@{account_name}.dfs.core.windows.net/WorkspaceResourceId=/subscriptions/{subscription_id}/resourcegroups/{resource_group}/providers/microsoft.operationalinsights/workspaces/{workspace_name}'
current_day_path = adls_path + current_path
historical_path = generate_adls_paths(end_date, lookback_days, adls_path)
In this step, you will load the data based on the input date specified.
%%synapse
try:
current_df = (
spark.read.option("recursiveFileLook", "true")
.option("header", "true")
.json(current_day_path)
)
current_df = (
current_df
.select(
"TimeGenerated",
"SourceIP",
"SourcePort",
"DestinationIP",
"DestinationPort",
"Protocol",
"ReceivedBytes",
"SentBytes",
"DeviceVendor",
)
.filter(F.col("DeviceVendor") == device_vendor)
)
except Exception as e:
print(f"Could note load the data due to error:{e}")
#Display the count of records
print(f"No of records loaded from the current date specified: {current_df.count()}")
You can also perform the analysis on all historical data available in your ADLS account. The notebook is currently configured to run only on current date specified in input.
If you need to perform the same analysis on historical data, run the cell below and under Data Wrangling using Spark -> Filtering Data code cell, replace current_df
with historical_df
variable.
Otherwise SKIP running below cell as it will result in an error if you do not have historical data
%%synapse
try:
#Read Previous days data
historical_df = (
spark.read.option("recursiveFileLook", "true")
.option("header", "true")
.json(historical_path[-1])
)
historical_df = historical_df.select(
"TimeGenerated",
"SourceIP",
"SourcePort",
"DestinationIP",
"DestinationPort",
"Protocol",
"ReceivedBytes",
"SentBytes",
"DeviceVendor",
).filter(F.col("DeviceVendor") == device_vendor)
#Read all historical days data per day and union them together
for path in historical_path[:-1]:
daily_table = (
spark.read.option("recursiveFileLook", "true")
.option("header", "true")
.json(path)
)
daily_table = daily_table.select(
"TimeGenerated",
"SourceIP",
"SourcePort",
"DestinationIP",
"DestinationPort",
"Protocol",
"ReceivedBytes",
"SentBytes",
"DeviceVendor",
).filter(F.col("DeviceVendor") == device_vendor)
historical_df = historical_df.union(daily_table)
except Exception as e:
print(f"Could not load the data due to error:\n {e}")
#Display the count of records
print(f"\n No of records loaded from the lookback days specified: {historical_df.count()}")
In this step, we will prepare dataset by filtering logs to only destination as Public/external IPs. For this, we are using regex and rlike spark API to filter internal to external network traffic.
%%synapse
PrivateIPregex = ("^127\.|^10\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[0-1]\.|^192\.168\.")
cooked_df = (current_df # replace historical_df if you want to use historical data
# use below filter if you have Palo Alto logs
# .filter(
# (F.col("Activity") == "TRAFFIC")
# )
.withColumn(
"DestinationIsPrivate", F.col("DestinationIP").rlike(PrivateIPregex)
)
.filter(F.col("DestinationIsPrivate") == "false")
.withColumn("TimeGenerated", F.col("TimeGenerated").cast("timestamp"))
)
cooked_df.show()
In this step, you can either analyze Historical data or current data to filter source IP and destination IP per defined criteria.
In below example, we are filtering the Source IP which has daily event count more than the specified threshold.
Also, you can filter the destination IPs whom very less source IPs are connecting. This will reduce false positives be filtering destination IPs which are commonly seen from internal systems which are likely benign.
%%synapse
daily_event_count_threshold = 100 # Replace the threshold based on your environment or use default values
degree_of_srcip_threshold = 25 # Replace the threshold based on your environment or use default values
# Filtering IP list per TotalEventsThreshold
csl_srcip = (
cooked_df.groupBy("SourceIP")
.count()
.filter(F.col("count") > daily_event_count_threshold)
.orderBy(F.col("count"), ascending=False)
)
# Filtering Destination IP list per Degree of Source IPs threshold
csl_dstip = (
cooked_df.groupBy("DestinationIP")
.agg(F.countDistinct("SourceIP").alias("DegreeofSourceIps"))
.filter(F.col("DegreeofSourceIps") < degree_of_srcip_threshold)
)
# Filtering IP list per Daily event threshold
baseline_df = (
cooked_df.join(csl_srcip, ["SourceIP"])
.join(csl_dstip, ["DestinationIP"])
.select(
"TimeGenerated",
"SourceIP",
"SourcePort",
"DestinationIP",
"DestinationPort",
"Protocol",
"ReceivedBytes",
"SentBytes",
"DeviceVendor",
)
)
baseline_df.show()
In this step, we will use spark to wrangle the data by applying below transformations.
** SPARK References:**
%%synapse
time_delta_threshold = 15 # Replace thresholds in seconds interval between 2 successive events. min 10 to anything maximum.
percent_beacon_threshold = 75 # Replace thresholds in percentage. Max value can be 100.
# Serialize the dataset by sorting timegenerated and partition by SourceIP and WorkspaceId
w = (
Window()
.partitionBy(F.col("SourceIP"))
.orderBy(F.col("TimeGenerated"))
)
# Calculate new timestamp column of next event
csl_beacon_df = baseline_df.select(
"*", lag("TimeGenerated").over(w).alias("prev_TimeStamp")
).na.drop()
# Calculate timedelta difference between previoud and next timestamp
timeDiff = F.unix_timestamp("TimeGenerated") - F.unix_timestamp("prev_TimeStamp")
# Add new column as timedelta
csl_beacon_df = csl_beacon_df.withColumn("Timedelta", timeDiff).filter(
F.col("Timedelta") > time_delta_threshold
)
csl_beacon_ranked = csl_beacon_df.groupBy(
"DeviceVendor",
"SourceIP",
"DestinationIP",
"DestinationPort",
"Protocol",
"Timedelta",
).agg(
F.count("Timedelta").alias("Timedeltacount"),
F.sum("SentBytes").alias("TotalSentBytes"),
F.sum("ReceivedBytes").alias("TotalReceivedBytes"),
F.count("*").alias("Totalevents"),
)
w1 = (
Window.partitionBy(
"DeviceVendor",
"SourceIP",
"DestinationIP",
"DestinationPort",
)
.orderBy(F.col("SourceIP").cast("long"))
.rowsBetween(-2, 0)
)
csl_beacon_df = (
csl_beacon_ranked
.join(csl_dstip, ["DestinationIP"])
.withColumn("Timedeltalist", F.collect_list("Timedeltacount").over(w1))
.withColumn(
"Timedeltalistcount",
F.expr("AGGREGATE(Timedeltalist, DOUBLE(0), (acc, x) -> acc + x)").cast(
"long"
),
)
.withColumn(
"Totaleventcount",
F.sum("Timedeltalistcount").over(
Window.partitionBy("SourceIP", "DestinationIP", "DestinationPort")
),
)
.withColumn(
"Percentbeacon",
(
F.col("Timedeltalistcount")
/ F.sum("Timedeltalistcount").over(
Window.partitionBy(
"DeviceVendor",
"SourceIP",
"DestinationIP",
"DestinationPort",
)
)
* 100.0
),
)
.filter(F.col("Percentbeacon") > percent_beacon_threshold)
.filter(F.col("Totaleventcount") > daily_event_count_threshold)
.orderBy(F.col("Percentbeacon"), ascending=False)
)
csl_beacon_df.show()
In this step, we will save the results from previous step as single json file in ADLS. This file can be exported from ADLS to be used with native python session outside spark pool for more data analysis, visualization etc.
%%synapse
dir_name = "<dir-name>" # specify desired directory name
new_path = adls_path + dir_name
csl_beacon_pd = csl_beacon_df.coalesce(1).write.format("json").save(new_path)
%synapse stop
def initialize_storage_account(storage_account_name, storage_account_key):
try:
global service_client
service_client = DataLakeServiceClient(
account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name
),
credential=storage_account_key,
)
except Exception as e:
print(e)
def list_directory_contents(container_name, input_path, file_type):
try:
file_system_client = service_client.get_file_system_client(
file_system=container_name
)
paths = file_system_client.get_paths(path=input_path)
pathlist = []
for path in paths:
pathlist.append(path.name) if path.name.endswith(file_type) else pathlist
return pathlist
except Exception as e:
print(e)
def download_file_from_directory(container_name, input_path, input_file):
try:
file_system_client = service_client.get_file_system_client(
file_system=container_name
)
directory_client = file_system_client.get_directory_client(input_path)
local_file = open("output.json", "wb")
file_client = directory_client.get_file_client(input_file)
download = file_client.download_file()
downloaded_bytes = download.readall()
local_file.write(downloaded_bytes)
local_file.close()
except Exception as e:
print(e)
def json_normalize(input_file, output_file):
nwbeaconList = []
with open(input_file) as f:
for jsonObj in f:
nwbeaconDict = json.loads(jsonObj)
nwbeaconList.append(nwbeaconDict)
with open(output_file, "w") as write_file:
json.dump(nwbeaconList, write_file)
In below sections, we will provide input details about ADLS account ad then use functions to connect , list contents and download results locally.
If you need help in locating input details, follow below steps
You can check View account access keys doc to find and retrieve your storage account keys for ADLS account.
Warning: If you are storing secrets such as storage account keys in the notebook you should
probably opt to store either into msticpyconfig file on the compute instance or use
Read more about using KeyVault
in the MSTICPY docs
# Primary storage info
account_name = "<storage account name>" # fill in your primary account name
container_name = "<container name>" # fill in your container name
subscription_id = "<subscription id>"
resource_group = "<resource-group>" # fill in your resource gropup for ADLS account
workspace_name = "<Microsoft sentinel/Log Analytics workspace name>" # fill in your la workspace name
input_path = f"WorkspaceResourceId=/subscriptions/{subscription_id}/resourcegroups/{resource_group}/providers/microsoft.operationalinsights/workspaces/"
adls_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/{input_path}/{workspace_name}"
dir_name = "<dir-name>/" #Replace the dirname previously specified to store results from spark
account_key = "<storage-account-key>" # Replace your storage account key
new_path = input_path + dir_name
initialize_storage_account(account_name, account_key)
pathlist = list_directory_contents(container_name, new_path, "json")
input_file = pathlist[0].split("/")[-1]
download_file_from_directory(container_name, new_path, input_file)
json_normalize("output.json", "out_normalized.json")
df = pd.read_json('out_normalized.json')
df.head()
In this section, we will enrich entities retrieved from network beaconing behavior such as IP information. Types of Enrichment which will beneficial in perfoming investigation will be IP Geolcation , Whois Registrar information and ThreatIntel lookups.
For first time users, please refer Getting Started Guide For Microsoft Sentinel ML Notebooks
and section Create your configuration file to create your mstipyconfig
.
In this step, we will use msticpy geolocation capabilities using maxmind database. You will need maxmind API key to download the database.
from msticpy.sectools.geoip import GeoLiteLookup
iplocation = GeoLiteLookup()
df = iplocation.df_lookup_ip(df, column="DestinationIP")
df.head()
In this step, we can perform whois lokup on all public destination ips and populate additional information such as ASN. You can use this output to further filter known ASNs from the results.
num_ips = len(df["DestinationIP"].unique())
print(f"Performing WhoIs lookups for {num_ips} IPs ", end="")
df["DestASN"] = df.apply(lambda x: get_whois_info(x.DestinationIP, True), axis=1)
df["DestASNFull"] = df.apply(lambda x: x.DestASN[1], axis=1)
df["DestASN"] = df.apply(lambda x: x.DestASN[0], axis=1)
#Display results
df.head()
In this step, we can perform threatintel lookup using msticpy and open source TI providers such as IBM Xforce, VirusTotal, Greynoise etc.
Below example shows performing lookup on single IP as well as bulk lookup on all ips using IBM Xforce TI Provider.
You will need to register with IBM Xforce and enter API keys into mstipyconfig.yaml
ti_lookup = TILookup()
# Perform lookup on single IOC
result = ti_lookup.lookup_ioc(observable="52.183.120.194", providers=["XForce"])
ti_lookup.result_to_df(result)
# Flattening all the desnation IPs into comma separated list
ip_list = df['DestinationIP'].astype(str).values.flatten().tolist()
# Perform bulk lookup on all IPs with specified providers
ti_resp = ti_lookup.lookup_iocs(data=ip_list, providers=["AzSTI", "XForce"])
select_ti = browse_results(ti_resp, severities=['high','warning'])
select_ti
MSTICpy also includes a feature to allow you to map locations, this can be particularily useful when looking at the distribution of remote network connections or other events. Below we plot the locations of destination IPs observed in our results.
from msticpy.nbtools import entityschema
from msticpy.sectools.ip_utils import convert_to_ip_entities
from msticpy.nbtools.foliummap import FoliumMap, get_map_center
# Convert our IP addresses in string format into an ip address entity
ip_entity = entityschema.IpAddress()
ip_list = [convert_to_ip_entities(i)[0] for i in df['DestinationIP'].head(10)]
# Get center location of all IP locaitons to center the map on
location = get_map_center(ip_list)
logon_map = FoliumMap(location=location, zoom_start=4)
# Add location markers to our map and dsiplay it
if len(ip_list) > 0:
logon_map.add_ip_cluster(ip_entities=ip_list)
display(logon_map.folium_map)
We originally started our hunting on very large datasets of firewall logs. Due to the sheer scale of data, we leveraged spark to load the data.
We then performed baselining on historical data and use it to further filter current day dataset. In the next step we performed various data transformation by using spark features such as paritioning, windowing, ranking datatset to find outbound network beaconing like behavior.
In order to analyze this data further, we enrich IP entities from result dataset with additional information such as Geolocation, whois registration and threat intel lookups.
Analysts can perform further investigation on selected IP addresses from enrichment results by correlating various data sources available. You can then create incidents in Microsoft Sentinel and track investigation in it.