Overview:
This application is designed to identify anomalies or outliers in datasets, leveraging the strengths of advanced Python scripting for data analysis combined with the interpretative power of a locally hosted large language model (LLM). By separating the data analysis and narrative explanation processes, the app effectively addresses the limitations commonly associated with LLMs handling raw tabular data.
At its core, the app first employs sophisticated Python algorithms to sift through the dataset, identifying statistical outliers based on predefined criteria. Once these outliers are identified, the relevant insights are distilled into a structured format and passed to the LLM. The LLM then acts not as a data analyst but as a sophisticated interpreter, generating detailed, understandable narratives that explain the significance of these outliers in a context meaningful to business leaders.
Key Features:
Benefits for Businesses and Leadership Teams:
The Data Anomaly Detection and Explanation App is a powerful tool that empowers businesses to harness the full potential of their data. By streamlining the process of data analysis and report generation, it enhances the overall accessibility of data-driven insights for strategic business use. The application demystifies complex datasets, enabling leadership teams to take confident, informed actions based on robust, AI-enhanced analytics. It not only saves time and resources but also facilitates effective communication and collaboration, ultimately driving business growth and competitive advantage.
import numpy as np
import datetime as dt
from typing import Optional
import inflect
import requests
import json
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import pandas as pd
# Description: This script contains helper functions for generating narratives from data
def pluralize(word):
engine = inflect.engine()
plural = engine.plural(word)
return(plural)
def get_descriptive_outliers(
df,
dimension,
measure,
total=None,
summarization="sum",
coverage=0.5,
coverage_limit=5):
"""
Returns descriptive outliers based on a given dataframe, dimension, and measure.
"""
table = df.groupby(dimension)[measure].agg(
sum if summarization == "sum" else
pd.Series.nunique if summarization == "count" else
"mean"
).reset_index().sort_values(by=measure, ascending=False)
if summarization in ["sum", "count"]:
if total is None:
total = table[measure].sum().round(2)
table = (
table.assign(
share=lambda x: x[measure]/total)
.assign(cum_share=lambda x: x['share'].cumsum())
.loc[lambda x: (x['cum_share'] >= coverage).shift(fill_value=False).cumsum() == 0]
.iloc[:coverage_limit]
)
if df.shape[0] == 1 and table['cum_share'].iloc[0] == 1:
return None
elif summarization == 'average':
if total is None:
total = table[measure].mean().round(2)
table = (table
.assign(share = lambda x: abs(x[measure]/total - 1)) # Use absolute deviation from the mean
.assign(abs_share=lambda x: x['share'].abs())
.sort_values(by='abs_share', ascending=False)
.assign(cum_share=lambda x: x['share'].abs().cumsum()/(x['share'].max() - x['share'].min()))
.loc[lambda x: (x['cum_share'] >= coverage*2).shift(fill_value=False).cumsum() == 0]
.iloc[:coverage_limit]
)
n_outliers = table.shape[0]
outlier_levels = table[dimension].astype(str).values.tolist()
outlier_values = table[measure].round(1).values.tolist()
outlier_values_p = (table["share"].round(2) * 100).astype(str).add("%").values.tolist()
output = {
"n_outliers": n_outliers,
"outlier_levels": outlier_levels,
"outlier_values": outlier_values,
"outlier_values_p": outlier_values_p
}
return output
def narrate_descriptive(
df,
measure = None,
dimensions = None,
summarization = 'sum',
coverage = 0.5,
coverage_limit = 5,
narration_depth = 2,
template_total = 'Total {measure} across all {pluralize(dimension_one)} is {total}.',
template_average = 'Average {measure} across all {pluralize(dimension_one)} is {total}.',
template_outlier = 'Outlying {dimension} by {measure} is {outlier_insight}.',
template_outlier_multiple = 'Outlying {pluralize(dimension)} by {measure} are {outlier_insight}.',
template_outlier_l2 = 'In {level_l1}, significant {level_l2} by {measure} is {outlier_insight}.',
template_outlier_l2_multiple = 'In {level_l1}, significant {pluralize(level_l2)} by {measure} are {outlier_insight}.',
return_data = False,
simplify = False
):
"""
This function generates a narrative report based on a given data frame and parameters.
"""
# Assert data frame
if not isinstance(df, pd.DataFrame):
print('df must be a pandas DataFrame')
return
if isinstance(measure, type(None)):
measure = df.\
select_dtypes(include = 'number').\
columns[0]
if isinstance(dimensions, type(None)):
dimensions = df.\
select_dtypes(include = ['object', 'category']).\
columns.\
values.\
tolist()
dimension_one = dimensions[0]
if summarization == 'sum':
total_raw = df[measure].sum().round(2)
elif summarization == 'average':
total_raw = df[measure].mean().round(2)
elif summarization == 'count':
total_raw = df[measure].count()
total = total_raw
narrative_total = eval(f"f'{template_total}'")
narrative = {
f'Total {measure}': narrative_total
}
variables = {
f'Total {measure}': {
'narrative_total': narrative_total,
'template_total': template_total,
'measure': measure,
'dimension_one': dimension_one,
'total': total,
'total_raw': total_raw
}
}
# High-Level Narrative
for dimension in dimensions:
output = get_descriptive_outliers(
df = df,
dimension=dimension,
measure=measure,
# we need overall total for average only, in other cases it leads to incorrect output
total = None if summarization in ["sum", "count"] else total_raw,
summarization = summarization,
coverage = coverage,
coverage_limit = coverage_limit
)
if output is None:
continue
# Outputting all to the global env
n_outliers = output['n_outliers']
outlier_levels = output['outlier_levels']
outlier_values = output['outlier_values']
outlier_values_p = output['outlier_values_p']
if summarization == 'average':
outlier_insight = ', '.join([f"{outlier_levels} ({outlier_values}, {outlier_values_p} vs average {measure})" for outlier_levels, outlier_values, outlier_values_p in zip(outlier_levels, outlier_values, outlier_values_p)])
else:
outlier_insight = ', '.join([f"{outlier_levels} ({outlier_values}, {outlier_values_p})" for outlier_levels, outlier_values, outlier_values_p in zip(outlier_levels, outlier_values, outlier_values_p)])
if n_outliers > 1:
template_outlier_final = template_outlier_multiple
template_selected = "multiple"
else:
template_outlier_final = template_outlier
template_selected = "single"
narrative_outlier_final = {
f'{dimension} by {measure}': eval(f"f'{template_outlier_final}'")
}
narrative.update(narrative_outlier_final)
if template_selected == 'single':
variables_l1 = {
f'{dimension} by {measure}': {
'narrative_outlier_final': narrative_outlier_final,
'template_outlier': template_outlier,
'dimension': dimension,
'measure': measure,
'outlier_insight': outlier_insight,
'n_outliers': n_outliers,
'outlier_levels': outlier_levels,
'outlier_values': outlier_values,
'outlier_values_p': outlier_values_p
}
}
if template_selected == 'multiple':
variables_l1 = {
f'{dimension} by {measure}': {
'narrative_outlier_final': narrative_outlier_final,
'template_outlier_multiple': template_outlier_multiple,
'dimension': dimension,
'measure': measure,
'outlier_insight': outlier_insight,
'n_outliers': n_outliers,
'outlier_levels': outlier_levels,
'outlier_values': outlier_values,
'outlier_values_p': outlier_values_p
}
}
variables.update(variables_l1)
# Output
if return_data == True:
return(variables)
if simplify == True:
narrative = list(narrative.values())
return(narrative)
# Description: This script contains helper functions for obtaining the frequency of the data
def get_frequency(df, date_field=None):
"""
Get Date Frequency of the Data
"""
if not isinstance(df, pd.DataFrame):
raise ValueError("'df' must be a pandas DataFrame")
if len(df) == 0:
raise ValueError("'df' must have at least 1 row")
if date_field is None:
date_fields = df.select_dtypes(include=[np.datetime64]).columns if not df.select_dtypes(include=[np.datetime64]).columns.empty else None
if date_fields is None:
raise ValueError("No date field detected in 'df'")
elif len(date_fields) > 1:
raise ValueError("Multiple date fields detected in 'df', please specify 'date_field'")
else:
date_field = date_fields[0]
else:
if date_field not in df.columns:
raise ValueError("'date_field' must be present in the supplied data frame")
elif not np.issubdtype(df[date_field].dtype, np.datetime64):
raise ValueError("'date_field' must be of datetime type")
df = df.rename(columns={date_field: "date_field"})
est_frequency = df["date_field"].diff().dt.days.abs().value_counts().idxmax()
if est_frequency > 300:
frequency = "year"
elif est_frequency > 35:
frequency = "quarter"
elif est_frequency > 8:
frequency = "month"
elif est_frequency > 3:
frequency = "week"
else:
frequency = "day"
return frequency
def get_py_date(df: pd.DataFrame, frequency: Optional[str] = None):
"""
Calculate the prior year date based on the maximum date in the DataFrame and the given frequency.
"""
# Table must be a pandas DataFrame and have at least one row
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if df.shape[0] == 0:
raise ValueError("df must have at least one row, execution is stopped")
date_field = df.select_dtypes(include=[np.datetime64]).columns[0] if not df.select_dtypes(include=[np.datetime64]).columns.empty else None
if date_field is None:
raise ValueError("Data frame must contain one date column")
# Calculating frequency if not available
if frequency is None:
frequency = get_frequency(df)
max_date = df[date_field].max()
max_year = max_date.year
if frequency == "week":
df_weekly = (
df[[date_field]]
.drop_duplicates()
.sort_values(by=date_field)
.assign(week=lambda x: x[date_field].dt.isocalendar().week,
year=lambda x: x[date_field].dt.year)
)
max_week = df_weekly.loc[df_weekly[date_field] == max_date, "week"].iat[0]
py_date = df_weekly.loc[(df_weekly["year"] == max_year - 1) & (df_weekly["week"] == max_week), date_field].values
py_date = pd.to_datetime(py_date)
if len(py_date) == 0:
py_date = max_date - pd.DateOffset(years=1)
else:
py_date = py_date[0]
else:
py_date = max_date - pd.DateOffset(years=1)
return py_date.date()
def ytd_volume(
df,
measure = None,
date = None,
summarization = "sum",
current_year = None,
cy_date = None):
"""
Calculate the year-to-date (YTD) volume of a given measure in a pandas DataFrame.
"""
# Table must be a pandas DataFrame and have at least one row
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if df.shape[0] == 0:
raise ValueError("df must have at least one row, execution is stopped")
# Summarization Assertion
if summarization not in ["sum", "count", "average"]:
raise ValueError("summarization must of be one of: 'sum', 'count' or 'mean'.")
# Measure, Date and Dimensions Assertion
if measure is not None:
if measure not in df.columns:
raise ValueError("measure must a column in the dataset")
else:
# If measure isn't supplied get the first numerical column from it
measure = df.select_dtypes(include=[np.number]).columns[0]
# Get Date
if date is not None:
if date not in df.columns:
raise ValueError("date must a column in the dataset")
df[date] = pd.to_datetime(df[date])
if not pd.api.types.is_datetime64_any_dtype(df[date]):
raise ValueError("'date' must be a date column in the dataset")
else:
# Getting the first date field available
date = df.select_dtypes(include=[np.datetime64]).columns[0] if not df.select_dtypes(include=[np.datetime64]).columns.empty else None
if date is None:
raise ValueError("No date column found in the dataset")
# Current Year's Date
if cy_date is None:
cy_date = df[date].max()
else:
cy_date = pd.to_datetime(cy_date)
# Current year assertion
if current_year is not None and current_year != cy_date.year:
try:
current_year = int(current_year)
except:
raise ValueError("current_year argument must be numeric or convertable to numeric like 2022 or '2022' ")
else:
current_year = cy_date.year
cy_volume = (df.assign(year=df[date].dt.year)
.query('year == @current_year and `{0}` <= @cy_date'.format(date))
.agg({measure: summarization})
.squeeze()
)
cy_volume = cy_volume.round(2)
return cy_volume
def pytd_volume(
df,
measure = None,
date = None,
summarization = "sum",
current_year = None,
py_date = None):
"""
Calculate the previous year-to-date (PYTD) volume of a given measure in a pandas DataFrame.
"""
# Table must be a pandas DataFrame and have at least one row
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if df.shape[0] == 0:
raise ValueError("df must have at least one row, execution is stopped")
# Summarization Assertion
if summarization not in ["sum", "count", "average"]:
raise ValueError("summarization must of be one of: 'sum', 'count' or 'mean'.")
# Measure, Date and Dimensions Assertion
if measure is not None:
if measure not in df.columns:
raise ValueError("measure must a column in the dataset")
else:
# If measure isn't supplied get the first numerical column from it
measure = df.select_dtypes(include=[np.number]).columns[0]
# Get Date
if date is not None:
if date not in df.columns:
raise ValueError("date must a column in the dataset")
df[date] = pd.to_datetime(df[date])
if not pd.api.types.is_datetime64_any_dtype(df[date]):
raise ValueError("'date' must be a date column in the dataset")
else:
# Getting the first date field available
date = df.select_dtypes(include=[np.datetime64]).columns[0] if not df.select_dtypes(include=[np.datetime64]).columns.empty else None
if date is None:
raise ValueError("No date column found in the dataset")
# Current Year's Date
if py_date is None:
py_date = get_py_date(df)
else:
py_date = pd.to_datetime(py_date)
# Current year assertion
if current_year is not None and current_year - 1 != py_date.year:
try:
current_year = int(current_year)
except:
raise ValueError("current_year argument must be numeric or convertable to numeric like 2022 or '2022' ")
else:
previous_year = py_date.year
py_volume = (df.assign(year=df[date].dt.year)
.query('year == @previous_year and `{0}` <= @py_date'.format(date))
.agg({measure: summarization})
.squeeze()
)
py_volume = py_volume.round(2)
return py_volume
# Description: This script contains helper functions for identifying outliers in the data
def get_trend_outliers(
df,
dimension,
measure,
total = None,
summarization = "sum",
coverage = 0.5,
coverage_limit = 5):
"""
Returns trend outliers based on a given dataframe, dimension, and measure.
"""
grouped = df.groupby(dimension)
# Define a function to apply to each group
def process_group(group, ytd_volume, pytd_volume, summarization, measure):
curr_volume = ytd_volume(group, summarization=summarization, measure=measure)
prev_volume = pytd_volume(group, summarization=summarization, measure=measure)
change = curr_volume - prev_volume
change_p = f"{round(change / prev_volume * 100, 2)}%"
abs_change = abs(change)
trend = "increase" if change > 0 else "decrease"
output = pd.Series({
"curr_volume": curr_volume,
"prev_volume": prev_volume,
"change": change,
"change_p": change_p,
"abs_change": abs_change,
"trend": trend,
})
return output
# Apply the function to each group and create a new DataFrame
table = grouped.apply(process_group, ytd_volume, pytd_volume, summarization, measure)
# Reset the index and sort by abs_change
table = table.reset_index().sort_values(by="abs_change", ascending=False)
if summarization in ["sum", "count"]:
if total is None:
total = df[measure].sum().round(2)
table = (
table.assign(share=lambda x: x['abs_change'] / x['abs_change'].sum())
.assign(cum_share=lambda x: x['share'].cumsum())
.assign(lag_cum_share=lambda x: x['cum_share'].shift(fill_value=False))
).reset_index()
table = table[table['lag_cum_share'] < coverage]
table = table.head(coverage_limit)
if df.shape[0] == 1 and table['cum_share'].iloc[0] == 1:
return None
elif summarization == 'average':
if total is None:
total = table[measure].mean().round(2)
table = (table
.assign(share = lambda x: x[measure]/total - 1)
.assign(abs_share=lambda x: x['share'].abs())
.sort_values(by='abs_share', ascending=False)
.assign(cum_share=lambda x: x['share'].abs().cumsum()/(x['share'].max() - x['share'].min()))
.loc[lambda x: (x['cum_share'] >= coverage*2).shift(fill_value=False).cumsum() == 0]
.iloc[:coverage_limit]
)
n_outliers = table.shape[0]
outlier_levels = table[dimension].astype(str).values.tolist()
outlier_values = table[measure].round(1).values.tolist()
outlier_values_p = (table["share"].round(2) * 100).astype(str).add("%").values.tolist()
output = {
"n_outliers": n_outliers,
"outlier_levels": outlier_levels,
"outlier_values": outlier_values,
"outlier_values_p": outlier_values_p
}
return output
# Monthly Trend Analysis
def monthly_trend(df, measure, dimensions, date_column='Date', summarization='sum', top_n=3):
# Ensure the date column is in datetime format
df[date_column] = pd.to_datetime(df[date_column])
df['Year'] = df[date_column].dt.year
df['Month'] = df[date_column].dt.month
# Group by dimensions, Year, and Month
group_by_columns = dimensions + ['Year', 'Month']
if summarization == 'sum':
summary_func = np.sum
elif summarization == 'average':
summary_func = np.mean
elif summarization == 'count':
summary_func = 'count'
else:
raise ValueError("Invalid summarization method. Choose from 'sum', 'average', 'count'.")
summary = df.groupby(group_by_columns)[measure].agg(summary_func).reset_index(name='Measure')
summary.sort_values(by=group_by_columns, inplace=True)
summary['Change'] = summary.groupby(dimensions)['Measure'].diff().fillna(0)
summary['Percentage_Change'] = summary.groupby(dimensions)['Measure'].pct_change().fillna(0) * 100
narrative = {}
for dimension in dimensions:
dimension_summary = summary.groupby([dimension, 'Year', 'Month']).agg({'Change': 'sum', 'Percentage_Change': 'mean'}).reset_index()
# Top N positive changes
top_increases = dimension_summary.nlargest(top_n, 'Change')
# Top N negative changes
top_decreases = dimension_summary.nsmallest(top_n, 'Change')
narrative[dimension + ' Increases'] = '; '.join([f"{row[dimension]}, {row['Year']}-{row['Month']:02d}: Change={row['Change']:.2f}, ({row['Percentage_Change']:.2f}%)" for index, row in top_increases.iterrows()])
narrative[dimension + ' Decreases'] = '; '.join([f"{row[dimension]}, {row['Year']}-{row['Month']:02d}: Change={row['Change']:.2f}, ({row['Percentage_Change']:.2f}%)" for index, row in top_decreases.iterrows()])
# Overall summary
total_increase_months = summary[summary['Change'] > 0].shape[0]
total_decrease_months = summary[summary['Change'] < 0].shape[0]
narrative['Overall Summary'] = f"Total months with increase: {total_increase_months}, Total months with decrease: {total_decrease_months}"
return narrative
# Configure the OpenAI API client with the base URL for Ollama
from openai import OpenAI
openai_api_key = 'ollama'
# Instantiate the OpenAI client with the base_url for Ollama
client = OpenAI(
api_key=openai_api_key,
base_url='http://localhost:11434/v1' # Ollama's base URL
)
def gpt_get_completions(
prompt,
openai_api_key=openai_api_key
):
# Set up the OpenAI API client with the messages and model information
chat_completion = client.chat.completions.create(
messages=[
{"role": "system", "content": "You are an expert business intelligence data analyst."},
{"role": "user", "content": prompt}
],
model="llama3:latest", # Replace with your desired model - llama2:latest is the default
)
# Return the content of the first message in the completions
return chat_completion.choices[0].message.content
def enhance_narrative(
narrative,
openai_api_key=openai_api_key
):
prompt = f'Provide a detailed overview of the information provided to executive leadership and make sure to highlight any outliers or anomalies in the data: "{narrative}"'
output = gpt_get_completions(prompt, openai_api_key=openai_api_key)
return(output)
def translate_narrative(
narrative,
language,
openai_api_key=openai_api_key
):
prompt = f'Using professional language translate the following text to {language}: "{narrative}"'
output = gpt_get_completions(prompt, openai_api_key=openai_api_key)
return(output)
def summarize_narrative(
narrative,
openai_api_key=openai_api_key
):
prompt = f'Summarize the following narrative to make it shorter: "{narrative}"'
output = gpt_get_completions(prompt, openai_api_key=openai_api_key)
return(output)
# Read the dataset into a pandas DataFrame
filename = 'sales.csv'
def read_data():
return pd.read_csv(filename, encoding='latin-1', keep_default_na=False)
data = read_data()
data.head()
Order ID | Date | Region | Product | Store | Promotion | Price | Quantity | Sales | |
---|---|---|---|---|---|---|---|---|---|
0 | QR10276 | 1/30/2019 | Europe | Tools | Big | 1 | 89.6 | 28 | 2007.04 |
1 | QR15245 | 7/26/2020 | Asia | Tools | Big | 0 | 268.5 | 7 | 1879.50 |
2 | QR13938 | 3/5/2020 | Europe | Home | Big | 0 | 223.3 | 18 | 4019.40 |
3 | QR15934 | 10/8/2020 | South America | Clothing | Big | 0 | 210.4 | 27 | 5680.80 |
4 | QR10963 | 4/15/2019 | Europe | Tools | Big | 0 | 193.1 | 22 | 4248.20 |
# Set up narratives
sales_narrative = narrate_descriptive(data, measure='Sales', dimensions=['Region', 'Product'], return_data=False, coverage=.5)
price_narrative = narrate_descriptive(data, measure='Price', dimensions=['Region', 'Product'], return_data=False, coverage=.5)
trend_narrative = monthly_trend(data, measure='Sales', dimensions=['Region', 'Product'], date_column='Date')
# Enhance the narratives using locally hosted Ollama LLM model
sales_results = enhance_narrative(sales_narrative)
price_results = enhance_narrative(price_narrative)
trend_results = enhance_narrative(trend_narrative)
# Print the enhanced narratives
print("Sales Narrative:")
print(sales_results)
print("\nPrice Narrative:")
print(price_results)
print("\nTrend Narrative:")
print(trend_results)
Sales Narrative: To executive leadership, I am pleased to present the key findings from our recent analysis of sales data across all regions and products.
Total Sales: Our analysis reveals that total sales across all regions reached an impressive $38,790,478.42. This is a significant milestone for the company, indicating strong demand for our products globally.
Regional Outliers: Upon examining regional sales data, we have identified two outliers: North America and Europe. These regions account for a significant proportion of overall sales, with North America contributing 47% ($18,073,636.4) and Europe contributing 35% ($13,555,127.7). This suggests that our products are highly popular in these regions, which is an important insight for our market strategy.
Product Outliers: Our analysis also reveals two outliers among product categories: Food & Beverage (40%, $15,543,697) and Electronics (22%, $8,609,628.8). These high-performing products could be leveraged to drive growth and expansion into new markets. The dominance of Food & Beverage and Electronics in terms of sales suggests that these products are highly appealing to customers.
Key Insights: Based on our analysis, I would like to highlight the following key insights for executive leadership:
In conclusion, the data presents a compelling story about the strengths of our business. By focusing on high-performing regions and products, we can optimize our growth strategy, drive innovation, and increase revenue.
Price Narrative: Executive Summary:
Our analysis of the company's sales data reveals several key insights that can inform business decisions and drive growth.
Total Sales: The total price across all regions is $17,310.10, indicating a strong overall performance.
Regional Outliers: Two regions stand out as significant outliers in terms of their total sales: North America ($7,390.31, 43.0%) and Europe ($5,569.86, 32.0%). These regions account for nearly three-quarters of the company's total sales, suggesting a strong presence in these markets.
Product Outliers: Similarly, two products are notable outliers in terms of their sales: Food & Beverage ($6,099.01.5, 35.0%) and Electronics ($34,428.6, 20.0%). These products contribute significantly to the company's overall revenue, indicating a strong focus on these categories.
Key Takeaways:
Recommendations:
By leveraging these insights, executive leadership can inform data-driven decisions that drive business growth and profitability.
Trend Narrative: Executive Leadership Overview:
The provided data highlights significant changes in regional and product sales trends over the past two years. The key findings are as follows:
Regional Increases:
Regional Decreases:
Product Increases:
Product Decreases:
Overall Summary:
Key Observations:
Outliers or anomalies in the data:
Recommendations:
By highlighting these key findings, outliers, and recommendations, we can inform executive leadership of the data's implications and guide business decisions to optimize regional and product performance.