#!/usr/bin/env python # coding: utf-8 # Open In Colab #

CFG Data Science Interview Task - Hadi Sotudeh

# I make use of `scikit-learn` library in Python. # # First, I need to import the relevant libraries, download the dataset, and have them available in Google Colab. #
#
# It is highly recommended to enable `GPU` on this notebook for quicker computations. # ## **Install & import libraries and download datasets** # In[ ]: get_ipython().run_cell_magic('capture', '', '# Remove previous files\n! rm ShotData.csv *.joblib\n! rm *.py && rm -rf sample_data && rm -rf sample-data\n\n# Download the datasets (ShotData.csv and metrica matches)\n! wget https://www.dropbox.com/s/zpijv8epzrbzifr/ShotData.csv\n! git clone https://github.com/metrica-sports/sample-data\n\n# Clone required functions to read metrica data from LaurieOnTracking repository (Friends of Tracking)\n! git clone https://github.com/Friends-of-Tracking-Data-FoTD/LaurieOnTracking\n! mv LaurieOnTracking/Metrica_IO.py ./Metrica_IO.py && mv LaurieOnTracking/Metrica_Viz.py ./Metrica_Viz.py\n! rm -rf LaurieOnTracking\n') # In[ ]: get_ipython().run_cell_magic('capture', '', '\n# Import Libraries\n# General libraries\nfrom tqdm import tqdm_notebook as tqdm\nimport matplotlib.pyplot as plt\nimport pandas as pd\nimport matplotlib\nimport numpy as np\nimport itertools\nimport warnings\nimport math\nimport os\n\n# Machine learning libraries\nfrom sklearn.metrics import plot_roc_curve, roc_auc_score, brier_score_loss\nfrom sklearn.model_selection import GridSearchCV\nfrom sklearn.ensemble import RandomForestClassifier\nfrom sklearn.linear_model import LogisticRegression\nfrom sklearn.preprocessing import OneHotEncoder\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.calibration import calibration_curve\nfrom sklearn.dummy import DummyClassifier\nfrom sklearn.pipeline import Pipeline\nfrom lightgbm import LGBMClassifier\nfrom xgboost import XGBClassifier\nimport joblib\n\n# Model interpretation library\nfrom sklearn.inspection import plot_partial_dependence\n\n# Metrica functions\nimport Metrica_IO as mio\nimport Metrica_Viz as mviz\n') # ## **Set Global Parameters** # In[ ]: # Show plots inside the jupyter notebook get_ipython().run_line_magic('matplotlib', 'inline') # Pandas settings to show more columns are rows in the jupyter notebook pd.set_option('display.max_rows', 500) pd.set_option('display.max_columns', 50000) # Increase font size of the plots plt.rcParams.update({'font.size': 18}) # Don't show warnings warnings.filterwarnings('ignore') # Dataset file path dataset = "ShotData.csv" # Target variable to predict dep_var = "outcome" # Hyper-paramter tuning variables cv = 5 seed = 42 scoring = 'roc_auc' ######### This part is copied from https://github.com/Friends-of-Tracking-Data-FoTD/LaurieOnTracking/blob/master/Tutorial1_GettingStarted.py # Set up initial path to data DATADIR = './sample-data/data/' game_id = 2 # let's look at sample match 2 home_team = "Home" away_team = "Away" pitch_x = 105. pitch_y = 68. meters_per_yard = 0.9144 # unit conversion from yards to meters goal_line_width = 8*meters_per_yard # #**Step 1 - Build an Expected Goal (xG) model** # ## Load ShotData and Perform Data Cleaning & Feature Engineering # In[ ]: # Keep only relevant features and the label (dep_var) cols_to_keep = ["position_x","position_y","play_type","BodyPart","Number_Intervening_Opponents","Number_Intervening_Teammates","outcome"] # Read the train file df = pd.read_csv(dataset)[cols_to_keep] # Column preprocessing # Binarize dep_var (goal and non-goal), binarize body type (foot and header) df["BodyPart"] = df["BodyPart"].apply(lambda r: "Foot" if r in ["Left", "Right"] else r) # goal = 1 and else 0 # Increase data quality, e.g., play_type has typo (Direct corner and Direct Corner) that need to be mapped to the same value df["play_type"] = df["play_type"].apply(lambda r: "Direct corner" if r in ["Direct corner", "Direct Corner"] else r) # Calculate extra relevant features such as distance and angle # Euclidean distance to center of the goal sqrt(x^2+y^2) df["distance"] = np.sqrt(df["position_x"]**2 + df["position_y"]**2) # Shot angle ot the goal # The angle is arctan(goal_line_width*x/(x^2+y^2-(goal_line_width*/2)^2)), which is in radius and I convert it into degrees next df["angle"] = np.arctan(goal_line_width*df["position_x"]/(df["position_x"]**2+df["position_y"]**2-(goal_line_width/2)**2)) df["angle"] = np.rad2deg(df["angle"].apply(lambda alpha: np.pi + alpha if alpha < 0 else alpha)) # Rounding the dataframe (2 decimals) df = df.round(2) # Perform one-hot encoding without multicollinearity columns_to_encode = ["play_type","BodyPart"] for col in columns_to_encode: # Prevent multicollinearity one_hot = pd.get_dummies(df[col], drop_first=True) # Drop the column as it is now encoded df = df.drop(col,axis = 1) # Join the encoded df df = df.join(one_hot) # Move outcome to the last column features = [x for x in df.columns if x not in ["position_x","position_y",dep_var]] df = df[features + [dep_var]] df.head() # Check basic dataset statistics: # In[ ]: is_empty = "No" if df.isnull().sum().all(): is_empty = "Yes" print(f"Is there any empty values in the dataset? {is_empty}\n") print(f"There are {df.shape[0]} instances in the dataset.") # Binarize outcome (goals vs. non-goals) and name it `goal`(0 or 1) and print the binarized distribution. # In[ ]: df["outcome"] = df["outcome"].apply(lambda r: 1 if r in ["Goal", "owngoal"] else 0) # goal = 1 and else 0 # Change column name from outcome to goal dep_var = "goal" df = df.rename(columns={"outcome": "goal"}) distributions = df[dep_var].value_counts(normalize=True) print(f"Non-Goal percentage is {round(100*distributions[0],2)}% and the Goal percentage is {round(100*distributions[1],2)}%") # Create Train and validation sets with (80%, 20%) # In[ ]: # Selecting correspondnig columns for training and test stes X = df[features].values y = df[dep_var].values # Spliting train and test sets. 80% for the training and 20% for the test set. xs, valid_xs, y, valid_y = train_test_split(X, y, test_size=0.20, random_state=seed, shuffle=True) # ## Define the evaluation metric, train ML Models, and choose the best one # Define [AUC ROC](https://en.wikipedia.org/wiki/Receiver_operating_characteristic) as the evaluation metric because the dataset is imbalanced. # In[ ]: def calc_auc_roc(y, prob_pred): return roc_auc_score(y, prob_pred) # Now, it is time to do the hyper-parameter tuning # In[ ]: ## Hyperparameters lr_hyperparameters = { 'lr__C': [0.01, 0.1, 1.0, 10, 100] } rf_hyperparameters = { 'rf__n_estimators': np.arange(20,100,10), 'rf__max_features': np.arange(0.5,1.0,0.1), 'rf__max_depth': np.arange(1,20,5), } xgb_hyperparameters = { 'xgb__max_depth': np.arange(2,12,2), # the maximum depth of each tree 'xgb__learning_rate': [0.1,0.3], # the training step for each iteration 'xgb__n_estimators': np.arange(1,80,10), } lgbm_hyperparameters = { 'lgbm__n_estimators': np.arange(10,140,20), 'lgbm__min_data_in_leaf': np.arange(100,1000,100), 'lgbm__max_depth': np.arange(2,10,2), } hyperparameters = { 'lr': lr_hyperparameters, 'rf': rf_hyperparameters, 'lgbm': lgbm_hyperparameters, 'xgb': xgb_hyperparameters, } # Pipeline of ML classiferis' pipielines pipelines = { 'bl': Pipeline([('bl', DummyClassifier(strategy='most_frequent'))]), # base line 'lr': Pipeline([('lr',LogisticRegression(random_state=seed, n_jobs=-1, penalty='l2'))]), 'rf': Pipeline([('rf', RandomForestClassifier(random_state=seed, n_jobs=-1, oob_score=True))]), 'xgb': Pipeline([('xgb', XGBClassifier(random_state=seed, n_jobs=-1))]), 'lgbm': Pipeline([('lgbm', LGBMClassifier(random_state=seed, n_jobs=-1))]), } # In[ ]: # Start the training process results = [] model_names = {"bl":"Baseline", "lr": "Logistic Regression" , "rf":"Random Forest", "xgb": "XGBoost", "lgbm": "Light Gradient Boosting"} fig, ax = plt.subplots(figsize=(8, 8)) ax.plot([0, 1], [0, 1], "k:", label="Perfectly calibrated") for key, pipeline in tqdm(pipelines.items()): if key == 'bl': model = pipeline else: model = GridSearchCV(pipeline, hyperparameters[key], cv=cv, scoring=scoring, n_jobs=-1) model.fit(xs,y) if hasattr(model,'best_estimator_'): best = model.best_estimator_.named_steps[key] else: best = model result = {} result["model"] = model_names[key] train_prob_pred = best.predict_proba(xs)[:,1] result["training (auc roc)"] = calc_auc_roc(y, train_prob_pred) validation_prob_pred = best.predict_proba(valid_xs)[:,1] result["validation (auc roc)"] = calc_auc_roc(valid_y, validation_prob_pred) fraction_of_positives, mean_predicted_value = calibration_curve(valid_y, validation_prob_pred, n_bins=10) result["Brier score"] = brier_score_loss(valid_y, validation_prob_pred) results.append(result) # Do not log calibration information for baseline fitter, as I already have done it. if key == 'bl': continue # Plot the calibration plot ax.plot(mean_predicted_value, fraction_of_positives, "s-", label="%s" % (model_names[key], )) # Save the model joblib.dump(best, f'{model_names[key]}.joblib') results_df = pd.DataFrame(results).round(3) display(results_df) ax.set_xlabel("Mean predicted value") ax.set_ylabel("Fraction of positives") ax.set_ylim([-0.05, 1.05]) ax.legend(loc="lower right") ax.set_title('Calibration plot (reliability curve)') plt.tight_layout() plt.savefig('calibartion_plot.png', bbox_inches='tight') plt.show() # Select the best model and print its tunned parameters. # In[ ]: best_model_name = "Logistic Regression" selected_model = joblib.load(f"{best_model_name}.joblib") print(f"selected model is {best_model_name}.\n") print("Its parameters are:") selected_model.get_params() # In[ ]: fig, ax = plt.subplots(figsize=(7, 7)) ax.set_title('AUC ROC Curve of the Logistic Regression Model') plot_roc_curve(selected_model, valid_xs, valid_y, ax=ax); # ## Interpret the model for knowledge discovery # Print the coefficients and intercept of the logistic regression model # In[ ]: intercept = round(selected_model.intercept_[0],2) print(f"The model intercept is {intercept}\n") coefficients = [round(c,2) for c in selected_model.coef_[0]] print("The model coefficients are:") pd.DataFrame(coefficients, features, columns=['coef']).sort_values(by='coef', ascending=False) # 1-dimensional partial dependence plots # In[ ]: explore_cols = ["angle", "distance", "Number_Intervening_Opponents","Number_Intervening_Teammates"] valid_xs_df = pd.DataFrame(valid_xs, columns = features) for index, col in enumerate(explore_cols): fig,ax = plt.subplots(figsize=(12, 4)) plot_partial_dependence(selected_model, valid_xs_df, [col], grid_resolution=20, ax=ax); # 2-dimensional partial dependence plots # # In[ ]: paired_features = [ ("angle","distance"), ("Number_Intervening_Opponents","Number_Intervening_Teammates"), ("angle","Number_Intervening_Opponents"), ("distance","Number_Intervening_Opponents"), ] for index, pair in enumerate(paired_features): fig,ax = plt.subplots(figsize=(8, 8)) plot_partial_dependence(selected_model, valid_xs_df, [pair], grid_resolution=20, ax=ax); # # **Step 2** # ## Functions needed to enrich the Metrica Shots based on xG model features (Step 1) # # In[ ]: def calc_distance(ball, event_team): """Euclidean distance to center of the target goal""" ball_x, ball_y = ball if event_team == home_team: return np.sqrt((ball_x - (-pitch_x/2))**2 + (ball_y)**2) else: return np.sqrt((ball_x - (pitch_x/2))**2 + (ball_y)**2) def calc_angle(ball, event_team): """Shot angle to the goalposts in degrees""" ball_x, ball_y = ball x_distance_to_goal = abs(pitch_x/2 - ball_x) y_distance_to_goal = abs(ball_y) if event_team == home_team: x_distance_to_goal = abs(ball_x + pitch_x/2) angle = np.arctan(goal_line_width*x_distance_to_goal/(x_distance_to_goal**2+y_distance_to_goal**2-(goal_line_width/2)**2)) if angle < 0: angle = np.pi + angle return np.rad2deg(angle) def calc_playType(event_pre_Type, event_pre_Subtype): '''Calculate the play type of a shot''' # Possible playTypes from xg model ['Open Play', 'Direct freekick', 'Penalty', 'Direct corner'] play_type = "Open Play" if event_pre_Type == "SET PIECE": if event_pre_Subtype in ["FREE KICK","FREE KICK-RETAKEN"]: play_type = "Direct freekick" elif event_pre_Subtype == "CORNER KICK": play_type = "Direct corner" elif event_pre_Subtype == "PENALTY": play_type = "Penalty" return play_type def calc_BodyPart(event_Subtype): '''Calculate the body type involved in a shot''' # Assuming all shots are with either head or foot if "HEAD" in event_Subtype: return "Head" else: return "Foot" def calc_trianlge_area(point1, point2, point3): """Calculate a triangle area with three given points (x,y)""" x1, y1 = point1 x2, y2 = point2 x3, y3 = point3 return abs((x1*(y2-y3)+(x2*(y3-y1))+(x3*(y1-y2)))/2) def calc_Number_Intervening(ball, event_player, event_team, frame, frame_team_name): """Calculate whether a player lies inside the triangle between the shooter and the goalposts""" number_intervening_players = 0 goalpost1 = () goalpost2 = () if event_team == home_team: goalpost1 = (-pitch_x/2, goal_line_width/2) goalpost2 = (-pitch_x/2, -goal_line_width/2) else: goalpost1 = (pitch_x/2, goal_line_width/2) goalpost2 = (pitch_x/2, -goal_line_width/2) shooter_triangle_area = calc_trianlge_area(ball, goalpost1, goalpost2) id_range = range(1,15) if frame_team_name == away_team: id_range = range(15,27) cols = [[f"{frame_team_name}_{id}_x",f"{frame_team_name}_{id}_y"] for id in id_range] # If calculating teammates obscuring the goal, remove the action taker player from the list if event_team == frame_team_name: cols.remove([f"{frame_team_name}_{event_player}_x",f"{frame_team_name}_{event_player}_y"]) for col in cols: playerPoint = tuple(frame[col]) if np.isnan(playerPoint[0]): continue player_triangles_areas = calc_trianlge_area(playerPoint, goalpost1, goalpost2) + calc_trianlge_area(playerPoint, goalpost1, ball) + calc_trianlge_area(playerPoint, goalpost2, ball) if round(shooter_triangle_area) == round(player_triangles_areas): number_intervening_players = number_intervening_players + 1 return number_intervening_players def calc_goal(event_Subtype): '''Calculate whether the outcome is goal or not''' if 'GOAL' in event_Subtype: return 1 else: return 0 def calc_xG(instance): '''Predict the goal-scoring probability of a shot by applying the trained model''' x = instance[features].values.reshape(1,-1) return round(selected_model.predict_proba(x)[:,1][0],2) # ## Find the shot frames and enrich them with defined functions to predict xG values. # In[ ]: # Read the event data events = mio.read_event_data(DATADIR,game_id) # Bit of housekeeping: unit conversion from metric data units to meters events = mio.to_metric_coordinates(events) #### TRACKING DATA #### # READING IN TRACKING DATA tracking_home = mio.tracking_data(DATADIR,game_id,'Home') tracking_away = mio.tracking_data(DATADIR,game_id,'Away') # Convert positions from metrica units to meters tracking_home = mio.to_metric_coordinates(tracking_home) tracking_away = mio.to_metric_coordinates(tracking_away) # Reverse direction of play in the second half # So that home team is always attacking from right->left # And the away team is always attacking from left->right tracking_home,tracking_away,events = mio.to_single_playing_direction(tracking_home,tracking_away,events) # Add a column event 'Minute' to the data frame events['Minute'] = events['Start Time [s]']/60. # Add information abou the previous action events["pre_Type"] = events["Type"].shift(1) events["pre_Subtype"] = events["Subtype"].shift(1) # Create the dataframe required by the xG model xg_features = ['Number_Intervening_Opponents', 'Number_Intervening_Teammates', 'distance', 'angle', 'Direct freekick', 'Open Play', 'Penalty', 'Head', 'Other'] instances = [] for index, event in events[events.Type=="SHOT"].iterrows(): # Extract required information event_team = event.Team event_player = event.From.replace("Player","").strip() event_time_seconds = event["Start Time [s]"] event_time_minutes = event.Minute event_Subtype = event.Subtype event_pre_Type = event.pre_Type event_pre_Subtype = event.pre_Subtype frame_number = event["Start Frame"] # get the event frame number frame_home = tracking_home.loc[frame_number] frame_away = tracking_away.loc[frame_number] ball = (frame_home["ball_x"], frame_home["ball_y"]) instance = {} instance["Frame"] = frame_number instance["team"] = event_team instance["Start Time [s]"] = event_time_seconds instance["Minute"] = event_time_minutes if event_team == home_team: instance["Number_Intervening_Opponents"] = calc_Number_Intervening(ball, event_player, event_team, frame_away, away_team) instance["Number_Intervening_Teammates"] = calc_Number_Intervening(ball, event_player, event_team, frame_home, home_team) elif event_team == away_team: instance["Number_Intervening_Opponents"] = calc_Number_Intervening(ball, event_player, event_team, frame_home, home_team) instance["Number_Intervening_Teammates"] = calc_Number_Intervening(ball, event_player, event_team, frame_away, away_team) instance["distance"] = calc_distance(ball, event_team) instance["angle"] = calc_angle(ball, event_team) instance["BodyPart"] = calc_BodyPart(event_Subtype) instance["play_type"] = calc_playType(event_pre_Type, event_pre_Subtype) instance[dep_var] = calc_goal(event_Subtype) instances.append(instance) instances_df = pd.DataFrame(instances) instances_df = instances_df.round(2) # Perform one-hot encoding columns_to_encode = ["play_type","BodyPart"] for col in columns_to_encode: # Prevent multicollinearity one_hot = pd.get_dummies(instances_df[col]) # Drop the column as it is now encoded instances_df = instances_df.drop(col,axis = 1) # Join the encoded df instances_df = instances_df.join(one_hot) # There us no 'Other' in metrica body part data instances_df["Other"] = 0 # Move outcome to the last column instances_df = instances_df[[x for x in instances_df.columns if x not in [dep_var]] + [dep_var]] ## Output Expected Goals instances_df['xG'] = instances_df.apply(lambda instance: calc_xG(instance), axis=1) display(instances_df.head()) # In[ ]: instances_df.groupby("team")[dep_var, "xG"].sum() # In[ ]: # Binning xG def calc_xg_quality(xg): if xg < 0.07: return "poor" elif 0.07 <= xg < 0.15: return "fair" elif 0.15 <= xg < 0.30: return "good" elif 0.30 <= xg: return "very good" instances_df["xG_quality"] = instances_df["xG"].apply(lambda xg: calc_xg_quality(xg)) instances_df.groupby("team")["xG_quality"].value_counts() # In[ ]: # inspird from plot_events in Metrica_Viz def plot_xG(events, MarkerSize, color, figax=None, field_dimen = (105.0,68.0)): """ plot_events( events ) Plots xG shots on a football pitch. event data can be a single or several rows of a data frame. All distances should be in meters. Parameters ----------- events: row (i.e. instant) of the home team tracking data frame MarkerSize: marker size color: color of indicator fig,ax: Can be used to pass in the (fig,ax) objects of a previously generated pitch. Set to (fig,ax) to use an existing figure, or None (the default) to generate a new pitch plot, field_dimen: tuple containing the length and width of the pitch in meters. Default is (105,68) Returrns ----------- fig,ax : figure and aixs objects (so that other data can be plotted onto the pitch) """ if figax is None: # create new pitch fig,ax = mviz.plot_pitch( field_dimen = field_dimen ) else: # overlay on a previously generated pitch fig,ax = figax for i,row in events.iterrows(): ax.plot( row['Start X'], row['Start Y'], color+'o', alpha=0.5, MarkerSize = MarkerSize) return fig,ax # Plot the shots with their xG values # In[ ]: figax = None for index, instance in instances_df.iterrows(): frame_id = instance["Frame"] team = instance["team"] xG = instance["xG"] color = 'r' if team == away_team: color = 'b' MarkerSize= math.ceil(xG*20) + 10 fig,ax = plot_xG(events[(events.Type=="SHOT")&(events["Start Frame"]==frame_id)].iloc[-1:], MarkerSize=MarkerSize, color = color, figax=figax) figax = (fig,ax) # ## Explore Cases # In[ ]: instances_df.sort_values(by=["xG"]) # In[ ]: # Plot a shot taken by the Home team frame_id = 12202 fig,ax = mviz.plot_events(events[(events.Type=="SHOT")&(events["Start Frame"]==frame_id)].iloc[-1:], indicators = ['Marker','Arrow'], annotate=False, field_dimen = (105.0,68.0)) fig,ax = mviz.plot_frame(tracking_home.loc[frame_id], tracking_away.loc[frame_id], field_dimen = (105.0,68.0), figax = (fig,ax) ) # In[ ]: # Plot a shot taken by the Away team frame_id = 115009 fig,ax = mviz.plot_events(events[(events.Type=="SHOT")&(events["Start Frame"]==frame_id)].iloc[-1:], indicators = ['Marker','Arrow'], annotate=False, field_dimen = (105.0,68.0)) fig,ax = mviz.plot_frame(tracking_home.loc[frame_id], tracking_away.loc[frame_id], field_dimen = (105.0,68.0), figax = (fig,ax) ) # ## THE END!