Data Sources Used:
This notebooks takes you through a guided triage of an Microsoft Sentinel Incident. The triage focuses on investigating the entities that attached to an Microsoft Sentinel Incident. This notebook can be extended with additional triage steps based on specific processes and workflows.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
import json
from IPython.display import HTML, display
from bokeh.plotting import show
from msticpy.nbwidgets import SelectAlert, Progress
from msticpy.context.azure import MicrosoftSentinel
from msticpy.vis.entity_graph_tools import Entity
from msticpy.common.exceptions import MsticpyAzureConfigError
from msticpy.common.pkg_config import get_config
from msticpy.vis.timeline import display_timeline
from msticpy.vis.entity_graph_tools import EntityGraph
display(HTML("<h3>Starting Notebook setup...</h3>"))
import msticpy as mp
mp.init_notebook(
additional_packages=["msticnb>=1.0"],
)
import msticnb as nb
def check_ent(items, entity):
"""Check if entity is present"""
for item in items:
if item[0].casefold() == entity.casefold():
return True
return False
def ti_color_cells(val):
"""Color cells of output dataframe based on severity"""
color = "none"
if isinstance(val, str):
if val.casefold() == "high":
color = "Red"
elif val.casefold() == "warning" or val.casefold() == "medium":
color = "Orange"
elif val.casefold() == "information" or val.casefold() == "low":
color = "Green"
return f"background-color: {color}"
def ent_color_cells(val):
"""Color table cells based on values in the cells"""
if isinstance(val, int):
color = "yellow" if val < 3 else "none"
elif isinstance(val, float):
color = "yellow" if val > 4.30891 or val < 2.72120 else "none"
else:
color = "none"
return "background-color: %s" % color
def ent_alerts(ent_val):
query = f" SecurityAlert | where TimeGenerated between(datetime({start})..datetime({end})) | where Entities contains '{ent_val}'"
alerts_df = qry_prov.exec_query(query)
if isinstance(alerts_df, pd.DataFrame) and not alerts_df.empty:
display_timeline(
data=alerts_df,
source_columns=["DisplayName", "AlertSeverity", "ProviderName"],
title=f"Alerts involving {ent_val}",
group_by="AlertSeverity",
height=300,
time_column="TimeGenerated",
)
from datetime import datetime, timedelta, timezone
# papermill default parameters
ws_name = "Default"
incident_id = None
end = datetime.now(timezone.utc)
start = end - timedelta(days=2)
This cell connects to the Microsoft Sentinel APIs and gets a list of subscriptions the user has access to for them to select. In order to use this the user must have at least read permissions on the Microsoft Sentinel workspace. In the drop down select the name of the subscription that contains the Microsoft Sentinel workspace you want to triage incidents from.
print(
"Configured workspaces: ",
", ".join(msticpy.settings.get_config("AzureSentinel.Workspaces").keys()),
)
import ipywidgets as widgets
ws_param = widgets.Combobox(
description="Workspace Name",
value=ws_name,
options=list(msticpy.settings.get_config("AzureSentinel.Workspaces").keys()),
)
ws_param
Now select the name of the Microsoft Sentinel workspace in the subscription you want to triage incidents from.
Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema
button.
To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.
Note that you may see a warning relating to the IPStack service when running this cell. This can be safely ignored as its not used in this case.
config_items = get_config("AzureSentinel.Workspaces")[ws_param.value]
try:
sent_prov = MicrosoftSentinel(sub_id=config_items["SubscriptionId"], res_grp=config_items["ResourceGroup"], ws_name=config_items["WorkspaceName"])
sent_prov.connect(workspace=ws_param.value)
except KeyError as e:
raise MsticpyAzureConfigError("Unable to retreive Sentinel workspace items from config. Ensure you have SubscriptionId, ResourceGroup and WorkspaceName specified.") from e
from msticpy.common.timespan import TimeSpan
#Authentication
qry_prov = QueryProvider("MSSentinel")
qry_prov.connect(WorkspaceConfig(workspace=ws_param.value))
nb_timespan = TimeSpan(start, end)
qry_prov.query_time.timespan = nb_timespan
md("<hr>")
md("Confirm time range to search", "bold")
qry_prov.query_time
The notebook is expecting your Microsoft Sentinel Tenant ID and Workspace ID to be configured in one of the following places:
config.json
in the current foldermsticpyconfig.yaml
in the current folder or location specified by MSTICPYCONFIG
environment variable.For help with setting up your config.json
file (if this hasn't been done automatically) see the ConfiguringNotebookEnvironment
notebook in the root folder of your Microsoft-Sentinel-Notebooks project. This shows you how to obtain your Workspace and Subscription IDs from the Microsoft Sentinel Portal. You can use the SubscriptionID to find your Tenant ID). To view the current config.json
run the following in a code cell.
%pfile config.json
For help with setting up your msticpyconfig.yaml
see the Setup section at the end of this notebook and the ConfigureNotebookEnvironment notebook
This imports the msticnb package and the notebooklets classes.
These are needed for the notebook operations
nb.init(query_provider=qry_prov)
ti = nb.DataProviders.instance.tilookup
timespan = TimeSpan(start=start, end=end)
This timeline shows you all events in the selected workspace, grouped by the severity of the incident.
incidents = sent_prov.get_incidents()
if isinstance(incidents, pd.DataFrame) and not incidents.empty:
incidents["date"] = pd.to_datetime(incidents["properties.createdTimeUtc"], utc=True)
incidents.rename(columns={"properties.title": "title", "properties.status": "status", "properties.severity": "severity"}, inplace=True)
filtered_incidents = incidents[incidents["date"].between(start, end)]
display_timeline(
data=filtered_incidents,
source_columns=["title", "status"],
title="Incidents over time - grouped by severity",
height=300,
group_by="severity",
time_column="date",
)
else:
md("No incidents found")
From the table below select the incident you wish to triage.
md("Select an incident to triage:", "bold")
def display_incident(incident):
details = f"""
<h3>Selected Incident: {incident['title']},</h3>
<b>Incident time: </b> {incident['properties.createdTimeUtc']} -
<b>Severity: </b> {incident['severity']} -
<b>Assigned to: </b>{incident['properties.owner.userPrincipalName']} -
<b>Status: </b> {incident['status']}
"""
new_idx = [idx.split(".")[-1] for idx in incident.index]
incident = incident.set_axis(new_idx)
return (HTML(details),pd.DataFrame(incident))
filtered_incidents["short_id"] = filtered_incidents["id"].apply(
lambda x: x.split("/")[-1]
)
alert_sel = SelectAlert(
alerts=filtered_incidents,
default_alert=incident_id,
columns=["title", "severity", "status", "name"],
time_col="date",
id_col="short_id",
action=display_incident,
)
alert_sel.display()
The cell below shows you key details relating to the incident, including the associated entities and the graph of the relationships between these entities.
incident_details = sent_prov.get_incident(
alert_sel.selected_alert.id.split("/")[-1], entities=True, alerts=True
)
ent_dfs = []
for ent in incident_details["Entities"][0]:
ent_df = pd.json_normalize(ent[1])
ent_df["Type"] = ent[0]
ent_dfs.append(ent_df)
md("Incident Entities:", "bold")
if ent_dfs:
new_df = pd.concat(ent_dfs, axis=0, ignore_index=True)
grp_df = new_df.groupby("Type")
for grp in grp_df:
md(grp[0], "bold")
display(grp[1].dropna(axis=1))
alert_out = []
if "Alerts" in incident_details.columns:
for alert in incident_details.iloc[0]["Alerts"]:
qry = f"SecurityAlert | where TimeGenerated between((datetime({start})-7d)..datetime({end})) | where SystemAlertId == '{alert['ID']}'"
df = qry_prov.exec_query(qry)
display(df)
if df.empty or not df["Entities"].iloc[0]:
alert_full = {"ID": alert["ID"], "Name": alert["Name"], "Entities": None}
else:
alert_full = {
"ID": alert["ID"],
"Name": alert["Name"],
"Entities": json.loads(df["Entities"].iloc[0]),
}
alert_out.append(alert_full)
incident_details["Alerts"] = [alert_out]
md("Graph of incident entities:", "bold")
graph = EntityGraph(incident_details.iloc[0])
graph.plot(timeline=True)
Below is an analysis of the incident's entities that appear in threat intelligence sources.
sev = []
resps = pd.DataFrame()
# For each entity look it up in Threat Intelligence data
md("Looking up entities in TI feeds...")
prog = Progress(completed_len=len(incident_details["Entities"].iloc[0]))
i = 0
result_dfs = []
for ent in incident_details["Entities"].iloc[0]:
i += 1
prog.update_progress(i)
if ent[0] == "Ip":
resp = ti.lookup_ioc(ent[1]["address"], ioc_type="ipv4")
result_dfs.append(ti.result_to_df(resp))
sev.append(resp["Severity"].unique().tolist())
if ent[0] == "Url" or ent[0] == "DnsResolution":
if "url" in ent[1]:
lkup_dom = ent[1]["url"]
else:
lkup_dom = ent[1]["domainName"]
resp = ti.lookup_ioc(lkup_dom, ioc_type="url")
result_dfs.append(ti.result_to_df(resp))
sev.append(resp["Severity"].unique().tolist())
if ent[0] == "FileHash":
resp = ti.lookup_ioc(ent[1]["hashValue"])
result_dfs.append(ti.result_to_df(resp))
sev.append(resp["Severity"].unique().tolist())
if result_dfs:
resps = pd.concat(result_dfs)
else:
resps = pd.DataFrame()
# Take overall severity of the entities based on the highest score
if "high" in sev:
severity = "High"
elif "warning" in sev:
severity = "Warning"
elif "information" in sev:
severity = "Information"
else:
severity = "None"
md("Checking to see if incident entities appear in TI data...")
incident_details["TI Severity"] = severity
# Output TI hits of high or warning severity
if (
incident_details["TI Severity"].iloc[0] == "High"
or incident_details["TI Severity"].iloc[0] == "Warning"
or incident_details["TI Severity"].iloc[0] == "Information"
):
print("Incident:")
display(
incident_details[
[
"properties.createdTimeUtc",
"properties.incidentNumber",
"properties.title",
"properties.status",
"properties.severity",
"TI Severity",
]
]
.style.applymap(ti_color_cells)
.hide(axis='index')
)
md("TI Results:", "bold")
display(
resps[["Ioc", "IocType", "Provider", "Severity", "Details"]]
.sort_values(by="Severity")
.style.applymap(ti_color_cells)
.hide(axis='index')
)
else:
md("None of the Entities appeared in TI data", "bold")
Below is an analysis of all IP entities attached to the incident.
# Enrich IP entities using the IP Summary notebooklet
ip_ent_nb = nb.nblts.azsent.network.IpAddressSummary()
if not resps.empty and "ipv4" in resps["IocType"].unique():
for ip_addr in resps[resps["IocType"] == "ipv4"]["Ioc"].unique():
folium_map = FoliumMap(width="50%", height="50%")
try:
display(HTML(f"<h2>Summary of Activity Related to {ip_addr}:</h2>"))
ip_ent_nb_out = ip_ent_nb.run(value=ip_addr, timespan=timespan, silent=True)
md(
f"{ip_addr} - {ip_ent_nb_out.ip_origin} - {ip_ent_nb_out.ip_type}",
"bold",
)
if (
isinstance(ip_ent_nb_out.whois, pd.DataFrame)
and not ip_ent_nb_out.whois.empty
):
md(f"Whois information for {ip_addr}", "bold")
display(ip_ent_nb_out.whois)
if ip_ent_nb_out.location:
md(f"Geo IP details for {ip_addr}", "bold")
folium_map.add_ip_cluster([ip_ent_nb_out.ip_entity])
display(folium_map)
if (
isinstance(ip_ent_nb_out.related_alerts, pd.DataFrame)
and not ip_ent_nb_out.related_alerts.empty
):
md(f"Alerts for {ip_addr}", "bold")
tl = nbdisplay.display_timeline(
data=ip_ent_nb_out.related_alerts,
source_columns=["AlertName", "Severity"],
title=f"Alerts associated with {ip_addr}",
height=300,
)
display(tl)
if (
isinstance(ip_ent_nb_out.ti_results, pd.DataFrame)
and not ip_ent_nb_out.ti_results.empty
):
md(f"TI results for {ip_addr}", "bold")
display(ip_ent_nb_out.ti_results)
if (
isinstance(ip_ent_nb_out.passive_dns, pd.DataFrame)
and not ip_ent_nb_out.passive_dns.empty
):
md(f"Passive DNS results for {ip_addr}", "bold")
display(ip_ent_nb_out.passive_dns)
if ip_ent_nb_out.host_entities[0]["IpAddresses"]:
md(f"{ip_addr} belongs to a known host", "bold")
display(
pd.DataFrame.from_records(
[
{
x: ip_ent_nb_out.host_entities[x]
for x in ip_ent_nb_out.host_entities.__iter__()
}
]
)
)
print(
"------------------------------------------------------------------------------------------------------------------------------------------------------------------"
)
display(HTML("<br><br>"))
except:
md(f"Error processing {ip_addr}", "bold")
else:
md("No IP entities with Threat Intel results present", "bold")
Below is an analysis of all URL entities attached to the incident.
url_nb = nb.nblts.azsent.url.URLSummary()
domain_records = pd.DataFrame()
if not resps.empty and "url" in resps["IocType"].unique():
md("Domain entity enrichment", "bold")
for url in resps[resps["IocType"] == "url"]["Ioc"].unique():
md(f"Summary of {url}", "bold")
url_nb_result = url_nb.run(timespan=timespan, value=url)
url_nb_result.display_alert_timeline()
url_nb_result.browse()
else:
md("No URL entities with Threat Intel results present", "bold")
Below is an analysis of all User entities attached to the incident.
# Enrich Account entities using the AccountSummary notebooklet
account_nb = nb.nblts.azsent.account.AccountSummary()
user = None
uent = None
if check_ent(incident_details["Entities"][0], "account") or check_ent(
incident_details["Entities"][0], "mailbox"
):
md("Account entity enrichment", "bold")
for ent in incident_details["Entities"][0]:
if ent[0] == "Account" or ent[0] == "Mailbox":
if "accountName" in ent[1].keys():
uent = ent[1]["accountName"]
elif "aadUserId" in ent[1].keys():
uent = ent[1]["aadUserId"]
elif "upn" in ent[1].keys():
uent = ent[1]["upn"]
if "upnSuffix" in ent[1].keys():
user = uent + "@" + ent[1]["upnSuffix"]
else:
user = uent
if user:
try:
ac_nb = account_nb.run(timespan=timespan, value=user, silent=True)
ac_nb.get_additional_data()
if (
isinstance(ac_nb.account_activity, pd.DataFrame)
and not ac_nb.account_activity.empty
):
display(HTML(f"<h2>Summary of Activity Related to {user}:</h2>"))
md("Recent activity")
display(ac_nb.account_activity)
else:
md(f"No user activity found for {user} between {timespan.start} and {timespan.end}")
if (
isinstance(ac_nb.related_alerts, pd.DataFrame)
and not ac_nb.related_alerts.empty
):
show(ac_nb.alert_timeline)
if (
isinstance(ac_nb.host_logon_summary, pd.DataFrame)
and not ac_nb.host_logon_summary.empty
):
md(f"Host logons by {user}")
display(ac_nb.host_logon_summary)
if (
isinstance(ac_nb.azure_activity_summary, pd.DataFrame)
and not ac_nb.azure_activity_summary.empty
):
md(f"Azure activity by {user}")
display(ac_nb.azure_activity_summary)
show(ac_nb.azure_timeline_by_provider)
if (
isinstance(ac_nb.ip_summary, pd.DataFrame)
and not ac_nb.ip_summary.empty
):
md(f"IPs used by {user}")
display(ac_nb.ip_summary)
except:
print(f"Error processing {user}")
else:
md("No Account entities present", "bold")
Below is an analysis of all Host entities attached to the incident.
# Enrich Host entities using the HostSummary notebooklet
host_nb = nb.nblts.azsent.host.HostSummary()
if check_ent(incident_details["Entities"][0], "host"):
md("Host entity enrichment", "bold")
for ent in incident_details["Entities"][0]:
if ent[0] == "Host":
if "dnsDomain" in ent[1]:
host_name = ent[1]["hostName"] + "." + ent[1]["dnsDomain"], ""
else:
host_name = ent[1]["hostName"]
md(f"Host summary for {host_name}", "bold")
try:
display(HTML(f"<h2>Summary of Activity Related to{host_name}:</h2>"))
host_sum_out = host_nb.run(value=host_name, timespan=timespan)
except:
print(f"Error processing {host_name}")
else:
md("No Host entities present", "bold")
If there are other entity types not analyzed above, a timeline of their appearance in security alerts appears below.
ent_map = {
"FieHash": "hashValue",
"Malware": "malwareName",
"File": "fileName",
"CloudApplication": "appId",
"AzureResource": "ResourceId",
"RegistryValue": "registryName",
"SecurityGroup": "SID",
"IoTDevice": "deviceId",
"Mailbox": "mailboxPrimaryAddress",
"MailMessage": "networkMessageId",
"SubmissionMail": "submissionId",
"Account": "accountName",
"Host": "hostName",
"Ip": "address",
}
for ent in incident_details["Entities"][0]:
if ent[0] in ent_map:
ent_alerts(ent[1][ent_map[ent[0]]])