import json
import os
from collections import Counter
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from bokeh.plotting import show
from whois import whois
import msticnb as nb
from msticpy.common.azure_auth import az_connect
from msticpy.common.timespan import TimeSpan
from msticpy.data.data_providers import QueryProvider
from msticpy.nbtools.utility import md
from msticpy.sectools.tilookup import TILookup
try:
os.environ.pop("AZURE_CLIENT_SECRET")
except:
pass
pd.options.mode.chained_assignment = None
def check_ent(items, entity):
"""Check if entity is present"""
for item in items:
if entity in item:
return True
return False
def are_equal(ent1, ent2):
"""Checks properties to see if the entities are the same."""
result = True
for prop in ent1.properties:
result &= getattr(ent1, prop, None) == getattr(ent2, prop, None)
return result
def color_cells(val):
"""Color cells of output dataframe based on severity"""
color = "none"
if isinstance(val, str):
if val.casefold() == "high":
color = "Red"
elif val.casefold() == "warning" or val.casefold() == "medium":
color = "Orange"
elif val.casefold() == "information" or val.casefold() == "low":
color = "Green"
return f"background-color: {color}"
def Entropy(data):
"""Calculate entropy of string"""
s, lens = Counter(data), np.float(len(data))
return -sum(count / lens * np.log2(count / lens) for count in s.values())
def color_cells(val):
"""Color table cells based on values in the cells"""
if isinstance(val, int):
color = "yellow" if val < 3 else "none"
elif isinstance(val, float):
color = "yellow" if val > 4.30891 or val < 2.72120 else "none"
else:
color = "none"
return "background-color: %s" % color
# Connect to Azure Sentinel
%env KQLMAGIC_LOAD_MODE=silent
qry_prov = QueryProvider("LogAnalytics")
creds = az_connect(["cli", "msi"])
token = creds.modern.get_token("https://api.loganalytics.io/.default")
token_dict = {
"access_token": token.token,
"token_type": "Bearer",
"resource": "https://api.loganalytics.io/",
}
qry_prov.connect(
connection_str=f"loganalytics://code().tenant('{ten_id}').workspace('{ws_id}')",
kqlmagic_args=f"-try_token=token_dict",
)
# Establish threat intelligence provider
ti = TILookup()
# Set up notebooklets
nb.init(qry_prov)
timespan = TimeSpan(start=datetime.now() - timedelta(days=7))
# Get details of the incident from the Securityincident table
incidents = qry_prov.exec_query(
f"SecurityIncident | where IncidentName =~ '{incident_id}'"
)
incident = incidents.iloc[0]
# Extract individual alerts from the incident
alert_id_list = ", ".join(list({f'"{id}"' for id in incident.AlertIds}))
query = f"SecurityAlert | where SystemAlertId in ({alert_id_list})"
alert_df = qry_prov.exec_query(query)
# For each alert in the incident extract the entities and build a unique list
from collections import defaultdict
alert_entities = {}
unique_entities = defaultdict(list)
for _, row in alert_df.iterrows():
try:
alert = SecurityAlert(row)
alert_entities[alert.SystemAlertId] = alert.entities
for ent in alert.entities:
for current_ent in unique_entities.get(ent.Type, []):
if are_equal(ent, current_ent):
break
else:
unique_entities[ent.Type].append(ent)
except:
pass
sev = []
resps = pd.DataFrame()
# For each entity look it up in Threat Intelligence data
for ent in unique_entities.items():
if (ent[0]) == "ip":
for ip in ent[1]:
resp = ti.lookup_ioc(
observable=ip["Address"], ioc_type="ipv4"
)
resps = resps.append(ti.result_to_df(resp), ignore_index=True)
for response in resp[1]:
sev.append(response[1].severity)
if (ent[0]) == "url":
for url in ent[1]:
resp = ti.lookup_ioc(
observable=url["Url"], ioc_type="url"
)
resps = resps.append(ti.result_to_df(resp), ignore_index=True)
for response in resp[1]:
sev.append(response[1].severity)
# Take overall severity of the entities based on the highest score
if "high" in sev:
severity = "High"
elif "warning" in sev:
severity = "Warning"
elif "information" in sev:
severity = "Information"
else:
severity = "None"
incident["TI Severity"] = severity
# Output TI hits of high or warning severity
if incident["TI Severity"] == "High" or incident["TI Severity"] == "Warning":
print("Incident:")
display(
incident.to_frame()
.T[
[
"TimeGenerated",
"IncidentNumber",
"Title",
"Status",
"Severity",
"TI Severity",
]
]
.style.applymap(color_cells)
.hide_index()
)
print("TI Results:")
display(
resps[["Ioc", "IocType", "Provider", "Severity", "Details"]]
.style.applymap(color_cells)
.hide_index()
)
# Enrich IP entities using the IP Summary notebooklet
ip_ent_nb = nb.nblts.azsent.network.IpAddressSummary()
if not resps.empty and "ipv4" in resps["IocType"].unique():
for ip_addr in resps[resps["IocType"] == "ipv4"]["Ioc"].unique():
try:
ip_ent_nb_out = ip_ent_nb.run(value=ip_addr, timespan=timespan, silent=True)
if (
isinstance(ip_ent_nb_out.whois, pd.DataFrame)
and not ip_ent_nb_out.whois.empty
):
md(f"Whois information for {ip_addr}")
display(ip_ent_nb_out.whois)
if (
isinstance(ip_ent_nb_out.geoip, pd.DataFrame)
and not ip_ent_nb_out.geoip.empty
):
md(f"Geo IP details for {ip_addr}")
display(ip_ent_nb_out.geoip)
if (
isinstance(ip_ent_nb_out.related_alerts, pd.DataFrame)
and not ip_ent_nb_out.related_alerts.empty and ip_ent_nb_out.alert_timeline
):
md(f"Alerts for {ip_addr}")
show(ip_ent_nb_out.alert_timeline)
if (
isinstance(ip_ent_nb_out.ti_results, pd.DataFrame)
and not ip_ent_nb_out.ti_results.empty
):
md(f"TI results for {ip_addr}")
display(ip_ent_nb_out.ti_results)
if (
isinstance(ip_ent_nb_out.passive_dns, pd.DataFrame)
and not ip_ent_nb_out.passive_dns.empty
):
md(f"Passive DNS results for {ip_addr}")
display(ip_ent_nb_out.passive_dns)
if (
isinstance(ip_ent_nb_out.vps_network, pd.DataFrame)
and not ip_ent_nb_out.vps_network.empty
):
md(f"{ip_addr} belongs to a known VPS provider")
display(ip_ent_out.vps_network)
if (
isinstance(ip_ent_nb_out.host_entity, pd.DataFrame)
and not ip_ent_nb_out.host_entity.empty
):
md(f"{ip_addr} belongs to a known host")
display(ip_ent_out.host_entity)
except:
print(f"Error processing {ip_addr}")
else:
md("No IP entities present ")
# Enrich Domain entities
domain_items = [
"name",
"org",
"city",
"state",
"country",
"registrar",
"status",
"creation_date",
"expiration_date",
"updated_date",
"name_servers",
"dnssec",
]
domain_records = pd.DataFrame()
if not resps.empty and "url" in resps["IocType"].unique():
for url in resps[resps["IocType"] == "url"]["Ioc"].unique():
md(f"Summary for {url}", "bold")
wis = whois(url)
if wis.domain_name:
if isinstance(wis["domain_name"], list):
domain = wis["domain_name"][0]
else:
domain = wis["domain_name"]
# Create domain record from whois data
dom_rec = {}
for key in wis.keys():
if key in domain_items:
dom_rec[key] = [wis[key]]
dom_rec["domain"] = domain
dom_record = pd.DataFrame(dom_rec)
page_rank = ti.result_to_df(
ti.lookup_ioc(observable=domain, providers=["OPR"])
)
page_rank_score = page_rank["RawResult"][0]["response"][0][
"page_rank_integer"
]
dom_record["Page Rank"] = [page_rank_score]
dom_ent = Entropy(domain)
dom_record["Entropy"] = [dom_ent]
# Highlight page rank of entropy scores of note
display(
dom_record.T.style.applymap(
color_cells, subset=pd.IndexSlice[["Page Rank", "Entropy"], 0]
)
)
md(
"If Page Rank or Domain Entropy are highlighted this indicates that their values are outside the expected values of a legitimate website"
)
md(f"The average entropy for the 1M most popular domains is 3.2675")
else:
md("No Domain entities present ")
# Enrich Account entities using the AccountSummary notebooklet
timespan = TimeSpan(start=datetime.now() - timedelta(days=2))
account_nb = nb.nblts.azsent.account.AccountSummary()
user = None
uent = None
if check_ent(unique_entities.items(), "account"):
for ent in unique_entities.items():
if ent[0] == "account":
if "AadUserId" in str(ent[1][0]):
try:
uent = ent[1][1]
except IndexError:
pass
else:
uent = ent[1][0]
if uent:
try:
user = uent["Name"] + "@" + uent["UPNSuffix"]
except TypeError:
user = uent["Name"]
if user:
try:
ac_nb = account_nb.run(
timespan=timespan, value=user.casefold(), silent=True
)
ac_nb.get_additional_data()
md(f"Account summary for {user}", "bold")
if (
isinstance(ac_nb.account_activity, pd.DataFrame)
and not ac_nb.account_activity.empty
):
md("Recent activity")
display(ac_nb.account_activity)
if (
isinstance(ac_nb.related_alerts, pd.DataFrame)
and not ac_nb.related_alerts.empty
):
show(ac_nb.alert_timeline)
if (
isinstance(ac_nb.host_logon_summary, pd.DataFrame)
and not ac_nb.host_logon_summary.empty
):
md(f"Host logons by {user}")
display(ac_nb.host_logon_summary)
if (
isinstance(ac_nb.azure_activity_summary, pd.DataFrame)
and not ac_nb.azure_activity_summary.empty
):
md(f"Azure activity by {user}")
display(ac_nb.azure_activity_summary)
show(ac_nb.azure_timeline_by_provider)
except:
print(f"Error processing {user}")
else:
md("No Account entities present ")
# Enrich Host entities using the HostSummary notebooklet
timespan = TimeSpan(start=datetime.now() - timedelta(days=2))
host_nb = nb.nblts.azsent.host.HostSummary()
if check_ent(unique_entities.items(), "host"):
for ent in unique_entities.items():
if ent[0] == "host":
for host in ent[1]:
if host["DnsDomain"]:
host_name = host["HostName"] + "." + host["DnsDomain"], ""
else:
host_name = host["HostName"]
md(f"Host summary for {host_name}", "bold")
try:
host_sum_out = host_nb.run(value=host_name, timespan=timespan)
except:
print(f"Error processing {host_name}")
else:
md("No Host entities present")