#!/usr/bin/env python # coding: utf-8 # # Credential Scan on Azure Blob Storage # # __Notebook Version:__ 1.0
# __Python Version:__ Python 3.8 - AzureML
# __Required Packages:__ No
# __Platforms Supported:__ Azure Machine Learning Notebooks # # __Data Source Required:__ No # # ### Description # This notebook provides step-by-step instructions and sample code to detect credential leak into Azure Blob Storage using Azure SDK for Python.
# *** No need to download and install any other Python modules. ***
# *** Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
# # ## Table of Contents # 1. Warm-up # 2. Authentication to Azure Storage # 3. Scan Azure Blob for Leaking Credentials # ## 1. Warm-up # In[ ]: # If you need to know what Python modules are available, you may run this: # help("modules") # In[ ]: # Load Python libraries that will be used in this notebook from azure.common.client_factory import get_client_from_cli_profile from azure.common.credentials import get_azure_cli_credentials from azure.mgmt.storage import StorageManagementClient from azure.identity import DefaultAzureCredential from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__ from azure.mgmt.resource import ResourceManagementClient import json import os import csv import ipywidgets from IPython.display import display, HTML, Markdown import re # In[ ]: # Functions will be used in this notebook def read_config_values(file_path): "This loads pre-generated parameters for Sentinel Workspace" with open(file_path) as json_file: if json_file: json_config = json.load(json_file) return (json_config["tenant_id"], json_config["subscription_id"], json_config["resource_group"], json_config["workspace_id"], json_config["workspace_name"], json_config["user_alias"], json_config["user_object_id"]) return None def has_valid_token(): "Check to see if there is a valid AAD token" try: credentials, sub_id = get_azure_cli_credentials() creds = credentials._get_cred(resource=None) token = creds._token_retriever()[2] print("Successfully signed in.") return True except Exception as ex: if "Please run 'az login' to setup account" in str(ex): print("Please sign in first.") return False elif "AADSTS70043: The refresh token has expired" in str(ex): message = "**The refresh token has expired.
Please continue your login process. Then:
1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking 'Compute' on left menu, then select the instance, clicking 'Restart';
2. Otherwise, you may just restart the kernel from top menu.
Finally, close and re-load the notebook, then re-run cells one by one from the top.**" display(Markdown(message)) return False except: print("Please restart the kernel, and run 'az login'.") return False def get_file_content(blob): "Decoding file content" try: content = blob.content_as_text(max_concurrency=1, encoding='UTF-8') except UnicodeDecodeError: content = blob.content_as_text(max_concurrency=1, encoding='UTF-16') return content def get_regex_list(): "This function return RegEx list for credscan" regex_list = [ "(?i)(ida:password|IssuerSecret|(api|client|app(lication)?)[_\\- ]?(key|secret)[^,a-z]|\\.azuredatabricks\\.net).{0,10}(dapi)?[a-z0-9/+]{22}", "(?i)(x-api-(key|token).{0,10}[a-z0-9/+]{40}|v1\\.[a-z0-9/+]{40}[^a-z0-9/+])", "(?-i:)\\WAIza(?i)[a-z0-9_\\\\\\-]{35}\\W", "(?i)(\\Wsig\\W|Secret(Value)?|IssuerSecret|(\\Wsas|primary|secondary|management|Shared(Access(Policy)?)?).?Key|\\.azure\\-devices\\.net|\\.(core|servicebus|redis\\.cache|accesscontrol|mediaservices)\\.(windows\\.net|chinacloudapi\\.cn|cloudapi\\.de|usgovcloudapi\\.net)|New\\-AzureRedisCache).{0,100}([a-z0-9/+]{43}=)", "(?i)visualstudio\\.com.{1,100}\\W(?-i:)[a-z2-7]{52}\\W", "(?i)se=2021.+sig=[a-z0-9%]{43,63}%3d", "(?i)(x-functions-key|ApiKey|Code=|\\.azurewebsites\\.net/api/).{0,100}[a-z0-9/\\+]{54}={2}", "(?i)code=[a-z0-9%]{54,74}(%3d){2}", "(?i)(userpwd|publishingpassword).{0,100}[a-z0-9/\\+]{60}\\W", "(?i)[^a-z0-9/\\+][a-z0-9/\\+]{86}==", "(?-i:)\\-{5}BEGIN( ([DR]SA|EC|OPENSSH|PGP))? PRIVATE KEY( BLOCK)?\\-{5}", "(?i)(app(lication)?|client)[_\\- ]?(key(url)?|secret)([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})[^\\-]", "(?i)refresh[_\\-]?token([\\s=:>]{1,10}|[\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2})(\"data:text/plain,.+\"|[a-z0-9/+=_.-]{20,200})", "(?i)AccessToken(Secret)?([\\s\"':=|>\\]]{3,15}|[\"'=:\\(]{2}|[\\s=:>]{1,10})[a-z0-9/+=_.-]{20,200}", "(?i)[a-z0-9]{3,5}://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)snmp(\\-server)?\\.exe.{0,100}(priv|community)", "(?i)(ConvertTo\\-?SecureString\\s*((\\(|\\Wstring)\\s*)?['\"]+)", "(?i)(Consumer|api)[_\\- ]?(Secret|Key)([\\s=:>]{1,10}|[\\s\"':=|>,\\]]{3,15}|[\"'=:\\(]{2})[^\\s]{5,}", "(?i)authorization[,\\[:= \"']+([dbaohmnsv])", "(?i)-u\\s+.{2,100}-p\\s+[^\\-/]", "(?i)(amqp|ssh|(ht|f)tps?)://[^%:\\s\"'/][^:\\s\"'/\\$]+[^:\\s\"'/\\$%]:([^%\\s\"'/][^@\\s\"'/]{0,100}[^%\\s\"'/])@[\\$a-z0-9:\\.\\-_%\\?=/]+", "(?i)(\\Waws|amazon)?.{0,5}(secret|access.?key).{0,10}\\W[a-z0-9/\\+]{40}", "(?-i:)(eyJ0eXAiOiJKV1Qi|eyJhbGci)", "(?i)@(\\.(on)?)?microsoft\\.com[ -~\\s]{1,100}?(\\w?pass\\w?)", "(?i)net(\\.exe)?.{1,5}(user\\s+|share\\s+/user:|user-?secrets? set)\\s+[a-z0-9]", "(?i)xox[pbar]\\-[a-z0-9]", "(?i)[\":\\s=]((x?corp|extranet(test)?|ntdev)(\\.microsoft\\.com)?|corp|redmond|europe|middleeast|northamerica|southpacific|southamerica|fareast|africa|exchange|extranet(test)?|partners|parttest|ntdev|ntwksta)\\W.{0,100}(password|\\Wpwd|\\Wpass|\\Wpw\\W|userpass)", "(?i)(sign_in|SharePointOnlineAuthenticatedContext|(User|Exchange)Credentials?|password)[ -~\\s]{0,100}?@([a-z0-9.]+\\.(on)?)?microsoft\\.com['\"]?", "(?i)(\\.database\\.azure\\.com|\\.database(\\.secure)?\\.windows\\.net|\\.cloudapp\\.net|\\.database\\.usgovcloudapi\\.net|\\.database\\.chinacloudapi\\.cn|\\.database.cloudapi.de).{0,100}(DB_PASS|(sql|service)?password|\\Wpwd\\W)", "(?i)(secret(.?key)?|password)[\"']?\\s*[:=]\\s*[\"'][^\\s]+?[\"']", "(?i)[^a-z\\$](DB_USER|user id|uid|(sql)?user(name)?|service\\s?account)\\s*[^\\w\\s,]([ -~\\s]{2,120}?|[ -~]{2,30}?)([^a-z\\s\\$]|\\s)\\s*(DB_PASS|(sql|service)?password|pwd)", "(?i)(password|secret(key)?)[ \\t]*[=:]+[ \\t]*([^:\\s\"';,<]{2,200})", ] return regex_list def set_continuation_flag(flag): if flag == False: print("continuation flag is false.") return flag def convert_result_to_string(result_row): if (type(result_row)) == str: return result_row elif (type(result_row)) == tuple: return ','.join([m for m in result_row if len(m) > 0]) def export_csv(file_name, data_list): with open(file_name, 'w') as f: w = csv.writer(f, delimiter = ',') w.writerows([x.split(',') for x in data_list]) # In[ ]: # Calling the above function to populate Sentinel workspace parameters # The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json'); # ## 2. Authentication to Azure Storage # In[ ]: # Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site. # You may add [--tenant $tenant_id] to the command if has_valid_token() == False: get_ipython().system('az login --tenant $tenant_id --use-device-code') # Initializing Azure Storage and Azure Resource Python clients storage_client = get_client_from_cli_profile(StorageManagementClient, subscription_id = subscription_id) resource_client = get_client_from_cli_profile(ResourceManagementClient, subscription_id = subscription_id) # Set continuation_flag if resource_client == None: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(True) # In[ ]: # If you encounter error like: "got an unexpected keyword argument 'user_agent'" at the above cell, you may run the following command as a temporarily work-around to continue: # Please uncomment the following line and run it: # !pip install --upgrade azure-cli # Then re-run the cell above # In[ ]: # Select Azure Resource Group if continuation_flag: group_list = resource_client.resource_groups.list() group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in group_list]), description='Groups:') display(group_dropdown) # In[ ]: # Select Azure Storage Account if continuation_flag and group_dropdown.value != None: resource_list = resource_client.resources.list_by_resource_group( group_dropdown.value, filter="resourceType eq 'Microsoft.Storage/storageAccounts'", ) storage_account_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in resource_list]), description='Accounts:') display(storage_account_dropdown) else: continuation_flag = set_continuation_flag(False) # ## 3. Scan Azure Blob for Leaking Credentials # In[ ]: # Select a blob container for a specified Azure Storage account if continuation_flag and storage_account_dropdown.value != None: storage_keys = storage_client.storage_accounts.list_keys(group_dropdown.value,storage_account_dropdown.value) if storage_keys != None: storage_key = {v.key_name: v.value for v in storage_keys.keys}['key1'] blob_service_client = BlobServiceClient( account_url="https://{0}.blob.core.windows.net".format(storage_account_dropdown.value), credential=storage_key ) if blob_service_client != None: container_list = blob_service_client.list_containers() container_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in container_list]), description='Containers:') display(container_dropdown) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False) # In[ ]: # Select a blob from a specified blob container if continuation_flag and container_dropdown.value != None: container_client = blob_service_client.get_container_client(container_dropdown.value) if container_client != None: blob_list = container_client.list_blobs() blob_dropdown = ipywidgets.Dropdown(options=sorted([r.name for r in blob_list]), description='Blobs:') display(blob_dropdown) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False) # In[ ]: # Get blob content if continuation_flag and blob_dropdown.value != None: selected_blob = container_client.download_blob(blob_dropdown.value) if selected_blob != None: content = get_file_content(selected_blob) else: continuation_flag = set_continuation_flag(False) else: continuation_flag = set_continuation_flag(False) # In[ ]: # Run Regex strings on the file content import warnings warnings.filterwarnings('ignore') result_list = [] csv_string = "" if continuation_flag and content != None: has_leaking = False regex_list = get_regex_list() for regex in regex_list: re.compile(regex) results = re.findall(regex, content) if results: print("================================================") print("MATCHED REGEX:\n" + regex) print("------------------------------------------------") print("FILE: " + blob_dropdown.value + "\n") #print(content) print("---------------MATCHED CONTENT -----------------") for result in results: print(str(result)) csv_string = convert_result_to_string(result) result_list.append(csv_string) print("================================================") has_leaking = True if has_leaking == False: print('No leaking data found') else: continuation_flag = set_continuation_flag(False) # In[ ]: # Save results to a csv file in the current file system if continuation_flag and len(result_list) > 0: export_csv("credscan_blob.csv", result_list) else: print("No data")