Notebook Version: 1.0
Python Version: Python 3.8
Required Packages: azure_sentinel_utilities, damerauLevenshtein, azureml-synapse
Platforms Supported: Azure Synapse Workspace, Azure Sentinel, Azure Log Analytics Workspace, Storage Account, Azure Machine Learning Notebooks connected to Azure Synapse Workspace
Data Source Required: Yes
Data Source: SecurityEvents
Spark Version: 3.1 or above
This notebook demonstrates how to apply custom machine learning algorithms to data in Azure Sentinel. It showcases a Masquerading Process Name anomaly algorithm, which looks for Windows process creation events for processes whose names are similar to known normal processes. It is a very common attack vector for malicious processes to masquerade as known normal processes by having names similar to known normal ones but different by a single character. Since these are easy to miss when simply looked at, they can succeed at running malicious code on your machine. Examples of such malicious processes are scvhost.exe, svch0st.exe, etc. -> Known normal process here was svchost.exe.
The data used here is from the SecurityEvents table with EventID = 4688. These correspond to process creation events from Windows machines.
You will have to export this data from your Log Analytics workspace into a storage account. Instructions for this LA export mechanism can be found here: LA export mechanism.
Here is a Blog explaining data export
Data is then loaded from this storage account container and the results are published to your Log Analytics resource.
This notebook can be run either from the AML platform or directly off of Synapse. Based on what you choose, the setup will differ. Please follow either section A or B, that suits you, for setup before running the main pyspark code.
You will need to configure your environment to use a Synapse cluster with your AML workspace. For this, you require to setup the Synapse compute and attach the necessary packages/wheel files. Then, for the rest of the code, you need to convert to using Synapse language by marking each cell with a %%synapse header.
Steps:
from azureml.core import Workspace, LinkedService
You will have to setup the AML compute that is attached to your notebook with some packages so that the rest of this code can run properly.
# Run the following line and confirm that 'azureml-synapse' is not installed.
%pip list
# Now run the following line and then restart the kernel/compute so that the package is installed.
%pip install azureml-synapse
# Rerun the following line and confirm that 'azureml-synapse' is installed.
%pip list
Please use notebook Configurate Azure ML and Azure Synapse Analytics to configure environment.
The notebook will configure existing Azure synapse workspace to create and connect to Spark pool. You can then create linked service and connect AML workspace to Azure Synapse workspaces. You can skip point 6 which exports data from Log Analytics to Datalake Storage Gen2 because you have already set up the data export to the storage account above.
Note: Specify the input parameters in below step in order to connect AML workspace to synapse workspace using linked service.
amlworkspace = "<aml workspace name>" # fill in your AML workspace name
subscription_id = "<subscription id>" # fill in your subscription id
resource_group = '<resource group of AML workspace>' # fill in your resource groups for AML workspace
linkedservice = '<linked service name>' # fill in your linked service created to connect to synapse workspace
Authentication to Azure Resources:
In this step we will connect aml workspace to linked service connected to Azure Synapse workspace
# Get the aml workspace
aml_workspace = Workspace.get(name=amlworkspace, subscription_id=subscription_id, resource_group=resource_group)
# Retrieve a known linked service
linked_service = LinkedService.get(aml_workspace, linkedservice)
You will have to setup the spark pool that is attached to your notebook with some packages so that the rest of this code can run properly.
Please follow these steps:
a. Create a requirements.txt with the following line in it and upload it to the Requirements section
fastDamerauLevenshtein
b. Download the azure_sentinel_utilities whl package from Repo
First upload this package in the 'Workspace packages' in the left tab of the original blade.
c. Then select this package from there in this tab.
Enter your Synapse Spark compute below. To find the Spark compute, please follow these steps:
synapse_spark_compute = input('Synapse Spark compute:')
# Start spark session
%synapse start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute
You will need to attach the required packages and wheel files to the cluster you intend to use with this notebook. Follow Step 3 above to complete this.
From here on below, all the steps are the same for both AML and Synapse platforms. The main difference is, if you have setup through AML then pre-pend each pyspark block with the synapse header %%synapse. For Synapse runs, don't add that header.
One-time: Set credentials in KeyVault so the notebook can access
Ensure the settings in the cell below are filled in.
%%synapse
from azure_sentinel_utilities.azure_storage import storage_blob_manager
from azure_sentinel_utilities.log_analytics import log_analytics_client
import re
import datetime as dt
import time
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window
from fastDamerauLevenshtein import damerauLevenshtein
import random
import string
These are some customizable variables which are used further in the code.
%%synapse
frequentThreshold = 0.8 # If the percentile of a process's creation count is above this threshold, it is considered a frequent and normal process.
infrequentThreshold = 0.2 # If the percentile of a process's creation count is lower than this threshold, it is considered an infrequent and possibly malicious process.
levenDistThreshold = 0.85 # Higher the levenshtein distance, more similar are the two sequences. This threshold will help you select only the very similar processes and remove the noise.
Making Connections to the Storage Account and KeyVaults for user credentials
%%synapse
#Log Analytics WorkSpace (Sentinel) to write the results
workspaceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # wks_guid
workspaceSharedKey = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsSharedKey', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE')
workspaceResourceId = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'wsResourceId', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE') # eg: /subscriptions/<sub_guid>/resourcegroups/<rg_name>/providers/microsoft.operationalinsights/work
#extract storage account and key from connection string
connectionString = mssparkutils.credentials.getSecret(keyVault = 'YOUR_KEYVAULT_HERE', keyName = 'saConnectionString', linkedServiceName = 'YOUR_LINKED_SERVICE_HERE')
print("Connection String to your storage account is : ", connectionString)
keyPattern = 'DefaultEndpointsProtocol=(\w+);AccountName=(\w+);AccountKey=([^;]+);'
match = re.match(keyPattern, connectionString)
storageAccount = match.group(2)
storageKey = match.group(3)
print("Storage Account is : ", storageAccount)
print("Storage Key is : ", storageKey)
containerName = "am-securityevent" # This name is fixed for security events
basePath = "WorkspaceResourceId={workspaceResourceId}".format(workspaceResourceId=workspaceResourceId)
print("BasePath is : ", basePath)
startTime = dt.datetime.now() - dt.timedelta(days=1)
endTime = dt.datetime.now() - dt.timedelta(days=0)
startTimeStr = startTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("Start Time of Algo run is : ", startTime)
endTimeStr = endTime.strftime("%m/%d/%Y, %I:%M:%S.%f %p")
print("End Time of Algo run is : ", endTime)
This cell defines the helper functions.
%%synapse
def calcDist(one, two):
oneDot = -1
twoDot = -1
if(one is not None and two is not None):
oneDot = one.rfind('.')
twoDot = two.rfind('.')
if((oneDot != -1) and (twoDot != -1)):
oneEnd = one[oneDot+1:]
twoEnd = two[twoDot+1:]
if(oneEnd == twoEnd):
one = one[:oneDot]
two = two[:twoDot]
return damerauLevenshtein(one, two)
return 0.0
calcDistUdf = F.udf(calcDist, T.FloatType())
def getRandomTimeStamp():
input_time_format = '%m/%d/%Y, %I:%M:%S.%f %p'
output_time_format = '%m/%d/%Y, %I:%M:%S.000 %p'
randAdd = random.random()
stime = time.mktime(time.strptime(startTimeStr, input_time_format))
etime = time.mktime(time.strptime(endTimeStr, input_time_format))
ptime = stime + randAdd * (etime - stime)
return time.strftime(output_time_format, time.localtime(ptime))
getRandomTimeStampUdf = F.udf(getRandomTimeStamp, T.StringType())
def getListGoodProcs():
return [
"svchost", "csrss", "smss", "services",
"winlogon", "wininit", "lsass",
"spoolsv", "conhost", "powershell",
"init", "bash", "avahi-daemon", "gnome-session", "getty",
"acpid", "dbus-daemon", "dbus-launch", "networkmanager",
"explorer", "more.com", "mode.com",
"nbtstat", "netstat", "cscript", "cnscript",
"tracert", "tracerpt", "systeminfo", "system_info",
"aitagent", "adtagent", "wininit", "wininst",
"userinit", "userinst", "dnscmd", "dfscmd",
"nslookup", "nblookup"
]
def getListKnownProcs():
goodProcs = getListGoodProcs()
goodProcs = [s + ('.exe' if not '.' in s else '') for s in goodProcs]
df = spark.createDataFrame(goodProcs, T.StringType())
df = df.withColumnRenamed("value", "Process")
return df
def getSyntheticMaliciousProcs():
badProcs = []
goodProcs = getListGoodProcs()
for process in goodProcs:
length = len(process)
randNum = random.randint(1, length-2) # not changing first or last letter because that is easier to spot
# get other random integer
randLetter = random.choice(string.ascii_lowercase)
# substitute original letter with random letter
temp = list(process)
temp[randNum] = randLetter
badProcs = badProcs + ["".join(temp)]
badProcs = [s + ('.exe' if not '.' in s else '') for s in badProcs]
badProcs = badProcs + ["scvhost.exe", "svch0st.exe"] # adding known masquerading processes
df = spark.createDataFrame(badProcs, T.StringType())
df = df.withColumnRenamed("value", "Process")
return df
def getSyntheticEvents(typeOfEvent):
processPath = ""
numExplode = 1
if(typeOfEvent == "normal"):
processPath = "C:\Windows\System32\\"
df = getListKnownProcs()
numExplode = 20
elif(typeOfEvent == "malicious"):
processPath = "C:\Windows\Temp\\"
df = getSyntheticMaliciousProcs()
df = df.withColumn("NewProcessName", F.concat(F.lit(processPath), F.col("Process")))
df = df.withColumn("NumExplode", F.lit(numExplode))
df = df.withColumn("EventID", F.lit("4688"))
new_df = df.withColumn('NumExplode', F.expr('explode(array_repeat(NumExplode,int(NumExplode)))')).drop("NumExplode")
new_df = new_df.withColumn("TimeGenerated", getRandomTimeStampUdf())
new_df = new_df.withColumn("From", F.lit("Hardcoded"))
new_df = new_df.select("NewProcessName", "Process", "TimeGenerated", "From")
return new_df
Next, we define the schema of the input and get the raw customer 4688 events. We are using the following details: EventID, NewProcessName, Process, TimeGenerated.
%%synapse
def security_event_schema():
return T.StructType([
T.StructField(name = "EventID", dataType = T.StringType(), nullable = True),
T.StructField(name = "NewProcessName", dataType = T.StringType(), nullable = True),
T.StructField(name = "Process", dataType = T.StringType(), nullable = True),
T.StructField(name = "TimeGenerated", dataType = T.StringType(), nullable = True),
])
blobManager = storage_blob_manager(connectionString)
raw_df = blobManager.get_raw_df(startTime, endTime, containerName, basePath, security_event_schema(), blobManager.get_blob_service_client(connectionString))
final = raw_df.where(F.col("EventID") == "4688")
final = final.withColumn("Process", F.lower("Process"))
final = final.drop("EventID")
final = final.withColumn("From", F.lit("User"))
final = final.cache()
print("There are ", final.count(), " events of type 4688 to process.")
final.show()
Here we append synthetically created normal and malicious process creation events. This is being done to show performance of this algorithm by ensuring some masquerading process names are caught.
%%synapse
normalEvents = getSyntheticEvents("normal")
potentiallyMaliciousEvents = getSyntheticEvents("malicious")
final = final.union(normalEvents).union(potentiallyMaliciousEvents)
print("Count of SecurityEvents + Synthethically created 4688 events are : ", final.count())
We are comparing frequent to infrequent processes to decide maliciousness of a process.
The approach here is that we consider processes occuring more than 'frequentThreshold' percentile of the time as normal and those occuring less than 'infrequentThreshold' percentile of the time as potentially malicious. Those in the middle range are excluded from analysis because they fall in the grey area of being of relatively high popularity but falling below the first threshold.
The values of these thresholds can be customized by you based on your needs.
%%synapse
groupProcess = final.groupBy("Process").count().sort(F.desc("count"))
groupProcess = groupProcess.select("Process","count", F.percent_rank().over(Window.partitionBy().orderBy(groupProcess['count'])).alias("percent_rank"))
groupProcess = groupProcess.sort(F.desc("percent_rank"))
frequentProcess = groupProcess.where(F.col("percent_rank") >= frequentThreshold).select("Process")
frequentProcess = frequentProcess.withColumnRenamed("Process", "frequentProcess")
infrequentProcess = groupProcess.where(F.col("percent_rank") < infrequentThreshold).select("Process")
infrequentProcess = infrequentProcess.withColumnRenamed("Process", "infrequentProcess")
print("There are ", frequentProcess.count(), " normal processes in your data")
print("There are ", infrequentProcess.count(), " potentially malicious processes in your data")
print("Examples of some potentially malicious processes: ")
infrequentProcess.show()
Next we find the Levenshtein distance between the normal and potentially malicious processes to check whether we have any masquerading processes.
%%synapse
compare = frequentProcess.crossJoin(infrequentProcess)
compare = compare.withColumn("Dist", calcDistUdf(F.col("frequentProcess"), F.col("infrequentProcess")))
print("Showing the Levenshtein distances between various processes")
compare.show()
It is always useful to have the corresponding process paths from where the processes spawned to understand maliciousness of the process. This cell finds the paths of all the processes, for context. We also filter based on a threshold values which you can alter to better fit your criteria.
%%synapse
frequentProcessWhole = compare.join(final, (final.Process == compare.frequentProcess), how = "left").drop("Process")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("NewProcessName", "frequentProcessPath")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("TimeGenerated", "frequentTimeGenerated")
frequentProcessWhole = frequentProcessWhole.withColumnRenamed("From", "frequentFrom")
infrequentProcessWhole = frequentProcessWhole.join(final, (final.Process == frequentProcessWhole.infrequentProcess), how = "left").drop("Process")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("NewProcessName", "infrequentProcessPath")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("TimeGenerated", "infrequentTimeGenerated")
infrequentProcessWhole = infrequentProcessWhole.withColumnRenamed("From", "infrequentFrom")
infrequentProcessWholeFiltered = infrequentProcessWhole.where(F.col("Dist") > levenDistThreshold)
print("Your anomalies: ")
(infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "User") & (F.col("infrequentFrom") == "User"))).show()
print("Hardcoded anomalies examples")
(infrequentProcessWholeFiltered.where((F.col("frequentFrom") == "Hardcoded") | (F.col("infrequentFrom") == "Hardcoded"))).show()
To remove noise, we are extracting only the process names and path information of the potentially malicious process names.
%%synapse
print("Showing potential anomalies after removing noise")
(infrequentProcessWholeFiltered.drop("frequentTimeGenerated", "infrequentTimeGenerated").distinct()).show()
Sending results to Log Analytics
%%synapse
def escape_str(str):
return str.replace('\\','\\\\')
escape_strUdf = F.udf(escape_str, T.StringType())
def send_results_to_log_analytics(df_to_la):
# The log type is the name of the event that is being submitted. This will show up under "Custom Logs" as log_type + '_CL'
log_type = 'MasqueradingProcessNameResult'
df_to_la = df_to_la.withColumn('timestamp', F.current_timestamp())
# concatenate columns to form one json record
json_records = df_to_la.withColumn('json_field', F.concat(F.lit('{'),
F.lit(' \"TimeStamp\": \"'), F.from_unixtime(F.unix_timestamp(F.col("timestamp")), "y-MM-dd'T'hh:mm:ss.SSS'Z'"), F.lit('\",'),
F.lit(' \"NormalProcess\": \"'), escape_strUdf(F.col('frequentProcess')), F.lit('\",'),
F.lit(' \"PotentiallyMaliciousProcess\": \"'), escape_strUdf(F.col('infrequentProcess')), F.lit('\",'),
F.lit(' \"AnomalyScore\":'), F.col('Dist'),
F.lit('}')
)
)
# combine json record column to create the array
json_body = json_records.agg(F.concat_ws(", ", F.collect_list('json_field')).alias('body'))
if len(json_body.first()) > 0:
json_payload = json_body.first()['body']
json_payload = '[' + json_payload + ']'
payload = json_payload.encode('utf-8')
return log_analytics_client(workspaceId, workspaceSharedKey).post_data(payload, log_type)
else:
return "No json data to send to LA"
%%synapse
print("Sending results to LogAnalytics")
print("Sending ", infrequentProcessWholeFiltered.count(), " results to Log Analytics")
send_results_to_log_analytics(infrequentProcessWholeFiltered)
print("Done")
# Run the following line if you have been running through AML
%synapse stop