Notebook Version: 1.0
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Data Sources Required: SecurityAlerts
This Notebook assists analysts in triage Alerts within Azure Sentinel by enriching them with Threat Intelligence and OSINT data. This purpose it to allow analysts to quickly triage a large number of alerts and identify those to focus investigation on.
How to use:
Run the cells in this Notebook in order, at various points in the Notebook flow you will be prompted to enter or select options relevant to the scope of your triage.
This Notebook presumes you have Azure Sentinel Workspace settings and Threat Intelligence providers configured in a config file. If you do not have this in place please refer https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html# to https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb and to set this up.
If this is your first time running this Notebook please run the cell below before proceeding to ensure you have the required packages installed correctly.
from pathlib import Path
import os
import sys
import warnings
from IPython.display import display, HTML, Markdown
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(0, 5, 0)
display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
from utils.nb_check import check_python_ver, check_mp_ver
check_python_ver(min_py_ver=REQ_PYTHON_VER)
try:
check_mp_ver(min_msticpy_ver=REQ_MSTICPY_VER)
except ImportError:
!pip install --upgrade msticpy
if "msticpy" in sys.modules:
importlib.reload(sys.modules["msticpy"])
else:
import msticpy
check_mp_ver(REQ_MSTICPY_VER)
from msticpy.nbtools import nbinit
extra_imports = [
"msticpy.nbtools.observationlist, Observation",
"msticpy.nbtools.observationlist, Observations",
"msticpy.nbtools.foliummap, get_center_ip_entities",
"whois",
"datetime,,dt",
"ipwhois, IPWhois",
]
additional_packages = [
"tldextract", "python-whois", "IPWhois"
]
nbinit.init_notebook(
namespace=globals(),
additional_packages=additional_packages,
extra_imports=extra_imports,
);
ti = TILookup()
summary = Observations()
Import the required packages and initialize a set of required entities and properties:
This cell collects Workspace details contained in your msticpyconfig.yaml file and uses them to authenticate.
#Collect Azure Sentinel Workspace Details from our config file and use them to connect
try:
# Update to WorkspaceConfig(workspace="WORKSPACE_NAME") to get alerts from a Workspace other than your default one.
# Run WorkspaceConfig().list_workspaces() to see a list of configured workspaces
ws_config = WorkspaceConfig(workspace="CyberSecDemo")
ws_id = ws_config['workspace_id']
ten_id = ws_config['tenant_id']
md("Workspace details collected from config file")
qry_prov = QueryProvider('LogAnalytics')
qry_prov.connect(connection_str=ws_config.code_connect_str)
except RuntimeError:
md("""You do not have any Workspaces configured in your config files.
Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb
to setup these files before proceeding""" ,'bold')
Enter some information relevant to your triage work , this will then be stored as part of this Notebook for future reference and recall. Please also select which Threat Intelligence providers to use for enrichment. Please note you need to have auth details for each provider in order for this to operate. You can select one or more providers or select "All" to use all avaliable providers.
#Collect details for triage record
md("Enter Name:")
name = widgets.Text()
display(name)
md("Enter Ticket ID: ")
ticket = widgets.Text()
display(ticket)
md("Enter Description:")
description = widgets.Textarea()
display(description)
# Get list of configured TI providers and filter out non TI enrichments
ti_provs = [x for x in ti.configured_providers if not x == "OPR" and not x == "Tor"]
if not ti_provs:
raise Exception("""You do not have any Threat Intelligence providers configured.
Please refer to https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html on how to configure them.""")
md("Select TI providers to use for enrichment.")
sel_ti_provs = widgets.SelectMultiple(
options=ti_provs + ["All"],
value=['All'],
description='TI providers:',
disabled=False
)
display(sel_ti_provs)
Adjust the time slider to select the timeframe for which you wish to triage alerts for.
# Set list of TI providers to use and record this in our summary record alongside triage details entered above.
if "All" in sel_ti_provs.value:
ti_prov_use = ti_provs
else:
ti_prov_use = list(sel_ti_provs.value)
invest_summary = Observation(caption="Investigation Details", data=
{"Analyst" : name.value, "Ticket": ticket.value, "Investigation Description" :description.value,
"Investigation Date": dt.datetime.now(), "TI Providers": ti_prov_use})
summary.add_observation(invest_summary)
# Widget to select time window in
query_times = nbwidgets.QueryTime(units='day',
max_before=30, max_after=1, before=3)
query_times.display()
You can choose to select a subset of alerts based on provider in order to narrow your triage scope. You can also select "All" to return security alerts from all providers. Once a provider is selected you can additionally filter by Alert Name in order to focus on a specific alert type.
# Collect alerts based on the scope set above
alerts = qry_prov.SecurityAlert.list_alerts(query_times)
alerts_summ = Observation(caption="Alerts", data={"Data" : alerts, "Times": query_times })
summary.add_observation(alerts_summ)
# display summary of alerts retrieved
md("Alert summary", "large")
display(alerts.groupby("ProviderName")[["AlertName"]]
.count()
.reset_index()
.rename(columns={"AlertName": "Alerts"})
)
def update_alert_names(_):
selected_alert_type = sel_alerts.value
if sel_prov.value != "All":
alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
else:
alert_names = alerts['AlertName'].unique()
alert_names = np.append(alert_names, ["All"])
sel_alerts.options = alert_names
if selected_alert_type in alert_names:
sel_alerts.value = selected_alert_type
else:
sel_alerts.value = "All"
if alerts.empty:
md(f"No alerts in this Workspace during between {query_times.start} and {query_times.end}", "bold")
else:
w_layout = list_layout = widgets.Layout(width="400px")
#Select Provider to filter by
providers = alerts['ProviderName'].unique()
providers = np.append(providers, ["All"])
sel_prov = widgets.Dropdown(
options=providers,
description='Providers:',
disabled=False,
layout=w_layout,
)
sel_prov.observe(update_alert_names, names="value")
alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
alert_names = np.append(alert_names, ["All"])
sel_alerts = widgets.Dropdown(
options=alert_names,
description='Alert Names:',
disabled=False,
value = "All",
layout=w_layout,
)
md("Select provider and/or Alert type to triage", "large")
display(widgets.VBox([sel_prov, sel_alerts]))
Once alerts are collected we can enrich these alerts by looking up the entities associated with these alerts in Threat Intelligence. The TI Risk column in the table below represents an aggregation of results from the selected TI providers.
import json
from tqdm.notebook import tqdm
# Filter alerts based on AlertName and Provider
if sel_prov.value == "All":
sent_alerts = alerts
else:
sent_alerts = alerts[alerts['ProviderName'] == sel_prov.value].copy()
if sel_alerts.value == "All":
selected_alert_type = sent_alerts
else:
selected_alert_type = sent_alerts[sent_alerts['AlertName']== sel_alerts.value].copy()
def entity_load(entity):
try:
return json.loads(entity)
except json.JSONDecodeError:
return None
selected_alert_type['Entities'] = selected_alert_type['Entities'].apply(entity_load)
# Lookup each entity in TI and aggregate results into a overall severity based on the highest indicator severity.
def lookup(row):
sev = []
if row['Entities'] is not None:
for entity in row['Entities']:
try:
if entity["Type"] == 'ip' or entity["Type"] == 'ipaddress':
resp = ti.lookup_ioc(observable=entity["Address"], providers=ti_prov_use)
elif entity["Type"] == 'url':
resp = ti.lookup_ioc(observable=entity["Url"], providers=ti_prov_use)
else:
resp = None
if resp:
for response in resp[1]:
sev.append(response[1].severity)
except KeyError:
pass
if 'high' in sev:
severity = "High"
elif 'warning' in sev:
severity = "Warning"
elif 'information' in sev:
severity = "Information"
else:
severity = "None"
return severity
# Highlight cells based on Threat Intelligence results.
def color_cells(val):
if isinstance(val, str):
if val.lower() == "high":
color = 'Red'
elif val.lower() == 'warning':
color = 'Orange'
elif val.lower() == 'information':
color = 'Green'
else:
color = 'none'
else:
color = 'none'
return 'background-color: %s' % color
tqdm.pandas(desc="Lookup progress")
selected_alert_type['TI Risk'] = selected_alert_type.progress_apply(lookup, axis=1)
display(selected_alert_type[['StartTimeUtc','AlertName','Severity','TI Risk', 'Description']]
.sort_values(by=['StartTimeUtc']).style.applymap(color_cells).hide_index())
We can drill down into a specific alert by selecting it from the list below. This will return additional details on the alert as well as details of any threat intelligence matches.
from msticpy.sectools.ip_utils import convert_to_ip_entities
from msticpy.nbtools.foliummap import FoliumMap, get_center_ip_entities
#Display full alert details when selected
def show_full_alert(selected_alert):
global security_alert
security_alert = SecurityAlert(
rel_alert_select.selected_alert)
ioc_list = []
ti_out = None
if security_alert['Entities'] is not None:
for entity in security_alert['Entities']:
if entity['Type'] == 'ipaddress' or entity['Type'] == 'ip':
ioc_list.append(entity['Address'])
elif entity["Type"] == 'url':
ioc_list.append(entity['Url'])
if len(ioc_list) > 0:
ti_data = ti.lookup_iocs(data=ioc_list, providers=ti_prov_use)
ti_out = ti_data[['Ioc','IocType','Provider','Result','Severity','Details']].reset_index().style.applymap(color_cells).hide_index()
ti_ips = ti_data[ti_data['IocType'] == 'ipv4']
# If we have IP entities try and plot these on a map
if not ti_ips.empty:
ip_ents = [convert_to_ip_entities(i)[0] for i in ti_ips['Ioc'].unique()]
center = get_center_ip_entities(ip_ents)
ip_map = FoliumMap(location=center, zoom_start=4)
ip_map.add_ip_cluster(ip_ents, color='red')
else:
ip_map = None
#nbdisplay.display_alert(security_alert, show_entities=True)
return([security_alert, ti_out, ip_map])
# Show selected alert when selected
if isinstance(alerts, pd.DataFrame) and not alerts.empty:
ti_data = None
md('Click on alert to view details.', "large")
rel_alert_select = nbwidgets.SelectAlert(alerts=selected_alert_type,
action=show_full_alert)
rel_alert_select.display()
# Add alert details to summary.
if ti_data is not None:
alert_details = Observation(caption="Alert Details", data={"Alert":security_alert, "TI":ti_data})
else:
alert_details = Observation(caption="Alert Details", data=security_alert)
summary.add_observation(alert_details)
else:
md('No alerts found.')
The cell below displays a timeline of the alerts you are triaging, with the selected alert highlighted in order to provide context on the alert.
# Display timeline of all alerts grouped by the TI risk score of them
selected_alert = Observation(caption="Alert Details", data=rel_alert_select.selected_alert)
summary.add_observation(selected_alert)
if len(selected_alert_type) == 1:
md("Only one alert in selected alert provider/type - can't display timeline.")
else:
nbdisplay.display_timeline(
data=selected_alert_type, time_column="StartTimeUtc",
group_by="TI Risk", source_columns=["AlertName"],
alert=rel_alert_select.selected_alert, title="Alerts over time grouped by TI risk score")
Now that we have selected an alert of interest and triage key details we need to identify next investigative steps. The cell below identifies and extracts key entities from the selected alert. It provides additional enrichment to them using OSINT and based on their type recommends an additional Notebook to run for further investigation based on the Notebooks available at https://github.com/Azure/Azure-Sentinel-Notebooks/ or via the Azure Sentinel portal.
from ipwhois import IPWhois
import whois
from ipaddress import ip_address
import tldextract
# Based on the extracted entity enrich it with OSINT
def enhance(row):
if row['Type'] == "ipaddress":
return whois_desc(row['Entity'])
elif row['Type'] == "host":
return host_sum(row['Entity'])
elif row['Type'] == "url":
return whois_url(row['Entity'])
# If entity is a hostname, get key details of the host.
def host_sum(host):
hb_q = f"Heartbeat | where TimeGenerated > datetime({query_times.start}) and TimeGenerated < datetime({query_times.end}) | where Computer == '{host}' | take 1"
hb = qry_prov.exec_query(hb_q)
if not hb.empty:
hb_str = f"{host} - {hb['ComputerIP'][0]} - {hb['OSType'][0]} - {hb['ComputerEnvironment'][0]}"
else:
hb_str = "No host heartbeat"
return hb_str
# If entity is IP address work out what type of address it is and if a public IP address get ASN name.
def whois_desc(ip_lookup, progress=False):
try:
ip = ip_address(ip_lookup)
except ValueError:
return "Not an IP Address"
if ip.is_private:
return "Private address space"
if not ip.is_global:
return "Other address space"
ip_whois = IPWhois(ip)
whois_result = ip_whois.lookup_whois()
return whois_result["asn_description"]
# If entity is a URL get the name of the organisation that registered the domain.
def whois_url(url):
_, domain,tld = tldextract.extract(url)
wis = whois.whois(f"{domain}.{tld}")
return wis['org']
# Based on the entity type suggest a Notebook for future investigation.
def notebook_suggestor(row):
if row['Type'] in notebooks.keys():
return notebooks[row['Type']]
else:
return "Write your own Notebook"
notebooks = {"ipaddress" : "Entity Explorer - IP Address",
"host" : "Entity Explorer - Linux Host/Windows Host",
"account" : "Entity Explorer - Account",
"url" : "Entity Explorer - Domain and URL"}
md('Entities for further investigation:', 'bold')
ents = security_alert.get_all_entities()
if not ents.empty:
ents['Notebook'] = ents.apply(notebook_suggestor, axis=1)
ents['Enrichment'] = ents.apply(enhance, axis=1)
display(ents.style.hide_index())
# Save entity details into our summary.
entities = Observation(caption="Entities for further investigation", data=ents)
summary.add_observation(entities)
else:
md('No entities found in this alert')
#Uncomment the line below to see a summary of this Notebook's output
#summary.display_observations()