Notebook Version: 1.1
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, pandas_bokeh, numpy, matplotlib, networkx, seaborn, datetime, ipywidgets, ipython, dnspython, ipwhois, folium, maxminddb_geolite2
Data Sources Required:
This Notebooks brings together a series of tools and techniques to enable threat hunting within the context of a singular Linux host. The notebook utilizes a range of data sources to achieve this but in order to support the widest possible range of scenarios this Notebook prioritizes using common Syslog data. If there is detailed auditd data available for a host you may wish to edit the Notebook to rely primarily on this dataset, as it currently stands auditd is used when available to provide insight not otherwise available via Syslog.
Our broad initial hunting hypothesis is that a particular Linux host in our environment has been compromised, we will need to hunt from a range of different positions to validate or disprove this hypothesis.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
from pathlib import Path
from IPython.display import display, HTML
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(1, 0, 0)
REQ_MP_EXTRAS = ["ml"]
# If the installation fails try to manually install using
# %pip install --upgrade msticpy
from msticpy.nbtools import nbinit
additional_packages = [
"oauthlib", "pyvis", "python-whois", "seaborn"
]
nbinit.init_notebook(
namespace=globals(),
additional_packages=additional_packages,
extra_imports=extra_imports,
);
from bokeh.models import ColumnDataSource, FactorRange
from bokeh.palettes import viridis
from bokeh.plotting import show, Row, figure
from bokeh.transform import factor_cmap, cumsum
from dns import reversename, resolver
from functools import lru_cache
from ipaddress import ip_address
from ipwhois import IPWhois
from math import pi
from msticpy.common.exceptions import MsticpyException
from msticpy.nbtools import observationlist
from msticpy.nbtools.foliummap import get_map_center
from msticpy.sectools import auditdextract
from msticpy.sectools.cmd_line import risky_cmd_line
from msticpy.sectools.ip_utils import convert_to_ip_entities
from msticpy.sectools.syslog_utils import create_host_record, cluster_syslog_logons_df, risky_sudo_sessions
from pyvis.network import Network
import datetime as dt
import re
Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:
%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)
instead of
%kql loganalytics://code().workspace(WORKSPACE_ID)
Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema
button.
To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.
# See if we have a Microsoft Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs
ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
ws_config.prompt_for_ws()
qry_prov = QueryProvider(data_environment="AzureSentinel")
print("done")
# Authenticate to Microsoft Sentinel workspace
qry_prov.connect(ws_config)
To begin the hunt we need to et the time frame in which you wish to test your compromised host hunting hypothesis within. Use the widget below to select your start and end time for the hunt.
query_times = nbwidgets.QueryTime(units='day',
max_before=14, max_after=1, before=1)
query_times.display()
Select the host you want to test your hunting hypothesis against, only hosts with Syslog data within the time frame you specified are available. If the host you wish to select is not present try adjusting your time frame.
#Get a list of hosts with syslog data in our hunting timegframe to provide easy selection
syslog_query = f"""Syslog | where TimeGenerated between (datetime({query_times.start}) .. datetime({query_times.end})) | summarize by Computer"""
md("Collecting avaliable host details...")
hosts_list = qry_prov._query_provider.query(query=syslog_query)
if isinstance(hosts_list, pd.DataFrame) and not hosts_list.empty:
hosts = hosts_list["Computer"].unique().tolist()
host_text = nbwidgets.SelectItem(description='Select host to investigate: ',
item_list=hosts, width='75%', auto_display=True)
else:
display(md("There are no hosts with syslog data in this time period to investigate"))
Below is a overview of the selected host based on available data sources.
hostname=host_text.value
az_net_df = None
# Collect data on the host
all_syslog_query = f"Syslog | where TimeGenerated between (datetime({query_times.start}) .. datetime({query_times.end})) | where Computer =~ '{hostname}'"""
all_syslog_data = qry_prov.exec_query(all_syslog_query)
if isinstance(all_syslog_data, pd.DataFrame) and not all_syslog_data.empty:
heartbeat_query = f"""Heartbeat | where TimeGenerated >= datetime({query_times.start}) | where TimeGenerated <= datetime({query_times.end})| where Computer == '{hostname}' | top 1 by TimeGenerated desc nulls last"""
if "AzureNetworkAnalytics_CL" in qry_prov.schema:
aznet_query = f"""AzureNetworkAnalytics_CL | where TimeGenerated >= datetime({query_times.start}) | where TimeGenerated <= datetime({query_times.end}) | where VirtualMachine_s has '{hostname}' | where ResourceType == 'NetworkInterface' | top 1 by TimeGenerated desc | project PrivateIPAddresses = PrivateIPAddresses_s, PublicIPAddresses = PublicIPAddresses_s"""
print("Getting network data...")
az_net_df = qry_prov.exec_query(query=aznet_query)
print("Getting host data...")
host_hb = qry_prov.exec_query(query=heartbeat_query)
# Create host entity record, with Azure network data if any is avaliable
if az_net_df is not None and isinstance(az_net_df, pd.DataFrame) and not az_net_df.empty:
host_entity = create_host_record(syslog_df=all_syslog_data, heartbeat_df=host_hb, az_net_df=az_net_df)
else:
host_entity = create_host_record(syslog_df=all_syslog_data, heartbeat_df=host_hb)
md(
"<b>Host Details</b><br>"
f"<b>Hostname</b>: {host_entity.computer}<br>"
f"<b>OS</b>: {host_entity.OSType} {host_entity.OSName}<br>"
f"<b>IP Address</b>: {host_entity.IPAddress.Address}<br>"
f"<b>Location</b>: {host_entity.IPAddress.Location.CountryName}<br>"
f"<b>Installed Applications</b>: {host_entity.Applications}<br>"
)
else:
md_warn("No Syslog data found, check hostname and timeframe.")
md("The data query may be timing out, consider reducing the timeframe size.")
This section provides an overview of any security alerts or Hunting Bookmarks in Microsoft Sentinel related to this host, this will help scope and guide our hunt.
related_alerts = qry_prov.SecurityAlert.list_related_alerts(
query_times, host_name=hostname)
realted_bookmarks = qry_prov.AzureSentinel.list_bookmarks_for_entity(query_times, entity_id=hostname)
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
host_alert_items = (related_alerts[['AlertName', 'TimeGenerated']]
.groupby('AlertName').TimeGenerated.agg('count').to_dict())
def print_related_alerts(alertDict, entityType, entityName):
if len(alertDict) > 0:
md(f"Found {len(alertDict)} different alert types related to this {entityType} (\'{entityName}\')")
for (k, v) in alertDict.items():
md(f"- {k}, Count of alerts: {v}")
else:
md(f"No alerts for {entityType} entity \'{entityName}\'")
print_related_alerts(host_alert_items, 'host', host_entity.HostName)
nbdisplay.display_timeline(
data=related_alerts, source_columns=["AlertName"], title="Host alerts over time", height=300, color="red")
else:
md('No related alerts found.')
if isinstance(realted_bookmarks, pd.DataFrame) and not realted_bookmarks.empty:
nbdisplay.display_timeline(data=realted_bookmarks, source_columns=["BookmarkName"], height=200, color="orange", title="Host bookmarks over time",)
else:
md('No related bookmarks found.')
rel_alert_select = None
def show_full_alert(selected_alert):
global security_alert, alert_ip_entities
security_alert = SecurityAlert(
rel_alert_select.selected_alert)
nbdisplay.display_alert(security_alert, show_entities=True)
# Show selected alert when selected
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
related_alerts['CompromisedEntity'] = related_alerts['Computer']
md('### Click on alert to view details.')
rel_alert_select = nbwidgets.SelectAlert(alerts=related_alerts,
action=show_full_alert)
rel_alert_select.display()
else:
md('No related alerts found.')
Based on the security alerts for this host we can choose to re-scope our hunting time frame.
if rel_alert_select is None or rel_alert_select.selected_alert is None:
start = query_times.start
else:
start = rel_alert_select.selected_alert['TimeGenerated']
# Set new investigation time windows based on the selected alert
invest_times = nbwidgets.QueryTime(
units='day', max_before=24, max_after=12, before=1, after=1, origin_time=start)
invest_times.display()
Whilst this notebook is linear in layout it doesn't need to be linear in usage. We have selected our host to investigate and set an initial hunting time-frame to work within. We can now start to test more specific hunting hypothesis with the aim of validating our broader initial hunting hypothesis. To do this we can start by looking at:
You can choose to start below with a hunt in host logon events or choose to jump to one of the other sections listed above. The order in which you choose to run each of these major sections doesn't matter, they are each self contained. You may also choose to rerun sections based on your findings from running other sections.
This notebook uses external threat intelligence sources to enrich data. The next cell loads the TILookup class.
Note: to use TILookup you will need configuration settings in your msticpyconfig.yaml
see TIProviders documenation
and Configuring Notebook Environment notebook
or ConfiguringNotebookEnvironment (GitHub static view)
tilookup = TILookup()
md("Threat intelligence provider loading complete.")
Hypothesis: That an attacker has gained legitimate access to the host via compromised credentials and has logged into the host to conduct malicious activity.
This section provides an overview of logon activity for the host within our hunting time frame, the purpose of this is to allow for the identification of anomalous logons or attempted logons.
# Collect logon events for this, seperate them into sucessful and unsucessful and cluster sucessful one into sessions
logon_events = qry_prov.LinuxSyslog.user_logon(start=invest_times.start, end=invest_times.end, host_name=hostname)
remote_logons = None
failed_logons = None
if isinstance(logon_events, pd.DataFrame) and not logon_events.empty:
remote_logons = (logon_events[logon_events['LogonResult'] == 'Success'])
failed_logons = (logon_events[logon_events['LogonResult'] == 'Failure'])
else:
print("No logon events in this timeframe")
if (isinstance(remote_logons, pd.DataFrame) and not remote_logons.empty) or (isinstance(failed_logons, pd.DataFrame) and not failed_logons.empty):
#Provide a timeline of sucessful and failed logon attempts to aid identification of potential brute force attacks
display(Markdown('### Timeline of sucessful host logons.'))
tooltip_cols = ['User', 'ProcessName', 'SourceIP']
if rel_alert_select is not None:
logon_timeline = nbdisplay.display_timeline(data=remote_logons, overlay_data=failed_logons, source_columns=tooltip_cols, height=200, overlay_color="red", alert = rel_alert_select.selected_alert)
else:
logon_timeline = nbdisplay.display_timeline(data=remote_logons, overlay_data=failed_logons, source_columns=tooltip_cols, height=200, overlay_color="red")
display(Markdown('<b>Key:</b><p style="color:darkblue">Sucessful logons </p><p style="color:Red">Failed Logon Attempts (via su)</p>'))
all_df = pd.DataFrame(dict(successful= remote_logons['ProcessName'].value_counts(), failed = failed_logons['ProcessName'].value_counts())).fillna(0)
fail_data = pd.value_counts(failed_logons['User'].values, sort=True).head(10).reset_index(name='value').rename(columns={'User':'Count'})
fail_data['angle'] = fail_data['value']/fail_data['value'].sum() * 2*pi
fail_data['color'] = viridis(len(fail_data))
fp = figure(plot_height=350, plot_width=450, title="Relative Frequencies of Failed Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value")
fp.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=fail_data)
sucess_data = pd.value_counts(remote_logons['User'].values, sort=False).reset_index(name='value').rename(columns={'User':'Count'})
sucess_data['angle'] = sucess_data['value']/sucess_data['value'].sum() * 2*pi
sucess_data['color'] = viridis(len(sucess_data))
sp = figure(plot_height=350, width=450, title="Relative Frequencies of Sucessful Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value")
sp.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=sucess_data)
fp.axis.axis_label=None
fp.axis.visible=False
fp.grid.grid_line_color = None
sp.axis.axis_label=None
sp.axis.visible=False
sp.grid.grid_line_color = None
processes = all_df.index.values.tolist()
results = all_df.columns.values.tolist()
fail_sucess_data = {'processes' :processes,
'sucess' : all_df['successful'].values.tolist(),
'failure': all_df['failed'].values.tolist()}
palette = viridis(2)
x = [ (process, result) for process in processes for result in results ]
counts = sum(zip(fail_sucess_data['sucess'], fail_sucess_data['failure']), ())
source = ColumnDataSource(data=dict(x=x, counts=counts))
b = figure(x_range=FactorRange(*x), plot_height=350, plot_width=450, title="Failed and Sucessful logon attempts by process",
toolbar_location=None, tools="", y_minor_ticks=2)
b.vbar(x='x', top='counts', width=0.9, source=source, line_color="white",
fill_color=factor_cmap('x', palette=palette, factors=results, start=1, end=2))
b.y_range.start = 0
b.x_range.range_padding = 0.1
b.xaxis.major_label_orientation = 1
b.xgrid.grid_line_color = None
show(Row(sp,fp,b))
ip_list = [convert_to_ip_entities(i, ip_col="SourceIP")[0] for i in remote_logons['SourceIP'].unique() if i != ""]
ip_fail_list = [convert_to_ip_entities(i)[0] for i in failed_logons['SourceIP'].unique() if i != ""]
location = get_map_center(ip_list + ip_fail_list)
folium_map = FoliumMap(location = location, zoom_start=1.4)
#Map logon locations to allow for identification of anomolous locations
if len(ip_fail_list) > 0:
md('<h3>Map of Originating Location of Logon Attempts</h3>')
icon_props = {'color': 'red'}
folium_map.add_ip_cluster(ip_entities=ip_fail_list, **icon_props)
if len(ip_list) > 0:
icon_props = {'color': 'green'}
folium_map.add_ip_cluster(ip_entities=ip_list, **icon_props)
display(folium_map.folium_map)
md('<p style="color:red">Warning: the folium mapping library '
'does not display correctly in some browsers.</p><br>'
'If you see a blank image please retry with a different browser.')
Based on the detail above if you wish to focus your hunt on a particular user jump to the User Activity section. Alternatively to further further refine our hunt we need to select a logon session to view in more detail. Select a session from the list below to continue. Sessions that occurred at the time an alert was raised for this host, or where the user has a abnormal ratio of failed to successful login attempts are highlighted.
logon_sessions_df = None
try:
print("Clustering logon sessions...")
logon_sessions_df = cluster_syslog_logons_df(logon_events)
except Exception as err:
print(f"Error clustering logons: {err}")
if logon_sessions_df is not None:
logon_sessions_df["Alerts during session?"] = np.nan
# check if any alerts occur during logon window.
logon_sessions_df['Start (UTC)'] = [(time - dt.timedelta(seconds=5)) for time in logon_sessions_df['Start']]
logon_sessions_df['End (UTC)'] = [(time + dt.timedelta(seconds=5)) for time in logon_sessions_df['End']]
for TimeGenerated in related_alerts['TimeGenerated']:
logon_sessions_df.loc[(TimeGenerated >= logon_sessions_df['Start (UTC)']) & (TimeGenerated <= logon_sessions_df['End (UTC)']), "Alerts during session?"] = "Yes"
logon_sessions_df.loc[logon_sessions_df['User'] == 'root', "Root?"] = "Yes"
logon_sessions_df.replace(np.nan, "No", inplace=True)
ratios = []
for _, row in logon_sessions_df.iterrows():
suc_fail = logon_events.apply(lambda x: True if x['User'] == row['User'] and x["LogonResult"] == 'Success' else(
False if x['User'] == row['User'] and x["LogonResult"] == 'Failure' else None), axis=1)
numofsucess = len(suc_fail[suc_fail == True].index)
numoffail = len(suc_fail[suc_fail == False].index)
if numoffail == 0:
ratio = 1
else:
ratio = numofsucess/numoffail
ratios.append(ratio)
logon_sessions_df["Sucessful to failed logon ratio"] = ratios
def color_cells(val):
if isinstance(val, str):
color = 'yellow' if val == "Yes" else 'white'
elif isinstance(val, float):
color = 'yellow' if val > 0.5 else 'white'
else:
color = 'white'
return 'background-color: %s' % color
display(logon_sessions_df[['User','Start (UTC)', 'End (UTC)', 'Alerts during session?', 'Sucessful to failed logon ratio', 'Root?']]
.style.applymap(color_cells).hide_index())
logon_items = (
logon_sessions_df[['User','Start (UTC)', 'End (UTC)']]
.to_string(header=False, index=False, index_names=False)
.split('\n')
)
logon_sessions_df["Key"] = logon_items
logon_sessions_df.set_index('Key', inplace=True)
logon_dict = logon_sessions_df[['User','Start (UTC)', 'End (UTC)']].to_dict('index')
logon_selection = nbwidgets.SelectItem(description='Select logon session to investigate: ',
item_dict=logon_dict , width='80%', auto_display=True)
else:
md("No logon sessions during this timeframe")
def view_syslog(selected_facility):
return [syslog_events.query('Facility == @selected_facility')]
# Produce a summary of user modification actions taken
if "Add" in x:
return len(add_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist())
elif "Modify" in x:
return len(mod_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist())
elif "Delete" in x:
return len(del_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist())
else:
return ""
crn_tl_data = {}
user_tl_data = {}
sudo_tl_data = {}
sudo_sessions = None
tooltip_cols = ['SyslogMessage']
if logon_sessions_df is not None:
#Collect data based on the session selected for investigation
invest_sess = {'StartTimeUtc': logon_selection.value.get('Start (UTC)'), 'EndTimeUtc': logon_selection.value.get(
'End (UTC)'), 'Account': logon_selection.value.get('User'), 'Host': hostname}
session = entities.HostLogonSession(invest_sess)
syslog_events = qry_prov.LinuxSyslog.all_syslog(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host)
sudo_events = qry_prov.LinuxSyslog.sudo_activity(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host, user=session.Account)
if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty:
try:
sudo_sessions = cluster_syslog_logons_df(logon_events=sudo_events)
except MsticpyException:
pass
# Display summary of cron activity in session
cron_events = qry_prov.LinuxSyslog.cron_activity(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host)
if not isinstance(cron_events, pd.DataFrame) or cron_events.empty:
md(f'<h3> No Cron activity for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}</h3>')
else:
cron_events['CMD'].replace('', np.nan, inplace=True)
crn_tl_data = {"Cron Exections": {"data": cron_events[['TimeGenerated', 'CMD', 'CronUser', 'SyslogMessage']].dropna(), "source_columns": tooltip_cols, "color": "Blue"},
"Cron Edits": {"data": cron_events.loc[cron_events['SyslogMessage'].str.contains('EDIT')], "source_columns": tooltip_cols, "color": "Green"}}
md('<h2> Most common commands run by cron:</h2>')
md('This shows how often each cron job was exected within the specified time window')
cron_commands = (cron_events[['EventTime', 'CMD']]
.groupby(['CMD']).count()
.dropna()
.style
.set_table_attributes('width=900px, text-align=center')
.background_gradient(cmap='Reds', low=0.5, high=1)
.format("{0:0>1.0f}"))
display(cron_commands)
# Display summary of user and group creations, deletions and modifications during the session
user_activity = qry_prov.LinuxSyslog.user_group_activity(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host)
if not isinstance(user_activity, pd.DataFrame) or user_activity.empty:
md(f'<h3>No user or group moidifcations for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}></h3>')
else:
add_events = user_activity[user_activity['UserGroupAction'].str.contains(
'Add')]
del_events = user_activity[user_activity['UserGroupAction'].str.contains(
'Delete')]
mod_events = user_activity[user_activity['UserGroupAction'].str.contains(
'Modify')]
user_activity['Count'] = user_activity.groupby('UserGroupAction')['UserGroupAction'].transform('count')
if add_events.empty and del_events.empty and mod_events.empty:
md('<h2> Users and groups added or deleted:</h2<>')
md(f'No users or groups were added or deleted on {host_entity.HostName} between {query_times.start} and {query_times.end}')
user_tl_data = {}
else:
md("<h2>Users added, modified or deleted</h2>")
display(user_activity[['UserGroupAction','Count']].drop_duplicates().style.hide_index())
account_actions = pd.DataFrame({"User Additions": [add_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()],
"User Modifications": [mod_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()],
"User Deletions": [del_events.replace("", np.nan).dropna(subset=['User'])['User'].unique().tolist()]})
display(account_actions.style.hide_index())
user_tl_data = {"User adds": {"data": add_events, "source_columns": tooltip_cols, "color": "Orange"},
"User deletes": {"data": del_events, "source_columns": tooltip_cols, "color": "Red"},
"User modfications": {"data": mod_events, "source_columns": tooltip_cols, "color": "Grey"}}
# Display sudo activity during session
if not isinstance(sudo_sessions, pd.DataFrame) or sudo_sessions.empty:
md(f"<h3>No Sudo sessions for {session.Host} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}</h3>")
sudo_tl_data = {}
else:
sudo_start = sudo_events[sudo_events["SyslogMessage"].str.contains(
"pam_unix.+session opened")].rename(columns={"Sudoer": "User"})
sudo_tl_data = {"Host logons": {"data": remote_logons, "source_columns": tooltip_cols, "color": "Cyan"},
"Sudo sessions": {"data": sudo_start, "source_columns": tooltip_cols, "color": "Purple"}}
try:
risky_actions = cmd_line.risky_cmd_line(events=sudo_events, log_type="Syslog")
suspicious_events = cmd_speed(
cmd_events=sudo_events, time=60, events=2, cmd_field="Command")
except:
risky_actions = None
suspicious_events = None
if risky_actions is None and suspicious_events is None:
pass
else:
risky_sessions = risky_sudo_sessions(
risky_actions=risky_actions, sudo_sessions=sudo_sessions, suspicious_actions=suspicious_events)
for key in risky_sessions:
if key in sudo_sessions:
sudo_sessions[f"{key} - {risky_sessions[key]}"] = sudo_sessions.pop(
key)
if isinstance(sudo_events, pd.DataFrame):
sudo_events_val = sudo_events[['EventTime', 'CommandCall']][sudo_events['CommandCall']!=""].dropna(how='any', subset=['CommandCall'])
if sudo_events_val.empty:
md(f"No sucessful sudo activity for {hostname} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}")
else:
sudo_events.replace("", np.nan, inplace=True)
md('<h2> Frequency of sudo commands</h2>')
md('This shows how many times each command has been run with sudo. /bin/bash is usally associated with the use of "sudo -i"')
sudo_commands = (sudo_events[['EventTime', 'CommandCall']]
.groupby(['CommandCall'])
.count()
.dropna()
.style
.set_table_attributes('width=900px, text-align=center')
.background_gradient(cmap='Reds', low=.5, high=1)
.format("{0:0>3.0f}"))
display(sudo_commands)
else:
md(f"No sucessful sudo activity for {hostname} between {logon_selection.value.get('Start (UTC)')} and {logon_selection.value.get('End (UTC)')}")
# Display a timeline of all activity during session
crn_tl_data.update(user_tl_data)
crn_tl_data.update(sudo_tl_data)
if crn_tl_data:
md('<h2> Session Timeline.</h2>')
nbdisplay.display_timeline(
data=crn_tl_data, title='Session Timeline', height=300)
else:
md("No logon sessions during this timeframe")
Use this syslog message data to further investigate suspicous activity during the session
if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty:
#Return syslog data and present it to the use for investigation
session_syslog = qry_prov.LinuxSyslog.all_syslog(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=session.Host)
if session_syslog.empty:
display(HTML(
f' No syslog for {session.Host} between {session.StartTimeUtc} and {session.EndTimeUtc}'))
def view_sudo(selected_cmd):
return [sudo_events.query('CommandCall == @selected_cmd')[
['TimeGenerated', 'SyslogMessage', 'Sudoer', 'SudoTo', 'Command', 'CommandCall']]]
# Show syslog messages associated with selected sudo command
items = sudo_events['CommandCall'].dropna().unique().tolist()
if items:
md("<h3>View all messages associated with a sudo command</h3>")
display(nbwidgets.SelectItem(item_list=items, action=view_sudo))
else:
md("No logon sessions during this timeframe")
if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty:
# Display syslog messages from the session witht he facility selected
items = syslog_events['Facility'].dropna().unique().tolist()
md("<h3>View all messages associated with a syslog facility</h3>")
display(nbwidgets.SelectItem(item_list=items, action=view_syslog))
else:
md("No logon sessions during this timeframe")
if isinstance(logon_sessions_df, pd.DataFrame) and not logon_sessions_df.empty:
display(HTML("<h3>Process Trees from session</h3>"))
print("Building process tree, this may take some time...")
# Find the table with auditd data in
regex = '.*audit.*\_cl?'
matches = ((re.match(regex, key, re.IGNORECASE)) for key in qry_prov.schema)
for match in matches:
if match != None:
audit_table = match.group(0)
else:
audit_table = None
# Retrieve auditd data
if audit_table:
audit_data = qry_prov.LinuxAudit.auditd_all(
start=session.StartTimeUtc, end=session.EndTimeUtc, host_name=hostname
)
if isinstance(audit_data, pd.DataFrame) and not audit_data.empty:
audit_events = auditdextract.extract_events_to_df(
data=audit_data
)
process_tree = auditdextract.generate_process_tree(audit_data=audit_events)
process_tree.mp_process_tree.plot()
else:
display(HTML("No auditd data avaliable to build process tree"))
else:
display(HTML("No auditd data avaliable to build process tree"))
else:
md("No logon sessions during this timeframe")
Click here to start a process/application focused hunt or continue with session based hunt below by selecting a sudo session to investigate.
Sudo activity is often required by an attacker to conduct actions on target, and more granular data is avalibale for sudo sessions allowing for deeper level hunting within these sesions.
if logon_sessions_df is not None and sudo_sessions is not None:
sudo_items = sudo_sessions[['User','Start', 'End']].to_string(header=False,
index=False,
index_names=False).split('\n')
sudo_sessions["Key"] = sudo_items
sudo_sessions.set_index('Key', inplace=True)
sudo_dict = sudo_sessions[['User','Start', 'End']].to_dict('index')
sudo_selection = nbwidgets.SelectItem(description='Select sudo session to investigate: ',
item_dict=sudo_dict, width='100%', height='300px', auto_display=True)
else:
sudo_selection = None
md("No logon sessions during this timeframe")
#Collect data associated with the sudo session selected
sudo_events = None
from msticpy.sectools.tiproviders.ti_provider_base import TISeverity
def ti_check_sev(severity, threshold):
severity = TISeverity.parse(severity)
threshold = TISeverity.parse(threshold)
return severity.value >= threshold.value
if sudo_selection:
sudo_sess = {'StartTimeUtc': sudo_selection.value.get('Start'), 'EndTimeUtc': sudo_selection.value.get(
'End'), 'Account': sudo_selection.value.get('User'), 'Host': hostname}
sudo_session = entities.HostLogonSession(sudo_sess)
sudo_events = qry_prov.LinuxSyslog.sudo_activity(start=sudo_session.StartTimeUtc.round(
'-1s') - pd.Timedelta(seconds=1), end=(sudo_session.EndTimeUtc.round('1s')+ pd.Timedelta(seconds=1)), host_name=sudo_session.Host)
if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty:
display(sudo_events.replace('', np.nan).dropna(axis=0, subset=['Command'])[
['TimeGenerated', 'Command', 'CommandCall', 'SyslogMessage']])
# Extract IOCs from the data
ioc_extractor = iocextract.IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else 'Linux'
print('Extracting IoCs.......')
ioc_df = ioc_extractor.extract(data=sudo_events,
columns=['SyslogMessage'],
os_family=os_family,
ioc_types=['ipv4', 'ipv6', 'dns', 'url',
'md5_hash', 'sha1_hash', 'sha256_hash'])
if len(ioc_df) > 0:
ioc_count = len(
ioc_df[["IoCType", "Observable"]].drop_duplicates())
md(f"Found {ioc_count} IOCs")
#Lookup the extracted IOCs in TI feed
ti_resps = tilookup.lookup_iocs(data=ioc_df[["IoCType", "Observable"]].drop_duplicates(
).reset_index(), obs_col='Observable', ioc_type_col='IoCType')
i = 0
ti_hits = []
ti_resps.reset_index(drop=True, inplace=True)
while i < len(ti_resps):
if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1):
ti_hits.append(ti_resps['Ioc'][i])
i += 1
else:
i += 1
md(f"Found {len(ti_hits)} IoCs in Threat Intelligence")
for ioc in ti_hits:
md(f"Messages containing IoC found in TI feed: {ioc}")
display(sudo_events[sudo_events['SyslogMessage'].str.contains(
ioc)][['TimeGenerated', 'SyslogMessage']])
else:
md("No IoC patterns found in Syslog Messages.")
else:
md('No sudo messages for this session')
else:
md("No Sudo session to investigate")
Hypothesis: That an attacker has gained access to the host and is using a user account to conduct actions on the host.
This section provides an overview of activity by user within our hunting time frame, the purpose of this is to allow for the identification of anomalous activity by a user. This hunt can be driven be investigation of suspected users or as a hunt across all users seen on the host.
# Get list of users with logon or sudo sessions on host
logon_events = qry_prov.LinuxSyslog.user_logon(query_times, host_name=hostname)
users = logon_events['User'].replace('', np.nan).dropna().unique().tolist()
all_users = list(users)
if isinstance(sudo_events, pd.DataFrame) and not sudo_events.empty:
sudoers = sudo_events['Sudoer'].replace(
'', np.nan).dropna().unique().tolist()
all_users.extend(x for x in sudoers if x not in all_users)
# Pick Users
if not logon_events.empty:
user_select = nbwidgets.SelectItem(description='Select user to investigate: ',
item_list=all_users, width='75%', auto_display=True)
else:
md("There was no user activity in the timeframe specified.")
user_select = None
folium_user_map = FoliumMap()
def view_sudo(cmd):
return [user_sudo_hold.query('CommandCall == @cmd')[
['TimeGenerated', 'HostName', 'Command', 'CommandCall', 'SyslogMessage']]]
user_sudo_hold = None
if user_select is not None:
# Get all syslog relating to these users
username = user_select.value
user_events = all_syslog_data[all_syslog_data['SyslogMessage'].str.contains(username)]
logon_sessions = cluster_syslog_logons_df(logon_events)
# Display all logons associated with the user
md(f"<h1> User Logon Activity for {username}</h1>")
user_logon_events = logon_events[logon_events['User'] == username]
try:
user_logon_sessions = cluster_syslog_logons_df(user_logon_events)
except:
user_logon_sessions = None
user_remote_logons = (
user_logon_events[user_logon_events['LogonResult'] == 'Success']
)
user_failed_logons = (
user_logon_events[user_logon_events['LogonResult'] == 'Failure']
)
if not user_remote_logons.empty:
for _, row in logon_sessions_df.iterrows():
end = row['End']
user_sudo_events = qry_prov.LinuxSyslog.sudo_activity(start=user_remote_logons.sort_values(
by='TimeGenerated')['TimeGenerated'].iloc[0], end=end, host_name=hostname, user=username)
else:
user_sudo_events = None
if user_logon_sessions is None and user_remote_logons.empty and user_failed_logons.empty:
pass
else:
display(HTML(
f"{len(user_remote_logons)} sucessfull logons and {len(user_failed_logons)} failed logons for {username}"))
display(Markdown('### Timeline of host logon attempts.'))
tooltip_cols = ['SyslogMessage']
dfs = {"User Logons" :user_remote_logons, "Failed Logons": user_failed_logons, "Sudo Events" :user_sudo_events}
user_tl_data = {}
for k,v in dfs.items():
if v is not None and not v.empty:
user_tl_data.update({k :{"data":v,"source_columns":tooltip_cols}})
nbdisplay.display_timeline(
data=user_tl_data, title="User logon timeline", height=300)
all_user_df = pd.DataFrame(dict(successful= user_remote_logons['ProcessName'].value_counts(), failed = user_failed_logons['ProcessName'].value_counts())).fillna(0)
processes = all_user_df.index.values.tolist()
results = all_user_df.columns.values.tolist()
user_fail_sucess_data = {'processes' :processes,
'sucess' : all_user_df['successful'].values.tolist(),
'failure': all_user_df['failed'].values.tolist()}
palette = viridis(2)
x = [ (process, result) for process in processes for result in results ]
counts = sum(zip(user_fail_sucess_data['sucess'], fail_sucess_data['failure']), ())
source = ColumnDataSource(data=dict(x=x, counts=counts))
b = figure(x_range=FactorRange(*x), plot_height=350, plot_width=450, title="Failed and Sucessful logon attempts by process",
toolbar_location=None, tools="", y_minor_ticks=2)
b.vbar(x='x', top='counts', width=0.9, source=source, line_color="white",
fill_color=factor_cmap('x', palette=palette, factors=results, start=1, end=2))
b.y_range.start = 0
b.x_range.range_padding = 0.1
b.xaxis.major_label_orientation = 1
b.xgrid.grid_line_color = None
user_logons = pd.DataFrame({"Sucessful Logons" : [int(all_user_df['successful'].sum())],
"Failed Logons" : [int(all_user_df['failed'].sum())]}).T
user_logon_data = pd.value_counts(user_logon_events['LogonResult'].values, sort=True).head(10).reset_index(name='value').rename(columns={'User':'Count'})
user_logon_data = user_logon_data[user_logon_data['index']!="Unknown"].copy()
user_logon_data['angle'] = user_logon_data['value']/user_logon_data['value'].sum() * 2*pi
user_logon_data['color'] = viridis(len(user_logon_data))
p = figure(plot_height=350, plot_width=450, title="Relative Frequencies of Failed Logons by Account", toolbar_location=None, tools="hover", tooltips="@index: @value")
p.axis.visible = False
p.xgrid.visible = False
p.ygrid.visible = False
p.wedge(x=0, y=1, radius=0.5, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color="white", fill_color='color', legend='index', source=user_logon_data)
show(Row(p,b))
user_ip_list = [convert_to_ip_entities(i)[0] for i in user_remote_logons['SourceIP']]
user_ip_fail_list = [convert_to_ip_entities(i)[0] for i in user_failed_logons['SourceIP']]
user_location = get_map_center(ip_list + ip_fail_list)
user_folium_map = FoliumMap(location = location, zoom_start=1.4)
#Map logon locations to allow for identification of anomolous locations
if len(ip_fail_list) > 0:
md('<h3>Map of Originating Location of Logon Attempts</h3>')
icon_props = {'color': 'red'}
user_folium_map.add_ip_cluster(ip_entities=user_ip_fail_list, **icon_props)
if len(ip_list) > 0:
icon_props = {'color': 'green'}
user_folium_map.add_ip_cluster(ip_entities=user_ip_list, **icon_props)
display(user_folium_map.folium_map)
md('<p style="color:red">Warning: the folium mapping library '
'does not display correctly in some browsers.</p><br>'
'If you see a blank image please retry with a different browser.')
#Display sudo activity of the user
if not isinstance(user_sudo_events, pd.DataFrame) or user_sudo_events.empty:
md(f"<h3>No sucessful sudo activity for {username}</h3>")
else:
user_sudo_hold = user_sudo_events
user_sudo_commands = (user_sudo_events[['EventTime', 'CommandCall']].replace('', np.nan).groupby(['CommandCall']).count().dropna().style.set_table_attributes('width=900px, text-align=center').background_gradient(cmap='Reds', low=.5, high=1).format("{0:0>3.0f}"))
display(user_sudo_commands)
md("Select a sudo command to investigate in more detail")
display(nbwidgets.SelectItem(item_list=items, action=view_sudo))
else:
md("No user session selected")
# If the user has sudo activity extract and IOCs from the logs and look them up in TI feeds
if not isinstance(user_sudo_hold, pd.DataFrame) or user_sudo_hold.empty:
md(f"No sudo messages data")
else:
# Extract IOCs
ioc_extractor = iocextract.IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else 'Linux'
print('Extracting IoCs.......')
ioc_df = ioc_extractor.extract(data=user_sudo_hold,
columns=['SyslogMessage'],
ioc_types=['ipv4', 'ipv6', 'dns', 'url', 'md5_hash', 'sha1_hash', 'sha256_hash'])
if len(ioc_df) > 0:
ioc_count = len(ioc_df[["IoCType", "Observable"]].drop_duplicates())
md(f"Found {ioc_count} IOCs")
ti_resps = tilookup.lookup_iocs(data=ioc_df[["IoCType", "Observable"]].drop_duplicates(
).reset_index(), obs_col='Observable', ioc_type_col='IoCType')
i = 0
ti_hits = []
ti_resps.reset_index(drop=True, inplace=True)
while i < len(ti_resps):
if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1):
ti_hits.append(ti_resps['Ioc'][i])
i += 1
else:
i += 1
md(f"Found {len(ti_hits)} IoCs in Threat Intelligence")
for ioc in ti_hits:
md(f"Messages containing IoC found in TI feed: {ioc}")
display(user_sudo_hold[user_sudo_hold['SyslogMessage'].str.contains(
ioc)][['TimeGenerated', 'SyslogMessage']])
else:
md("No IoC patterns found in Syslog Message.")
Hypothesis: That an attacker has compromised an application running on the host and is using the applications process to conduct actions on the host.
This section provides an overview of activity by application within our hunting time frame, the purpose of this is to allow for the identification of anomalous activity by an application. This hunt can be driven be investigation of suspected applications or as a hunt across all users seen on the host.
# Get list of Applications
apps = all_syslog_data['ProcessName'].replace('', np.nan).dropna().unique().tolist()
system_apps = ['sudo', 'CRON', 'systemd-resolved', 'snapd',
'50-motd-news', 'systemd-logind', 'dbus-deamon', 'crontab']
if len(host_entity.Applications) > 0:
installed_apps = []
installed_apps.extend(x for x in apps if x not in system_apps)
# Pick Applications
app_select = nbwidgets.SelectItem(description='Select sudo session to investigate: ',
item_list=installed_apps, width='75%', auto_display=True)
else:
display(HTML("No applications other than stand OS applications present"))
# Get all syslog relating to these Applications
app = app_select.value
app_data = all_syslog_data[all_syslog_data['ProcessName'] == app].copy()
# App log volume over time
if isinstance(app_data, pd.DataFrame) and not app_data.empty:
app_data_volume = app_data.set_index(
"TimeGenerated").resample('5T').count()
app_data_volume.reset_index(level=0, inplace=True)
app_data_volume.rename(columns={"TenantId" : "NoOfLogMessages"}, inplace=True)
nbdisplay.display_timeline_values(data=app_data_volume, y='NoOfLogMessages', source_columns=['NoOfLogMessages'], title=f"{app} log volume over time")
app_high_sev = app_data[app_data['SeverityLevel'].isin(
['emerg', 'alert', 'crit', 'err', 'warning'])]
if isinstance(app_high_sev, pd.DataFrame) and not app_high_sev.empty:
app_hs_volume = app_high_sev.set_index(
"TimeGenerated").resample('5T').count()
app_hs_volume.reset_index(level=0, inplace=True)
app_hs_volume.rename(columns={"TenantId" : "NoOfLogMessages"}, inplace=True)
nbdisplay.display_timeline_values(data=app_hs_volume, y='NoOfLogMessages', source_columns=['NoOfLogMessages'], title=f"{app} high severity log volume over time")
risky_messages = risky_cmd_line(events=app_data, log_type="Syslog", cmd_field="SyslogMessage")
if risky_messages:
print(risky_messages)
Due to the large volume of data involved you may wish to make you query window smaller
if rel_alert_select is None or rel_alert_select.selected_alert is None:
start = query_times.start
else:
start = rel_alert_select.selected_alert['TimeGenerated']
# Set new investigation time windows based on the selected alert
proc_invest_times = nbwidgets.QueryTime(units='hours',
max_before=6, max_after=3, before=2, origin_time=start)
proc_invest_times.display()
audit_table = None
app_audit_data = None
app = app_select.value
process_tree_data = None
regex = '.*audit.*\_cl?'
# Find the table with auditd data in and collect the data
matches = ((re.match(regex, key, re.IGNORECASE)) for key in qry_prov.schema)
for match in matches:
if match != None:
audit_table = match.group(0)
#Check if the amount of data expected to be returned is a reasonable size, if not prompt before continuing
if audit_table != None:
if isinstance(app_audit_data, pd.DataFrame):
pass
else:
print('Collecting audit data, please wait this may take some time....')
app_audit_query_count = f"""{audit_table}
| where TimeGenerated >= datetime({proc_invest_times.start})
| where TimeGenerated <= datetime({proc_invest_times.end})
| where Computer == '{hostname}'
| summarize count()
"""
count_check = qry_prov.exec_query(query=app_audit_query_count)
if count_check['count_'].iloc[0] > 100000 and not count_check.empty:
size = count_check['count_'].iloc[0]
print(
f"You are returning a very large dataset ({size} rows).",
"It is reccomended that you consider scoping the size\n",
"of your query down.\n",
"Are you sure you want to proceed?"
)
response = (input("Y/N") or "N")
if (
(count_check['count_'].iloc[0] < 100000)
or (count_check['count_'].iloc[0] > 100000
and response.casefold().startswith("y"))
):
print("querying audit data...")
audit_data = qry_prov.LinuxAudit.auditd_all(
start=proc_invest_times.start, end=proc_invest_times.end, host_name=hostname
)
if isinstance(audit_data, pd.DataFrame) and not audit_data.empty:
print("building process tree...")
audit_events = auditdextract.extract_events_to_df(
data=audit_data
)
process_tree_data = auditdextract.generate_process_tree(audit_data=audit_events)
plot_lim = 1000
if len(process_tree) > plot_lim:
md_warn(f"More than {plot_lim} processes to plot, limiting to top {plot_lim}.")
process_tree[:plot_lim].mp_process_tree.plot(legend_col="exe")
else:
process_tree.mp_process_tree.plot(legend_col="exe")
size = audit_events.size
print(f"Collected {size} rows of data")
else:
md("No audit events avalaible")
else:
print("Resize query window")
else:
md("No audit events avalaible")
md(f"<h3>Process tree for {app}</h3>")
if process_tree_data is not None:
process_tree_df = process_tree_data[process_tree_data["exe"].str.contains(app, na=False)].copy()
if not process_tree_df.empty:
app_roots = process_tree_data.apply(lambda x: ptree.get_root(process_tree_data, x), axis=1)
trees = []
for root in app_roots["source_index"].unique():
trees.append(process_tree_data[process_tree_data["path"].str.startswith(root)])
app_proc_trees = pd.concat(trees)
app_proc_trees.mp_process_tree.plot(legend_col="exe", show_table=True)
else:
display(f"No process tree data avaliable for {app}")
process_tree = None
else:
md("No data avaliable to build process tree")
These logs are associated with the process being investigated and include IOCs that appear in our TI feeds.
# Extract IOCs from syslog assocated with the selected process
ioc_extractor = iocextract.IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else 'Linux'
md('Extracting IoCs...')
ioc_df = ioc_extractor.extract(data=app_data,
columns=['SyslogMessage'],
ioc_types=['ipv4', 'ipv6', 'dns', 'url',
'md5_hash', 'sha1_hash', 'sha256_hash'])
if process_tree_data is not None and not process_tree_data.empty:
app_process_tree = app_proc_trees.dropna(subset=['cmdline'])
audit_ioc_df = ioc_extractor.extract(data=app_process_tree,
columns=['cmdline'],
ioc_types=['ipv4', 'ipv6', 'dns', 'url',
'md5_hash', 'sha1_hash', 'sha256_hash'])
ioc_df = ioc_df.append(audit_ioc_df)
# Look up IOCs in TI feeds
if len(ioc_df) > 0:
ioc_count = len(ioc_df[["IoCType", "Observable"]].drop_duplicates())
md(f"Found {ioc_count} IOCs")
md("Looking up threat intel...")
ti_resps = tilookup.lookup_iocs(data=ioc_df[[
"IoCType", "Observable"]].drop_duplicates().reset_index(drop=True), obs_col='Observable')
i = 0
ti_hits = []
ti_resps.reset_index(drop=True, inplace=True)
while i < len(ti_resps):
if ti_resps['Result'][i] == True and ti_check_sev(ti_resps['Severity'][i], 1):
ti_hits.append(ti_resps['Ioc'][i])
i += 1
else:
i += 1
display(HTML(f"Found {len(ti_hits)} IoCs in Threat Intelligence"))
for ioc in ti_hits:
display(HTML(f"Messages containing IoC found in TI feed: {ioc}"))
display(app_data[app_data['SyslogMessage'].str.contains(
ioc)][['TimeGenerated', 'SyslogMessage']])
else:
md("<h3>No IoC patterns found in Syslog Message.</h3>")
Hypothesis: That an attacker is remotely communicating with the host in order to compromise the host or for C2 or data exfiltration purposes after compromising the host.
This section provides an overview of network activity to and from the host during hunting time frame, the purpose of this is to allow for the identification of anomalous network traffic. If you wish to investigate a specific IP in detail it is recommended that you use the IP Explorer Notebook (include link).
# Get list of IPs from Syslog and Azure Network Data
ioc_extractor = iocextract.IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else 'Linux'
print('Finding IP Addresses this may take a few minutes.......')
syslog_ips = ioc_extractor.extract(data=all_syslog_data,
columns=['SyslogMessage'],
ioc_types=['ipv4', 'ipv6'])
if 'AzureNetworkAnalytics_CL' not in qry_prov.schema:
az_net_comms_df = None
az_ips = None
else:
if hasattr(host_entity, 'private_ips') and hasattr(host_entity, 'public_ips'):
all_host_ips = host_entity.private_ips + \
host_entity.public_ips + [host_entity.IPAddress]
else:
all_host_ips = [host_entity.IPAddress]
host_ips = {'\'{}\''.format(i.Address) for i in all_host_ips}
host_ip_list = ','.join(host_ips)
az_ip_where = f"""| where (VMIPAddress in ("{host_ip_list}") or SrcIP in ("{host_ip_list}") or DestIP in ("{host_ip_list}")) and (AllowedOutFlows > 0 or AllowedInFlows > 0)"""
az_net_comms_df = qry_prov.AzureNetwork.az_net_analytics(
start=query_times.start, end=query_times.end, host_name=hostname, where_clause=az_ip_where)
if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty:
az_ips = az_net_comms_df.query("PublicIPs != @host_entity.IPAddress")
else:
az_ips = None
if len(syslog_ips):
IPs = syslog_ips[['IoCType', 'Observable']].drop_duplicates('Observable')
display(f"Found {len(IPs)} IP Addresses assoicated with the host")
else:
md("### No IoC patterns found in Syslog Message.")
if az_ips is not None:
ips = az_ips['PublicIps'].drop_duplicates(
) + syslog_ips['Observable'].drop_duplicates()
else:
ips = syslog_ips['Observable'].drop_duplicates()
if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + \
az_net_comms_df['AllowedInFlows']
sns.catplot(x="L7Protocol", y="TotalAllowedFlows",
col="FlowDirection", data=az_net_comms_df)
sns.relplot(x="FlowStartTime", y="TotalAllowedFlows",
col="FlowDirection", kind="line",
hue="L7Protocol", data=az_net_comms_df).set_xticklabels(rotation=50)
nbdisplay.display_timeline(data=az_net_comms_df.query('AllowedOutFlows > 0'),
overlay_data=az_net_comms_df.query(
'AllowedInFlows > 0'),
title='Network Flows (out=blue, in=green)',
time_column='FlowStartTime',
source_columns=[
'FlowType', 'AllExtIPs', 'L7Protocol', 'FlowDirection'],
height=300)
else:
md('<h3>No Azure network data for specified time range.</h3>')
Choose from the list of Selected ASNs for the IPs you wish to check on. Then select the IP(s) that you wish to check against Threat Intelligence data. The Source list is populated with all ASNs found in the syslog and network flow data.
#Lookup each IP in whois data and extract the ASN
@lru_cache(maxsize=1024)
def whois_desc(ip_lookup, progress=False):
try:
ip = ip_address(ip_lookup)
except ValueError:
return "Not an IP Address"
if ip.is_private:
return "private address"
if not ip.is_global:
return "other address"
whois = IPWhois(ip)
whois_result = whois.lookup_whois()
if progress:
print(".", end="")
return whois_result["asn_description"]
# Summarise network data by ASN
ASN_List = []
print("WhoIs Lookups")
ASNs = ips.apply(lambda x: whois_desc(x, True))
IP_ASN = pd.DataFrame(dict(IPs=ips, ASN=ASNs)).reset_index()
x = IP_ASN.groupby(["ASN"]).count().drop(
'index', axis=1).sort_values('IPs', ascending=False)
display(x)
ASN_List = x.index
# Select an ASN to investigate in more detail
selection = widgets.SelectMultiple(
options=ASN_List,
width=900,
description='Select ASN to investigate',
disabled=False
)
display(selection)
# For every IP associated with the selected ASN look them up in TI feeds
ip_invest_list = None
ip_selection = None
for ASN in selection.value:
if ip_invest_list is None:
ip_invest_list = (IP_ASN[IP_ASN["ASN"] == ASN]['IPs'].tolist())
else:
ip_invest_list + (IP_ASN[IP_ASN["ASN"] == ASN]['IPs'].tolist())
if ip_invest_list is not None:
ioc_ip_list = []
if len(ip_invest_list) > 0:
ti_resps = tilookup.lookup_iocs(data=ip_invest_list, providers=["OTX"])
i = 0
ti_hits = []
while i < len(ti_resps):
if ti_resps['Details'][i]['pulse_count'] > 0:
ti_hits.append(ti_resps['Ioc'][i])
i += 1
else:
i += 1
display(HTML(f"Found {len(ti_hits)} IoCs in Threat Intelligence"))
for ioc in ti_hits:
ioc_ip_list.append(ioc)
#Show IPs found in TI feeds for further investigation
if len(ioc_ip_list) > 0:
display(HTML("Select an IP whcih appeared in TI to investigate further"))
ip_selection = nbwidgets.SelectItem(description='Select IP Address to investigate: ', item_list = ioc_ip_list, width='95%', auto_display=True)
else:
md("No IPs to investigate")
# Get all syslog for the IPs
if ip_selection is not None:
display(HTML("Syslog data associated with this IP Address"))
sys_hits = all_syslog_data[all_syslog_data['SyslogMessage'].str.contains(
ip_selection.value)]
display(sys_hits)
os_family = host_entity.OSType if host_entity.OSType else 'Linux'
display(HTML("TI result for this IP Address"))
display(ti_resps[ti_resps['Ioc'] == ip_selection.value])
else:
md("No IP address selected")
msticpyconfig.yaml
configuration File¶You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml
file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG
) pointing to its location.
To configure this file see the ConfigureNotebookEnvironment notebook