ianhelle@microsoft.com
This notebook is a is a quick and dirty Sigma to Log Analytics converter. It uses the modules from sigmac package to do the conversion.
Only a subset of the Sigma rules are convertible currently. Failure to convert could be for one or more of these reasons:
The sigmac tool is downloadable as a package from PyPi but since we are downloading the rules from the repo, we also copy and import the package from the repo source.
After conversion you can use an interactive browser to step through the rules and view (and copy/save) the KQL equivalents. You can also take the conversion results and use them in another way (e.g.bulk save to files).
The notebook is all somewhat experimental and offered as-is without any guarantees
import requests
# Download the repo ZIP
sigma_git_url = 'https://github.com/Neo23x0/sigma/archive/master.zip'
r = requests.get(sigma_git_url)
from ipywidgets import widgets, Layout
import os
from pathlib import Path
def_path = Path.joinpath(Path(os.getcwd()), "sigma")
path_wgt = widgets.Text(value=str(def_path),
description='Path to extract to zipped repo files: ',
layout=Layout(width='50%'),
style={'description_width': 'initial'})
path_wgt
import zipfile
import io
repo_zip = io.BytesIO(r.content)
zip_archive = zipfile.ZipFile(repo_zip, mode='r')
zip_archive.extractall(path=path_wgt.value)
RULES_REL_PATH = 'sigma-master/rules'
rules_root = Path(path_wgt.value) / RULES_REL_PATH
You should see a folder with folders such as application, apt, windows...
%ls {rules_root}
# Read the Sigma YAML file paths into a dict and make a
# a copy for the target Kql queries
from pathlib import Path
from collections import defaultdict
import copy
def get_rule_files(rules_root):
file_dict = defaultdict(dict)
for file in Path(rules_root).resolve().rglob("*.yml"):
rel_path = Path(file).relative_to(rules_root)
path_key = '.'.join(rel_path.parent.parts)
file_dict[path_key][rel_path.name] = file
return file_dict
sigma_dict = get_rule_files(rules_root)
kql_dict = copy.deepcopy(sigma_dict)
# Add downloaded sigmac tool to sys.path and import Sigmac functions
import os
import sys
module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools'))
if module_path not in sys.path:
sys.path.append(module_path)
from sigma.parser.collection import SigmaCollectionParser
from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError
from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain
from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException
from sigma.filter import SigmaRuleFilter
import sigma.backends.discovery as backends
from sigma.backends.base import BackendOptions
from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
# Sigma to Log Analytics Conversion
import yaml
_LA_MAPPINGS = '''
fieldmappings:
Image: NewProcessName
ParentImage: ParentProcessName
ParentCommandLine: NO_MAPPING
'''
NOT_CONVERTIBLE = 'Not convertible'
def sigma_to_la(file_path):
with open(file_path, 'r') as input_file:
try:
sigmaconfigs = SigmaConfigurationChain()
sigmaconfig = SigmaConfiguration(_LA_MAPPINGS)
sigmaconfigs.append(sigmaconfig)
backend_options = BackendOptions(None, None)
backend = backends.getBackend('ala')(sigmaconfigs, backend_options)
parser = SigmaCollectionParser(input_file, sigmaconfigs, None)
results = parser.generate(backend)
kql_result = ''
for result in results:
kql_result += result
except (NotImplementedError, NotSupportedError):
kql_result = NOT_CONVERTIBLE
input_file.seek(0,0)
sigma_txt = input_file.read()
if not kql_result == NOT_CONVERTIBLE:
try:
kql_header = "\n".join(get_sigma_properties(sigma_txt))
kql_result = kql_header + "\n" + kql_result
except Exception as e:
print("exception reading sigma YAML: ", e)
print(sigma_txt, kql_result, sep='\n')
return sigma_txt, kql_result
sigma_keys = ['title', 'description', 'tags', 'status',
'author', 'logsource', 'falsepositives', 'level']
def get_sigma_properties(sigma_rule):
sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader)
sigma_rule_dict = next(sigma_docs)
for prop in sigma_keys:
yield get_property(prop, sigma_rule_dict)
def get_property(name, sigma_rule_dict):
sig_prop = sigma_rule_dict.get(name, 'na')
if isinstance(sig_prop, dict):
sig_prop = ' '.join([f"{k}: {v}" for k, v in sig_prop.items()])
return f"// {name}: {sig_prop}"
_KQL_FILTERS = {
'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ',
'host': ' | where Computer has {host_name} '
}
def insert_at(source, insert, find_sub):
pos = source.find(find_sub)
if pos != -1:
return source[:pos] + insert + source[pos:]
else:
return source + insert
def add_filter_clauses(source, **kwargs):
if "{" in source or "}" in source:
source = ("// Warning: embedded braces in source. Please edit if necessary.\n"
+ source)
source = source.replace('{', '{{').replace('}', '}}')
if kwargs.get('host', False):
source = insert_at(source, _KQL_FILTERS['host'], '|')
if kwargs.get('date', False):
source = insert_at(source, _KQL_FILTERS['date'], '|')
return source
# Run the conversion
conv_counter = {}
for categ, sources in sigma_dict.items():
src_converted = 0
for file_name, file_path in sources.items():
sigma, kql = sigma_to_la(file_path)
kql_dict[categ][file_name] = (sigma, kql)
if not kql == NOT_CONVERTIBLE:
src_converted += 1
conv_counter[categ] = (len(sources), src_converted)
print("Conversion statistics")
print("-" * len("Conversion statistics"))
print('\n'.join([f'{categ}: rules: {counter[0]}, converted: {counter[1]}'
for categ, counter in conv_counter.items()]))
from ipywidgets import widgets, Layout
# Browser Functions
def on_cat_value_change(change):
queries_w.options = kql_dict[change['new']].keys()
queries_w.value = queries_w.options[0]
def on_query_value_change(change):
if view_qry_check.value:
qry_text = kql_dict[sub_cats_w.value][queries_w.value][1]
if "Not convertible" not in qry_text:
qry_text = add_filter_clauses(qry_text,
date=add_date_filter_check.value,
host=add_host_filter_check.value)
query_text_w.value = qry_text.replace('|', '\n|')
orig_text_w.value = kql_dict[sub_cats_w.value][queries_w.value][0]
def on_view_query_value_change(change):
vis = 'visible' if view_qry_check.value else 'hidden'
on_query_value_change(None)
query_text_w.layout.visibility = vis
orig_text_w.layout.visibility = vis
# Function defs for ExecuteQuery cell below
def click_exec_hqry(b):
global qry_results
query_name = queries_w.value
query_cat = sub_cats_w.value
query_text = query_text_w.value
query_text = query_text.format(**qry_wgt.query_params)
disp_results(query_text)
def disp_results(query_text):
out_wgt.clear_output()
with out_wgt:
print("Running query...", end=' ')
qry_results = execute_kql_query(query_text)
print(f'done. {len(qry_results)} rows returned.')
display(qry_results)
exec_hqry_button = widgets.Button(description="Execute query..")
out_wgt = widgets.Output() #layout=Layout(width='100%', height='200px', visiblity='visible'))
exec_hqry_button.on_click(click_exec_hqry)
# Browser widget setup
categories = list(sorted(kql_dict.keys()))
sub_cats_w = widgets.Select(options=categories,
description='Category : ',
layout=Layout(width='30%', height='120px'),
style = {'description_width': 'initial'})
queries_w = widgets.Select(options = kql_dict[categories[0]].keys(),
description='Query : ',
layout=Layout(width='30%', height='120px'),
style = {'description_width': 'initial'})
query_text_w = widgets.Textarea(
value='',
description='Kql Query:',
layout=Layout(width='100%', height='300px', visiblity='hidden'),
disabled=False)
orig_text_w = widgets.Textarea(
value='',
description='Sigma Query:',
layout=Layout(width='100%', height='250px', visiblity='hidden'),
disabled=False)
query_text_w.layout.visibility = 'hidden'
orig_text_w.layout.visibility = 'hidden'
sub_cats_w.observe(on_cat_value_change, names='value')
queries_w.observe(on_query_value_change, names='value')
view_qry_check = widgets.Checkbox(description="View query", value=True)
add_date_filter_check = widgets.Checkbox(description="Add date filter", value=False)
add_host_filter_check = widgets.Checkbox(description="Add host filter", value=False)
view_qry_check.observe(on_view_query_value_change, names='value')
add_date_filter_check.observe(on_view_query_value_change, names='value')
add_host_filter_check.observe(on_view_query_value_change, names='value')
# view_qry_button.on_click(click_exec_hqry)
# display(exec_hqry_button);
vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check])
hbox = widgets.HBox([sub_cats_w, queries_w, vbox_opts])
vbox = widgets.VBox([hbox, orig_text_w, query_text_w])
on_view_query_value_change(None)
display(vbox)
Execute query
button to run the currently display query¶Notes:
from msticpy.nbtools.nbwidgets import QueryTime
qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10)
vbox = widgets.VBox([exec_hqry_button, out_wgt])
display(vbox)
qry_wgt.display()
def clean_kql_comments(query_string):
"""Cleans"""
import re
return re.sub(r'(//[^\n]+)', '', query_string, re.MULTILINE).replace('\n', '').strip()
def execute_kql_query(query_string):
if not query_string or len(query_string.strip()) == 0:
print('No query supplied')
return None
src_query = clean_kql_comments(query_string)
result = get_ipython().run_cell_magic('kql', line='', cell=src_query)
if result is not None and result.completion_query_info['StatusCode'] == 0:
results_frame = result.to_dataframe()
return results_frame
return []
import os
from msticpy.nbtools.wsconfig import WorkspaceConfig
from msticpy.nbtools import kql, GetEnvironmentKey
ws_config_file = 'config.json'
try:
ws_config = WorkspaceConfig(ws_config_file)
print('Found config file')
for cf_item in ['tenant_id', 'subscription_id', 'resource_group', 'workspace_id', 'workspace_name']:
print(cf_item, ws_config[cf_item])
except:
ws_config = None
ws_id = GetEnvironmentKey(env_var='WORKSPACE_ID',
prompt='Log Analytics Workspace Id:')
if ws_config:
ws_id.value = ws_config['workspace_id']
ws_id.display()
try:
WORKSPACE_ID = select_ws.value
except NameError:
try:
WORKSPACE_ID = ws_id.value
except NameError:
WORKSPACE_ID = None
if not WORKSPACE_ID:
raise ValueError('No workspace selected.')
kql.load_kql_magic()
%kql loganalytics://code().workspace(WORKSPACE_ID)
path_save_wgt = widgets.Text(value=str(def_path) + "_kql_out",
description='Path to save KQL files: ',
layout=Layout(width='50%'),
style={'description_width': 'initial'})
path_save_wgt
root = Path(path_save_wgt.value)
root.mkdir(exist_ok=True)
for categ, kql_files in kql_dict.items():
sub_dir = root.joinpath(categ)
for file_name, contents in kql_files.items():
kql_txt = contents[1]
if not kql_txt == NOT_CONVERTIBLE:
sub_dir.mkdir(exist_ok=True)
file_path = sub_dir.joinpath(file_name.replace('.yml', '.kql'))
with open(file_path, 'w') as output_file:
output_file.write(kql_txt)
print(f"Saved {file_path}")