Notebook Version: 1.0
Python Version: Python 3.6 - AzureML
Required Packages: No
Platforms Supported: Azure Machine Learning Notebooks
Data Source Required: Data Explorer data tables
This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Data Explorer using Azure SDK for Python and KQL.
*** Need to download and installPython module for Azure Data Explorer. *
* Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
Need to know more about KQL? Getting started with Kusto Query Language.
# If you need to know what Python modules are available, you may run this:
# help("modules")
# During the installation, there maybe some incompatible errors, you can safely ignore them.
!pip install azure-kusto-data
!pip install azure-mgmt-kusto
# Load Python libraries that will be used in this notebook
from azure.common.client_factory import get_client_from_cli_profile
from azure.common.credentials import get_azure_cli_credentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.kusto import KustoManagementClient
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
import pandas as pd
import json
import ipywidgets
from IPython.display import display, HTML, Markdown
import warnings
# Functions will be used in this notebook
def read_config_values(file_path):
"This loads pre-generated parameters for Sentinel Workspace"
with open(file_path) as json_file:
if json_file:
json_config = json.load(json_file)
return (json_config["tenant_id"],
json_config["subscription_id"],
json_config["resource_group"],
json_config["workspace_id"],
json_config["workspace_name"],
json_config["user_alias"],
json_config["user_object_id"])
return None
def has_valid_token():
"Check to see if there is a valid AAD token"
try:
credentials, sub_id = get_azure_cli_credentials()
creds = credentials._get_cred(resource=None)
token = creds._token_retriever()[2]
print("Successfully signed in.")
return True
except Exception as ex:
if "Please run 'az login' to setup account" in str(ex):
print(str(ex))
return False
elif "AADSTS70043: The refresh token has expired" in str(ex):
message = "**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking 'Compute' on left menu, then select the instance, clicking 'Restart'; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**"
display(Markdown(message))
return False
except:
print("Please restart the kernel, and run 'az login'.")
return False
def get_credscan_kql_where_clause(column_name):
"This function return the KQL where clause for credscan"
where_clause = " | where {0} "
regex_string = ""
regex_list = [
r"(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}",
r"(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])",
r"(?-i)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W",
r"(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)",
r"(?i)visualstudio\\.com.{1,100}\\W(?-i)[a-z2-7]{52}\\W",
r"(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d",
r"(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}",
r"(?i)code=[a-z0-9%]{54,74}(%3d){2}",
r"(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W",
r"(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==",
r"(?-i)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}",
r"(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]",
r"(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})",
r"(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}",
r"(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
r"(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)",
r"(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)",
r"(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}",
r"(?i)authorization[,\\[:= \"']+([dbaohmnsv])",
r"(?i)-u\\s+.{2,100}-p\\s+[^\\-/]",
r"(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
r"(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}",
r"(?-i)(eyJ0eXAiOiJKV1Qi|eyJhbGci)",
r"(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)",
r"(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]",
r"(?i)xox[pbar]\\-[a-z0-9]",
r"(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)",
r"(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?",
r"(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)",
r"(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']",
r"(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)",
r"(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})",
]
for (i, re_str) in enumerate(regex_list):
if i != 0:
if i == 27:
regex_string += " and "
else:
regex_string += " or "
regex_string += " " + column_name + " matches regex \"" + re_str + "\""
return where_clause.format(regex_string)
def set_continuation_flag(flag):
if flag == False:
print("continuation flag is false.")
return flag
# Calling the above function to populate Sentinel workspace parameters
# The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables
tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json');
# Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site.
# You may add [--tenant $tenant_id] to the command
if has_valid_token() == False:
!az login --tenant $tenant_id --use-device-code
kusto_client = get_client_from_cli_profile(KustoManagementClient, subscription_id = subscription_id)
resource_client = get_client_from_cli_profile(ResourceManagementClient, subscription_id = subscription_id)
# Set continuation_flag
if resource_client == None:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(True)
# If you encounter error like: "got an unexpected keyword argument 'user_agent'" at the above cell, you may run the following command as a temporarily work-around to continue:
# Please uncomment the following line and run it:
# !pip install --upgrade azure-cli
# Then re-run the cell above
# Select Azure Resource Group
if continuation_flag:
group_list = resource_client.resource_groups.list()
group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in group_list]), description='Groups:')
display(group_dropdown)
#Select a Kusto cluster for the subscription
if continuation_flag and group_dropdown.value != None:
cluster_list = list(kusto_client.clusters.list_by_resource_group(group_dropdown.value))
if cluster_list != None:
cluster_dropdown = ipywidgets.Dropdown(options=sorted([c.name for c in cluster_list]), description='Clusters:')
display(cluster_dropdown)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
#Select a database for the subscription
if continuation_flag and cluster_dropdown.value != None:
database_list = kusto_client.databases.list_by_cluster(resource_group_name=group_dropdown.value, cluster_name=cluster_dropdown.value)
if database_list != None:
database_dropdown = ipywidgets.Dropdown(options=sorted([c.name for c in database_list]), description='Databases:')
display(database_dropdown)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
# Initialize Kusto data client and create table list
if continuation_flag and database_dropdown.value != None:
cluster_uris = [c.uri for c in cluster_list if c.name == cluster_dropdown.value]
kusto_data_client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(cluster_uris[0]))
if kusto_data_client != None:
try:
db_name = database_dropdown.value.split("/")[1]
tables = kusto_data_client.execute_mgmt(db_name, ".show tables details")
table_list = dataframe_from_result_table(tables.primary_results[0]).TableName.tolist()
if table_list != None:
table_dropdown = ipywidgets.Dropdown(options=sorted(table_list), description='Tables:')
display(table_dropdown)
else:
continuation_flag = set_continuation_flag(False)
except:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
# Select a Column in the selected table to scan
# However, PLEASE NOTE: you may safely ignore this cell if you decide to scan all columns for the selected table!
if continuation_flag and table_dropdown.value != None:
warnings.filterwarnings('ignore')
all_columns_query = "{0} | getschema | project ColumnName | order by ColumnName asc".format(table_dropdown.value)
columns_result = kusto_data_client.execute_query(database=db_name, query=all_columns_query)
if columns_result != None:
column_list = dataframe_from_result_table(columns_result.primary_results[0]).ColumnName.tolist()
column_dropdown = ipywidgets.Dropdown(options=sorted(column_list), description='Columns:')
display(column_dropdown)
else:
column_list= []
else:
continuation_flag = set_continuation_flag(False)
# This cell will run Credential Scanner regex
# You may adjust the query based on your needs.
# To look at the query, you may run: print(query)
if continuation_flag and table_dropdown.value != None:
if 'column_list' in vars() and column_dropdown.value != None:
column_name = "tostring(['{0}'])".format(column_dropdown.value)
else:
column_name = "*"
table_name = table_dropdown.value
kql_where_clause = get_credscan_kql_where_clause(column_name)
query = "{0} {1}".format(table_name, kql_where_clause)
#print("Query: " + query)
# Run query
result = kusto_data_client.execute_query(database=db_name, query=query)
# Display Result
final_result = dataframe_from_result_table(result.primary_results[0])
if final_result.size == 0:
print("No leaked credentials found")
else:
display(final_result)
else:
continuation_flag = set_continuation_flag(False)
# Save results to a csv file in the current file system
if continuation_flag and final_result.empty == False and final_result.size > 0:
final_result.to_csv('credscan_adx.csv')