Notebook Version: 1.0
Python Version: Python 3.8 - AzureML
Required Packages: No
Platforms Supported: Azure Machine Learning Notebooks, Spark Version 3.1
Data Source Required: No
This notebook provides step-by-step instructions to set up Azure ML and Azure Synapse Analytics environment for your big data analytics scenarios that leverage Azure Synapse Spark engine. It covers:
Configuring your Azure Synapse workspace,
creating a new Azure Synapse Spark pool,
configuring your Azure Machine Learning workspace, and creating a new link service to link Azure Synapse with Azure Machine Learning workspace.
Additionally, the notebook provides the steps to export your data from a Log Analytics workspace to an Azure Data Lake Storage gen2 that you can use for big data analytics.
*** Python modules download may be needed. *
* Please run the cells sequentially to avoid errors. Please do not use "run all cells". ***
# Load Python libraries that will be used in this notebook
from azure.mgmt.resource import ResourceManagementClient
from azure.loganalytics.models import QueryBody
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
from azure.loganalytics import LogAnalyticsDataClient
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration, Datastore
from azureml.core.compute import SynapseCompute, ComputeTarget
from azure.identity import AzureCliCredential
import time
import json
import os
import pandas as pd
import ipywidgets
from IPython.display import display, HTML, Markdown
from urllib.parse import urlparse
# Functions will be used in this notebook
def read_config_values(file_path):
"This loads pre-generated parameters for Microsoft Sentinel Workspace"
with open(file_path) as json_file:
if json_file:
json_config = json.load(json_file)
return (json_config["tenant_id"],
json_config["subscription_id"],
json_config["resource_group"],
json_config["workspace_id"],
json_config["workspace_name"],
json_config["user_alias"],
json_config["user_object_id"])
return None
def has_valid_token():
"Check to see if there is a valid AAD token"
try:
error = "ERROR: Please run 'az login' to setup account."
expired = "ERROR: AADSTS70043: The refresh token has expired or is invalid"
validator = !az account get-access-token
if any(expired in item for item in validator.get_list()):
return '**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart"; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**'
elif any(error in item for item in validator.get_list()):
return "Please run 'az login' to setup account"
else:
return None
except:
return "Please login"
def convert_slist_to_dataframe(text, grep_text, grep_field_inx, remove_head, remove_tail):
try:
"This function converts IPython.utils.text.SList to Pandas.dataFrame"
grep_result = text.grep(grep_text,field=grep_field_inx)
df = pd.DataFrame(data=grep_result)
df[grep_field_inx] = df[grep_field_inx].str[remove_head:].str[:remove_tail]
except:
df = pd.DataFrame()
finally:
return df
def process_la_result(result):
"This function processes data returned from Azure LogAnalyticsDataClient, it returns pandas DataFrame."
json_result = result.as_dict()
cols = pd.json_normalize(json_result['tables'][0], 'columns')
final_result = pd.json_normalize(json_result['tables'][0], 'rows')
if final_result.shape[0] != 0:
final_result.columns = cols.name
return final_result
def set_continuation_flag(flag):
"Set continuation flag message"
if flag == False:
print("continuation flag is false.")
return flag
def validate_input(regex, text):
"User input validation"
import re
pattern = re.compile(regex, re.I)
if text == None:
print("No Input found.")
return False;
elif not re.fullmatch(pattern, text):
print("Input validation failed.")
return False;
else:
return True;
# Calling the above function to populate Microsoft Sentinel workspace parameters
# The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables
tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json');
print("Current Microsoft Sentinel Workspace: " + workspace_name)
# Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site.
# You may add [--tenant $tenant_id] to the command
if has_valid_token() != None:
message = '**The refresh token has expired. <br> Please continue your login process. Then: <br> 1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart"; <br> 2. Otherwise, you may just restart the kernel from top menu. <br> Finally, close and re-load the notebook, then re-run cells one by one from the top.**'
display(Markdown(message))
!echo -e '\e[42m'
!az login --tenant $tenant_id --use-device-code
# Initializing Azure Storage and Azure Resource Python clients
resource_client = ResourceManagementClient(AzureCliCredential(), subscription_id = subscription_id)
# Set continuation_flag
if resource_client == None:
continuation_flag = set_continuation_flag(False)
else:
continuation_flag = set_continuation_flag(True)
!az account set --subscription $subscription_id
print('Successfully signed in.')
In this section, you first select an Azure resource group, then select an Azure Synapse workspace.
# 1. Select Azure Resource Group for Synapse
if continuation_flag:
group_list = resource_client.resource_groups.list()
synapse_group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in group_list]), description='Groups:')
display(synapse_group_dropdown)
# 2. Select an Azure Synapse workspace
if continuation_flag and synapse_group_dropdown.value != None:
response = !az synapse workspace list --subscription $subscription_id --resource-group $synapse_group_dropdown.value
if response!= None:
name_list = convert_slist_to_dataframe(response, '"name', 0, 13, -2)
if len(name_list) > 0:
synapse_workspace_dropdown = ipywidgets.Dropdown(options=name_list[0], description='Synapse WS:')
display(synapse_workspace_dropdown)
else:
print("No workspace found, please select one Resource Group with Synapse workspace.")
else:
continuation_flag = False
print("Please create Azure Synapse Analytics Workspace before proceeding to next.")
else:
continuation_flag = False
print("Need to have a Azure Resource Group to proceed.")
In this section, you will create an Spark pool if you don't have one yet.
new_spark_pool_name = input("New Spark pool name:")
# 1. !!PROCEED THIS ONLY WHEN YOU WANT TO: Create an Azure Synapse Spark Pool!!
if continuation_flag and validate_input(r"[A-Za-z0-9]{1,15}", new_spark_pool_name):
!az synapse spark pool create --name $new_spark_pool_name --subscription $subscription_id \
--workspace-name $synapse_workspace_dropdown.value \
--resource-group $synapse_group_dropdown.value \
--spark-version 3.1 --node-count 3 --node-size Small --debug
print('====== Task completed. ======')
elif continuation_flag:
print("Please enter a valid Spark pool name.")
Run the cell below to select a Spark pool that you want to use from the Spark pool list.
# 2. List Azure Synapse Spark Pool
if continuation_flag and synapse_workspace_dropdown.value != None:
response_pool = !az synapse spark pool list --resource-group $synapse_group_dropdown.value --workspace-name $synapse_workspace_dropdown.value --subscription $subscription_id
if response_pool!= None and len(response_pool.grep("ERROR: AADSTS70043")) == 0:
pool_list = convert_slist_to_dataframe(response_pool, '"name', 0, 13, -2)
if len(pool_list) > 0:
spark_pool_dropdown = ipywidgets.Dropdown(options=pool_list[0], description='Spark Pools:')
display(spark_pool_dropdown)
else:
print("First make sure you have logged into the system.")
else:
continuation_flag = False
print("Need to have a Azure Spnapse Workspace to proceed.")
In this section, you will create a linked service, to link the selected Azure ML workspace to the selected Azure Synapse workspace, you need to be an owner of the selected Synapse workspace to proceed. You then can attached a Spark pool to the linked service.
# Select Azure Resource Group for Azure ML
if continuation_flag:
aml_group_list = resource_client.resource_groups.list()
aml_group_dropdown = ipywidgets.Dropdown(options=sorted([g.name for g in aml_group_list]), description='Groups:')
display(aml_group_dropdown)
# Select Azure ML Workspace
if continuation_flag and aml_group_dropdown.value != None:
aml_workspace_result = Workspace.list(subscription_id=subscription_id, resource_group=aml_group_dropdown.value)
if aml_workspace_result != None:
aml_workspace_dropdown = ipywidgets.Dropdown(options=sorted(list(aml_workspace_result.keys())), description='AML WS:')
display(aml_workspace_dropdown)
else:
continuation_flag = False
print("Need to have a Azure Resource Group to proceed.")
# Get Linked services for selected AML workspace
if continuation_flag and aml_workspace_dropdown.value != None:
has_linked_service = False
aml_workspace = Workspace.get(name=aml_workspace_dropdown.value, subscription_id=subscription_id, resource_group=aml_group_dropdown.value)
aml_synapse_linked_service_list = LinkedService.list(aml_workspace)
if aml_synapse_linked_service_list != None:
for ls_name in [ls.name for ls in aml_synapse_linked_service_list]:
display(ls_name)
has_linked_service = True
else:
print("No linked service")
continuation_flag = False
** EXECUTE THE FOLLOWING CELL ONLY WHEN YOU WANT TO: Create a new AML - Synapse linked service! ** ** Owner role of the Synapse workspace is required to create a linked service. **
linked_service_name=input('Linked service name:')
# !!PROCEED THIS ONLY WHEN YOU WANT TO: Create new linked service!!
if continuation_flag and aml_workspace != None and synapse_workspace_dropdown.value != None and linked_service_name != None:
# Synapse Link Service Configuration
synapse_link_config = SynapseWorkspaceLinkedServiceConfiguration(subscription_id = aml_workspace.subscription_id, resource_group = synapse_group_dropdown.value, name= synapse_workspace_dropdown.value)
# Link workspaces and register Synapse workspace in Azure Machine Learning
linked_service = LinkedService.register(workspace = aml_workspace, name = linked_service_name, linked_service_config = synapse_link_config)
** EXECUTE THE FOLLOWING CELL ONLY WHEN YOU WANT TO: Attach the selected Spark pool to the newly created linked service! **
synapse_compute_name=input('Synapse compute name:')
# !!PROCEED THIS ONLY WHEN YOU WANT TO: Attach the selected Spark pool to the above linked service
if continuation_flag and aml_workspace != None and synapse_workspace_dropdown.value != None and linked_service != None and spark_pool_dropdown.value != None and synapse_compute_name != None:
spark_attach_config = SynapseCompute.attach_configuration(linked_service, type='SynapseSpark', pool_name=spark_pool_dropdown.value)
synapse_compute = ComputeTarget.attach(workspace = aml_workspace, name= synapse_compute_name, attach_configuration= spark_attach_config)
synapse_compute.wait_for_completion()
In this section, you can export Microsoft Sentinel data in Log Analytics to a selected ADLS Gen2 storage.
# 1. Initialzie Azure LogAnalyticsDataClient, which is used to access Microsoft Sentinel log data in Azure Log Analytics.
# You may need to change resource_uri for various cloud environments.
resource_uri = "https://api.loganalytics.io"
la_client = get_client_from_cli_profile(LogAnalyticsManagementClient, subscription_id = subscription_id)
creds, _ = get_azure_cli_credentials(resource=resource_uri)
la_data_client = LogAnalyticsDataClient(creds)
# 2. Get all tables available using Kusto query language. If you need to know more about KQL, please check out the link provided at the introductory section.
tables_result = None
table_list = None
all_tables_query = "union withsource = SentinelTableName * | distinct SentinelTableName | sort by SentinelTableName asc"
if la_data_client != None:
tables_result = la_data_client.query(workspace_id, QueryBody(query=all_tables_query))
if tables_result != None:
table_list = process_la_result(tables_result)
tables = sorted(table_list.SentinelTableName.tolist())
table_dropdown = ipywidgets.SelectMultiple(options=tables, row = 5, description='Tables:')
display(table_dropdown)
# 3. List AzureBlobFS Storage URL in Synapse linked service
if continuation_flag and synapse_workspace_dropdown.value != None:
synapse_linked_service_response = !az synapse linked-service list --workspace-name $synapse_workspace_dropdown.value
sls_list = convert_slist_to_dataframe(synapse_linked_service_response, '"url', 0, 14, -1)
if len(sls_list) > 0:
synapse_linked_service_dropdown = ipywidgets.Dropdown(options=sls_list[0], description='ADLS URL:')
display(synapse_linked_service_dropdown)
else:
continuation_flag = False
print("Please create Azure Synapse linked service for storage before proceeding to next.")
else:
continuation_flag = False
print("Need to have a Azure Synapse workspace to proceed.")
# 4. Set target ADLS Gen2 storage as data export destination
if continuation_flag and synapse_linked_service_dropdown.value != None:
adls_gen2_name = urlparse(synapse_linked_service_dropdown.value).netloc.split('.')[0]
if continuation_flag and adls_gen2_name == None:
# You may set ADLS Gen2 manually here:
adls_gen2_name = ""
if continuation_flag and synapse_group_dropdown.value != None and adls_gen2_name != None:
adls_resource_id = '/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.Storage/storageAccounts/{2}'.format(subscription_id, synapse_group_dropdown.value, adls_gen2_name)
else:
continuation_flag = False
print("Need to have a resource group and an ADLS Gen2 account to continue.")
# 5. List all data export rules
# Keep in mind that you cannot a destination that is already defined in a rule. Destination (resource id) must be unique across export rules in your workspace!!
if continuation_flag:
export_response = !az monitor log-analytics workspace data-export list --resource-group $resource_group --workspace-name $workspace_name
if export_response != None:
export_list = convert_slist_to_dataframe(export_response, '"resourceId', 0, 19, -2)
if len(export_list) > 0:
data_export_dropdown = ipywidgets.Dropdown(options=export_list[0], description='Data Exports:')
display(data_export_dropdown)
else:
print("No data export rule was found")
else:
print("No data export rule was found, you may create one in the following step.")
** EXECUTE THE FOLLOWING CELL ONLY WHEN YOU WANT TO: Export data tables from Log Analytics to the selected Azure Data Lake Storage Gen 2! **
export_name=input('Export name:')
# 6. !!PROCEED THIS ONLY WHEN YOU WANT TO: Export data from Log Analytics to Azure Data Lake Storage Gen 2
if continuation_flag and adls_resource_id != None and table_dropdown.value != None and export_name != None:
tables = " ".join(table_dropdown.value)
!az monitor log-analytics workspace data-export create --resource-group $resource_group --workspace-name $workspace_name \
--name $export_name --tables $tables --destination $adls_resource_id
These are optional steps.
# List Log Analytics data export rules
if continuation_flag:
export_response = !az monitor log-analytics workspace data-export list --resource-group $resource_group --workspace-name $workspace_name
if export_response != None:
export_rule_list = convert_slist_to_dataframe(export_response, '"name', 0, 13, -2)
if len(export_rule_list) > 0:
export_rule_dropdown = ipywidgets.Dropdown(options=export_rule_list[0], description='Export Rules:')
display(export_rule_dropdown)
** EXECUTE THE FOLLOWING CELL ONLY WHEN YOU WANT TO: Delete a data export rule by name! **
# 2-b. Delete a Log Analytics data export rule
if continuation_flag and export_rule_dropdown.value != None:
result = !az monitor log-analytics workspace data-export delete --resource-group $resource_group --workspace-name $workspace_name --name $export_rule_dropdown.value --yes
print(result)