Windows Host Explorer

 Details... **Notebook Version:** 1.0
**Python Version:** Python 3.6 (including Python 3.6 - AzureML)
**Required Packages**: kqlmagic, msticpy, pandas, numpy, matplotlib, bokeh, networkx, ipywidgets, ipython, scikit_learn, dnspython, ipwhois, folium, maxminddb_geolite2
**Data Sources Required**: - Log Analytics - SecurityAlert, SecurityEvent (EventIDs 4688 and 4624/25), AzureNetworkAnalytics_CL, Heartbeat - (Optional) - VirusTotal, AlienVault OTX, IBM XForce, Open Page Rank, (all require accounts and API keys)

Brings together a series of queries and visualizations to help you determine the security state of the Windows host or virtual machine that you are investigating.

Contents


Notebook initialization

The next cell:

  • Checks for the correct Python version
  • Checks versions and optionally installs required packages
  • Imports the required packages into the notebook
  • Sets a number of configuration options.

This should complete without errors. If you encounter errors or warnings look at the following two notebooks:

If you are running in the Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:

You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup. There are more details about this in the ConfiguringNotebookEnvironment notebook and in these documents:

In [ ]:
from pathlib import Path
from IPython.display import display, HTML

REQ_PYTHON_VER = "3.6"
REQ_MSTICPY_VER = "1.0.0"
REQ_MP_EXTRAS = ["ml"]

display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
    from utils.nb_check import check_versions
    check_versions(REQ_PYTHON_VER, REQ_MSTICPY_VER, REQ_MP_EXTRAS)

# If not using Azure Notebooks, install msticpy with
# !pip install msticpy

from msticpy.nbtools import nbinit
nbinit.init_notebook(
    namespace=globals(),
    extra_imports=["ipwhois, IPWhois"]
);

Get WorkspaceId and Authenticate to Azure Sentinel

Details... If you are using user/device authentication, run the following cell. - Click the 'Copy code to clipboard and authenticate' button. - This will pop up an Azure Active Directory authentication dialog (in a new tab or browser window). The device code will have been copied to the clipboard. - Select the text box and paste (Ctrl-V/Cmd-V) the copied value. - You should then be redirected to a user authentication page where you should authenticate with a user account that has permission to query your Log Analytics workspace. Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret: ``` %kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret) ``` instead of ``` %kql loganalytics://code().workspace(WORKSPACE_ID) ``` **Note:** you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a ```popup schema``` button. To find your Workspace Id go to [Log Analytics](https://ms.portal.azure.com/#blade/HubsExtension/Resources/resourceType/Microsoft.OperationalInsights%2Fworkspaces). Look at the workspace properties to find the ID.
In [ ]:
# See if we have an Azure Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs

ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
    ws_config.prompt_for_ws()
    
qry_prov = QueryProvider(data_environment="AzureSentinel")
In [ ]:
# Authenticate to Azure Sentinel workspace
qry_prov.connect(ws_config)
table_index = qry_prov.schema_tables

Authentication and Configuration Problems


Click for details about configuring your authentication parameters The notebook is expecting your Azure Sentinel Tenant ID and Workspace ID to be configured in one of the following places: - `config.json` in the current folder - `msticpyconfig.yaml` in the current folder or location specified by `MSTICPYCONFIG` environment variable. For help with setting up your `config.json` file (if this hasn't been done automatically) see the [`ConfiguringNotebookEnvironment`](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb) notebook in the root folder of your Azure-Sentinel-Notebooks project. This shows you how to obtain your Workspace and Subscription IDs from the Azure Sentinel Portal. You can use the SubscriptionID to find your Tenant ID). To view the current `config.json` run the following in a code cell. ```%pfile config.json``` For help with setting up your `msticpyconfig.yaml` see the [Setup](#Setup) section at the end of this notebook and the [ConfigureNotebookEnvironment notebook](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb)

Contents

Search for a Host name and query host properties

In [ ]:
host_text = widgets.Text(
    description="Enter the Host name to search for:", **WIDGET_DEFAULTS
)
display(host_text)
In [ ]:
query_times = nbwidgets.QueryTime(units="day", max_before=20, before=5, max_after=1)
query_times.display()
In [ ]:
# Get single event - try process creation
if "SecurityEvent" not in table_index:
    raise ValueError("No Windows event log data available in the workspace")
host_name = None
matching_hosts_df = qry_prov.WindowsSecurity.list_host_processes(
    query_times, host_name=host_text.value.strip(), add_query_items="| distinct Computer"
)
if len(matching_hosts_df) > 1:
    print(f"Multiple matches for '{host_text.value}'. Please select a host from the list.")
    choose_host = nbwidgets.SelectItem(
        item_list=list(matching_hosts_df["Computer"].values),
        description="Select the host.",
        auto_display=True,
    )
elif not matching_hosts_df.empty:
    host_name = matching_hosts_df["Computer"].iloc[0]
    print(f"Unique host found: {host_name}")
else:
    print(f"Host not found: {host_text.value}")
In [ ]:
if not host_name:
    host_name = choose_host.value

host_entity = None
if not matching_hosts_df.empty:
    host_entity = entities.Host(src_event=matching_hosts_df[matching_hosts_df["Computer"] == host_name].iloc[0])
if not host_entity:
    raise LookupError(f"Could not find Windows events the name {host_name}")

def populate_heartbeat_details(host_hb_df, host_entity=None):
    if not host_hb_df.empty:
        host_hb = host_hb_df.iloc[0]
        if not host_entity:
            host_entity = entities.Host(host_hb["Computer"])
        host_entity.SourceComputerId = host_hb["SourceComputerId"]
        host_entity.OSType = host_hb["OSType"]
        host_entity.OSMajorVersion = host_hb["OSMajorVersion"]
        host_entity.OSMinorVersion = host_hb["OSMinorVersion"]
        host_entity.ComputerEnvironment = host_hb["ComputerEnvironment"]
        host_entity.ResourceId = host_hb["ResourceId"]
        host_entity.OmsSolutions = [
            sol.strip() for sol in host_hb["Solutions"].split(",")
        ]
        host_entity.VMUUID = host_hb["VMUUID"]

        ip_entity = entities.IpAddress()
        ip_entity.Address = host_hb["ComputerIP"]
        geoloc_entity = entities.GeoLocation()
        geoloc_entity.CountryName = host_hb["RemoteIPCountry"]
        geoloc_entity.Longitude = host_hb["RemoteIPLongitude"]
        geoloc_entity.Latitude = host_hb["RemoteIPLatitude"]
        ip_entity.Location = geoloc_entity
        host_entity.IPAddress = ip_entity  # TODO change to graph edge
    return host_entity

def convert_to_ip_entities(ip_str):
    
    ip_entities = []
    if ip_str:
        if "," in ip_str:
            addrs = ip_str.split(",")
        elif " " in ip_str:
            addrs = ip_str.split(" ")
        else:
            addrs = [ip_str]
        for addr in addrs:
            ip_entity = entities.IpAddress()
            ip_entity.Address = addr.strip()
            if iplocation:
                iplocation.lookup_ip(ip_entity=ip_entity)
            ip_entities.append(ip_entity)
    return ip_entities

# Add this information to our inv_host_entity
def populate_host_aznet_ips(az_net_df, host_entity):
    retrieved_address = []
    if len(az_net_df) == 1:
        host_entity.private_ips = convert_to_ip_entities(
            az_net_df["PrivateIPAddresses"].iloc[0]
        )
        host_entity.public_ips = convert_to_ip_entities(
            az_net_df["PublicIPAddresses"].iloc[0]
        )
        retrieved_address = [ip.Address for ip in host_entity.public_ips]
    else:
        if "private_ips" not in host_entity:
            host_entity.private_ips = []
        if "public_ips" not in host_entity:
            host_entity.public_ips = []


iplocation = None
try:
    iplocation = GeoLiteLookup()
except:
    md(
        "Warning - could not create GeoLiteLookup"
        + " - location details will not be retrieved",
        "red, bold"
        )

# Try to get an OMS Heartbeat for this computer
if "Heartbeat" in table_index:
    print(f"Looking for {host_name} in OMS Heartbeat data...")
    host_hb_df = qry_prov.Network.get_heartbeat_for_host(host_name=host_name)
    host_entity = populate_heartbeat_details(host_hb_df, host_entity)

if "AzureNetworkAnalytics_CL" in table_index:
    print(f"Looking for {host_name} IP addresses in network flows...")
    az_net_df = qry_prov.Network.get_ips_for_host(host_name=host_name)
    populate_host_aznet_ips(az_net_df, host_entity)

md("Host Details", "bold")
display(host_entity)

Contents

Related Alerts

Look for any related alerts around this time.

In [ ]:
ra_query_times = nbwidgets.QueryTime(
    units="day",
    origin_time=query_times.origin_time,
    max_before=28,
    max_after=5,
    before=5,
    auto_display=True,
)
In [ ]:
related_alerts = qry_prov.SecurityAlert.list_related_alerts(
    ra_query_times, host_name=host_entity.HostName
)

def print_related_alerts(alertDict, entityType, entityName):
    if len(alertDict) > 0:
        md(
            f"<h3>Found {len(alertDict)} different alert types related to this {entityType} (`{entityName}`)</h3>"
        )
        for (k, v) in alertDict.items():
            print(f"- {k}, # Alerts: {v}")
    else:
        print(f"No alerts for {entityType} entity `{entityName}`")


if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
    host_alert_items = (
        related_alerts[["AlertName", "TimeGenerated"]]
        .groupby("AlertName")
        .TimeGenerated.agg("count")
        .to_dict()
    )
    print_related_alerts(host_alert_items, "host", host_entity.HostName)
    if len(host_alert_items) > 1:
        nbdisplay.display_timeline(
            data=related_alerts, title="Alerts", source_columns=["AlertName"], height=200
        )
    elif len(host_alert_items) == 1:
        md("Only one alert found - view the details in the browser below.")
else:
    md("No related alerts found.")

Select an Alert to view details

In [ ]:
def disp_full_alert(alert):
    global related_alert
    related_alert = SecurityAlert(alert)
    return nbdisplay.format_alert(related_alert, show_entities=True)

recenter_wgt = widgets.Checkbox(
    value=True,
    description='Center subsequent query times round selected Alert?',
    disabled=False,
    **WIDGET_DEFAULTS
)
if related_alerts is not None and not related_alerts.empty:
    related_alerts["CompromisedEntity"] = related_alerts["Computer"]
    md("<h3>Click on alert to view details.</h3>")
    display(recenter_wgt)
    rel_alert_select = nbwidgets.SelectAlert(
        alerts=related_alerts,
        action=disp_full_alert,
    )
    rel_alert_select.display()

Contents

Host Logons

This section looks at successful and failed logons on the host.

In [ ]:
# set the origin time to the time of our alert
origin_time = (related_alert.TimeGenerated 
               if recenter_wgt.value 
               else query_times.origin_time)
logon_query_times = nbwidgets.QueryTime(
    units="day",
    origin_time=origin_time,
    before=5,
    after=1,
    max_before=20,
    max_after=20,
)
logon_query_times.display()

Successful Logons - Timeline and LogonType breakdown

In [ ]:
host_logons = qry_prov.WindowsSecurity.list_host_logons(
    logon_query_times, host_name=host_entity.HostName
)

if host_logons is not None and not host_logons.empty:
    md("<h3>Logon timeline.</h3>")
    tooltip_cols = [
        "TargetUserName",
        "TargetDomainName",
        "SubjectUserName",
        "SubjectDomainName",
        "LogonType",
        "IpAddress",
    ]
    nbdisplay.display_timeline(
        data=host_logons,
        group_by="TargetUserName",
        source_columns=tooltip_cols,
        legend="right", yaxis=True
    )

    md("<h3>Counts of logon events by logon type.</h3>")
    md("Min counts for each logon type highlighted.")
    logon_by_type = (
        host_logons[["Account", "LogonType", "EventID"]]
        .astype({'LogonType': 'int32'})
        .merge(right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"),
               left_on="LogonType", right_index=True)
        .drop(columns="LogonType")
        .groupby(["Account", "LogonTypeDesc"])
        .count()
        .unstack()
        .rename(columns={"EventID": "LogonCount"})
        .fillna(0)
        .style
        .background_gradient(cmap="viridis", low=0.5, high=0)
        .format("{0:0>3.0f}")
    )
    display(logon_by_type)
else:
    md("No logon events found for host.")

Contents

Failed Logons

In [ ]:
failedLogons = qry_prov.WindowsSecurity.list_host_logon_failures(
    logon_query_times, host_name=host_entity.HostName
)
if failedLogons.empty:
    print("No logon failures recorded for this host between ",
          f" {logon_query_times.start} and {logon_query_times.end}"
        )
else:
    nbdisplay.display_timeline(
        data=host_logons.query('TargetLogonId != "0x3e7"'),
        overlay_data=failedLogons,
        alert=related_alert,
        title="Logons (blue=user-success, green=failed)",
        source_columns=tooltip_cols,
        height=200,
    )
    display(
        failedLogons
        .astype({'LogonType': 'int32'})
        .merge(
            right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"),
            left_on="LogonType", right_index=True
        )
        [['Account', 'EventID', 'TimeGenerated',
          'Computer', 'SubjectUserName', 'SubjectDomainName',
          'TargetUserName', 'TargetDomainName',
          'LogonTypeDesc','IpAddress', 'WorkstationName'
         ]]
    )

Accounts With Failed And Successful Logons

This query joins failed and successful logons for the same account name. Multiple logon failures followed by a sucessful logon might indicate attempts to guess or probe the user password.

In [ ]:
if not failedLogons.empty:
    combined = pd.concat([failedLogons,
                          host_logons[host_logons["TargetUserName"]
                                      .isin(failedLogons["TargetUserName"]
                                            .drop_duplicates())]])
    display(combined.head())
    combined["LogonStatus"] = combined.apply(lambda x: "Failed" if x.EventID == 4625 else "Success", axis=1)
    nbdisplay.display_timeline(data=combined,
                               group_by="LogonStatus",
                               source_columns=["TargetUserName", "LogonType", "SubjectUserName", "TargetLogonId"],
                               legend="inline",
                               yaxis=True,
                               height=200)
    display(combined.sort_values("TimeGenerated"))
else:
    md(f"No logon failures recorded for this host between {logon_query_times.start} and {logon_query_times.end}") 

Contents

Other Security Events

It's often useful to look at what other events were being logged at the time of the attack.

We show events here grouped by Account. Things to look for are:

  • Unexpected events that change system security such as the addition of accounts or services
  • Event types that occur for only a single account - especially if there are a lot of event types only executed by a single account.
In [ ]:
md(f"Collecting Windows Event Logs for {host_entity.HostName}, this may take a few minutes...")

all_events_df = qry_prov.WindowsSecurity.list_host_events(
    logon_query_times,
    host_name=host_entity.HostName,
    add_query_items="| where EventID != 4688 and EventID != 4624",
    split_query_by="1D"
)

if not isinstance(all_events_df, pd.DataFrame):
    raise ValueError("Event query failed. Please retry with a smaller time range.")

# Create a pivot of Event vs. Account
win_events_acc = all_events_df[["Account", "Activity", "TimeGenerated"]].copy()
win_events_acc = win_events_acc.replace("-\\-", "No Account").replace(
    {"Account": ""}, value="No Account"
)
win_events_acc["Account"] = win_events_acc.apply(lambda x: x.Account.split("\\")[-1], axis=1)
event_pivot = (
    pd.pivot_table(
        win_events_acc,
        values="TimeGenerated",
        index=["Activity"],
        columns=["Account"],
        aggfunc="count",
    )
    .fillna(0)
    .reset_index()
)

md("Yellow highlights indicate account with highest event count")
with pd.option_context('display.precision', 0):
    display(
        event_pivot.style
        .applymap(lambda x: "color: white" if x == 0 else "")
        .applymap(
            lambda x: "background-color: lightblue"
            if not isinstance(x, str) and x > 0
            else ""
        )
        .set_properties(subset=["Activity"], **{"width": "400px", "text-align": "left"})
        .highlight_max(axis=1)
        .hide_index()
    )

Parse Event Data for Selected Events

For events that you want to look at in more detail you can parse out the full EventData field (containing all fields of the original event). The parse_event_data function below does that - transforming the EventData XML into a dictionary of property/value pairs). The expand_event_properties function takes this dictionary and transforms into columns in the output DataFrame.


 More details... You can do this for multiple event types in a single pass but, dependng on the schema of each event you may end up with a lot of sparsely populated columns. E.g. suppose EventID 1 has EventData fields A, B and C and EventID 2 has fields A, D, E. If you parse both IDs you'll will end up with a DataFrame with columns A, B, C, D and E with contents populated only for the rows that with corresponding data. We recommend that you process batches of related event types (e.g. all user account change events) that have similar sets of fields to keep the output DataFrame manageable.
In [ ]:
# Function to convert EventData XML into dictionary and
# populate columns into DataFrame from previous query result
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import ParseError

SCHEMA = "http://schemas.microsoft.com/win/2004/08/events/event"


def parse_event_data(row):
    try:
        xdoc = ET.fromstring(row.EventData)
        col_dict = {
            elem.attrib["Name"]: elem.text for elem in xdoc.findall(f"{{{SCHEMA}}}Data")
        }
        reassigned = set()
        for k, v in col_dict.items():
            if k in row and not row[k]:
                row[k] = v
                reassigned.add(k)
        if reassigned:
            # print('Reassigned: ', ', '.join(reassigned))
            for k in reassigned:
                col_dict.pop(k)
        return col_dict
    except (ParseError, TypeError):
        return None


# Parse event properties into a dictionary
all_events_df["EventProperties"] = all_events_df.apply(parse_event_data, axis=1)

# For a specific event ID you can explode the EventProperties values into their own columns
# using this function. You can do this for the whole data set but it will result
# in a lot of sparse columns in the output data frame
def expand_event_properties(input_df):
    exp_df = input_df.apply(lambda x: pd.Series(x.EventProperties), axis=1)
    return (
        exp_df.drop(set(input_df.columns).intersection(exp_df.columns), axis=1)
        .merge(
            input_df.drop("EventProperties", axis=1),
            how="inner",
            left_index=True,
            right_index=True,
        )
        .replace("", np.nan)  # these 3 lines get rid of blank columns
        .dropna(axis=1, how="all")
        .fillna("")
    )


display(
    expand_event_properties(all_events_df[all_events_df["EventID"] == 4698]).head(2)
)

Account Change Events - Timeline

Here we want to focus on a some specific subcategories of events. Attackers commonly try to add or change user accounts and group memberships. We also include events related to addition or change of scheduled tasks and Windows services.

In [ ]:
# Get a full list of Windows Security Events
import pkgutil
import os
w_evt = pkgutil.get_data("msticpy", f"resources{os.sep}WinSecurityEvent.json")
win_event_df = pd.read_json(w_evt)

# Create criteria for events that we're interested in
acct_sel = win_event_df["subcategory"] == "User Account Management"
group_sel = win_event_df["subcategory"] == "Security Group Management"
schtask_sel = (win_event_df["subcategory"] == "Other Object Access Events") & (
    win_event_df["description"].str.contains("scheduled task")
)

event_list = win_event_df[acct_sel | group_sel | schtask_sel]["event_id"].to_list()
# Add Service install event
event_list.append(7045)

# Plot events on a timeline
p = nbdisplay.display_timeline(
    data=all_events_df[all_events_df["EventID"].isin(event_list)],
    group_by="Activity",
    source_columns=["Account"],
    legend="right",
)

Show Details of Selected Events

From the above data - pick which event types you want to view (by default, all are selected). The second cell will display the event types selected.

In [ ]:
# populate actual events IDs to select from
recorded_events = (all_events_df['EventID']
                   [all_events_df["EventID"]
                    .isin(event_list)].drop_duplicates().values)
event_subset = win_event_df[win_event_df["event_id"].isin(event_list)
                            & win_event_df["event_id"].isin(recorded_events)]
items = list(event_subset.apply(lambda x: (x.full_desc, x.event_id), axis=1).values)
ss = nbwidgets.SelectSubset(
    items,
    default_selected=items
)
In [ ]:
drop_cols = [
    "TenantId",
    "EventProperties",
    "SourceSystem",
    "EventSourceName",
    "Channel",
    "EventID",
    "Task",
    "Level",
    "EventData",
    "EventOriginId",
    "MG",
    "TimeCollected",
    "ManagementGroupName",
    "Type",
]

display(
    all_events_df[all_events_df["EventID"].isin(ss.selected_values)]
    .replace(to_replace="", value=np.NAN)
    .dropna(axis=1, how="all")
    .drop(columns=drop_cols)
)

Contents

Examine Logon Sessions

Looking at characteristics and activity of individual logon sessions is an effective way of spottting clusters of attacker activity.

The biggest problem is deciding which logon sessions are the ones to look at. We may already have some indicators of sessions that we want to examine from earlier sections:

  • Accounts that experienced a series of failed logons followed by successful logons see
  • Accounts that triggered unexpected events see

In this section we use clustering to collapse repetive logons and show details of the distinct logon patterns

Browse logon account details

In [ ]:
from msticpy.sectools.eventcluster import (
    dbcluster_events,
    add_process_features,
    char_ord_score,
)

if host_logons is None or host_logons.empty:
    md("No host logons recorded. This section cannot be run.")
    raise ValueError("aborted")

# Set up clustering features and run DBScan clustering
logon_features = host_logons.copy()
logon_features["AccountNum"] = host_logons.apply(
    lambda x: char_ord_score(x.Account), axis=1
)
logon_features["TargetUserNum"] = host_logons.apply(
    lambda x: char_ord_score(x.TargetUserName), axis=1
)
logon_features["LogonHour"] = host_logons.apply(lambda x: x.TimeGenerated.hour, axis=1)

# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_logons, _, _) = dbcluster_events(
    data=logon_features,
    time_column="TimeGenerated",
    cluster_columns=["AccountNum", "LogonType", "TargetUserNum"],
    max_cluster_distance=0.0001,
)
md(f"Number of input events: {len(host_logons)}")
md(f"Number of clustered events: {len(clus_logons)}")

md("<h3>Relative frequencies by account pattern</h3>")
plt.rcParams["figure.figsize"] = (12, 4)
clus_logons.sort_values("Account").plot.barh(x="Account", y="ClusterSize");

View distinct host logon patterns

In [ ]:
import re

# Build a list of distinct logon patterns from the clustered data
dist_logons = clus_logons.sort_values("TimeGenerated")[
    ["TargetUserName", "TimeGenerated", "LastEventTime", "LogonType", "ClusterSize"]
]
dist_logons = dist_logons.apply(
    lambda x: (
        f"{x.TargetUserName}:    "
        f"(logontype {x.LogonType})   "
        f"timerange: {x.TimeGenerated} - {x.LastEventTime}    "
        f"count: {x.ClusterSize}"
    ),
    axis=1,
)
# Convert to dict, flipping keys/values
dist_logons = {v: k for k, v in dist_logons.to_dict().items()}


def get_selected_logon_cluster(selected_item):
    acct = clus_logons.loc[selected_item]["TargetUserName"]
    logon_type = clus_logons.loc[selected_item]["LogonType"]
    return host_logons.query("TargetUserName == @acct and LogonType == @logon_type")


def show_logon(idx):
    return nbdisplay.format_logon(pd.DataFrame(clus_logons.loc[idx]).T)


logon_wgt = nbwidgets.SelectItem(
    description="Select logon cluster to examine",
    item_dict=dist_logons,
    action=show_logon,
    height="200px",
    width="100%",
    auto_display=True,
)

Analyze Processes Patterns for logon sessions

In this section we look at the types of processes run in each logon session. For each process (and process characteristics such as command line structure) we measure its rarity compared to other processes on the same host. We then calculate the mean rarity of all processes in a logon session and display the results ordered by rarity. One is the highest possible score and would indicate all processes in the session have a unique execution pattern.

Note: The next section retrieves processes for time period around the logons for the user ID selected in the previous session. If you want to view a broader time boundary please adjust the query time boundaries in below.

In [ ]:
# set the origin time to start at the first logon in our set
# end end 2hrs after the last
start_time = host_logons["TimeGenerated"].min()
end_time = host_logons["TimeGenerated"].max()
time_diff = int((end_time - start_time).total_seconds() / (60 * 60) + 2)
proc_query_times = nbwidgets.QueryTime(
    units="hours",
    origin_time=start_time,
    before=1,
    after=time_diff + 1,
    max_before=20,
    max_after=time_diff + 20,
)
proc_query_times.display()

Compute the relative rarity of processes in each session

This should be a good guide to which sessions are the more interesting to look at.

Note Clustering lots (1000s) of events will take a little time to compute.

In [ ]:
from msticpy.sectools.eventcluster import dbcluster_events, add_process_features
from collections import Counter

print("Getting process events...", end="")
processes_on_host = qry_prov.WindowsSecurity.list_host_processes(
    proc_query_times, host_name=host_entity.HostName
)
print(f"done. {len(processes_on_host)} events")
print("Clustering. Please wait...", end="")
feature_procs = add_process_features(input_frame=processes_on_host, path_separator="\\")

feature_procs["accountNum"] = feature_procs.apply(
    lambda x: char_ord_score(x.Account), axis=1
)
# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_events, dbcluster, x_data) = dbcluster_events(
    data=feature_procs,
    cluster_columns=[
        "commandlineTokensFull",
        "pathScore",
        "accountNum",
        "isSystemSession",
    ],
    max_cluster_distance=0.0001,
)
print("done")
print("Number of input events:", len(feature_procs))
print("Number of clustered events:", len(clus_events))

# Join the clustered results back to the original process frame
procs_with_cluster = feature_procs.merge(
    clus_events[
        [
            "commandlineTokensFull",
            "accountNum",
            "pathScore",
            "isSystemSession",
            "ClusterSize",
        ]
    ],
    on=["commandlineTokensFull", "accountNum", "pathScore", "isSystemSession"],
)
# Rarity = inverse of cluster size
procs_with_cluster["Rarity"] = 1 / procs_with_cluster["ClusterSize"]
# count the number of processes for each logon ID
lgn_proc_count = (
    pd.concat(
        [
            processes_on_host.groupby("TargetLogonId")["TargetLogonId"].count(),
            processes_on_host.groupby("SubjectLogonId")["SubjectLogonId"].count(),
        ]
    ).sum(level=0)
).to_dict()

# Display the results
md("Sessions ordered by process rarity", 'bold')
md("Higher score indicates higher number of unusual processes")
process_rarity = (procs_with_cluster.groupby(["SubjectUserName", "SubjectLogonId"])
    .agg({"Rarity": "mean", "TimeGenerated": "count"})
    .rename(columns={"TimeGenerated": "ProcessCount"})
    .reset_index())
display(
    process_rarity
    .sort_values("Rarity", ascending=False)
    .style.bar(subset=["Rarity"], color="#d65f5f")
)

Overview of session timelines for sessions with higher rarity score

In [ ]:
# Display process timeline for 75% percentile rarest scores
rare_sess = process_rarity[process_rarity["Rarity"]
                           > process_rarity["Rarity"].quantile(.25)]
rare_sessions = processes_on_host[(processes_on_host["SubjectLogonId"]
                                   .isin(rare_sess["SubjectLogonId"]))
                                  & (processes_on_host["SubjectUserName"]
                                     .isin(rare_sess["SubjectUserName"]))]

md("Timeline of sessions with higher process rarity", "large")
md("Multiple sessions (y-axis) may be are shown for each account.")
md("You will likely need to zoom in to see the individual session processes.")

nbdisplay.display_timeline(
    data=rare_sessions,
    group_by="SubjectLogonId",
    source_columns=["SubjectUserName", "SubjectLogonId", "NewProcessName", "CommandLine"],
    legend="right",
    yaxis=True
);

View the processes for these Sessions

In [ ]:
def view_logon_sess(logon_id=""):
    global selected_logon
    selected_logon = host_logons[host_logons["TargetLogonId"] == logon_id]

    if all_procs.value:
        sess_procs = processes_on_host.query(
            "TargetLogonId == @logon_id | SubjectLogonId == @logon_id"
        )
    else:
        sess_procs = procs_with_cluster.query("SubjectLogonId == @logon_id")[
            ["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"]
        ].drop_duplicates()
    return sess_procs

sessions = list(process_rarity
                .sort_values("Rarity", ascending=False)
                .apply(lambda x: (f"{x.SubjectLogonId}  {x.SubjectUserName}   Rarity={x.Rarity}",
                                  x.SubjectLogonId), 
                       axis=1))
all_procs = widgets.Checkbox(
    value=False,
    description="View All Processes (Show clustered only if not checked)",
    **WIDGET_DEFAULTS,
)
display(all_procs)
logon_wgt = nbwidgets.SelectItem(
    description="Select logon session to examine",
    item_dict={label: val for label, val in sessions},
    height="300px",
    width="100%",
    auto_display=True,
    action=view_logon_sess,
)

Browse All Sessions (Optional)

If the previous section did not reveal anything interesting you can opt to browse all logon sessions.

Otherwise, skip to the Check Commandline for IoCs section

To do this you need to first pick an account + logon type (in the following cell) then pick a particular session that you want to view in the subsequent cell. Use the rarity score from the previous graph to guide you.

Step 1 - Select a logon ID and Type

In [ ]:
logon_wgt2 = nbwidgets.SelectItem(
    description="Select logon cluster to examine",
    item_dict=dist_logons,
    height="200px",
    width="100%",
    auto_display=True,
)
all_procs = widgets.Checkbox(
    value=False,
    description="View All Processes (Clustered only if not checked)",
    **WIDGET_DEFAULTS,
)
display(all_procs)

Step 2 - Pick a logon session to view its processes

In [ ]:
selected_logon_cluster = get_selected_logon_cluster(logon_wgt2.value)

selected_tgt_logon = selected_logon_cluster["TargetUserName"].iat[0]
system_logon = selected_tgt_logon.lower() == "system" or selected_tgt_logon.endswith(
    "$"
)

if system_logon:
    md(
            '<h3><p style="color:red">Warning: the selected '
            "account name appears to be a system account.</p></h1><br>"
            "<i>It is difficult to accurately associate processes "
            "with the specific logon sessions.<br>"
            "Showing clustered events for entire time selection."
    )
    display(
        clus_events.sort_values("TimeGenerated")[
            [
                "TimeGenerated",
                "LastEventTime",
                "NewProcessName",
                "CommandLine",
                "ClusterSize",
                "commandlineTokensFull",
                "pathScore",
                "isSystemSession",
            ]
        ]
    )

# Display a pick list for logon instances
sel_1 = host_logons["TargetLogonId"].isin(lgn_proc_count)
sel_2 = host_logons["TargetUserName"] == selected_tgt_logon
items = (
    host_logons[sel_1 & sel_2]
    .sort_values("TimeGenerated")
    .apply(
        lambda x: (
            f"{x.TargetUserName}:    "
            f"(logontype={x.LogonType})   "
            f"(timestamp={x.TimeGenerated})    "
            f"logonid={x.TargetLogonId}"
        ),
        axis=1,
    )
    .values.tolist()
)
if not items:
    items = ["No processes for logon"]
sess_w = widgets.Select(
    options=items, description="Select logon instance to examine", **WIDGET_DEFAULTS
)

import re

logon_list_regex = r"""
(?P<acct>[^:]+):\s+
\(logontype=(?P<logon_type>[^)]+)\)\s+
\(timestamp=(?P<time>[^)]+)\)\s+
logonid=(?P<logonid>[0-9a-fx)]+)
"""


def get_selected_logon(selected_item):
    acct_match = re.search(logon_list_regex, selected_item, re.VERBOSE)
    if acct_match:
        acct = acct_match["acct"]
        logon_type = int(acct_match["logon_type"])
        time_stamp = pd.to_datetime(acct_match["time"])
        logon_id = acct_match["logonid"]
        return host_logons.query(
            "TargetUserName == @acct and LogonType == @logon_type"
            " and TargetLogonId == @logon_id"
        )


def view_logon_sess(x=""):
    global selected_logon
    selected_logon = get_selected_logon(x)
    if selected_logon is not None:
        logonId = selected_logon["TargetLogonId"].iloc[0]
        if all_procs.value:
            sess_procs = processes_on_host.query(
                "TargetLogonId == @logonId | SubjectLogonId == @logonId"
            )
        else:
            sess_procs = procs_with_cluster.query("SubjectLogonId == @logonId")[
                ["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"]
            ].drop_duplicates()
        display(sess_procs)


widgets.interactive(view_logon_sess, x=sess_w)

Contents

Check for IOCs in Commandline for selected session

This section looks for Indicators of Compromise (IoC) within the data sets passed to it.

The input data for this comes from the session you picked in the following sections:

To change which section is used - go back to the desired section and pick a session or re-run the entire cell.

Extract IoCs

In [ ]:
# Use this to search all process commandlines
# ioc_df = ioc_extractor.extract(
#     data=procs_with_cluster,
#     columns=["CommandLine"],
#     os_family=os_family,
#     ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"],
# )
# ioc_df[["IoCType", "Observable"]].drop_duplicates()
In [ ]:
selected_tgt_logon = selected_logon["TargetUserName"].iat[0]
system_logon = (selected_tgt_logon.lower() == "system" 
                or selected_tgt_logon.endswith("$"))
if not system_logon:
    logonId = selected_logon["TargetLogonId"].iloc[0]
    sess_procs = processes_on_host.query(
        "TargetLogonId == @logonId | SubjectLogonId == @logonId"
    )
else:
    sess_procs = clus_events

ioc_extractor = IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else "Windows"

ioc_df = ioc_extractor.extract(
    data=sess_procs,
    columns=["CommandLine"],
    ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"],
)
if len(ioc_df):
    md("<h3>IoC patterns found in process set.</h3>")
    display(ioc_df[["IoCType", "Observable"]].drop_duplicates())
else:
    md("<h3>No IoC patterns found in process tree.</h3>")

If any Base64 encoded strings, decode and search for IoCs in the results.

 Details... This section looks for base64 encoded strings within the data - this is a common way of hiding attacker intent. It attempts to decode any strings that look like base64. Additionally, if the base64 decode operation returns any items that look like a base64 encoded string or file, a gzipped binary sequence, a zipped or tar archive, it will attempt to extract the contents before searching for potentially interesting IoC observables within the decoded data. For simple strings the Base64 decoded output is straightforward. However for nested encodings this can get a little complex and difficult to represent in a tabular format. **Columns** - reference - The index of the row item in dotted notation in depth.seq pairs (e.g. 1.2.2.3 would be the 3 item at depth 3 that is a child of the 2nd item found at depth 1). This may not always be an accurate notation - it is mainly use to allow you to associate an individual row with the reference value contained in the full_decoded_string column of the topmost item). - original_string - the original string before decoding. - file_name - filename, if any (only if this is an item in zip or tar file). - file_type - a guess at the file type (this is currently elementary and only includes a few file types). - input_bytes - the decoded bytes as a Python bytes string. - decoded_string - the decoded string if it can be decoded as a UTF-8 or UTF-16 string. Note: binary sequences may often successfully decode as UTF-16 strings but, in these cases, the decodings are meaningless. - encoding_type - encoding type (UTF-8 or UTF-16) if a decoding was possible, otherwise 'binary'. - file_hashes - collection of file hashes for any decoded item. - md5 - md5 hash as a separate column. - sha1 - sha1 hash as a separate column. - sha256 - sha256 hash as a separate column. - printable_bytes - printable version of input_bytes as a string of \xNN values - src_index - the index of the row in the input dataframe from which the data came. - full_decoded_string - the full decoded string with any decoded replacements. This is only really useful for top-level items, since nested items will only show the 'full' string representing the child fragment.
In [ ]:
dec_df = base64.unpack_items(data=sess_procs, column="CommandLine")
if dec_df is not None and len(dec_df) > 0:
    display(HTML("<h3>Decoded base 64 command lines</h3>"))
    display(HTML("Decoded values and hashes of decoded values shown below."))
    display(
        HTML(
            "Warning - some binary patterns may be decodable as unicode strings. "
            'In these cases you should ignore the "decoded_string" column '
            'and treat the encoded item as a binary - using the "printable_bytes" '
            "column or treat the decoded_string as a binary (bytes) value."
        )
    )

    display(
        dec_df[
            [
                "full_decoded_string",
                "decoded_string",
                "original_string",
                "printable_bytes",
                "file_hashes",
            ]
        ]
    )

    ioc_dec_df = ioc_extractor.extract(data=dec_df, columns=["full_decoded_string"])
    if len(ioc_dec_df):
        display(HTML("<h3>IoC patterns found in events with base64 decoded data</h3>"))
        display(ioc_dec_df)
        ioc_df = ioc_df.append(ioc_dec_df, ignore_index=True)
else:
    print("No base64 encodings found.")

Contents

Threat Intel Lookup

This section takes the output from the IoC Extraction from commandlines and submits it to Threat Intelligence services to see if any of the IoC are known threats.

Please take a moment to review the Selected list below and remove any items that are obviously not items that you want to lookup (e.g. myadmintool.ps is almost certainly a PowerShell script but is also a valid match for a legal DNS domain).

If you have not used msticpy threat intelligence lookups before you will need to supply API keys for the TI Providers that you want to use. See the documentation on configuring Configuring TI Proiders

Then reload provider settings:

mylookup = TILookup()
mylookup.reload_provider_settings()
In [ ]:
ti_lookup = TILookup()
if not ti_lookup.provider_status:
    md_warn("No configured TI providers were found - you will need to add some to msticpyconfig.yaml")
    md(
        "To configure TI providers please see the help on "
        "<a href='https://msticpy.readthedocs.io/en/latest/"
        "data_acquisition/TIProviders.html#configuration-file' "
        "target='_blank' rel='noopener noreferrer'>"
        "Configuring TI Providers</a>"
    )

items = list(ioc_df["Observable"].values)
ioc_ss = None
if items:
    ioc_ss = nbwidgets.SelectSubset(
        items,
        default_selected=items
    )
else:
    md_warn("No IoCs in input data.")
In [ ]:
if ioc_ss is not None:
    iocs_to_check = (
        ioc_df[ioc_df["Observable"]
        .isin(ioc_ss.selected_items)]
        [["IoCType", "Observable"]]
        .drop_duplicates()
    )

    ti_lookup.lookup_iocs(data=iocs_to_check, obs_col="Observable", ioc_type_col="IoCType")
else:
    md("No IoCs to lookup")

Contents

Network Check Communications with Other Hosts

In [ ]:
ip_q_times = nbwidgets.QueryTime(
    label="Set time bounds for network queries",
    units="day",
    max_before=28,
    before=2,
    after=5,
    max_after=28,
    origin_time=proc_query_times.origin_time
)
ip_q_times.display()

Query Flows by Host IP Addresses

In [ ]:
if "AzureNetworkAnalytics_CL" not in table_index:
    md_warn("AzureNetworkAnalytics_CL table is not available.")
    md("No network flow data available. "
       + "Please skip the remainder of this section.",
      "blue, bold")
    az_net_comms_df = None
else:
    all_host_ips = (
        host_entity.private_ips + host_entity.public_ips + [host_entity.IPAddress]
    )
    host_ips = [i.Address for i in all_host_ips]

    az_net_comms_df = qry_prov.Network.list_azure_network_flows_by_ip(
        ip_q_times, ip_address_list=host_ips
    )

    if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty:
        az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + az_net_comms_df['AllowedInFlows']
        nbdisplay.display_timeline(
            data=az_net_comms_df,
            group_by="L7Protocol",
            title="Network Flows by Protocol",
            time_column="FlowStartTime",
            source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
            height=300,
            legend="right",
            yaxis=True
        )
        nbdisplay.display_timeline(
            data=az_net_comms_df,
            group_by="FlowDirection",
            title="Network Flows by Direction",
            time_column="FlowStartTime",
            source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
            height=300,
            legend="right",
            yaxis=True
        )
    else:
        print("No network data for specified time range.")
In [ ]:
if az_net_comms_df is not None and not az_net_comms_df.empty:
    flow_plot = nbdisplay.display_timeline_values(
        data=az_net_comms_df,
        group_by="L7Protocol",
        source_columns=["FlowType", 
                        "AllExtIPs", 
                        "L7Protocol", 
                        "FlowDirection", 
                        "TotalAllowedFlows"],
        time_column="FlowStartTime",
        title="Network flows by Layer 7 Protocol",
        y="TotalAllowedFlows",
        ref_event=related_alert,
        legend="right",
        height=500,
        kind=["vbar", "circle"]
    );
else:
    md("No network data for specified time range.")

Flow Summary

In [ ]:
if az_net_comms_df is not None and not az_net_comms_df.empty:
    cols = [
        "VMName",
        "VMIPAddress",
        "PublicIPs",
        "SrcIP",
        "DestIP",
        "L4Protocol",
        "L7Protocol",
        "DestPort",
        "FlowDirection",
        "AllExtIPs",
        "TotalAllowedFlows",
    ]
    flow_index = az_net_comms_df[cols].copy()

    def get_source_ip(row):
        if row.FlowDirection == "O":
            return row.VMIPAddress if row.VMIPAddress else row.SrcIP
        else:
            return row.AllExtIPs if row.AllExtIPs else row.DestIP

    def get_dest_ip(row):
        if row.FlowDirection == "O":
            return row.AllExtIPs if row.AllExtIPs else row.DestIP
        else:
            return row.VMIPAddress if row.VMIPAddress else row.SrcIP

    flow_index["source"] = flow_index.apply(get_source_ip, axis=1)
    flow_index["dest"] = flow_index.apply(get_dest_ip, axis=1)
    md(f"{len(flow_index)} events in flow index")

    # Uncomment to view flow_index results
#     with warnings.catch_warnings():
#         warnings.simplefilter("ignore")
#         display(
#             flow_index[
#                 ["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]
#             ]
#             .groupby(["source", "dest", "L7Protocol", "FlowDirection"])
#             .sum()
#             .reset_index()
#             .style.bar(subset=["TotalAllowedFlows"], color="#d65f5f")
#         )
else:
    md_warn("No network data for specified time range.")
    md(
        "No AzureNetworkAnalytics data could be retrieved - "
        "the remaining sections of this notebook are not runnable."
    )
In [ ]:
# WHOIS lookup function
from ipaddress import ip_address
from msticpy.sectools.ip_utils import get_whois_info


# Add ASN informatio from Whois
flows_df = (
    flow_index[["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]]
    .groupby(["source", "dest", "L7Protocol", "FlowDirection"])
    .sum()
    .reset_index()
)

num_ips = len(flows_df["source"].unique()) + len(flows_df["dest"].unique())
print(f"Performing WhoIs lookups for {num_ips} IPs ", end="")
#flows_df = flows_df.assign(DestASN="", DestASNFull="", SourceASN="", SourceASNFull="")
flows_df["DestASN"] = flows_df.apply(lambda x: get_whois_info(x.dest, True), axis=1)
flows_df["SourceASN"] = flows_df.apply(lambda x: get_whois_info(x.source, True), axis=1)
print("done")

# Split the tuple returned by get_whois_info into separate columns
flows_df["DestASNFull"] = flows_df.apply(lambda x: x.DestASN[1], axis=1)
flows_df["DestASN"] = flows_df.apply(lambda x: x.DestASN[0], axis=1)
flows_df["SourceASNFull"] = flows_df.apply(lambda x: x.SourceASN[1], axis=1)
flows_df["SourceASN"] = flows_df.apply(lambda x: x.SourceASN[0], axis=1)

our_host_asns = [get_whois_info(ip.Address)[0] for ip in host_entity.public_ips]
md(f"Host {host_entity.HostName} ASNs:", "bold")
md(str(our_host_asns))

flow_sum_df = flows_df.groupby(["DestASN", "SourceASN"]).agg(
    TotalAllowedFlows=pd.NamedAgg(column="TotalAllowedFlows", aggfunc="sum"),
    L7Protocols=pd.NamedAgg(column="L7Protocol", aggfunc=lambda x: x.unique().tolist()),
    source_ips=pd.NamedAgg(column="source", aggfunc=lambda x: x.unique().tolist()),
    dest_ips=pd.NamedAgg(column="dest", aggfunc=lambda x: x.unique().tolist()),
).reset_index()
display(flow_sum_df)

Choose ASNs/IPs to Check for Threat Intel Reports

Choose from the list of Selected ASNs for the IPs you wish to check on. The Source list is been pre-populated with all ASNs found in the network flow summary.

As an example, we've populated the Selected list with the ASNs that have the lowest number of flows to and from the host. We also remove the ASN that matches the ASN of the host we are investigating.

Please edit this list, using flow summary data above as a guide and leaving only ASNs that you are suspicious about. Typicially these would be ones with relatively low TotalAllowedFlows and possibly with unusual L7Protocols.

If you have not used msticpy threat intelligence lookups before you will need to supply API keys for the TI Providers that you want to use. Please see the section on configuring Configuring TI Proiders

Then reload provider settings:

mylookup = TILookup()
mylookup.reload_provider_settings()
In [ ]:
all_asns = list(flow_sum_df["DestASN"].unique()) + list(flow_sum_df["SourceASN"].unique())
all_asns = set(all_asns) - set(["private address"])

# Select the ASNs in the 25th percentile (lowest number of flows)
quant_25pc = flow_sum_df["TotalAllowedFlows"].quantile(q=[0.25]).iat[0]
quant_25pc_df = flow_sum_df[flow_sum_df["TotalAllowedFlows"] <= quant_25pc]
other_asns = list(quant_25pc_df["DestASN"].unique()) + list(quant_25pc_df["SourceASN"].unique())
other_asns = set(other_asns) - set(our_host_asns)
md("Choose IPs from Selected ASNs to look up for Threat Intel.", "bold")
sel_asn = nbwidgets.SelectSubset(source_items=all_asns, default_selected=other_asns)
In [ ]:
from itertools import chain
from msticpy.sectools.tiproviders.ti_provider_base import TISeverity

def ti_check_ser_sev(severity, threshold):
    threshold = TISeverity.parse(threshold)
    return severity.apply(lambda x: TISeverity.parse(x) >= threshold)


dest_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["DestASN"].isin(sel_asn.selected_items)]["dest_ips"]))
src_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["SourceASN"].isin(sel_asn.selected_items)]["source_ips"]))
selected_ips = dest_ips | src_ips
md(f"{len(selected_ips)} unique IPs in selected ASNs")

# Add the IoCType to save cost of inferring each item
md("Looking up TI...")
selected_ip_dict = {ip: "ipv4" for ip in selected_ips}
ti_results = ti_lookup.lookup_iocs(data=selected_ip_dict)

md(f"{len(ti_results)} results received.")

ti_results_pos = ti_results[ti_check_ser_sev(ti_results["Severity"], 1)]
print(f"{len(ti_results_pos)} positive results found.")

if not ti_results_pos.empty:
    src_pos = flows_df.merge(ti_results_pos, left_on="source", right_on="Ioc")
    dest_pos = flows_df.merge(ti_results_pos, left_on="dest", right_on="Ioc")
    ti_ip_results = pd.concat([src_pos, dest_pos])
    md_warn("Positive Threat Intel Results found for the following flows")
    md("Please examine these IP flows using the IP Explorer notebook.", "bold, large")
    display(ti_ip_results)

GeoIP Map of External IPs

In [ ]:
def format_ip_entity(row, ip_col):
    ip_entity = entities.IpAddress(Address=row[ip_col])
    iplocation.lookup_ip(ip_entity=ip_entity)
    ip_entity.AdditionalData["protocol"] = row.L7Protocol
    if "severity" in row:
        ip_entity.AdditionalData["threat severity"] = row["severity"]
    if "Details" in row:
        ip_entity.AdditionalData["threat details"] = row["Details"]
    return ip_entity

# from msticpy.nbtools.foliummap import FoliumMap
folium_map = FoliumMap(zoom_start=4)
if az_net_comms_df is None or az_net_comms_df.empty:
    print("No network flow data available.")
else:
    # Get the flow records for all flows not in the TI results
    selected_out = flows_df[flows_df["DestASN"].isin(sel_asn.selected_items)]
    selected_out = selected_out[~selected_out["dest"].isin(ti_ip_results["Ioc"])]
    if selected_out.empty:
        ips_out = []
    else:
        ips_out = list(selected_out.apply(lambda x: format_ip_entity(x, "dest"), axis=1))
    
    selected_in = flows_df[flows_df["SourceASN"].isin(sel_asn.selected_items)]
    selected_in = selected_in[~selected_in["source"].isin(ti_ip_results["Ioc"])]
    if selected_in.empty:
        ips_in = []
    else:
        ips_in = list(selected_in.apply(lambda x: format_ip_entity(x, "source"), axis=1))

    ips_threats = list(ti_ip_results.apply(lambda x: format_ip_entity(x, "Ioc"), axis=1))

    display(HTML("<h3>External IP Addresses communicating with host</h3>"))
    display(HTML("Numbered circles indicate multiple items - click to expand"))
    display(HTML("Location markers: <br>Blue = outbound, Purple = inbound, Green = Host, Red = Threats"))

    icon_props = {"color": "green"}
    for ips in host_entity.public_ips:
        ips.AdditionalData["host"] = host_entity.HostName
    folium_map.add_ip_cluster(ip_entities=host_entity.public_ips, **icon_props)
    icon_props = {"color": "blue"}
    folium_map.add_ip_cluster(ip_entities=ips_out, **icon_props)
    icon_props = {"color": "purple"}
    folium_map.add_ip_cluster(ip_entities=ips_in, **icon_props)
    icon_props = {"color": "red"}
    folium_map.add_ip_cluster(ip_entities=ips_threats, **icon_props)
    folium_map.center_map()
    display(folium_map)
In [ ]:
 
In [ ]:
 

Contents

Appendices

Available DataFrames

In [ ]:
print("List of current DataFrames in Notebook")
print("-" * 50)
current_vars = list(locals().keys())
for var_name in current_vars:
    if isinstance(locals()[var_name], pd.DataFrame) and not var_name.startswith("_"):
        print(var_name)

Saving Data to Excel

To save the contents of a pandas DataFrame to an Excel spreadsheet use the following syntax

 writer = pd.ExcelWriter('myWorksheet.xlsx')
 my_data_frame.to_excel(writer,'Sheet1')
 writer.save()

Configuration

msticpyconfig.yaml configuration File

You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG) pointing to its location.

To configure this file see the ConfigureNotebookEnvironment notebook