Notebook Version: 1.0
Python Version: Python 3.8 - AzureML
Required Packages: No
Platforms Supported: Azure Machine Learning Notebooks
Data Source Required: No
This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Blob Storage using Azure SDK for Python.
*** No need to download and install any other Python modules. *
* Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
# If you need to know what Python modules are available, you may run this:
# help("modules")
# Load Python libraries that will be used in this notebook
from azure.common.client_factory import get_client_from_cli_profile
from azure.common.credentials import get_azure_cli_credentials
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
from azure.mgmt.resource import ResourceManagementClient
import json
import os
import csv
import ipywidgets
from IPython.display import display, HTML, Markdown
import re
# Functions will be used in this notebook
def read_config_values(file_path):
"This loads pre-generated parameters for Sentinel Workspace"
with open(file_path) as json_file:
if json_file:
json_config = json.load(json_file)
return (json_config["tenant_id"],
json_config["subscription_id"],
json_config["resource_group"],
json_config["workspace_id"],
json_config["workspace_name"],
json_config["user_alias"],
json_config["user_object_id"])
return None
def has_valid_token():
"Check to see if there is a valid AAD token"
try:
credentials, sub_id = get_azure_cli_credentials()
creds = credentials._get_cred(resource=None)
token = creds._token_retriever()[2]
print("Successfully signed in.")
return True
except Exception as ex:
if "Please run 'az login' to setup account" in str(ex):
print("Please sign in first.")
return False
elif "AADSTS70043: The refresh token has expired" in str(ex):
message = "**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking 'Compute' on left menu, then select the instance, clicking 'Restart'; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**"
display(Markdown(message))
return False
except:
print("Please restart the kernel, and run 'az login'.")
return False
def get_file_content(blob):
"Decoding file content"
try:
content = blob.content_as_text(max_concurrency=1, encoding='UTF-8')
except UnicodeDecodeError:
content = blob.content_as_text(max_concurrency=1, encoding='UTF-16')
return content
def get_regex_list():
"This function return RegEx list for credscan"
regex_list = [
"(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}",
"(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])",
"(?-i:)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W",
"(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)",
"(?i)visualstudio\\.com.{1,100}\\W(?-i:)[a-z2-7]{52}\\W",
"(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d",
"(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}",
"(?i)code=[a-z0-9%]{54,74}(%3d){2}",
"(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W",
"(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==",
"(?-i:)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}",
"(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]",
"(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})",
"(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}",
"(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
"(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)",
"(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)",
"(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}",
"(?i)authorization[,\\[:= \"']+([dbaohmnsv])",
"(?i)-u\\s+.{2,100}-p\\s+[^\\-/]",
"(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+",
"(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}",
"(?-i:)(eyJ0eXAiOiJKV1Qi|eyJhbGci)",
"(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)",
"(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]",
"(?i)xox[pbar]\\-[a-z0-9]",
"(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)",
"(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?",
"(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)",
"(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']",
"(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)",
"(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})",
]
return regex_list
def set_continuation_flag(flag):
if flag == False:
print("continuation flag is false.")
return flag
def convert_result_to_string(result_row):
if (type(result_row)) == str:
return result_row
elif (type(result_row)) == tuple:
return ','.join([m for m in result_row if len(m) > 0])
def export_csv(file_name, data_list):
with open(file_name, 'w') as f:
w = csv.writer(f, delimiter = ',')
w.writerows([x.split(',') for x in data_list])
# Calling the above function to populate Sentinel workspace parameters
# The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables
tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json');
# Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site.
# You may add [--tenant $tenant_id] to the command
if has_valid_token() == False:
!az login --tenant $tenant_id --use-device-code
# Initializing Azure Storage and Azure Resource Python clients
storage_client = get_client_from_cli_profile(StorageManagementClient, subscription_id = subscription_id)
resource_client = get_client_from_cli_profile(ResourceManagementClient, subscription_id = subscription_id)
# Set continuation_flag
if resource_client == None:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(True)
# If you encounter error like: "got an unexpected keyword argument 'user_agent'" at the above cell, you may run the following command as a temporarily work-around to continue:
# Please uncomment the following line and run it:
# !pip install --upgrade azure-cli
# Then re-run the cell above
# Select Azure Resource Group
if continuation_flag:
group_list = resource_client.resource_groups.list()
group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in group_list]), description='Groups:')
display(group_dropdown)
# Select Azure Storage Account
if continuation_flag and group_dropdown.value != None:
resource_list = resource_client.resources.list_by_resource_group(
group_dropdown.value,
filter="resourceType eq 'Microsoft.Storage/storageAccounts'",
)
storage_account_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in resource_list]), description='Accounts:')
display(storage_account_dropdown)
else:
continuation_flag = set_continuation_flag(False)
# Select a blob container for a specified Azure Storage account
if continuation_flag and storage_account_dropdown.value != None:
storage_keys = storage_client.storage_accounts.list_keys(group_dropdown.value,storage_account_dropdown.value)
if storage_keys != None:
storage_key = {v.key_name: v.value for v in storage_keys.keys}['key1']
blob_service_client = BlobServiceClient(
account_url="https://{0}.blob.core.windows.net".format(storage_account_dropdown.value),
credential=storage_key
)
if blob_service_client != None:
container_list = blob_service_client.list_containers()
container_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in container_list]), description='Containers:')
display(container_dropdown)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
# Select a blob from a specified blob container
if continuation_flag and container_dropdown.value != None:
container_client = blob_service_client.get_container_client(container_dropdown.value)
if container_client != None:
blob_list = container_client.list_blobs()
blob_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in blob_list]), description='Blobs:')
display(blob_dropdown)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
# Get blob content
if continuation_flag and blob_dropdown.value != None:
selected_blob = container_client.download_blob(blob_dropdown.value)
if selected_blob != None:
content = get_file_content(selected_blob)
else:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(False)
# Run Regex strings on the file content
import warnings
warnings.filterwarnings('ignore')
result_list = []
csv_string = ""
if continuation_flag and content != None:
has_leaking = False
regex_list = get_regex_list()
for regex in regex_list:
re.compile(regex)
results = re.findall(regex, content)
if results:
print("================================================")
print("MATCHED REGEX:\n" + regex)
print("------------------------------------------------")
print("FILE: " + blob_dropdown.value + "\n")
#print(content)
print("---------------MATCHED CONTENT -----------------")
for result in results:
print(str(result))
csv_string = convert_result_to_string(result)
result_list.append(csv_string)
print("================================================")
has_leaking = True
if has_leaking == False:
print('No leaking data found')
else:
continuation_flag = set_continuation_flag(False)
# Save results to a csv file in the current file system
if continuation_flag and len(result_list) > 0:
export_csv("credscan_blob.csv", result_list)
else:
print("No data")