import os
from datetime import datetime
from pathlib import Path
import logging
import nbformat
from azure.mgmt.resource import ResourceManagementClient
from azure.storage.fileshare import ShareFileClient, ShareServiceClient
from azure.storage.queue import QueueServiceClient
import papermill as pm
from msticpy.common.azure_auth import az_connect
from msticpy.common.keyvault_client import BHKeyVaultClient
from msticpy.data.azure_sentinel import AzureSentinel
from azure.common.exceptions import CloudError
# Open a log file that tracks notebook executions
path = Path.cwd().joinpath("notebook_execution.log")
logging.basicConfig(filename=path)
# Populate with details relating to your environment
# Tenant ID
ten_id = "YOUR TENANT ID"
# The name of the Key Vault containing AFS key
vault_name = "YOUR KV NAME"
# The secret name that the AFS key is stored in
kv_sec_name = "YOUR SECRET NAME"
# Subscription ID of the Azure Sentinel Workspace to get incidents from
subscriptionId = "YOUR SUBSCRIPTION ID"
# The name of the Resource Group of the Azure Sentinel Workspace to get incidents from
resourceGroupName = "YOUR RG NAME"
# The name of the Azure Sentinel Workspace to get incidents from
workspaceName = "YOUR WORKSPACE NAME"
# The name of the Azure Sentinel Workspace ID to get incidents from
ws_id = "YOUR WORKSPACE ID"
# The name of the Azure Storage Queue account used (if used)
q_account = "YOUR QUEUE ACCOUNT"
# The name of the Azure Storage Queue account used (if used)
q_name = "YOUR QUEUE NAME"
# Details of the Azure Machine Learning workspace to be used (sub_id = Subscription ID, RG = Resource Group name, AMLWorkspace = AML Workspace Name)
AML_details = {
"sub_id": "YOUR SUB ID",
"RG": "YOUR RG NAME",
"AMLWorkspace": "YOUR WORKSPACE NAME",
"ten_id": ten_id,
}
# Authenticate to Azure using Azure CLI or Managed Identity
creds = az_connect(["cli", "msi"])
token = creds.modern.get_token("https://management.azure.com/.default")
def get_api_headers():
token = creds.modern.get_token("https://management.azure.com/.default")
"""Return authorization header with current token."""
return {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json",
}
# Access key vault and get Azure Storage access key
kv_c = BHKeyVaultClient(tenant_id=ten_id, vault_name=vault_name)
afs_cred = kv_c.get_secret(kv_sec_name)
# Connect to Azure Sentinel
azs = AzureSentinel()
azs.connect()
logging.info(f"{datetime.now()} - Successfully connected to Azure Sentinel")
# Get recent Incidents from API
try:
incidents = azs.get_incidents(
sub_id=subscriptionId, res_grp=resourceGroupName, ws_name=workspaceName
)
incident_ids = incidents["name"].tolist()
except CloudError:
logging.info(f"{datetime.now()} - Unable to retreive incidents")
incident_ids = []
# If using Queue method get incidents from queue - uncomment following cells to use this method
# queue_service_client = QueueServiceClient(
# account_url=q_account, credential=creds.modern, api_version="2019-07-07"
# )
# q_client = queue_service_client.get_queue_client(q_name)
# messages = q_client.receive_messages()
# logging.info(f"{datetime.now()} - Getting incidents from queue")
# incident_ids = [message["content"] for message in messages]
# q_client.clear_messages()
# Use a local archive to avoid processing of incidents more than once.
try:
with open("incident_archive", "r") as input_file:
incident_archive = input_file.read().splitlines()
except FileNotFoundError:
incident_archive = []
incident_file = open("incident_archive", "w")
out_files = []
# For each incident, if it has not already been processed then run the incident triage notebook with that ID
if incident_ids:
for incident_id in incident_ids:
if incident_id not in incident_archive:
logging.info(f"{datetime.now()} - Processing incident {incident_id}")
(Path.cwd() / 'out').mkdir(parents=True, exist_ok=True)
out_path = Path.cwd().joinpath(f"out/{incident_id}.ipynb")
# If execution error occurs continue but record this in the log
try:
pm.execute_notebook(
"AutomatedNotebooks-IncidentTriage.ipynb",
str(out_path),
parameters={
"incident_id": incident_id,
"ten_id": ten_id,
"ws_id": ws_id,
}
)
out_files.append(out_path)
except pm.PapermillExecutionError:
logging.info(
f"{datetime.now()} - Unable to process incident {incident_id} - skipping"
)
# Once processed add incident to archive
incident_file.write(incident_id + "\n")
else:
logging.info(
f"{datetime.now()} - Incident {incident_id} has already been processed - skipping"
)
incident_file.close()
# Function to move a notebook from local path to Azure File Storage
def move_to_afs(path, incident_id):
with open(path) as notebook:
notebook = notebook.read()
account = get_storage_acct()
notebook_name = path.name
share_name = get_share(account)
file_client = ShareFileClient(
account_url=f"{account}.file.core.windows.net",
share_name=share_name,
file_path=f"Users/TriageNbs/{notebook_name}",
credential=afs_cred,
)
file_client.upload_file(notebook)
path = (f"https://ml.azure.com/fileexplorerAzNB?wsid=/subscriptions/{AML_details['sub_id']}"
f"/resourcegroups/{AML_details['RG']}/workspaces/AzureMLWorkspace&tid={AML_details['ten_id']}"
f"&activeFilePath=Users/{notebook_name}")
write_to_incident(incident_id, path)
update_incident(incident_id)
# Function to find the Azure Storage Account used by Azure ML
def get_storage_acct():
res_client = ResourceManagementClient(creds.legacy, AML_details["sub_id"])
res = res_client.resources.get(
AML_details["RG"],
"",
"Microsoft.MachineLearningServices/workspaces",
"",
AML_details["AMLWorkspace"],
"2021-01-01",
)
account = res.properties["storageAccount"].split("/")[-1]
return account
# Function to get the correct file share to store notebook in
def get_share(account):
ssc = ShareServiceClient(f"{account}.file.core.windows.net", afs_cred)
for share in list(ssc.list_shares()):
if share["name"].startswith("code-"):
return share["name"]
# Function to write a comment to the Azure Sentinel Incident that contains a link to the notebook
def write_to_incident(incidentId, path):
html = f"<a href='{path}'>View incident triage notebook in AML</a>"
logging.info(
f"{datetime.now()} - Adding link to notebook for incident: {incidentId}"
)
azs.post_comment(
incident_id=incidentId,
comment=html,
sub_id=subscriptionId,
res_grp=resourceGroupName,
ws_name=workspaceName,
)
# Function to update incident severity to High if triage determines a risk
def update_incident(incidentId):
logging.info(f"{datetime.now()} - Updating severity for {incidentId}")
azs.update_incident(
incident_id=incidentId,
update_items={"severity": "High", "status": "New"},
sub_id=subscriptionId,
res_grp=resourceGroupName,
ws_name=workspaceName,
)
# For each processed incident check if there was valuable output in the notebook and process it
if out_files:
for path in out_files:
incident_id = str(path.name).split(".")[0]
try:
nb = nbformat.read(path, as_version=2)
for cell in nb["worksheets"][0]["cells"]:
if "output" in cell["metadata"]["tags"] and cell["outputs"]:
logging.info(
f"{datetime.now()} - Storing notebook for {incident_id} in AML"
)
move_to_afs(path, incident_id)
break
os.remove(str(path))
except FileNotFoundError:
continue