import json
import boto3
from snowflake.snowpark.session import Session
def get_secret(secret_name, region_name="us-east-1"):
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
get_secret_value_response = json.loads(get_secret_value_response['SecretString'])
return get_secret_value_response
creds = get_secret("wysde")
connection_parameters = {
"account": creds["SNOWFLAKE_ACCOUNT"],
"user": creds["SNOWFLAKE_USERNAME"],
"password": creds["SNOWFLAKE_PASSWORD"],
"warehouse": creds["SNOWFLAKE_WAREHOUSE"],
"role": creds["SNOWFLAKE_ROLE"],
"database": "sparsh",
"schema": "madcow"
}
# Install needed packages
# Be sure check that packages also exist in Acaconda repo for Snowflake
!pip install scipy numpy seaborn pandas statsmodels
!mkdir -p seeds
!cp -r ../data/*.csv seeds
!dbt seed
10:41:41 Running with dbt=1.3.2 10:41:41 Partial parse save file not found. Starting full parse. 10:41:43 [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources. There are 1 unused configuration paths: - models.fifapark.example 10:41:43 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 303 macros, 0 operations, 3 seed files, 0 sources, 0 exposures, 0 metrics 10:41:43 10:41:48 Concurrency: 1 threads (target='dev') 10:41:48 10:41:48 1 of 3 START seed file MADCOW.goalscorers ...................................... [RUN] 10:43:04 1 of 3 OK loaded seed file MADCOW.goalscorers .................................. [INSERT 41008 in 75.78s] 10:43:04 2 of 3 START seed file MADCOW.results .......................................... [RUN] 10:44:33 2 of 3 OK loaded seed file MADCOW.results ...................................... [INSERT 44353 in 89.37s] 10:44:33 3 of 3 START seed file MADCOW.shootouts ........................................ [RUN] 10:44:37 3 of 3 OK loaded seed file MADCOW.shootouts .................................... [INSERT 547 in 3.77s] 10:44:37 10:44:37 Finished running 3 seeds in 0 hours 2 minutes and 53.44 seconds (173.44s). 10:44:37 10:44:37 Completed successfully 10:44:37 10:44:37 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
session = Session.builder.configs(connection_parameters).create()
import pandas as pd
import numpy as np
import seaborn
import statsmodels.api as sm
import statsmodels.formula.api as smf
from scipy.stats import poisson
from snowflake.snowpark.functions import udf
# Let's fetch training data that we have in Snowflake about FIFA games
# Data is limited to games played after 2000 for validity reasons and also because this limits the size of the model
df = session.sql("SELECT * FROM RESULTS WHERE DATE > '2000-01-01'").to_pandas()
# It seems that score values are casted as strings, let's cast them to floats
df = df[['HOME_TEAM','AWAY_TEAM','HOME_SCORE','AWAY_SCORE']].copy()
df['HOME_SCORE'] = df['HOME_SCORE'].astype(float)
df['AWAY_SCORE'] = df['AWAY_SCORE'].astype(float)
df = df.sample(frac=0.1)
# Lets build the model using Poisson model
goal_model_data = pd.concat([df[['HOME_TEAM','AWAY_TEAM','HOME_SCORE']].assign(home=1).rename(
columns={'HOME_TEAM':'team', 'AWAY_TEAM':'opponent','HOME_SCORE':'goals'}),
df[['AWAY_TEAM','HOME_TEAM','AWAY_SCORE']].assign(home=0).rename(
columns={'AWAY_TEAM':'team', 'HOME_TEAM':'opponent','AWAY_SCORE':'goals'})])
model = smf.glm(formula="goals ~ home + team + opponent", data=goal_model_data,
family=sm.families.Poisson()).fit()
# Lets import the model to Snowflake stage
session.sql('create or replace stage MODELSTAGE').collect() #Create a model stage if it does not already exist.
[Row(status='Stage area MODELSTAGE successfully created.')]
# Let's build the Snowflake Python UDF using Snowpark @udf command
# Necessary packages are imported in the same command
# This is part is needed only once
@udf(name='predict_result',
is_permanent = True,
stage_location = '@MODELSTAGE',
replace=True,
session=session,
packages=["numpy", "pandas", "scipy", "statsmodels"])
def simulate_match(homeTeam: str, awayTeam: str) -> float:
home_goals_avg = model.predict(pd.DataFrame(data={'team': homeTeam,
'opponent': awayTeam,'home':1},
index=[1])).values[0]
away_goals_avg = model.predict(pd.DataFrame(data={'team': awayTeam,
'opponent': homeTeam,'home':0},
index=[1])).values[0]
team_pred = [[poisson.pmf(i, team_avg) for i in range(0, 10)] for team_avg in [home_goals_avg, away_goals_avg]]
home_win=(np.outer(np.array(team_pred[0]), np.array(team_pred[1])))
result = np.sum(np.tril(home_win, -1))
return result
!mkdir -p models
%%writefile models/fifa_predictions.py
def model(dbt, session):
dbt.config(
materialized="table",
packages=["pandas", "numpy", "statsmodels", "scipy", "joblib"]
)
goal_model_data = dbt.ref("goal_model_data").to_pandas()
model = smf.glm(formula="goals ~ home + team + opponent", data=goal_model_data,
family=sm.families.Poisson()).fit()
session.sql('create or replace stage MODELSTAGE').collect()
model_file = 'tmp/model.joblib'
dump(model, model_file)
session.file.put(model_file, "@MODELSTAGE", overwrite=True)
return goal_model_data #We need to always return a DF from dbt Python
Predict 1st round winners using existing RESULTS Loop through the schedule and populate PREDICT_FOR_HOME_WIN column with value provided by previously build FUNCTION
Note, full data predicton can take up to 20minutes with X-SMALL warehouse
%%sql
SELECT * FROM SHOOTOUTS LIMIT 10
SELECT * FROM GOALSCORERS LIMIT 10
SELECT * FROM RESULTS LIMIT 10
CREATE TABLE RESULTS_SMALL AS
SELECT * FROM RESULTS LIMIT 10
ALTER TABLE RESULTS_SMALL
ADD PREDICT_FOR_HOME_WIN smallint;
DECLARE
c1 CURSOR FOR SELECT home_team, away_team FROM RESULTS_SMALL;
home_team VARCHAR;
away_team VARCHAR;
BEGIN
FOR record IN c1 DO
home_team := record.home_team;
away_team := record.away_team;
UPDATE RESULTS_SMALL f SET predict_for_home_win = prediction.p
FROM (SELECT predict_result(:home_team, :away_team) p ) AS prediction
WHERE f.home_team = :home_team AND f.away_team = :away_team ;
END FOR;
END;
SELECT * FROM RESULTS_SMALL LIMIT 10
%%sql
-- 3. Create view to show winners of Group stage
CREATE OR REPLACE VIEW A1_WINNERS_GROUP_STAGE(
HOME_TEAM,
AWAY_TEAM,
WINNER
) as (
SELECT
HOME_TEAM
, AWAY_TEAM
, CASE WHEN PREDICT_FOR_HOME_WIN > 0.5 THEN HOME_TEAM
ELSE AWAY_TEAM END AS WINNER
FROM RESULTS_SMALL
);
SELECT * FROM RESULTS_SMALL LIMIT 10
%%sql
-- 4. Create view to show Round of 16 game pairs
CREATE OR REPLACE VIEW A2_GAME_PAIRS_ROUND_OF_16(
HOME_TEAM,
AWAY_TEAM
) as (
WITH BRACKET_WINNERS AS (
SELECT
DISTINCT
winners_group_stage.HOME_TEAM as W_HOME,
winners_group_stage.AWAY_TEAM as W_AWAY,
winners_group_stage.WINNER
FROM A1_WINNERS_GROUP_STAGE winners_group_stage
INNER JOIN RESULTS_SMALL fifa_schedule ON TRIM(fifa_schedule.HOME_TEAM) = TRIM(winners_group_stage.HOME_TEAM) AND TRIM(fifa_schedule.AWAY_TEAM) = TRIM(winners_group_stage.AWAY_TEAM)
GROUP BY W_HOME, W_AWAY, WINNER
), BRACKET_QUALIFIERS AS (
SELECT
WINNER,
COUNT(WINNER) AS WIN_COUNT,
RANK() OVER (ORDER BY COUNT(WINNER) DESC) AS rank
FROM BRACKET_WINNERS
GROUP BY WINNER
), BRACKET_RANKS AS (
SELECT
bracket_qualifiers.WINNER AS COUNTRY
, bracket_qualifiers.WIN_COUNT AS WINS
, CASE WHEN rank = 1 THEN
1
ELSE 2
END AS RANK_IN_BRACKET
FROM BRACKET_QUALIFIERS bracket_qualifiers
WHERE RANK < 3
), BRACKET_RANK_TO_COUNTRY_SCHEDULE AS (
SELECT
COUNTRY
, REPLACE(RANK_IN_BRACKET, ' ', '') AS RANK_IN_BRACKET
FROM BRACKET_RANKS
ORDER BY RANK_IN_BRACKET
), KNOCKOUT_GAMES AS (SELECT
*
FROM RESULTS_SMALL
) SELECT
bg_home.country AS HOME_TEAM
, bg_away.country AS AWAY_TEAM
FROM KNOCKOUT_GAMES kg
INNER JOIN BRACKET_RANK_TO_COUNTRY_SCHEDULE bg_home ON bg_home.RANK_IN_BRACKET = kg.HOME_TEAM
INNER JOIN BRACKET_RANK_TO_COUNTRY_SCHEDULE bg_away ON bg_away.RANK_IN_BRACKET = kg.AWAY_TEAM
) ;
SELECT * FROM A2_GAME_PAIRS_ROUND_OF_16 LIMIT 10;