Notebook Version: 1.0
Python Version: Python 3.7 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, numpy, matplotlib, networkx, ipywidgets, ipython, scikit_learn, dnspython, ipwhois, folium, holoviews
Data Sources Required:
Log Analytics
(Optional)
This Notebook assists defenders in hunting for Solarwinds Post compromise Tactics , Tools and Procedures (TTPs) across different environments both on-prem and cloud data sources.
You can read more about the attack in below technical blogs
References :
How to use:
Run the cells in this Notebook in order, at various points in the Notebook flow you will be prompted to enter or select options relevant to the scope of your triage.
This Notebook presumes you have Microsoft Sentinel Workspace settings and Threat Intelligence providers configured in a config file. If you do not have this in place please refer https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html#configuration-file to https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb and to set this up.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
from pathlib import Path
from IPython.display import display, HTML
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(1, 0, 0)
display(HTML("<h3>Starting Notebook setup...</h3>"))
# If not using Azure Notebooks, install msticpy with
# %pip install msticpy
from msticpy.nbtools import nbinit
additional_packages = [
"tldextract", "IPWhois", "python-whois"
]
nbinit.init_notebook(
namespace=globals(),
additional_packages=additional_packages,
);
from datetime import date
from functools import lru_cache
from msticpy.nbtools.foliummap import get_map_center, get_center_ip_entities
from pandas import json_normalize
from ruamel.yaml import YAML
from tqdm.notebook import tqdm
import IPython
import csv
import dns
import glob
import io
import json
import re
import requests
import tldextract
import whois
import zipfile
from bokeh.plotting import figure
# See if we have a Microsoft Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs
ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
ws_config.prompt_for_ws()
qry_prov = QueryProvider(data_environment="AzureSentinel")
print("done")
ti = TILookup()
msticpyconfig.yaml
configuration File¶You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml
file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG
) pointing to its location.
To configure this file see the ConfigureNotebookEnvironment notebook
# Authenticate to Microsoft Sentinel workspace
qry_prov.connect(ws_config)
def get_solarwinds_queries_from_github(git_url, outputdir):
r = requests.get(git_url)
repo_zip = io.BytesIO(r.content)
archive = zipfile.ZipFile(repo_zip, mode="r")
# Only extract Detections and Hunting Queries Folder
for file in archive.namelist():
if file.startswith(
(
"Azure-Sentinel-master/Detections/DeviceEvents/SolarWinds_TEARDROP_Process-IOCs.yaml",
"Azure-Sentinel-master/Detections/DeviceNetworkEvents/SolarWinds_SUNBURST_Network-IOCs.yaml",
"Azure-Sentinel-master/Detections/DeviceProcessEvents/SolarWinds_SUNBURST_Process-IOCs.yaml",
"Azure-Sentinel-master/Detections/DeviceFileEvents/SolarWinds_SUNBURST_&_SUPERNOVA_File-IOCs.yaml",
"Azure-Sentinel-master/Detections/AuditLogs/MaliciousOAuthApp_O365AttackToolkit.yaml",
"Azure-Sentinel-master/Detections/AuditLogs/MaliciousOAuthApp_PwnAuth.yaml",
"Azure-Sentinel-master/Detections/AuditLogs/UseraddedtoPrivilgedGroups.yaml",
"Azure-Sentinel-master/Detections/AuditLogs/ADFSDomainTrustMods.yaml",
"Azure-Sentinel-master/Detections/AuditLogs/RareApplicationConsent.yaml",
"Azure-Sentinel-master/Detections/SigninLogs/AzureAADPowerShellAnomaly.yaml",
"Azure-Sentinel-master/Detections/OfficeActivity/MailItemsAccessedTimeSeries.yaml",
"Azure-Sentinel-master/Detections/SecurityEvent/RDP_RareConnection.yaml",
"Azure-Sentinel-master/Detections/SecurityEvent/RDP_Nesting.yaml",
"Azure-Sentinel-master/Detections/SecurityEvent/UserCreatedAddedToBuiltinAdmins_1d.yaml",
"Azure-Sentinel-master/Hunting Queries/SecurityEvent/HostsWithNewLogons.yaml",
"Azure-Sentinel-master/Hunting Queries/MultipleDataSources/TrackingPrivAccounts.yaml",
"Azure-Sentinel-master/Hunting Queries/SecurityEvent/ProcessEntropy.yaml",
"Azure-Sentinel-master/Hunting Queries/SecurityEvent/RareProcbyServiceAccount.yaml",
"Azure-Sentinel-master/Hunting Queries/SecurityEvent/uncommon_processes.yaml"
)
):
archive.extract(file, path=outputdir)
print("Downloaded and Extracted Files successfully")
def_path = Path.joinpath(Path(os.getcwd()))
path_wgt = widgets.Text(value=str(def_path),
description='Path to extract to zipped repo files: ',
layout=widgets.Layout(width='50%'),
style={'description_width': 'initial'})
path_wgt
# Download the Microsoft Sentinel Github repo as ZIP
azsentinel_git_url = 'https://github.com/Azure/Azure-Sentinel/archive/master.zip'
get_solarwinds_queries_from_github(git_url=azsentinel_git_url, outputdir=path_wgt.value)
QUERIES_PATH = 'Azure-Sentinel-master'
sentinel_root = Path(path_wgt.value) / QUERIES_PATH
display(HTML("<h3>Listings under Detections...</h2>"))
%ls '{sentinel_root}/Detections/'
display(HTML("<h3>Listings under Hunting Queries...</h2>"))
%ls '{sentinel_root}/Hunting Queries/'
def parse_yaml(parent_dir, child_dir):
sentinel_repourl = "https://github.com/Azure/Azure-Sentinel/blob/master"
# Collect list of files recusrively uinder a folder
yaml_queries = glob.glob(f"{parent_dir}/{child_dir}/**/*.yaml", recursive=True)
df = pd.DataFrame()
# Parse and load yaml
parsed_yaml = YAML(typ="safe")
# Recursively load yaml Files and append to dataframe
for query in yaml_queries:
with open(query, "r", encoding="utf-8", errors="ignore") as f:
parsed_yaml_df = json_normalize(parsed_yaml.load(f))
parsed_yaml_df["DetectionURL"] = query.replace(parent_dir, sentinel_repourl)
df = df.append(parsed_yaml_df, ignore_index=True, sort=True)
if child_dir == "Detections":
df["DetectionType"] = "Analytics"
elif child_dir == "Hunting Queries":
df["DetectionType"] = "Hunting"
df["DetectionService"] = "Microsoft Sentinel Community Github"
return df
base_dir = path_wgt.value + "/Azure-Sentinel-master"
detections_df = parse_yaml(parent_dir=base_dir, child_dir="Detections")
hunting_df = parse_yaml(parent_dir=base_dir, child_dir="Hunting Queries")
frames = [detections_df, hunting_df]
sentinel_github_df = pd.concat(frames).reset_index(drop=True)
sentinel_github_df = sentinel_github_df.copy()
sentinel_github_df["DetectionURL"] = sentinel_github_df["DetectionURL"].str.replace(
" ", "%20", regex=True
)
sentinel_github_df["IngestedDate"] = date.today()
# Displaying basic statistics of yaml files
display(HTML("<h3>Microsoft Sentinel Github Stats...</h3>"))
print(
f"""Total Queries in Microsoft Sentinel Github:: {len(sentinel_github_df)}
No of Detections :: {len(detections_df)}
No of Hunting Queries:: {len(hunting_df)}
"""
)
display(sentinel_github_df.head())
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "TEARDROP memory-only dropper"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
teardrop_df = qry_prov.exec_query(query)
teardrop_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "SUNBURST and SUPERNOVA backdoor hashes"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
backdoor_df = qry_prov.exec_query(query)
backdoor_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "SUNBURST network beacons"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
nwbeacon_df = qry_prov.exec_query(query)
nwbeacon_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "SUNBURST suspicious SolarWinds child processes"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
swprocess_df = qry_prov.exec_query(query)
swprocess_df
To hunt for similar TTPs used in this attack, a good place to start is to build an inventory of the machines that have SolarWinds Orion components. Organizations might already have a software inventory management system to indicate hosts where the SolarWinds application is installed. Alternatively, Microsoft Sentinel could be leveraged to run a simple query to gather similar details. Below query will pull the hosts with SolarWinds process running in last 30 days based on Process execution/Sysmon logs either via host onboarded to Sentinel or onboarded via Microsoft Defender for Endpoints (MDE)
# Gather Solarwinds details based on Process execution Logs from diverse data sources
solarwinds_query = f"""
let timeframe = 30d;
(union isfuzzy=true
(
SecurityEvent
| where TimeGenerated >= ago(timeframe)
| where EventID == '4688'
| where tolower(NewProcessName) has 'solarwinds'
| extend MachineName = Computer , Process = NewProcessName
),
(
DeviceProcessEvents
| where TimeGenerated >= ago(timeframe)
| where tolower(InitiatingProcessFolderPath) has 'solarwinds'
| extend MachineName = DeviceName , Process = InitiatingProcessFolderPath
),
(
Event
| where TimeGenerated >= ago(timeframe)
| where Source == "Microsoft-Windows-Sysmon"
| where EventID == 1
| extend Image = EventDetail.[4].["#text"]
| where tolower(Image) has 'solarwinds'
| extend MachineName = Computer , Process = Image
)
)
| summarize StartTime = min(TimeGenerated), EndTime = max(TimeGenerated), make_set(Process) by MachineName
"""
print("Collecting data...")
solarwinds_assets = qry_prov.exec_query(solarwinds_query)
md(f'Solarwinds Server Details Gathered', styles=["bold", "large"])
display(solarwinds_assets)
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Hosts with new logons"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
hostswithnewlogons_df = qry_prov.exec_query(query)
hostswithnewlogons_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Rare RDP Connections"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
rarerdp_df = qry_prov.exec_query(query)
rarerdp_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "RDP Nesting"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
rdpnest_df = qry_prov.exec_query(query)
rdpnest_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Rare application consent"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
rareappconsent_df = qry_prov.exec_query(query)
rareappconsent_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Rare processes run by Service accounts"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
serviceaccproc_df = qry_prov.exec_query(query)
serviceaccproc_df
If you have suspicious Netblocks not expected in your organization(e.g.VPS Netblocks or Proxy severs) that you want to monior , define it in IP_Data and run below query using ipv4_lookup
kql function
query = """
SigninLogs
| where TimeGenerated > ago(360d)
| where ResultType == 0
| extend additionalDetails = tostring(Status.additionalDetails)
| evaluate ipv4_lookup(IP_Data, IPAddress, network, return_unmatched = false)
| summarize make_set(additionalDetails), min(TimeGenerated), max(TimeGenerated) by IPAddress, UserPrincipalName
| where array_length(set_additionalDetails) == 2
| where (set_additionalDetails[1] == "MFA requirement satisfied by claim in the token" and set_additionalDetails[0] == "MFA requirement satisfied by claim provided by external provider") or (set_additionalDetails[0] == "MFA requirement satisfied by claim in the token" and set_additionalDetails[1] == "MFA requirement satisfied by claim provided by external provider")
| project IPAddress, UserPrincipalName, min_TimeGenerated, max_TimeGenerated
"""
print("Collecting data...")
suspmfalogons_df = qry_prov.exec_query(query)
suspmfalogons_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "New user created and added to the built-in administrators group"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
newaccprivgroups_df = qry_prov.exec_query(query)
newaccprivgroups_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "User added to Azure Active Directory Privileged Groups"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
aadprivgroups_df = qry_prov.exec_query(query)
aadprivgroups_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Uncommon processes - bottom 5%"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
domain_trust_df = qry_prov.exec_query(query)
domain_trust_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Rare processes run by Service accounts"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
serviceaccproc_df = qry_prov.exec_query(query)
serviceaccproc_df
Check out KQL queries published by Sentinel Community at
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Modified domain federation trust settings"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
domain_trust_df = qry_prov.exec_query(query)
domain_trust_df
query = """
let auditLookback = 1h;
AuditLogs
| where TimeGenerated > ago(auditLookback)
| where OperationName has_any ("Add service principal", "Certificates and secrets management") // captures "Add service principal", "Add service principal credentials", and "Update application – Certificates and secrets management" events
| extend targetDisplayName = TargetResources[0].displayName
| extend targetId = TargetResources[0].id
| extend targetType = TargetResources[0].type
| extend keyEvents = TargetResources[0].modifiedProperties
| where keyEvents has "KeyIdentifier=" and keyEvents has "KeyUsage=Verify"
| where Result =~ "success"
| mv-expand keyEvents
| where keyEvents.displayName =~ "KeyDescription"
| parse keyEvents.newValue with * "KeyIdentifier=" keyIdentifier:string ",KeyType=" keyType:string ",KeyUsage=" keyUsage:string ",DisplayName=" keyDisplayName:string "]" *
| parse keyEvents.oldValue with * "KeyIdentifier=" keyIdentifierOld:string "," *
| where keyEvents.oldValue == "[]" or keyIdentifier != keyIdentifierOld
| where keyUsage == "Verify"
| extend UserAgent = iff(AdditionalDetails[0].key == "User-Agent",AdditionalDetails[0].value,"")
| extend InitiatingUser = iff(isnotempty(InitiatedBy.user.userPrincipalName),InitiatedBy.user.userPrincipalName, InitiatedBy.app.displayName)
| extend InitiatingIpAddress = iff(isnotempty(InitiatedBy.user.ipAddress), InitiatedBy.user.ipAddress, InitiatedBy.app.ipAddress)
//
// Adding the below filter for detection-quality events; Microsoft Sentinel users can comment out this line and tune additional service principal events for their environment
| where targetType =~ "Application"
"""
print("Collecting data...")
newkeycreds_df = qry_prov.exec_query(query)
newkeycreds_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Suspicious application consent similar to O365 Attack Toolkit"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
o365toolkit_df = qry_prov.exec_query(query)
o365toolkit_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Suspicious application consent similar to PwnAuth"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
o365pwnauth_df = qry_prov.exec_query(query)
o365pwnauth_df
query = sentinel_github_df.loc[
sentinel_github_df["name"] == "Azure Active Directory PowerShell accessing non-AAD resources"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
adpowershell_df = qry_prov.exec_query(query)
adpowershell_df
query = sentinel_github_df.loc[
sentinel_github_df["id"] == "b4ceb583-4c44-4555-8ecf-39f572e827ba"
]["query"].reset_index(drop=True)[0]
print("Collecting data...")
timeseriesmail_df = qry_prov.exec_query(query)
timeseriesmail_df
Read the Get Started notebook.
View sample notebooks in the Sample-Notebooks folder
View How-tos and Troubleshooting in the HowTos folder
https://msticpy.readthedocs.io/en/latest/GettingStarted.html
https://msticnb.readthedocs.io/en/latest/notebooklets.html#current-notebooklets
Behavior:Win32/Solorigate.C!dha threat description - Microsoft Security Intelligence
FalconFriday – Fireeye Red Team Tool Countermeasures KQL Queries
Microsoft Sentinel SolarWinds Post Compromise Hunting Workbook
[https://security.microsoft.com/threatanalytics3/2b74f636-146e-48dd-94f6-5cb5132467ca/overview]