Notebook Version: 2.1
Python Version: Python 3.10 - SDK v2
Platforms Supported: Azure Machine Learning Notebooks
Data Source Required: Log Analytics tables
Gain insights into the possible root cause of an alert by searching for related anomalies on the corresponding entities around the alert’s time. This notebook will provide valuable leads for an alert’s investigation, listing all suspicious increase in event counts or their properties around the time of the alert, and linking to the corresponding raw records in Log Analytics for the investigator to focus on and interpret.
You may need to select Python 3.8 - AzureML on Azure Machine Learning Notebooks.
%pip install azure-monitor-query
# Loading Python libraries
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus
from azure.identity import AzureCliCredential, DefaultAzureCredential
from datetime import timezone
import sys
import timeit
import datetime as dt
import pandas as pd
import copy
import base64
import json
from IPython.display import display, HTML, Markdown
from cryptography.fernet import Fernet
The following cell has classes and functions for this notebook, code is hidden to unclutter the notebook. please RUN the cell, you may view the code by clicking 'input hidden'.
# Classes will be used in this notebook
class AnomalyQueries():
""" KQLs for anomaly lookup """
QUERIES = {}
QUERIES['LISTTABLES'] = b'dW5pb24gd2l0aHNvdXJjZSA9IFNlbnRpbmVsVGFibGVOYW1lICogfCBkaXN0aW5jdCBTZW50aW5lbFRhYmxlTmFtZSB8IHNvcnQgYnkgU2VudGluZWxUYWJsZU5hbWUgYXNjIA=='
QUERIES['ISCATCOLUMN'] = b'e3RhYmxlfSB8IGdldHNjaGVtYSB8IHdoZXJlIENvbHVtblR5cGUgaW4gKCdpbnQnLCAnbG9uZycsICdzdHJpbmcnKSB8IHByb2plY3QgQ29sdW1uTmFtZQ=='
QUERIES['ISCATHEURISTIC'] = b'e3RhYmxlfSB8IHdoZXJlIGluZ2VzdGlvbl90aW1lKCkgPiBhZ28oMWQpIHwgdGFrZSB0b2ludCgxZTgpIHwgc3VtbWFyaXplIGRjID0gZGNvdW50KHtjb2x1bW59KSwgY291bnQoKSB8IHdoZXJlIGRjPCAxMDAwIGFuZCBkYyA+IDEgfCBwcm9qZWN0IHJhdGlvID0gdG9kb3VibGUoZGMpIC8gY291bnRfIHwgd2hlcmUgcmF0aW88IDFlLTIg'
QUERIES['TIMESERIESANOMALYDETECTION'] = b'bGV0IGZ1bGxEYWlseUNvdW50ID0gbWF0ZXJpYWxpemUoIHt0YWJsZX0gfCBleHRlbmQgVGltZUNyZWF0ZWQgPSBUaW1lR2VuZXJhdGVkIHwgd2hlcmUgVGltZUNyZWF0ZWQgPiBkYXRldGltZSgne21pblRpbWVzdGFtcH0nKSBhbmQgVGltZUNyZWF0ZWQ8ZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9JykgfCB3aGVyZSB7ZW50Q29sdW1ufSBoYXMgJ3txRW50aXR5fScgfCBtYWtlLXNlcmllcyBjb3VudCgpIGRlZmF1bHQgPSAwIG9uIFRpbWVDcmVhdGVkIGZyb20gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JykgdG8gZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9Jykgc3RlcCAxZCBieSB7Y29sdW1ufSk7IGZ1bGxEYWlseUNvdW50IHwgZXh0ZW5kKGFub21hbGllcywgYW5vbWFseVNjb3JlLCBleHBlY3RlZENvdW50KSA9IHNlcmllc19kZWNvbXBvc2VfYW5vbWFsaWVzKGNvdW50XywxLjUsLTEsJ2F2ZycsMSkgfCB3aGVyZSBhbm9tYWx5U2NvcmVbLTFdID4gMS41IHwgd2hlcmUgdG9pbnQoY291bnRfWy0xXSkgPiB0b2RvdWJsZShleHBlY3RlZENvdW50Wy0xXSkgfCBtdi1hcHBseSBhbm9tYWxpZXMgdG8gdHlwZW9mKGxvbmcpIG9uIChzdW1tYXJpemUgdG90QW5vbWFsaWVzPXN1bShhbm9tYWxpZXMpKSB8IHdoZXJlIHRvdEFub21hbGllcyA8IDUgfCBwcm9qZWN0IHFFbnRpdHkgPSAne3FFbnRpdHl9JywgcVRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7cVRpbWVzdGFtcH0nKSwgbWluVGltZXN0YW1wID0gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JyksIG1heFRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7bWF4VGltZXN0YW1wfScpLCBkZWx0YSA9IHRvdGltZXNwYW4oe2RlbHRhfSksIFRhYmxlID0gJ3t0YWJsZX0nLCBlbnRDb2wgPSAne2VudENvbHVtbn0nLCBjb2xOYW1lID0gJ3tjb2x1bW59JywgY29sVmFsID0gdG9zdHJpbmcoe2NvbHVtbn0pLCBjb2xUeXBlID0gZ2V0dHlwZSh7Y29sdW1ufSksIGV4cGVjdGVkQ291bnQgPSBleHBlY3RlZENvdW50Wy0xXSwgYWN0dWFsQ291bnQgPSBjb3VudF9bLTFdLCBhbm9tYWx5U2NvcmUgPSBhbm9tYWx5U2NvcmVbLTFd'
QUERIES['TIMEWINDOWQUERY'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCBleHRlbmQgaW5nZXN0aW9uX3RpbWUoKSB8IHdoZXJlICRJbmdlc3Rpb25UaW1lID4gaW5kRGF0ZSArIHtmfXtkZWx0YX0gYW5kICRJbmdlc3Rpb25UaW1lPGluZERhdGUgKyB7dH17ZGVsdGF9IHwgd2hlcmUge2VudENvbHVtbn0gaGFzICd7cUVudGl0eX0nIHwgcHJvamVjdCBpbmcgPSRJbmdlc3Rpb25UaW1lIHwgdGFrZSAxIA=='
QUERIES['ISENTITYINTABLE'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCB3aGVyZSBpbmdlc3Rpb25fdGltZSgpIGJldHdlZW4oKGluZERhdGUgLTFoKSAuLiAoaW5kRGF0ZSArIDFoKSkgfCBzZWFyY2ggJ3txRW50aXR5fScgfCB0YWtlIDE='
@staticmethod
def get_query(name):
""" get KQL """
en_query = AnomalyQueries.QUERIES[name]
query = base64.b64decode(en_query).decode('utf=8')
return query
class AnomalyFinder():
"""
This class provides process flow functions for anomaly lookup.
Method - run is the main entry point.
"""
def __init__(self, workspace_id, la_data_client):
self.workspace_id = workspace_id
self.la_data_client = la_data_client
self.anomaly = ''
def query_table_list(self):
""" Get a list of data tables from Log Analytics for the user """
query = AnomalyQueries.get_query('LISTTABLES')
return self.query_loganalytics(query)
def query_loganalytics(self, query):
""" This method will call Log Analytics through LA client """
start_time = dt.datetime.now(timezone.utc) - dt.timedelta(30)
end_time=dt.datetime.now(timezone.utc)
result = self.la_data_client.query_workspace(
workspace_id=self.workspace_id,
query=query,
timespan=(start_time, end_time))
df = pd.DataFrame(data=result.tables[0].rows, columns=result.tables[0].columns)
return df
@staticmethod
def construct_related_queries(df_anomalies):
""" This method constructs query for user to repo and can be saves for future references """
if df_anomalies.shape[0] == 0:
return None
queries = ''
for tbl in df_anomalies.Table.unique():
cur_table_anomalies = df_anomalies.loc[df_anomalies.Table == tbl, :]
query = """{tbl} \
| where TimeGenerated > datetime({maxTimestamp})-14d and TimeGenerated < datetime({maxTimestamp}) \
| where {entCol} has "{qEntity}" \
| where """.format(**{
'tbl': tbl,
'qTimestamp': cur_table_anomalies.qTimestamp.iloc[0],
'maxTimestamp': cur_table_anomalies.maxTimestamp.iloc[0],
'entCol': cur_table_anomalies.entCol.iloc[0],
'qEntity': cur_table_anomalies.qEntity.iloc[0]
})
for j, row in cur_table_anomalies.iterrows(): # pylint: disable=unused-variable
query += " {col} == to{colType}(\"{colVal}\") or".format(
col=row.colName,
colType=(row.colType) if 'colType' in row.keys() else 'string',
colVal=row.colVal.replace('"', '')
)
query = query[:-2] # drop the last or
query += " | take 1000; " # limit the output size
query = query.replace("\\", "\\\\")
queries += query
return queries
def get_timewindow(self, q_entity, q_timestamp, ent_col, tbl):
""" find the relevant time window for analysis """
win_start = 0
min_timestamp = None
delta = None
max_timestamp = None
long_min_timestamp = None
time_window_query_template = AnomalyQueries.get_query('TIMEWINDOWQUERY')
for from_hour in range(-30, 0, 1):
kql_time_range_d = time_window_query_template.format(
table=tbl,
qDate=q_timestamp,
entColumn=ent_col,
qEntity=q_entity,
f=from_hour,
t=from_hour+1,
delta='d')
df_time_range = self.query_loganalytics(kql_time_range_d)
if df_time_range.shape[0] > 0:
win_start = from_hour
break
dt_q_timestamp = pd.to_datetime(q_timestamp)
ind2now = dt.datetime.utcnow() - dt_q_timestamp
if win_start < -3:
if ind2now > dt.timedelta(days=1):
delta = '1d'
max_timestamp = dt_q_timestamp + dt.timedelta(days=1)
else:
delta = '1d'
max_timestamp = dt.datetime.now()
long_min_timestamp = max_timestamp + dt.timedelta(days=win_start)
min_timestamp = max_timestamp + dt.timedelta(days=max([-6, win_start]))
elif win_start < 0: # switch to hours
win_start_hour = -5
for from_hour in range(-3*24, -5, 1):
kql_time_range_h = time_window_query_template.format(
table=tbl,
qDate=q_timestamp,
entColumn=ent_col,
qEntity=q_entity,
f=from_hour,
t=from_hour+1,
delta='h')
df_time_range = self.query_loganalytics(kql_time_range_h)
if df_time_range.shape[0] > 0:
win_start_hour = from_hour
break
if win_start_hour < -5:
if ind2now > dt.timedelta(hours=1):
delta = '1h'
max_timestamp = dt_q_timestamp + dt.timedelta(hours=1)
else:
delta = '1h'
max_timestamp = dt.datetime.now()
min_timestamp = max_timestamp + dt.timedelta(hours=win_start_hour)
long_min_timestamp = min_timestamp
return min_timestamp, delta, max_timestamp, long_min_timestamp
def run(self, q_timestamp, q_entity, tables):
""" Main function for Anomaly Lookup """
progress_bar = WidgetViewHelper.define_int_progress_bar()
display(progress_bar) # pylint: disable=undefined-variable
# list tables if not given
if not tables:
kql_list_tables = AnomalyQueries.get_query('LISTTABLES')
tables = self.query_loganalytics(kql_list_tables)
tables = tables.SentinelTableName.tolist()
progress_bar.value += 1
# find the column in which the query entity appears in each table
# - assumption that it appears in just one columns
tables2search = []
is_entity_in_table_template = AnomalyQueries.get_query('ISENTITYINTABLE')
for tbl in tables:
kql_entity_in_table = is_entity_in_table_template.format(
table=tbl,
qDate=q_timestamp,
qEntity=q_entity)
ent_in_table = self.query_loganalytics(kql_entity_in_table)
if ent_in_table.shape[0] > 0:
ent_col = [col for col in ent_in_table.select_dtypes('object').columns[1:] if
ent_in_table.loc[0, col] is not None
and ent_in_table.loc[:, col].str.contains(q_entity, case=False).all()]
if ent_col:
ent_col = ent_col[0]
tables2search.append({'table': tbl, 'entCol': ent_col})
progress_bar.value += 2
# for each table, find the time window to query on
for tbl in tables2search:
tbl['minTimestamp'], tbl['delta'], tbl['maxTimestamp'], tbl['longMinTimestamp'] = \
self.get_timewindow(q_entity, q_timestamp, tbl['entCol'], tbl['table'])
progress_bar.value += 1
# identify all the categorical columns per table on which we will find anomalies
categorical_cols = []
is_cat_column_template = AnomalyQueries.get_query('ISCATCOLUMN')
is_cat_heuristic_template = AnomalyQueries.get_query('ISCATHEURISTIC')
for tbl in tables2search:
kql_is_cat_column = is_cat_column_template.format(table=tbl['table'])
df_cols = self.query_loganalytics(kql_is_cat_column)
for col in df_cols.ColumnName:
kql_is_cat_heuristic = is_cat_heuristic_template.format(
table=tbl['table'],
column=col)
df_is_cat = self.query_loganalytics(kql_is_cat_heuristic)
if df_is_cat.shape[0] > 0:
cat_col_info = copy.deepcopy(tbl)
cat_col_info['col'] = col
categorical_cols.append(cat_col_info)
progress_bar.value += 2
anomalies_list = []
time_series_anomaly_detection_template = \
AnomalyQueries.get_query('TIMESERIESANOMALYDETECTION')
for col_info in categorical_cols:
max_timestamp = col_info['maxTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')
long_min_timestamp = col_info['longMinTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')
kql_time_series_anomaly_detection = time_series_anomaly_detection_template.format(
table=col_info['table'],
column=col_info['col'],
entColumn=col_info['entCol'],
qEntity=q_entity,
minTimestamp=long_min_timestamp,
maxTimestamp=max_timestamp,
qTimestamp=q_timestamp,
delta=col_info['delta'])
cur_anomalies = self.query_loganalytics(kql_time_series_anomaly_detection)
anomalies_list.append(cur_anomalies)
progress_bar.value += 2
if anomalies_list:
anomalies = pd.concat(anomalies_list, axis=0)
else:
anomalies = pd.DataFrame()
progress_bar.value += 2
queries = AnomalyFinder.construct_related_queries(anomalies)
progress_bar.close()
self.anomaly = str(anomalies.to_json(orient='records'))
return anomalies, queries
class WidgetViewHelper():
""" This classes provides helper methods for UI controls and components. """
def __init__(self):
self.variable = None
@staticmethod
def select_table(anomaly_lookup):
""" Select data tables """
table_list = anomaly_lookup.query_table_list()
tables = list(table_list["SentinelTableName"])
return widgets.Select(options=tables,
row=len(tables),
#value=[],
description='Tables:')
@staticmethod
def define_int_progress_bar():
""" define progress bar """
# pylint: disable=line-too-long
return IntProgress(value=0, min=0, max=10, step=1, description='Loading:', bar_style='success', orientation='horizontal', position='top')
@staticmethod
def define_int_progress_bar():
""" Define a progress bar """
return widgets.IntProgress(value=0,
min=0,
max=10,
step=1,
description='Loading:',
bar_style='success',
orientation='horizontal',
position='top')
@staticmethod
# pylint: disable=line-too-long
def copy_to_clipboard(url, text_body, label_text):
""" Copy text to Clipboard """
html_str = (
"""<!DOCTYPE html>
<html><body style="height:20px">
<input id="sentinel_text_for_copy" type="text" readonly style="font-weight: bold; border: none; max-height:10px; width:1px;" size = '"""
+ str(len(text_body))
+ """' value='"""
+ text_body
+ """'>
<button style="border: 2px solid #4CAF50;" onclick="sentinel_copy()">""" + label_text + """</button>
<script>
function sentinel_copy() {
var copyText = document.getElementById("sentinel_text_for_copy");
copyText.select();
document.execCommand("copy");
}
</script>
</body></html>"""
)
return html_str
@staticmethod
def construct_url_for_log_analytics_logs(tenant_domain,
subscription_id,
resource_group,
workspace_name):
""" Generate URL for LA logs """
return 'https://portal.azure.com/#blade/Microsoft_Azure_Security_Insights/MainMenuBlade/7/subscriptionId/{0}/resourceGroup/{1}/workspaceName/{2}'.format(subscription_id, resource_group, workspace_name)
@staticmethod
# pylint: disable=undefined-variable
def display_html(inner_html):
""" Display HTML """
display(HTML(inner_html))
@staticmethod
def pick_start_and_end_date():
""" Pick dates """
start_date = widgets.DatePicker(description='Pick a start date', disabled=False)
end_date = widgets.DatePicker(description='Pick a end date', disabled=False)
# pylint: disable=undefined-variable
display(start_date)
# pylint: disable=undefined-variable
display(end_date)
return start_date, end_date
@staticmethod
def select_multiple_items(label, item_name):
""" Select multiple items """
label_item = widgets.Label(value=label)
items = widgets.Textarea(value='', placeholder='One per line: \n 0x7ae3 \n 0x7ae6', description=item_name, disabled=False, rows=5)
display(label_item)
display(items)
return items
# Functions will be used in this notebook
def read_config_values(file_path):
"This loads pre-generated parameters for Sentinel Workspace"
with open(file_path) as json_file:
if json_file:
json_config = json.load(json_file)
return (json_config["tenant_id"],
json_config["subscription_id"],
json_config["resource_group"],
json_config["workspace_id"],
json_config["workspace_name"],
json_config["user_alias"],
json_config["user_object_id"])
return None
def has_valid_token():
"Check to see if there is a valid AAD token"
try:
error = "Please run 'az login'"
expired = "ERROR: AADSTS70043: The refresh token has expired or is invalid"
failed = "failed"
validator = !az account get-access-token
if any(expired in item for item in validator.get_list()):
return '**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart"; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**'
elif any(error in item for item in validator.get_list()) or any(failed in item for item in validator.get_list()):
return "Please run 'az login' to setup account"
else:
return None
except:
return "Please login"
# Calling the above function to populate Microsoft Sentinel workspace parameters
# The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables
tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json');
# Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site.
# You may add [--tenant $tenant_id] to the command
if has_valid_token() != None:
!echo -e '\e[42m'
!az login --tenant $tenant_id --use-device-code
resource_uri = "https://api.loganalytics.io"
la_client = LogAnalyticsManagementClient(AzureCliCredential(), subscription_id = subscription_id)
credential = DefaultAzureCredential()
la_data_client = LogsQueryClient(credential)
# Entity inputs
import ipywidgets as widgets
#DateTime format: 2024-06-04T07:05:20.000
q_timestamp = widgets.Text(value='2024-06-07',description='DateTime: ')
display(q_timestamp)
#Entity format: user
q_entity = widgets.Text(value='user',description='Entity: ')
display(q_entity)
# Select tables
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
anomaly_lookup = AnomalyFinder(workspace_id, la_data_client)
selected_table = WidgetViewHelper.select_table(anomaly_lookup)
display(selected_table)
# Query data: this action may take a few minutes or more, please be patient.
start = timeit.default_timer()
anomalies, queries = anomaly_lookup.run(q_timestamp.value, q_entity.value, list([selected_table.value]))
print('======= Task completed ===========')
print('Elapsed time: ', timeit.default_timer() - start, ' seconds')
if anomalies is not None:
print(str(len(anomalies)) + ' records found.')
else:
print('0 records found.')
# Display query result in DataFrame
if anomalies is not None and len(anomalies) > 0:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
sorted_anomalies = anomalies.sort_values(by=['anomalyScore'], ascending=False)
display(sorted_anomalies)
# Save results to a csv file in the current file system
if anomalies is not None and len(anomalies) > 0:
anomalies.to_csv('anomaly_lookup.csv')
# ML Clustering based on anomalyScore
if anomalies is not None and len(anomalies) > 10:
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
anomaly_score_set = anomalies.iloc[:, [12]].copy()
kmeans = KMeans(n_clusters=3).fit(anomaly_score_set)
centroids = kmeans.cluster_centers_
print(centroids)
# Display Top anomaly scores
if anomalies is not None and len(anomalies) > 10 and anomaly_score_set is not None:
top_anomalies = anomaly_score_set.loc[anomaly_score_set['anomalyScore'] > "5"]
print(top_anomalies)
# You also can go to Azure Log Analytics for further analysis
if queries is not None:
url = WidgetViewHelper.construct_url_for_log_analytics_logs(tenant_id, subscription_id, resource_group, workspace_name)
print('======= Clicking the URL to go to Log Analytics =======')
print(url)
if len(queries) > 2000:
print('======= Copy the queries to go to Log Analytics =======')
print(queries)
else:
WidgetViewHelper.display_html(WidgetViewHelper.copy_to_clipboard(url, queries, 'Add queries to clipboard then paste to Logs'))