Notebook Version: 1.0
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, numpy, matplotlib, networkx, ipywidgets, ipython, dnspython, ipwhois, folium, maxminddb_geolite2
Data Sources Required:
TI Proviers Used
This Notebooks brings together a series of tools and techniques to enable threat hunting within the context of a domain name or URL that has been identified as of interest. It provides a series of techniques to assist in determining whether a domain or URL is malicious. Once this has been established it provides an overview of the scope of the domain or URL across an environment, along with indicators of areas for further investigation such as hosts of interest.
Our broad initial hunting hypothesis is that a particular Linux host in our environment has been compromised, we will need to hunt from a range of different positions to validate or disprove this hypothesis.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
from pathlib import Path
from IPython.display import display, HTML, Image
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(1, 0, 0)
update_nbcheck = (
"<p style='color: orange; text-align=left'>"
"<b>Warning: we needed to update '<i>utils/nb_check.py</i>'</b><br>"
"Please restart the kernel and re-run this cell."
"</p>"
)
display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
try:
from utils.nb_check import check_versions
except ImportError as err:
%xmode Minimal
!curl https://raw.githubusercontent.com/Azure/Azure-Sentinel-Notebooks/master/utils/nb_check.py > ./utils/nb_check.py 2>/dev/null
display(HTML(update_nbcheck))
if "check_versions" not in globals():
raise ImportError("Old version of nb_check.py detected - see instructions below.")
%xmode Verbose
check_versions(REQ_PYTHON_VER, REQ_MSTICPY_VER)
# If not using Azure Notebooks, install msticpy with
# !pip install msticpy
from msticpy.nbtools import nbinit
extra_imports = [
"msticpy.nbtools, observationlist",
"msticpy.sectools, domain_utils",
"pyvis.network, Network",
]
nbinit.init_notebook(
namespace=globals(),
additional_packages=["pyvis", "python-whois"],
extra_imports=extra_imports,
);
Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:
%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)
instead of
%kql loganalytics://code().workspace(WORKSPACE_ID)
Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema
button.
To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.
# See if we have an Azure Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs
ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
ws_config.prompt_for_ws()
qry_prov = QueryProvider(data_environment="AzureSentinel")
print("done")
# Authenticate to Azure Sentinel workspace
qry_prov.connect(ws_config)
# Load TI Providers
tilookup = TILookup()
tilookup.reload_providers()
tilookup.provider_status
The notebook is expecting your Azure Sentinel Tenant ID and Workspace ID to be configured in one of the following places:
config.json
in the current foldermsticpyconfig.yaml
in the current folder or location specified by MSTICPYCONFIG
environment variable.For help with setting up your config.json
file (if this hasn't been done automatically) see the ConfiguringNotebookEnvironment
notebook in the root folder of your Azure-Sentinel-Notebooks project. This shows you how to obtain your Workspace and Subscription IDs from the Azure Sentinel Portal. You can use the SubscriptionID to find your Tenant ID). To view the current config.json
run the following in a code cell.
%pfile config.json
For help with setting up your msticpyconfig.yaml
see the Setup section at the end of this notebook and the ConfigureNotebookEnvironment notebook
Enter the domain or URL you wish to investigate. e.g. www.microsoft.com/index.html
domain_url = widgets.Text(description='Please enter your the domain or URL to investigate:',
**WIDGET_DEFAULTS)
display(domain_url)
import tldextract
graph_items = []
dom_val = domain_utils.DomainValidator()
summary = observationlist.Observations()
dom_record = None
url=domain_url.value.strip().lower()
_, domain, tld = tldextract.extract(domain_url.value)
domain = domain.lower() + "." + tld.lower()
if dom_val.validate_tld(domain) is not True:
md(f"{domain} is not a valid domain name", "bold")
if url != domain:
md(f"<strong>Domain</strong> : {domain}")
md(f"<strong>URL</strong> : {url}")
graph_items.append((domain,url))
else:
md(f"<strong>Domain</strong> : {domain}")
url = None
If you are certain the above indicators are malicious and wish to jump straight to investigating thier scope of impact in the environment jump to Related Alerts.
As a first step we want to establish if this domain or URL is known to to be malicious by our Threat Intelligence providers.
msticpyconfig.yaml
configuration File¶You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml
file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG
) pointing to its location.
To configure this file see the ConfigureNotebookEnvironment notebook and Threat intelligence provider configuration.
For Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) Run ConfiguringNotebookEnvironment
from msticpy.sectools.tiproviders.ti_provider_base import TISeverity
def conv_severity(severity):
try:
if isinstance(severity, TISeverity):
return severity
if isinstance(severity, str):
return TISeverity[severity]
else:
return TISeverity(severity)
except (ValueError, KeyError):
return TISeverity.information
def ti_check_sev(severity, threshold):
severity = conv_severity(severity)
threshold = conv_severity(threshold)
return severity.value >= threshold.value
domain_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, ioc_type='dns'))
if url is not None:
url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=url, ioc_type='url'))
md(f"Threat Intelligence Results for {url}", "bold")
display(url_ti.T)
summary.add_observation(caption="URL TI", description=f"Summary of TI for {url}", data=url_ti)
graph_items += [((url,provider)) for provider in url_ti.index
if ti_check_sev(url_ti.loc[provider]['Severity'], 1)]
md(f"Threat Intelligence Results for {domain}", "bold")
display(domain_ti.T)
summary.add_observation(caption="Domain TI", description=f"Summary of TI for {domain}", data=domain_ti)
graph_items += [((domain,provider)) for provider in domain_ti.index
if ti_check_sev(domain_ti.loc[provider]['Severity'],1)]
To build up a fuller picture of the domain we can use whois, and other data sources to gather pertinent data. Indicators such as registration data, domain entropy, and registration details can provide indicators that a domain is not legitimate in nature.
This cell uses the Open Page Rank API (https://www.domcop.com/openpagerank/) - in order to use this you need to add your API key to your msticpyconfig.yaml
configuration file (as you did for other TI providers).
To configure this file see the ConfigureNotebookEnvironment notebook and Threat intelligence provider configuration.
For Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) Run ConfiguringNotebookEnvironment
from whois import whois
from collections import Counter
def Entropy(data):
s, lens = Counter(data), np.float(len(data))
return -sum(count/lens * np.log2(count/lens) for count in s.values())
#Get a whois record for our domain
wis = whois(domain)
if wis.domain_name is not None:
# Create domain record from whois data
dom_record = pd.DataFrame({"Domain":[domain],
"Name":[wis['name']],
"Org":[wis['org']],
"DNSSec":[wis['dnssec']],
"City":[wis['city']],
"State":[wis['state']],
"Country":[wis['country']],
"Registrar": [wis['registrar']],
"Status": [wis['status']],
"Created":[wis['creation_date']],
"Expiration" : [wis['expiration_date']],
"Last Updated" : [wis['updated_date']],
"Name Servers": [wis['name_servers']]})
ns_domains = []
# Remove duplicate Name Server records
for server in wis['name_servers']:
ns_sub_d, ns_domain, ns_tld = tldextract.extract(server)
ns_dom = ns_domain.lower() + "." + ns_tld.lower()
if domain not in ns_domains:
ns_domains.append(ns_dom)
# Identity domains populatirty with Open Page Rank
page_rank = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["OPR"]))
if page_rank['RawResult'][0]:
page_rank_score = page_rank['RawResult'][0]['response'][0]['page_rank_integer']
else:
page_rank_score = 0
dom_record["Page Rank"] = [page_rank_score]
# Get a list of subdomains for the domain
url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["VirusTotal"]))
if url_ti['RawResult'][0]:
sub_doms = url_ti['RawResult'][0]['subdomains']
else:
sub_doms = 0
graph_items.append((domain, "Sub Domains"))
graph_items += [(sub,"Sub Domains") for sub in sub_doms]
dom_record['Sub Domains'] = [sub_doms]
# Work out domain entropy to identity possible DGA
dom_ent = Entropy(domain)
dom_record['Domains Entropy'] = [dom_ent]
# Add elements to graph for later plotting
if isinstance(dom_record['Created'],list):
graph_items.append((domain,dom_record['Created'][0][0]))
else:
graph_items.append((domain,dom_record['Created'][0]))
graph_items.append((domain, "Name Servers"))
graph_items += [(("Name Servers", ns)) for ns in dom_record['Name Servers'][0]]
graph_items += [(domain,dom_record['Registrar'][0]), (domain,dom_record['Country'][0]),(domain,f"Page Rank : {dom_record['Page Rank'][0]}")]
#Highlight domains with low PageRank score or if thier entropy is more than 2 standard deviations from the average for the top 1 million domains
def color_cells(val):
if isinstance(val, int):
color = 'yellow' if val < 3 else 'white'
elif isinstance(val, float):
color = 'yellow' if val > 4.30891 or val < 2.72120 else 'white'
else:
color = 'white'
return 'background-color: %s' % color
# Display whois details and highlight interesting values
display(dom_record.T.style.applymap(color_cells, subset=pd.IndexSlice[['Page Rank', 'Domains Entropy'],0]))
summary.add_observation(caption="Domain Summary", description=f"Summary of public domain records for {domain}", data=dom_record)
md("If Page Rank or Domain Entropy are highlighted this indicates that their values are outside the expected values of a legitimate website")
md(f"The average entropy for the 1M most popular domains is 3.2675")
else:
# If there is no whois data see what we can use from TI
url_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=domain, providers=["VirusTotal"]))
md(f"No current whois record exists for {domain} below are historical records")
print(url_ti['RawResult'][0]['whois'])
Does the domain have an associated tls certificate and if so is that certificate in the malicious certs list held by abuse.ch? Details such as the certificate's subject and issuer can also provide indicators as to the domains nature.
if url is not None:
scope = url
else:
scope = domain
# See if TLS cert is in abuse.ch malicious certs list and get cert details
result, x509 = dom_val.in_abuse_list(scope)
if x509 is not None:
cert_df = pd.DataFrame({"SN" :[x509.serial_number],
"Subject":[[(i.value) for i in x509.subject]],
"Issuer": [[(i.value) for i in x509.issuer]],
"Expired": [x509.not_valid_after],
"InAbuseList": result})
display(cert_df.T)
summary.add_observation(caption="TLS Summary", description=f"Summary of TLS certificate for {domain}", data=cert_df)
md("If 'InAbuseList' is True this shows that the SSL certificate fingerprint appeared in the abuse.ch list")
graph_items.append((domain,result))
else:
md("No TLS certificate was found in abuse.ch lists.")
What IP address is assocatiated with this domain, what do we know about that IP? What other domains have been associated with this IP, and is it a known ToR exit node?
In order to use this ToR lookup functionality of MSTICpy you need to configure it as a provider in your msticpyconfig.yaml
configuration file. No API key is required to use this functionality.
To configure this file see the ConfigureNotebookEnvironment notebook and Threat intelligence provider configuration.
For Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) Run ConfiguringNotebookEnvironment
import dns.resolver
from dns.resolver import NXDOMAIN
from ipwhois import IPWhois
primary_providers = [prov[0] for prov in tilookup._providers.items()]
if "VirusTotal" in tilookup.loaded_providers and "VirusTotal" not in primary_providers:
primary_providers.append("VirusTotal")
if dom_val.is_resolvable(domain) is True:
try:
answer = dns.resolver.query(domain, 'A')
except NXDOMAIN:
raise ValueError("Could not resolve IP addresses from domain.")
x = answer[0].to_text()
whois = IPWhois(x)
ipwis = whois.lookup_whois()
ip_rec = pd.DataFrame({"IP Address": [x],
"ASN" : [ipwis['asn']],
"ASN Owner": [ipwis['asn_description']],
"Country" : [ipwis['asn_country_code']],
"Date": [ipwis['asn_date']]})
ip_addresses = ip_rec['IP Address'].to_list()
graph_items += [
(ip_rec["IP Address"][0],domain),
(ip_rec["IP Address"][0],ip_rec["ASN"][0]),
(ip_rec["ASN Owner"][0],ip_rec["ASN"][0]),
(ip_rec["Country"][0],ip_rec["ASN"][0])
]
tor = None
if "Tor" in tilookup.loaded_providers:
tor = tilookup.result_to_df(tilookup.lookup_ioc(observable=ip_rec['IP Address'][0], providers=["Tor"]))
if tor is None or tor['Details'][0] == "Not found.":
ip_rec['Tor Node?'] = "No"
else:
ip_rec['Tor Node?'] = "Yes"
graph_items.append((ip_rec["IP Address"][0],"Tor Node"))
ip_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=ip_rec['IP Address'][0], providers=primary_providers))
last_10 = []
if "VirusTotal" in tilookup.loaded_providers:
last_10 = ip_ti.T['VirusTotal']['RawResult']["resolutions"][0:10]
prev_domains = []
for record in last_10:
prev_domains.append(record['hostname'])
graph_items.append((record['hostname'],ip_rec["IP Address"][0]))
ip_rec["Last 10 resolutions"] = [prev_domains]
display(ip_rec.T)
summary.add_observation(caption="IP Summary", description=f"Summary of IP assocaiated with {domain}", data=ip_rec)
else:
ip_ti = tilookup.result_to_df(tilookup.lookup_ioc(observable=answer[0].to_text()))
print(ip_ti.T['VirusTotal']['RawResult'])
Using https://browshot.com/ return a screenshot of the domain or url being investigated. This can help us identify if the site is a phishing portal.
As with other external providers you need an API key to use the BrowShot service, and have the provider configured in your msticpyconfig.yaml
file.
To configure this file see the ConfigureNotebookEnvironment notebook and Threat intelligence provider configuration.
For Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) Run ConfiguringNotebookEnvironment
if url is not None:
image_data = domain_utils.screenshot(url)
else:
image_data = domain_utils.screenshot(domain)
with open('screenshot.png', 'wb') as f:
f.write(image_data.content)
display(Image(filename='screenshot.png'))
In order to effectively evaluate the data collected above we will graph the elements to help highlight connections.
# Create graph from items saved to graph_items
import networkx as nx
import matplotlib.pyplot as plt
G=nx.Graph()
for item in graph_items:
G.add_edge(item[0],str(item[1]))
# Plot Graph with pyvis
net=Network(height=900, width=900, notebook=True)
net.barnes_hut()
net.from_nx(G)
net.set_options("""
var options = {"nodes": {"color": {"highlight": {"border": "rgba(233,77,49,1)"},"hover": {"border": "rgba(233,77,49,1)"}},
"scaling": {"min": 1},"size": 7},
"edges": {"color": {"inherit": true}, "smooth": false},
"interaction": {"hover": true,"multiselect": true},
"manipulation": {"enabled": true},
"physics": {"enabled": false,"barnesHut": {"gravitationalConstant": -80000,"springLength": 250,"springConstant": 0.001},"minVelocity": 0.75}
}""")
net.show("graph.html")
# If the intereactive graph does not display correcrtly uncomment the three lines below to access display a non-interactive version
import matplotlib.pyplot as plt
plt.figure(3,figsize=(12,12))
nx.draw(G, with_labels=True, font_weight='bold')
Once we have determined the nature of the domain or URL under investigation we want to see what the scope of impact is in our environment but identifying any presence of the domain or URL in our datasets. If the domain has a high page rank score it is likely that it will be highly prevalent in a large environment, therefore you may wish to consider whether or not to run these cells for such a domain due to the data volumes involved.
if dom_record is None or int(dom_record["Page Rank"]) < 6:
warning = None
md(f"The Page Rank score for {domain} is low, querying for this domain should not present issues.")
else:
md_warn(f"{domain} has a high Page Rank score, it is likely to be highly prevalent in the environment.")
md("Please confirm below that you wish to proceed, note that some queries are likely to be slow due to large amounts of data", "bold")
warning = widgets.Checkbox(
value=False,
description='Are you sure?',
disabled=False
)
display(warning)
# Establish if we want to investigate just the URL or domain and URL
if warning is not None and warning.value == False:
md_warn("Please check the box above to confirm you wish to proceed")
else:
if url is not None:
md("Do you wish to search on the URL alone or URL and Domain? For mallicious URLs on known good domains you may wish to only search on the URL to get more granular results.")
scope_selection = widgets.RadioButtons(
options=['URL Only', 'URL and Domain'],
disabled=False
)
display(scope_selection)
else:
scope_selection = None
md(f"Searching data for {domain}")
host_list = []
# Set a time scope for our investigation
if scope_selection is not None:
if scope_selection.value == "URL Only":
scope = url
else:
scope = f"{domain}|{url}"
else:
scope = domain
query_times = nbwidgets.QueryTime(units='day',
max_before=20, max_after=1, before=3)
query_times.display()
#Get any alerts associated with the domain or URL
alerts = qry_prov.SecurityAlert.list_alerts(
query_times)
if isinstance(alerts, pd.DataFrame) and not alerts.empty:
related_alerts = alerts[alerts["Entities"].str.contains(scope)]
else:
alerts = None
display(HTML("No alerts found"))
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
related_alerts_items = (related_alerts[['AlertName', 'TimeGenerated']]
.groupby('AlertName').TimeGenerated.agg('count').to_dict())
def print_related_alerts(alertDict, entityType, entityName):
if len(alertDict) > 0:
display(Markdown(
f"### Found {len(alertDict)} different alert types related to this {entityType} (\'{entityName}\')"))
for (k, v) in alertDict.items():
display(Markdown(f"- {k}, Count of alerts: {v}"))
else:
display(
Markdown(f"No alerts for {entityType} entity \'{entityName}\'"))
# Display alerts on timeline to aid in visual grouping
print_related_alerts(related_alerts_items, 'domain', domain)
nbdisplay.display_timeline(
data=related_alerts, source_columns=["AlertName"], title="Host alerts over time", height=300, color="red")
score = len(related_alerts.index)/2
summary.add_observation(caption="Alerts", description=f"Alerts linked to {scope}", data=related_alerts, score=score)
else:
md("No related alerts found.")
rel_alert_select = None
def show_full_alert(selected_alert):
global security_alert, alert_ip_entities
security_alert = SecurityAlert(
rel_alert_select.selected_alert)
nbdisplay.display_alert(security_alert, show_entities=True)
# Show selected alert when selected
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
display(Markdown('### Click on alert to view details.'))
rel_alert_select = nbwidgets.SelectAlert(alerts=related_alerts,
action=show_full_alert)
rel_alert_select.display()
else:
md('No related alerts found.')
Hosts that have communicated with the domain or URL under investigation may have indicators of this activity in thier logs, especially if the domain or URL was referenced in a command line argument. The context that the domain or URL is observed in may provide some indication of what the activity was.
host_log_query = f"""
Syslog
| where TimeGenerated >= datetime({query_times.start})
| where TimeGenerated <= datetime({query_times.end})
| where SyslogMessage matches regex "{scope}"
| union isfuzzy = true (
SecurityEvent
| where TimeGenerated >= datetime({query_times.start})
| where TimeGenerated <= datetime({query_times.end})
| where CommandLine matches regex "{scope}")
"""
# Identify any hosts with logs relating to this URL or domain and provide a summary of those hosts
host_logs_df = qry_prov.exec_query(host_log_query)
if not host_logs_df.empty:
md(f"Summary of logs containing {scope} by host:", "bold")
host_log_sum = pd.DataFrame({'Log Count' : host_logs_df.groupby(['Computer']).count()['TimeGenerated']}).reset_index()
display(host_log_sum.style.hide_index())
#Add details to a summary for later use
summary.add_observation(caption="Host Log Summary", description=f"Summary of logs containing {scope} by host", data=host_log_sum)
ioc_extractor = iocextract.IoCExtract()
print('Extracting IPs, Domains and URLs from logs.......')
ioc_df = ioc_extractor.extract(data=host_logs_df,
columns=['SyslogMessage', 'CommandLine'],
os_family='Linux',
ioc_types=['ipv4', 'ipv6', 'dns', 'url'])
md("Network artifacts found in logs:", "bold")
display(ioc_df.drop('SourceIndex', axis=1).style.hide_index())
# Collect a list of ip addresses associated with the domain or url
ip_addresses += [(ip) for ip in ioc_df[ioc_df['IoCType'] == "ipv4"]['Observable'] if ip not in ip_addresses]
else:
md(f"No host logs found containing {domain} or {url}")
#Display the logs associated with the domain or URL for each host
def view_logs(host):
display(host_logs_df.query('Computer == @host'))
if not host_logs_df.empty:
items = host_log_sum['Computer'].dropna().unique().tolist()
host_list = items
md(f"<h3>View all host logs that contains {scope}</h3>")
log_view = widgets.Dropdown(
options=items, description='Select Computer to view raw logs', disabled=False, **WIDGET_DEFAULTS)
display(widgets.interactive(view_logs, host=log_view))
else:
md(f"No host logs found containing {domain} or {url}")
Often network devices will logs connection activity that can help identity which hosts have communicated with a given domain or URL, and may provide additional detail as to the nature of this communication.
net_query = f"""
CommonSecurityLog
| where TimeGenerated > datetime({query_times.start})
| where TimeGenerated < datetime({query_times.end})
| where RequestURL contains "{scope}" or AdditionalExtensions contains "{scope}"
"""
net_logs_df = qry_prov.exec_query(net_query)
# Search for indicators of network device logs containing the domain or URL. If any area summarize this data and add indicators to lists.
if not net_logs_df.empty:
md(f"Count of network connections to {scope} by hosts:")
host_count = pd.DataFrame({'Connection Count' : net_logs_df.groupby(['SourceIP','DestinationIP','DestinationPort', 'RequestURL']).count()['TimeGenerated']}).reset_index()
display(host_count.style.hide_index())
summary.add_observation(caption="Network Log Summary", description=f"Summary of network connections to {scope} by host", data=host_count)
ip.addresses += [(ip) for ip in host_count['DestinationIP'] if ip not in ip_addresses]
else:
md(f"No network device logs found containing {scope}")
def view_net_logs(host):
display(net_logs_df.query('SourceIP == @host'))
if not net_logs_df.empty:
# Display logs from any network devices that contain the domain or URL
items = net_logs_df['SourceIP'].dropna().unique().tolist()
host_list += items
md(f"<h3>View all host logs that contains {scope}</h3>")
net_log_view = widgets.Dropdown(
options=items, description='Select IP to view raw logs', disabled=False, **WIDGET_DEFAULTS)
display(widgets.interactive(view_net_logs, host=net_log_view))
else:
md(f"No network device logs found containing {scope}")
A host communicating with a domain is going to need to resolve that domain first, this can provide us details of other IP addresses associated with the domain. In addition the type of requests made can help us identify activity such as data exfiltration via DNS.
if "DnsEvents" in qry_prov.schema:
dns_query = f"""
DnsEvents
| where TimeGenerated > datetime({query_times.start})
| where TimeGenerated < datetime({query_times.end})
| where SubType == "LookupQuery"
| where tolower(Name) contains "{scope}"
| where isnotempty(IPAddresses)
"""
# Seach DNS logs for resolutions of the domain
dns_logs_df = qry_prov.exec_query(dns_query)
if not dns_logs_df.empty:
ip_addr = dns_logs_df[dns_logs_df['TimeGenerated'] == dns_logs_df['TimeGenerated'].max()]['IPAddresses'].replace("", np.nan).dropna().to_list()
new_ips = len(ip_addresses)
# Identity any DNS responses for the domain that contain IP addresses not previously identified
ip_addresses += [(ip) for ip in ip_addr if ip not in ip_addresses]
if len(ip_addresses) > new_ips:
md(f"New IP Addresses found for {domain}: ")
print(ip_addresses[(new_ips-1):])
host_list += dns_logs_df['ClientIP'].unique().tolist()
host_count = dns_logs_df.groupby('ClientIP').count()['Name']
host_resolutions = pd.DataFrame({"Count of DNS Lookups": dns_logs_df.groupby('ClientIP').count()['Name']}).reset_index()
md(f"Count of resolutions for {domain} by host:")
display(host_resolutions.style.hide_index())
summary.add_observation(caption="DNS Log Summary", description=f"Summary of DNS resolutions of {scope} by host", data=host_resolutions)
else:
md(f"No DNS device logs found containing {scope}")
else:
dns_logs_df = None
md("No DNS events avaliable in workspace")
# Check DNS logs for indicators of data exfiltration or tunnelling via DNS
if dns_logs_df is not None:
import msticpy.sectools.base64unpack as b64
lookups = dns_logs_df['Name'].dropna().unique().tolist()
potential_tunnels = []
for lookup in lookups:
if len(lookup) > 250:
print(f"Suspicious domain length {lookup}")
sub_d, _, _ = tldextract.extract(lookup)
req = sub_d.replace(".","")
score = Entropy(req)
if score > (3.2675 + 0.5) or score < (3.2675 - 0.5):
potential_tunnels.append(lookup)
base64 = b64.unpack(req)
if not base64[1].empty:
potential_tunnels.append(lookup)
suspicious_queries = dns_logs_df[dns_logs_df['Name'].isin(potential_tunnels)]
if suspicious_queries.empty:
md(f"No DNS lookups found for {domain}")
suspect_tunnels = None
else:
md("Potential DNS Tunnelling:")
suspect_tunnels = pd.DataFrame({"Count of DNS Lookups": suspicious_queries.groupby(['Name','ClientIP']).count()['TimeGenerated']})
display(suspect_tunnels.reset_index().style.hide_index())
summary.add_observation(caption="DNS Tunnelling", description=f"Potential DNS Tunnelling", data=suspect_tunnels)
else:
md("No DNS events avaliable in workspace")
In Microsoft Azure network flow logs can help identify hosts connecting to the domain or URL as well as provide some context as to the nature of these connections.
# Check Azure flow logs for any connections to the domain or URL.
if 'AzureNetworkAnalytics_CL' not in qry_prov.schema:
az_net_comms_df = None
md('No Azure network data avaliable in this workspace.')
else:
az_net_comms_df = qry_prov.Network.list_azure_network_flows_by_ip(query_times, ip_address_list=ip_addresses)
if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty:
az_net_comms_df.head()
az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + az_net_comms_df['AllowedInFlows']
nbdisplay.display_timeline(
data=az_net_comms_df,
group_by="L7Protocol",
title="Network Flows by Protocol",
time_column="FlowStartTime",
source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
height=300,
legend="right",
yaxis=True
)
nbdisplay.display_timeline(
data=az_net_comms_df,
group_by="FlowDirection",
title="Network Flows by Direction",
time_column="FlowStartTime",
source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
height=300,
legend="right",
yaxis=True
)
else:
md(f"No Azure network data for {domain} in this timerange.")
if az_net_comms_df is not None and not az_net_comms_df.empty:
flow_plot = nbdisplay.display_timeline_values(data=az_net_comms_df,
group_by="L7Protocol",
source_columns=["FlowType",
"AllExtIPs",
"L7Protocol",
"FlowDirection",
"TotalAllowedFlows"],
time_column="FlowStartTime",
y="TotalAllowedFlows",
legend="right",
legend_column="L7Protocol",
height=500,
kind=["vbar", "circle"]);
else:
md(f"No Azure network data avaliable.")
if az_net_comms_df is not None and not az_net_comms_df.empty:
cols = [
"VMName",
"VMIPAddress",
"PublicIPs",
"SrcIP",
"DestIP",
"L4Protocol",
"L7Protocol",
"DestPort",
"FlowDirection",
"AllExtIPs",
"TotalAllowedFlows",
]
flow_index = az_net_comms_df[cols].copy()
def get_source_ip(row):
if row.FlowDirection == "O":
return row.VMIPAddress if row.VMIPAddress else row.SrcIP
else:
return row.AllExtIPs if row.AllExtIPs else row.DestIP
def get_dest_ip(row):
if row.FlowDirection == "O":
return row.AllExtIPs if row.AllExtIPs else row.DestIP
else:
return row.VMIPAddress if row.VMIPAddress else row.SrcIP
flow_index["source"] = flow_index.apply(get_source_ip, axis=1)
flow_index["dest"] = flow_index.apply(get_dest_ip, axis=1)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
display(
flow_index[
["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]
]
.groupby(["source", "dest", "L7Protocol", "FlowDirection"])
.sum()
.reset_index()
.style.bar(subset=["TotalAllowedFlows"], color="#d65f5f")
)
summary.add_observation(caption="Network Flow Summary", description=f"Summary of network flows to and from IPs associated with {scope}", data=flow_index)
else:
flow_index = None
md(f"No Azure network data avaliable.")
if flow_index is not None and not flow_index.empty:
net_ips = flow_index['source'].dropna().unique().tolist() + flow_index['dest'].dropna().unique().tolist()
md("Resolving hostnames please be patient this may take some time")
ip.addresses = ip_addresses + [(ip) for ip in net_ips if ip not in ip_addresses]
for ip in ip_addresses:
host_res = qry_prov.Network.get_host_for_ip(query_times, ip_address=ip)
host_list.append(host_res['Computer'][0])
md("Hosts added to host list")
else:
md(f"No Azure network data avaliable.")
During the cells executed above we have identified hosts communicating with the domain or IP in question. These hosts are potential candidates for further investigation using Azure Sentinel or via other entity explorer Notebook. This cell provides a summary of these hosts and well as details of any alerts we have that are associated with these hosts.
import re
pattern = re.compile("^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
# Simplify to list
host_ip_list = [(host) for host in host_list if pattern.match(host)]
for ip in host_ip_list:
host_list.remove(ip)
host_name = qry_prov.Network.get_host_for_ip( query_times, ip_address=ip)
if not host_name.empty:
host_list.append(host_name['Computer'][0])
if alerts is not None:
alert_count = [((len(alerts[alerts["Entities"].str.contains(host)].index))) for host in host_list]
host_alerts = pd.DataFrame({"Hosts":host_list,
"Count of Host Alerts": alert_count})
if host_alerts.empty:
md(f"No hosts observed having an association with {domain}")
else:
summary.add_observation(caption="Host Alerts", description=f"A list of hosts observed communicating with {scope} and any alerts associated with them", data=host_alerts)
md(f"""
During the investigation the following hosts have been observed as having an association with {domain}.
The count of alerts for each host is to provide guidance on which hosts should be considered for prioritization
in further investigation.""")
display(host_alerts.style.hide_index())
md(f"Domain: {domain}", "bold")
md(f"URL: {url}", "bold")
summary.display_observations()