Capstone - Complete Analytics System
While many analysts use pre-built xG values from data providers, building your own Expected Goals model provides deep understanding of shot quality factors and allows customization for specific leagues, competitions, or analysis needs.
Learning Objectives
- Understand the fundamentals of xG model construction
- Engineer features from shot data
- Train and evaluate logistic regression and gradient boosting models
- Handle class imbalance in goal prediction
- Interpret and validate your xG model
Prerequisites
This chapter assumes familiarity with basic machine learning concepts covered in earlier chapters. You should understand classification, train/test splits, and evaluation metrics.
Preparing Shot Data
The first step in building an xG model is preparing a clean dataset of shots with relevant features.
import pandas as pd
import numpy as np
from statsbombpy import sb
def load_shot_data(competition_id, season_ids):
"""
Load shot data from multiple seasons.
"""
all_shots = []
for season_id in season_ids:
matches = sb.matches(competition_id=competition_id, season_id=season_id)
for match_id in matches['match_id']:
try:
events = sb.events(match_id=match_id)
shots = events[events['type'] == 'Shot']
shots['season_id'] = season_id
all_shots.append(shots)
except:
continue
return pd.concat(all_shots, ignore_index=True)
# Load Premier League shots
shots = load_shot_data(2, [90, 44, 42])
# Basic cleaning
shots_clean = shots.dropna(subset=['location']).copy()
# Extract location components
shots_clean['shot_x'] = shots_clean['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
shots_clean['shot_y'] = shots_clean['location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
)
# Target variable
shots_clean['is_goal'] = (shots_clean['shot_outcome'] == 'Goal').astype(int)
# Distance to goal (goal at x=120, y=40)
shots_clean['distance_to_goal'] = np.sqrt(
(120 - shots_clean['shot_x'])**2 +
(40 - shots_clean['shot_y'])**2
)
# Angle to goal (radians)
shots_clean['angle_to_goal'] = np.arctan2(
np.abs(shots_clean['shot_y'] - 40),
120 - shots_clean['shot_x']
)
# Shot characteristics
shots_clean['is_penalty'] = shots_clean['shot_type'] == 'Penalty'
shots_clean['is_header'] = shots_clean['shot_body_part'] == 'Head'
shots_clean['is_foot'] = shots_clean['shot_body_part'].isin(['Right Foot', 'Left Foot'])
shots_clean['first_time'] = shots_clean['shot_first_time'].fillna(False)
shots_clean['open_goal'] = shots_clean['shot_open_goal'].fillna(False)
print(f"Total shots: {len(shots_clean)}")
print(f"Goals: {shots_clean['is_goal'].sum()}")
print(f"Conversion rate: {shots_clean['is_goal'].mean()*100:.2f}%")library(tidyverse)
library(StatsBombR)
# Load shot data from multiple seasons
load_shot_data <- function(competition_id, season_ids) {
all_shots <- map_dfr(season_ids, function(season) {
matches <- FreeMatches(Competitions) %>%
filter(competition.competition_id == competition_id,
season.season_id == season)
events <- map_dfr(matches$match_id, ~get.matchFree(.x))
events %>%
filter(type.name == "Shot") %>%
mutate(season_id = season)
})
return(all_shots)
}
# Load Premier League shots
shots <- load_shot_data(2, c(90, 44, 42)) # Multiple seasons
# Basic cleaning
shots_clean <- shots %>%
filter(!is.na(location.x), !is.na(location.y)) %>%
mutate(
# Target variable
is_goal = as.integer(shot.outcome.name == "Goal"),
# Extract shot location
shot_x = location.x,
shot_y = location.y,
# Distance to goal center (goal at x=120, y=40)
distance_to_goal = sqrt((120 - shot_x)^2 + (40 - shot_y)^2),
# Angle to goal (in radians)
angle_to_goal = atan2(
abs(shot_y - 40),
120 - shot_x
),
# Shot type
shot_type = shot.type.name,
body_part = shot.body_part.name,
# Situation
play_pattern = play_pattern.name,
is_penalty = shot_type == "Penalty",
is_header = body_part == "Head",
is_foot = body_part %in% c("Right Foot", "Left Foot"),
# Qualifiers
first_time = coalesce(shot.first_time, FALSE),
follows_dribble = coalesce(shot.follows_dribble, FALSE),
open_goal = coalesce(shot.open_goal, FALSE)
)
cat("Total shots:", nrow(shots_clean), "\n")
cat("Goals:", sum(shots_clean$is_goal), "\n")
cat("Conversion rate:", round(mean(shots_clean$is_goal) * 100, 2), "%\n")Feature Engineering
The quality of your xG model depends heavily on the features you create. Good features capture the key factors that determine shot quality.
| Feature Category | Features | Impact on xG |
|---|---|---|
| Location | Distance, angle, x/y coordinates | High - primary determinant |
| Shot Type | Header, penalty, free kick, foot | High - headers are harder |
| Situation | Open play, set piece, counter | Medium |
| Execution | First time, following dribble | Medium |
| Context | Open goal, big chance indicator | High when present |
def engineer_xg_features(shots):
"""
Create features for xG model.
"""
shots = shots.copy()
# Distance and angle
shots['distance_to_goal'] = np.sqrt(
(120 - shots['shot_x'])**2 + (40 - shots['shot_y'])**2
)
shots['angle_to_goal'] = np.arctan2(
np.abs(shots['shot_y'] - 40),
120 - shots['shot_x']
)
shots['angle_degrees'] = np.degrees(shots['angle_to_goal'])
# Goal visibility (goal posts at y = 36.34 and y = 43.66)
shots['left_post_angle'] = np.arctan2(
shots['shot_y'] - 36.34, 120 - shots['shot_x']
)
shots['right_post_angle'] = np.arctan2(
43.66 - shots['shot_y'], 120 - shots['shot_x']
)
shots['goal_angle'] = np.abs(
shots['right_post_angle'] - shots['left_post_angle']
)
# Zone features
shots['in_six_yard_box'] = (
(shots['shot_x'] >= 114) &
(shots['shot_y'] >= 30) &
(shots['shot_y'] <= 50)
).astype(int)
shots['in_penalty_box'] = (
(shots['shot_x'] >= 102) &
(shots['shot_y'] >= 18) &
(shots['shot_y'] <= 62)
).astype(int)
shots['central_shot'] = (np.abs(shots['shot_y'] - 40) < 10).astype(int)
# Shot type encoding
shots['shot_type_counter'] = (
shots['play_pattern'] == 'From Counter'
).astype(int)
shots['shot_type_set_piece'] = (
shots['play_pattern'].isin(['From Corner', 'From Free Kick'])
).astype(int)
# Body part
shots['is_header'] = (shots['shot_body_part'] == 'Head').astype(int)
shots['is_right_foot'] = (shots['shot_body_part'] == 'Right Foot').astype(int)
shots['is_left_foot'] = (shots['shot_body_part'] == 'Left Foot').astype(int)
# Interaction features
shots['distance_x_angle'] = shots['distance_to_goal'] * shots['angle_to_goal']
shots['close_header'] = (shots['in_penalty_box'] & shots['is_header']).astype(int)
# Log transforms
shots['log_distance'] = np.log(shots['distance_to_goal'] + 1)
# Squared terms
shots['distance_squared'] = shots['distance_to_goal'] ** 2
shots['angle_squared'] = shots['angle_to_goal'] ** 2
return shots
shots_features = engineer_xg_features(shots_clean)
# Visualize feature relationships
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Distance distribution
goals = shots_features[shots_features['is_goal'] == 1]
no_goals = shots_features[shots_features['is_goal'] == 0]
axes[0].hist(no_goals['distance_to_goal'], bins=30, alpha=0.5,
label='No Goal', color='gray', density=True)
axes[0].hist(goals['distance_to_goal'], bins=30, alpha=0.5,
label='Goal', color='red', density=True)
axes[0].set_xlabel('Distance to Goal (m)')
axes[0].set_ylabel('Density')
axes[0].set_title('Shot Distance Distribution by Outcome')
axes[0].legend()
# Conversion by distance
shots_features['distance_bin'] = pd.cut(
shots_features['distance_to_goal'],
bins=range(0, 55, 5)
)
conv_by_dist = (
shots_features
.groupby('distance_bin')
.agg({'is_goal': ['sum', 'count']})
.reset_index()
)
conv_by_dist.columns = ['distance_bin', 'goals', 'shots']
conv_by_dist['conversion'] = conv_by_dist['goals'] / conv_by_dist['shots'] * 100
axes[1].bar(range(len(conv_by_dist)), conv_by_dist['conversion'], color='steelblue')
axes[1].set_xticks(range(len(conv_by_dist)))
axes[1].set_xticklabels([str(x) for x in conv_by_dist['distance_bin']], rotation=45)
axes[1].set_xlabel('Distance (m)')
axes[1].set_ylabel('Conversion %')
axes[1].set_title('Conversion Rate by Distance')
plt.tight_layout()
plt.show()# Advanced feature engineering
engineer_xg_features <- function(shots) {
shots %>%
mutate(
# Location-based features
distance_to_goal = sqrt((120 - shot_x)^2 + (40 - shot_y)^2),
angle_to_goal = atan2(abs(shot_y - 40), 120 - shot_x),
angle_degrees = angle_to_goal * 180 / pi,
# Goal visibility (approximate goal angle)
# Goal posts at y = 36.34 and y = 43.66
left_post_angle = atan2(shot_y - 36.34, 120 - shot_x),
right_post_angle = atan2(43.66 - shot_y, 120 - shot_x),
goal_angle = abs(right_post_angle - left_post_angle),
# Zone-based features
in_six_yard_box = shot_x >= 114 & shot_y >= 30 & shot_y <= 50,
in_penalty_box = shot_x >= 102 & shot_y >= 18 & shot_y <= 62,
central_shot = abs(shot_y - 40) < 10,
# Shot type encoding
shot_type_open_play = play_pattern == "Regular Play" | play_pattern == "From Counter",
shot_type_set_piece = play_pattern %in% c("From Corner", "From Free Kick"),
shot_type_counter = play_pattern == "From Counter",
# Body part encoding
is_head = body_part == "Head",
is_right_foot = body_part == "Right Foot",
is_left_foot = body_part == "Left Foot",
# Interaction features
distance_x_angle = distance_to_goal * angle_to_goal,
close_header = in_penalty_box & is_head,
# Log transforms for distance (diminishing returns)
log_distance = log(distance_to_goal + 1),
# Squared terms for non-linearity
distance_squared = distance_to_goal^2,
angle_squared = angle_to_goal^2
)
}
shots_features <- engineer_xg_features(shots_clean)
# Explore feature distributions
ggplot(shots_features, aes(x = distance_to_goal, fill = factor(is_goal))) +
geom_density(alpha = 0.5) +
scale_fill_manual(values = c("0" = "gray", "1" = "red"),
labels = c("No Goal", "Goal")) +
labs(
title = "Shot Distance Distribution by Outcome",
x = "Distance to Goal (m)", fill = "Outcome"
) +
theme_minimal()
# Conversion rate by distance bins
shots_features %>%
mutate(distance_bin = cut(distance_to_goal, breaks = seq(0, 50, 5))) %>%
group_by(distance_bin) %>%
summarise(
shots = n(),
goals = sum(is_goal),
conversion = goals / shots * 100
) %>%
ggplot(aes(x = distance_bin, y = conversion)) +
geom_bar(stat = "identity", fill = "steelblue") +
geom_text(aes(label = paste0(round(conversion, 1), "%")), vjust = -0.5, size = 3) +
labs(title = "Conversion Rate by Distance", x = "Distance (m)", y = "Conversion %") +
theme_minimal() +
theme(axis.text.x = element_text(angle = 45, hjust = 1))Training the xG Model
We'll train multiple models and compare their performance. Logistic regression provides interpretability while gradient boosting often achieves better accuracy.
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import log_loss, roc_auc_score, brier_score_loss
import xgboost as xgb
def prepare_model_data(shots):
"""
Prepare data for model training.
"""
feature_cols = [
'distance_to_goal', 'angle_to_goal', 'goal_angle',
'in_six_yard_box', 'in_penalty_box', 'central_shot',
'is_header', 'is_right_foot', 'is_left_foot',
'shot_type_counter', 'shot_type_set_piece',
'close_header', 'log_distance'
]
# Remove penalties
shots_no_pens = shots[shots['is_penalty'] != True].copy()
# Select features
X = shots_no_pens[feature_cols].fillna(0)
y = shots_no_pens['is_goal']
return X, y, shots_no_pens
X, y, shots_model = prepare_model_data(shots_features)
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set: {len(X_train)} shots, {y_train.sum()} goals")
print(f"Test set: {len(X_test)} shots, {y_test.sum()} goals")
# 1. Logistic Regression
print("\nTraining Logistic Regression...")
logit_model = LogisticRegression(max_iter=1000, random_state=42)
logit_model.fit(X_train, y_train)
logit_preds = logit_model.predict_proba(X_test)[:, 1]
# 2. XGBoost
print("Training XGBoost...")
xgb_model = xgb.XGBClassifier(
n_estimators=200,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
use_label_encoder=False,
eval_metric='logloss'
)
xgb_model.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
early_stopping_rounds=20,
verbose=False
)
xgb_preds = xgb_model.predict_proba(X_test)[:, 1]
# Compare models
print("\n=== Model Comparison ===")
print(f"Log Loss - Logistic: {log_loss(y_test, logit_preds):.4f}")
print(f"Log Loss - XGBoost: {log_loss(y_test, xgb_preds):.4f}")
print(f"AUC - Logistic: {roc_auc_score(y_test, logit_preds):.4f}")
print(f"AUC - XGBoost: {roc_auc_score(y_test, xgb_preds):.4f}")
print(f"Brier Score - Logistic: {brier_score_loss(y_test, logit_preds):.4f}")
print(f"Brier Score - XGBoost: {brier_score_loss(y_test, xgb_preds):.4f}")library(caret)
library(xgboost)
library(pROC)
# Prepare model data
prepare_model_data <- function(shots) {
# Select features
feature_cols <- c(
"distance_to_goal", "angle_to_goal", "goal_angle",
"in_six_yard_box", "in_penalty_box", "central_shot",
"is_head", "is_right_foot", "is_left_foot",
"shot_type_counter", "shot_type_set_piece",
"first_time", "open_goal",
"distance_x_angle", "log_distance"
)
# Remove penalties (separate model needed)
shots_no_pens <- shots %>%
filter(!is_penalty)
# Create model matrix
X <- shots_no_pens %>%
select(all_of(feature_cols)) %>%
mutate(across(everything(), ~replace_na(., 0)))
y <- shots_no_pens$is_goal
return(list(X = X, y = y, data = shots_no_pens))
}
model_data <- prepare_model_data(shots_features)
# Train/test split
set.seed(42)
train_idx <- createDataPartition(model_data$y, p = 0.8, list = FALSE)
X_train <- model_data$X[train_idx, ]
X_test <- model_data$X[-train_idx, ]
y_train <- model_data$y[train_idx]
y_test <- model_data$y[-train_idx]
# 1. Logistic Regression
cat("Training Logistic Regression...\n")
logit_model <- glm(
y_train ~ .,
data = cbind(X_train, y_train = y_train),
family = binomial(link = "logit")
)
# Predictions
logit_preds <- predict(logit_model, newdata = X_test, type = "response")
# 2. XGBoost
cat("Training XGBoost...\n")
dtrain <- xgb.DMatrix(as.matrix(X_train), label = y_train)
dtest <- xgb.DMatrix(as.matrix(X_test), label = y_test)
xgb_params <- list(
objective = "binary:logistic",
eval_metric = "logloss",
max_depth = 6,
eta = 0.1,
subsample = 0.8,
colsample_bytree = 0.8
)
xgb_model <- xgb.train(
params = xgb_params,
data = dtrain,
nrounds = 200,
watchlist = list(train = dtrain, test = dtest),
early_stopping_rounds = 20,
verbose = 0
)
xgb_preds <- predict(xgb_model, dtest)
# Compare models
cat("\n=== Model Comparison ===\n")
# Log Loss
logit_logloss <- -mean(y_test * log(logit_preds) + (1 - y_test) * log(1 - logit_preds))
xgb_logloss <- -mean(y_test * log(xgb_preds) + (1 - y_test) * log(1 - xgb_preds))
cat("Log Loss - Logistic:", round(logit_logloss, 4), "\n")
cat("Log Loss - XGBoost:", round(xgb_logloss, 4), "\n")
# ROC AUC
logit_auc <- auc(roc(y_test, logit_preds))
xgb_auc <- auc(roc(y_test, xgb_preds))
cat("AUC - Logistic:", round(logit_auc, 4), "\n")
cat("AUC - XGBoost:", round(xgb_auc, 4), "\n")Model Evaluation & Calibration
A good xG model should be well-calibrated - when we predict 0.2 xG, about 20% of those shots should be goals.
def evaluate_calibration(predictions, actuals, n_bins=10):
"""
Evaluate model calibration.
"""
df = pd.DataFrame({
'predicted': predictions,
'actual': actuals
})
df['bin'] = pd.cut(df['predicted'], bins=n_bins)
calibration = (
df.groupby('bin')
.agg({
'predicted': 'mean',
'actual': 'mean',
'bin': 'count'
})
.rename(columns={'bin': 'n', 'predicted': 'avg_predicted', 'actual': 'avg_actual'})
.reset_index()
)
# Calculate calibration error
calibration_error = np.abs(
calibration['avg_predicted'] - calibration['avg_actual']
).mean()
return calibration, calibration_error
xgb_calibration, cal_error = evaluate_calibration(xgb_preds, y_test)
# Plot calibration curve
fig, ax = plt.subplots(figsize=(8, 8))
ax.scatter(xgb_calibration['avg_predicted'], xgb_calibration['avg_actual'],
s=xgb_calibration['n']*2, alpha=0.7, color='steelblue')
ax.plot([0, 1], [0, 1], 'r--', label='Perfect calibration')
ax.set_xlabel('Predicted xG')
ax.set_ylabel('Actual Conversion Rate')
ax.set_title(f'xG Model Calibration Curve\nCalibration Error: {cal_error:.4f}')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.legend()
plt.tight_layout()
plt.show()
# Brier Score
brier = brier_score_loss(y_test, xgb_preds)
baseline_brier = brier_score_loss(y_test, [y_train.mean()] * len(y_test))
print(f"\nBrier Score: {brier:.4f}")
print(f"Baseline Brier: {baseline_brier:.4f}")
print(f"Improvement: {(1 - brier/baseline_brier)*100:.2f}%")
# Feature importance
importance = pd.DataFrame({
'feature': X.columns,
'importance': xgb_model.feature_importances_
}).sort_values('importance', ascending=False)
fig, ax = plt.subplots(figsize=(10, 6))
ax.barh(importance['feature'][:10][::-1], importance['importance'][:10][::-1],
color='forestgreen')
ax.set_xlabel('Importance')
ax.set_title('Top 10 Feature Importance')
plt.tight_layout()
plt.show()# Calibration analysis
evaluate_calibration <- function(predictions, actuals, n_bins = 10) {
# Create calibration bins
calibration_data <- tibble(
predicted = predictions,
actual = actuals
) %>%
mutate(
bin = cut(predicted, breaks = seq(0, 1, length.out = n_bins + 1),
include.lowest = TRUE)
) %>%
group_by(bin) %>%
summarise(
n = n(),
avg_predicted = mean(predicted),
avg_actual = mean(actual),
.groups = "drop"
)
# Calculate calibration error
calibration_error <- mean(abs(calibration_data$avg_predicted - calibration_data$avg_actual))
return(list(
data = calibration_data,
error = calibration_error
))
}
# Evaluate XGBoost calibration
xgb_calibration <- evaluate_calibration(xgb_preds, y_test)
# Plot calibration curve
ggplot(xgb_calibration$data, aes(x = avg_predicted, y = avg_actual)) +
geom_point(aes(size = n), color = "steelblue") +
geom_abline(intercept = 0, slope = 1, linetype = "dashed", color = "red") +
geom_smooth(method = "loess", se = FALSE, color = "blue") +
scale_size_continuous(range = c(2, 10)) +
labs(
title = "xG Model Calibration Curve",
subtitle = paste("Calibration Error:", round(xgb_calibration$error, 4)),
x = "Predicted xG", y = "Actual Conversion Rate",
size = "Sample Size"
) +
coord_equal() +
xlim(0, 1) + ylim(0, 1) +
theme_minimal()
# Brier Score (lower is better)
brier_score <- mean((xgb_preds - y_test)^2)
cat("Brier Score:", round(brier_score, 4), "\n")
# Compare with baseline (always predicting average conversion rate)
baseline_pred <- mean(y_train)
baseline_brier <- mean((baseline_pred - y_test)^2)
cat("Baseline Brier Score:", round(baseline_brier, 4), "\n")
cat("Improvement:", round((1 - brier_score/baseline_brier) * 100, 2), "%\n")
# Feature importance
importance <- xgb.importance(model = xgb_model)
ggplot(importance[1:10], aes(x = reorder(Feature, Gain), y = Gain)) +
geom_bar(stat = "identity", fill = "forestgreen") +
coord_flip() +
labs(
title = "Top 10 Feature Importance",
x = "", y = "Gain"
) +
theme_minimal()Interpreting Your xG Model
Understanding what your model has learned helps validate its logic and communicate insights to stakeholders.
# Logistic regression interpretation
def interpret_logit_model(model, feature_names):
"""
Interpret logistic regression coefficients.
"""
coefs = pd.DataFrame({
'feature': feature_names,
'coefficient': model.coef_[0],
'odds_ratio': np.exp(model.coef_[0])
}).sort_values('coefficient', key=abs, ascending=False)
coefs['direction'] = np.where(
coefs['coefficient'] > 0, 'Increases xG', 'Decreases xG'
)
return coefs
logit_interpretation = interpret_logit_model(logit_model, X.columns)
print(logit_interpretation)
# SHAP values for XGBoost interpretation
import shap
explainer = shap.TreeExplainer(xgb_model)
shap_values = explainer.shap_values(X_test)
# Summary plot
shap.summary_plot(shap_values, X_test, show=False)
plt.tight_layout()
plt.show()
# Partial dependence for distance
from sklearn.inspection import PartialDependenceDisplay
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
PartialDependenceDisplay.from_estimator(
xgb_model, X_test, ['distance_to_goal'], ax=axes[0]
)
axes[0].set_title('Partial Dependence: Distance to Goal')
PartialDependenceDisplay.from_estimator(
xgb_model, X_test, ['angle_to_goal'], ax=axes[1]
)
axes[1].set_title('Partial Dependence: Angle to Goal')
plt.tight_layout()
plt.show()# Logistic regression coefficients interpretation
interpret_logit_model <- function(model) {
# Extract coefficients
coefs <- summary(model)$coefficients %>%
as.data.frame() %>%
rownames_to_column("feature") %>%
filter(feature != "(Intercept)") %>%
arrange(desc(abs(Estimate)))
coefs <- coefs %>%
mutate(
odds_ratio = exp(Estimate),
direction = ifelse(Estimate > 0, "Increases xG", "Decreases xG")
)
return(coefs)
}
logit_interpretation <- interpret_logit_model(logit_model)
# Visualize coefficients
ggplot(logit_interpretation, aes(x = reorder(feature, Estimate), y = Estimate)) +
geom_bar(stat = "identity", aes(fill = direction)) +
coord_flip() +
scale_fill_manual(values = c("Increases xG" = "forestgreen", "Decreases xG" = "coral")) +
labs(
title = "Logistic Regression Coefficients",
subtitle = "Effect on log-odds of scoring",
x = "", y = "Coefficient",
fill = "Direction"
) +
theme_minimal()
# Partial dependence plots for key features
library(pdp)
# Distance effect
distance_pd <- partial(xgb_model, pred.var = "distance_to_goal",
train = as.matrix(X_train))
ggplot(distance_pd, aes(x = distance_to_goal, y = yhat)) +
geom_line(color = "steelblue", linewidth = 1.5) +
labs(
title = "Partial Dependence: Distance to Goal",
x = "Distance (m)", y = "Predicted xG"
) +
theme_minimal()
# Angle effect
angle_pd <- partial(xgb_model, pred.var = "angle_to_goal",
train = as.matrix(X_train))
ggplot(angle_pd, aes(x = angle_to_goal, y = yhat)) +
geom_line(color = "coral", linewidth = 1.5) +
labs(
title = "Partial Dependence: Angle to Goal",
x = "Angle (radians)", y = "Predicted xG"
) +
theme_minimal()Applying Your xG Model
Once trained, your xG model can be used to analyze matches, evaluate players, and generate insights.
def apply_xg_to_match(match_events, xg_model, feature_cols):
"""
Apply trained xG model to a new match.
"""
shots = match_events[match_events['type'] == 'Shot'].copy()
shots = engineer_xg_features(shots)
# Separate penalties
penalties = shots[shots['is_penalty'] == True].copy()
non_penalties = shots[shots['is_penalty'] != True].copy()
# Predict for non-penalties
if len(non_penalties) > 0:
X_new = non_penalties[feature_cols].fillna(0)
non_penalties['custom_xg'] = xg_model.predict_proba(X_new)[:, 1]
# Penalty xG
if len(penalties) > 0:
penalties['custom_xg'] = 0.76
# Combine
all_shots = pd.concat([non_penalties, penalties]).sort_values('index')
# Match summary
summary = (
all_shots
.groupby('team')
.agg({
'id': 'count',
'is_goal': 'sum',
'custom_xg': 'sum',
'shot_statsbomb_xg': 'sum'
})
.rename(columns={
'id': 'shots',
'is_goal': 'goals',
'shot_statsbomb_xg': 'provider_xg'
})
.reset_index()
)
return {
'shots': all_shots,
'summary': summary
}
# Example usage
feature_cols = list(X.columns)
match_xg = apply_xg_to_match(events, xgb_model, feature_cols)
print(match_xg['summary'])
# Compare with provider
fig, ax = plt.subplots(figsize=(8, 8))
shots_data = match_xg['shots']
colors = shots_data['is_goal'].map({0: 'steelblue', 1: 'red'})
ax.scatter(shots_data['shot_statsbomb_xg'], shots_data['custom_xg'],
c=colors, s=50, alpha=0.7)
ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlabel('Provider xG')
ax.set_ylabel('Custom xG')
ax.set_title('Custom xG vs Provider xG')
plt.tight_layout()
plt.show()# Apply xG model to new match
apply_xg_to_match <- function(match_events, xg_model, feature_cols) {
# Extract shots
shots <- match_events %>%
filter(type.name == "Shot") %>%
engineer_xg_features()
# Separate penalties
penalties <- shots %>% filter(is_penalty)
non_penalties <- shots %>% filter(!is_penalty)
# Predict xG for non-penalties
if (nrow(non_penalties) > 0) {
X_new <- non_penalties %>%
select(all_of(feature_cols)) %>%
mutate(across(everything(), ~replace_na(., 0)))
non_penalties$custom_xg <- predict(xg_model, as.matrix(X_new))
}
# Assign penalty xG (historical average ~0.76)
if (nrow(penalties) > 0) {
penalties$custom_xg <- 0.76
}
# Combine
all_shots <- bind_rows(non_penalties, penalties) %>%
arrange(index)
# Summarize by team
match_summary <- all_shots %>%
group_by(team.name) %>%
summarise(
shots = n(),
goals = sum(is_goal),
custom_xg = sum(custom_xg),
provider_xg = sum(shot.statsbomb_xg, na.rm = TRUE),
.groups = "drop"
)
return(list(
shots = all_shots,
summary = match_summary
))
}
# Example usage
match_xg <- apply_xg_to_match(events, xgb_model, names(X_train))
print(match_xg$summary)
# Compare with provider xG
comparison <- match_xg$shots %>%
select(player.name, minute, custom_xg, provider_xg = shot.statsbomb_xg, is_goal) %>%
mutate(
difference = custom_xg - provider_xg
)
ggplot(comparison, aes(x = provider_xg, y = custom_xg)) +
geom_point(aes(color = factor(is_goal)), size = 3, alpha = 0.7) +
geom_abline(intercept = 0, slope = 1, linetype = "dashed") +
scale_color_manual(values = c("0" = "steelblue", "1" = "red"),
labels = c("No Goal", "Goal")) +
labs(
title = "Custom xG vs Provider xG",
x = "Provider xG", y = "Custom xG", color = "Outcome"
) +
coord_equal() +
theme_minimal()Practice Exercises
Exercise 28.1: Complete xG Model with Advanced Feature Engineering
Task: Build a comprehensive xG model from scratch with advanced feature engineering, multiple model comparison, and calibration analysis.
Requirements:
- Engineer at least 10 features including distance, angle, body part, and game state
- Compare logistic regression, random forest, and gradient boosting models
- Create calibration plots and calculate Brier score for each model
- Analyze feature importance and interpret results
- Validate against provider xG values
import pandas as pd
import numpy as np
from statsbombpy import sb
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, brier_score_loss, calibration_curve
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# Load data from multiple competitions
competitions = sb.competitions()
target_comps = competitions[competitions['competition_name'].isin(['La Liga', 'Premier League'])]
all_shots = []
for _, comp in target_comps.head(2).iterrows():
matches = sb.matches(competition_id=comp['competition_id'],
season_id=comp['season_id'])
for _, match in matches.head(20).iterrows():
events = sb.events(match_id=match['match_id'])
shots = events[events['type'] == 'Shot'].copy()
if len(shots) > 0:
all_shots.append(shots)
shots_df = pd.concat(all_shots, ignore_index=True)
print(f"Total shots loaded: {len(shots_df)}")
# Feature engineering
def engineer_features(df):
features = pd.DataFrame()
# Extract location coordinates
features['x'] = df['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
features['y'] = df['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)
# Geometric features
features['distance'] = np.sqrt((120 - features['x'])**2 + (40 - features['y'])**2)
features['angle'] = np.arctan2(np.abs(features['y'] - 40), 120 - features['x']) * 180 / np.pi
# Body part features
features['is_header'] = (df['shot_body_part'] == 'Head').astype(int)
features['is_right_foot'] = (df['shot_body_part'] == 'Right Foot').astype(int)
features['is_left_foot'] = (df['shot_body_part'] == 'Left Foot').astype(int)
# Shot type features
features['is_open_play'] = (df['shot_type'] == 'Open Play').astype(int)
features['is_free_kick'] = (df['shot_type'] == 'Free Kick').astype(int)
features['is_corner'] = (df['shot_type'] == 'Corner').astype(int)
# Game state features
features['minute_normalized'] = df['minute'] / 90
features['is_first_half'] = (df['period'] == 1).astype(int)
# Shot context
features['under_pressure'] = df['under_pressure'].fillna(False).astype(int)
features['first_time'] = df['shot_first_time'].fillna(False).astype(int)
# Target variable
features['is_goal'] = (df['shot_outcome'] == 'Goal').astype(int)
# Provider xG for comparison
features['provider_xg'] = df['shot_statsbomb_xg']
return features
# Engineer features and clean data
shots_features = engineer_features(shots_df)
shots_features = shots_features[shots_df['shot_type'] != 'Penalty'] # Exclude penalties
shots_clean = shots_features.dropna()
print(f"Shots for modeling: {len(shots_clean)}")
print(f"Goal conversion rate: {shots_clean['is_goal'].mean()*100:.2f}%")
# Define feature columns
feature_cols = ['distance', 'angle', 'is_header', 'is_right_foot', 'is_left_foot',
'is_open_play', 'is_free_kick', 'is_corner', 'minute_normalized',
'is_first_half', 'under_pressure', 'first_time']
X = shots_clean[feature_cols]
y = shots_clean['is_goal']
provider_xg = shots_clean['provider_xg']
# Split data
X_train, X_test, y_train, y_test, provider_train, provider_test = train_test_split(
X, y, provider_xg, test_size=0.2, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Model 1: Logistic Regression
print("\n=== LOGISTIC REGRESSION ===")
lr_model = LogisticRegression(max_iter=1000, random_state=42)
lr_model.fit(X_train_scaled, y_train)
lr_pred = lr_model.predict_proba(X_test_scaled)[:, 1]
lr_auc = roc_auc_score(y_test, lr_pred)
lr_brier = brier_score_loss(y_test, lr_pred)
print(f"AUC: {lr_auc:.4f}")
print(f"Brier Score: {lr_brier:.4f}")
# Feature importance
lr_importance = pd.DataFrame({
'feature': feature_cols,
'coefficient': lr_model.coef_[0]
}).sort_values('coefficient', key=abs, ascending=False)
print("\nTop Features (Logistic Regression):")
print(lr_importance.head())
# Model 2: Random Forest
print("\n=== RANDOM FOREST ===")
rf_model = RandomForestClassifier(n_estimators=100, max_depth=6, random_state=42)
rf_model.fit(X_train, y_train)
rf_pred = rf_model.predict_proba(X_test)[:, 1]
rf_auc = roc_auc_score(y_test, rf_pred)
rf_brier = brier_score_loss(y_test, rf_pred)
print(f"AUC: {rf_auc:.4f}")
print(f"Brier Score: {rf_brier:.4f}")
# Model 3: Gradient Boosting
print("\n=== GRADIENT BOOSTING ===")
gb_model = GradientBoostingClassifier(n_estimators=100, max_depth=4,
learning_rate=0.1, random_state=42)
gb_model.fit(X_train, y_train)
gb_pred = gb_model.predict_proba(X_test)[:, 1]
gb_auc = roc_auc_score(y_test, gb_pred)
gb_brier = brier_score_loss(y_test, gb_pred)
print(f"AUC: {gb_auc:.4f}")
print(f"Brier Score: {gb_brier:.4f}")
# Feature importance for Gradient Boosting
gb_importance = pd.DataFrame({
'feature': feature_cols,
'importance': gb_model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop Features (Gradient Boosting):")
print(gb_importance.head())
# Model Comparison
print("\n=== MODEL COMPARISON ===")
provider_auc = roc_auc_score(y_test, provider_test)
provider_brier = brier_score_loss(y_test, provider_test)
comparison = pd.DataFrame({
'Model': ['Logistic Regression', 'Random Forest', 'Gradient Boosting', 'Provider xG'],
'AUC': [lr_auc, rf_auc, gb_auc, provider_auc],
'Brier Score': [lr_brier, rf_brier, gb_brier, provider_brier]
})
print(comparison.to_string(index=False))
# Calibration Plots
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
models = [
('Logistic Regression', lr_pred, 'blue'),
('Gradient Boosting', gb_pred, 'green'),
('Provider xG', provider_test.values, 'purple')
]
for ax, (name, pred, color) in zip(axes.flatten()[:3], models):
prob_true, prob_pred = calibration_curve(y_test, pred, n_bins=10)
ax.plot(prob_pred, prob_true, 'o-', color=color, label=name)
ax.plot([0, 1], [0, 1], 'r--', label='Perfect calibration')
ax.set_xlabel('Predicted xG')
ax.set_ylabel('Actual Goal Rate')
ax.set_title(f'{name} Calibration')
ax.legend()
ax.set_xlim(0, 0.6)
ax.set_ylim(0, 0.6)
# All models comparison
ax = axes[1, 1]
for name, pred, color in models:
prob_true, prob_pred = calibration_curve(y_test, pred, n_bins=10)
ax.plot(prob_pred, prob_true, 'o-', color=color, label=name)
ax.plot([0, 1], [0, 1], 'r--', label='Perfect')
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('All Models Comparison')
ax.legend()
ax.set_xlim(0, 0.6)
ax.set_ylim(0, 0.6)
plt.tight_layout()
plt.savefig('xg_model_comparison.png', dpi=150, bbox_inches='tight')
plt.show()
print("\nAnalysis complete! Best model determined by lowest Brier score.")library(StatsBombR)
library(tidyverse)
library(caret)
library(xgboost)
library(pROC)
# Load shot data from multiple competitions
competitions <- FreeCompetitions() %>%
filter(competition_name %in% c("La Liga", "Premier League"))
matches <- FreeMatches(competitions)
events <- StatsBombFreeEvents(MatchesDF = matches)
# Extract shots and engineer features
shots <- events %>%
filter(type.name == "Shot") %>%
mutate(
# Basic geometric features
x = location.x,
y = location.y,
distance = sqrt((120 - x)^2 + (40 - y)^2),
angle = atan2(abs(y - 40), 120 - x) * 180 / pi,
# Body part features
is_header = ifelse(shot.body_part.name == "Head", 1, 0),
is_right_foot = ifelse(shot.body_part.name == "Right Foot", 1, 0),
is_left_foot = ifelse(shot.body_part.name == "Left Foot", 1, 0),
# Shot type features
is_open_play = ifelse(shot.type.name == "Open Play", 1, 0),
is_free_kick = ifelse(shot.type.name == "Free Kick", 1, 0),
is_corner = ifelse(shot.type.name == "Corner", 1, 0),
# Game state features
minute_normalized = minute / 90,
is_first_half = ifelse(period == 1, 1, 0),
# Shot context
under_pressure = ifelse(!is.na(under_pressure), 1, 0),
first_time = ifelse(!is.na(shot.first_time), 1, 0),
# Target variable
is_goal = ifelse(shot.outcome.name == "Goal", 1, 0),
# Provider xG for comparison
provider_xg = shot.statsbomb_xg
) %>%
filter(!is.na(distance), shot.type.name != "Penalty") %>%
select(is_goal, distance, angle, is_header, is_right_foot, is_left_foot,
is_open_play, is_free_kick, is_corner, minute_normalized,
is_first_half, under_pressure, first_time, provider_xg)
# Remove rows with NA
shots_clean <- shots %>% drop_na()
cat("Total shots for modeling:", nrow(shots_clean), "\n")
cat("Goal conversion rate:", round(mean(shots_clean$is_goal) * 100, 2), "%\n")
# Split data
set.seed(42)
train_idx <- createDataPartition(shots_clean$is_goal, p = 0.8, list = FALSE)
train_data <- shots_clean[train_idx, ]
test_data <- shots_clean[-train_idx, ]
# Feature matrix (exclude provider_xg and target)
features <- c("distance", "angle", "is_header", "is_right_foot", "is_left_foot",
"is_open_play", "is_free_kick", "is_corner", "minute_normalized",
"is_first_half", "under_pressure", "first_time")
X_train <- train_data[, features]
y_train <- train_data$is_goal
X_test <- test_data[, features]
y_test <- test_data$is_goal
# Model 1: Logistic Regression
cat("\n=== LOGISTIC REGRESSION ===\n")
lr_model <- glm(is_goal ~ ., data = cbind(X_train, is_goal = y_train),
family = binomial())
lr_pred <- predict(lr_model, X_test, type = "response")
lr_auc <- auc(roc(y_test, lr_pred))
lr_brier <- mean((lr_pred - y_test)^2)
cat("AUC:", round(lr_auc, 4), "\n")
cat("Brier Score:", round(lr_brier, 4), "\n")
# Feature importance for logistic regression
lr_coefs <- data.frame(
feature = names(coef(lr_model))[-1],
coefficient = coef(lr_model)[-1]
) %>% arrange(desc(abs(coefficient)))
cat("\nTop Features (Logistic Regression):\n")
print(head(lr_coefs, 5))
# Model 2: Random Forest
cat("\n=== RANDOM FOREST ===\n")
rf_model <- train(
x = X_train, y = factor(y_train, levels = c("0", "1")),
method = "rf",
trControl = trainControl(method = "cv", number = 5, classProbs = TRUE),
tuneGrid = data.frame(mtry = c(3, 4, 5))
)
rf_pred <- predict(rf_model, X_test, type = "prob")[, "1"]
rf_auc <- auc(roc(y_test, rf_pred))
rf_brier <- mean((rf_pred - y_test)^2)
cat("AUC:", round(rf_auc, 4), "\n")
cat("Brier Score:", round(rf_brier, 4), "\n")
# Model 3: XGBoost
cat("\n=== XGBOOST ===\n")
dtrain <- xgb.DMatrix(as.matrix(X_train), label = y_train)
dtest <- xgb.DMatrix(as.matrix(X_test), label = y_test)
xgb_params <- list(
objective = "binary:logistic",
eval_metric = "auc",
max_depth = 4,
eta = 0.1,
subsample = 0.8
)
xgb_model <- xgb.train(
params = xgb_params,
data = dtrain,
nrounds = 100,
watchlist = list(test = dtest),
early_stopping_rounds = 10,
verbose = 0
)
xgb_pred <- predict(xgb_model, dtest)
xgb_auc <- auc(roc(y_test, xgb_pred))
xgb_brier <- mean((xgb_pred - y_test)^2)
cat("AUC:", round(xgb_auc, 4), "\n")
cat("Brier Score:", round(xgb_brier, 4), "\n")
# Feature importance for XGBoost
xgb_importance <- xgb.importance(feature_names = features, model = xgb_model)
cat("\nTop Features (XGBoost):\n")
print(head(xgb_importance, 5))
# Model Comparison Summary
cat("\n=== MODEL COMPARISON ===\n")
comparison <- data.frame(
Model = c("Logistic Regression", "Random Forest", "XGBoost", "Provider xG"),
AUC = c(lr_auc, rf_auc, xgb_auc, auc(roc(y_test, test_data$provider_xg))),
Brier = c(lr_brier, rf_brier, xgb_brier, mean((test_data$provider_xg - y_test)^2))
)
print(comparison)
# Calibration Plot
par(mfrow = c(2, 2))
# Function to create calibration data
create_calibration <- function(pred, actual, n_bins = 10) {
bins <- cut(pred, breaks = seq(0, 1, length.out = n_bins + 1), include.lowest = TRUE)
data.frame(
predicted = tapply(pred, bins, mean),
actual = tapply(actual, bins, mean),
count = tapply(actual, bins, length)
) %>% drop_na()
}
# Logistic Regression Calibration
cal_lr <- create_calibration(lr_pred, y_test)
plot(cal_lr$predicted, cal_lr$actual, xlim = c(0, 0.6), ylim = c(0, 0.6),
xlab = "Predicted xG", ylab = "Actual Goal Rate",
main = "Logistic Regression Calibration", pch = 19, col = "blue")
abline(0, 1, lty = 2, col = "red")
# XGBoost Calibration
cal_xgb <- create_calibration(xgb_pred, y_test)
plot(cal_xgb$predicted, cal_xgb$actual, xlim = c(0, 0.6), ylim = c(0, 0.6),
xlab = "Predicted xG", ylab = "Actual Goal Rate",
main = "XGBoost Calibration", pch = 19, col = "green")
abline(0, 1, lty = 2, col = "red")
# Provider xG Calibration
cal_provider <- create_calibration(test_data$provider_xg, y_test)
plot(cal_provider$predicted, cal_provider$actual, xlim = c(0, 0.6), ylim = c(0, 0.6),
xlab = "Predicted xG", ylab = "Actual Goal Rate",
main = "Provider xG Calibration", pch = 19, col = "purple")
abline(0, 1, lty = 2, col = "red")
# All models comparison
plot(1, type = "n", xlim = c(0, 0.6), ylim = c(0, 0.6),
xlab = "Predicted", ylab = "Actual", main = "All Models Comparison")
points(cal_lr$predicted, cal_lr$actual, pch = 19, col = "blue")
points(cal_xgb$predicted, cal_xgb$actual, pch = 17, col = "green")
points(cal_provider$predicted, cal_provider$actual, pch = 15, col = "purple")
abline(0, 1, lty = 2, col = "red")
legend("topleft", legend = c("LogReg", "XGBoost", "Provider"),
col = c("blue", "green", "purple"), pch = c(19, 17, 15), cex = 0.8)
par(mfrow = c(1, 1))Exercise 28.2: Body Part-Specific xG Models
Task: Headers have fundamentally different characteristics than foot shots. Build separate models for headers and foot shots, then create an ensemble that routes shots to the appropriate model.
Requirements:
- Create separate training datasets for headers vs foot shots
- Engineer header-specific features (jump height proxy, defensive positioning)
- Train optimized models for each shot type
- Build a routing mechanism to select the appropriate model
- Compare ensemble performance against a single unified model
import pandas as pd
import numpy as np
from statsbombpy import sb
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, brier_score_loss
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# Load shot data
competitions = sb.competitions()
la_liga = competitions[competitions['competition_name'] == 'La Liga'].iloc[0]
matches = sb.matches(competition_id=la_liga['competition_id'],
season_id=la_liga['season_id'])
all_shots = []
for _, match in matches.head(50).iterrows():
events = sb.events(match_id=match['match_id'])
shots = events[events['type'] == 'Shot'].copy()
if len(shots) > 0:
all_shots.append(shots)
shots_df = pd.concat(all_shots, ignore_index=True)
shots_df = shots_df[shots_df['shot_type'] != 'Penalty']
# Engineer base features
def engineer_base_features(df):
features = pd.DataFrame()
features['x'] = df['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
features['y'] = df['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)
features['distance'] = np.sqrt((120 - features['x'])**2 + (40 - features['y'])**2)
features['angle'] = np.arctan2(np.abs(features['y'] - 40), 120 - features['x']) * 180 / np.pi
features['is_header'] = (df['shot_body_part'] == 'Head')
features['is_goal'] = (df['shot_outcome'] == 'Goal')
features['under_pressure'] = df['under_pressure'].fillna(False)
features['first_time'] = df['shot_first_time'].fillna(False)
features['is_open_play'] = (df['shot_type'] == 'Open Play')
features['is_corner'] = (df['shot_type'] == 'Corner')
features['is_free_kick'] = (df['shot_type'] == 'Free Kick')
features['minute_norm'] = df['minute'] / 90
features['provider_xg'] = df['shot_statsbomb_xg']
return features
shots_features = engineer_base_features(shots_df).dropna()
# Split into headers and foot shots
headers = shots_features[shots_features['is_header'] == True].copy()
foot_shots = shots_features[shots_features['is_header'] == False].copy()
print(f"Total shots: {len(shots_features)}")
print(f"Headers: {len(headers)} ({headers['is_goal'].mean()*100:.2f}% conversion)")
print(f"Foot shots: {len(foot_shots)} ({foot_shots['is_goal'].mean()*100:.2f}% conversion)")
# Engineer header-specific features
headers['from_corner'] = headers['is_corner']
headers['central_position'] = (np.abs(headers['y'] - 40) < 10)
headers['close_range'] = (headers['distance'] < 12)
headers['six_yard_box'] = (headers['x'] > 114) & (np.abs(headers['y'] - 40) < 10)
# Engineer foot shot-specific features
foot_shots['is_right_foot_zone'] = (foot_shots['y'] < 40)
foot_shots['likely_one_on_one'] = (foot_shots['distance'] < 15) & foot_shots['is_open_play']
foot_shots['long_range'] = (foot_shots['distance'] > 20)
foot_shots['tight_angle'] = (foot_shots['angle'] > 30)
# Train header model
print("\n=== HEADER MODEL ===")
header_features = ['distance', 'angle', 'under_pressure', 'first_time',
'from_corner', 'central_position', 'close_range', 'six_yard_box']
X_header = headers[header_features].astype(float)
y_header = headers['is_goal'].astype(int)
X_header_train, X_header_test, y_header_train, y_header_test = train_test_split(
X_header, y_header, test_size=0.2, random_state=42, stratify=y_header
)
header_model = GradientBoostingClassifier(n_estimators=100, max_depth=3,
learning_rate=0.1, random_state=42)
header_model.fit(X_header_train, y_header_train)
header_pred = header_model.predict_proba(X_header_test)[:, 1]
header_auc = roc_auc_score(y_header_test, header_pred)
header_brier = brier_score_loss(y_header_test, header_pred)
print(f"Header Model AUC: {header_auc:.4f}")
print(f"Header Model Brier: {header_brier:.4f}")
# Train foot shot model
print("\n=== FOOT SHOT MODEL ===")
foot_features = ['distance', 'angle', 'under_pressure', 'first_time',
'is_open_play', 'is_right_foot_zone', 'likely_one_on_one',
'long_range', 'tight_angle']
X_foot = foot_shots[foot_features].astype(float)
y_foot = foot_shots['is_goal'].astype(int)
X_foot_train, X_foot_test, y_foot_train, y_foot_test = train_test_split(
X_foot, y_foot, test_size=0.2, random_state=42, stratify=y_foot
)
foot_model = GradientBoostingClassifier(n_estimators=100, max_depth=4,
learning_rate=0.1, random_state=42)
foot_model.fit(X_foot_train, y_foot_train)
foot_pred = foot_model.predict_proba(X_foot_test)[:, 1]
foot_auc = roc_auc_score(y_foot_test, foot_pred)
foot_brier = brier_score_loss(y_foot_test, foot_pred)
print(f"Foot Shot Model AUC: {foot_auc:.4f}")
print(f"Foot Shot Model Brier: {foot_brier:.4f}")
# Train unified model for comparison
print("\n=== UNIFIED MODEL ===")
unified_features = ['distance', 'angle', 'under_pressure', 'first_time',
'is_open_play', 'is_corner', 'is_header']
X_unified = shots_features[unified_features].astype(float)
y_unified = shots_features['is_goal'].astype(int)
X_unified_train, X_unified_test, y_unified_train, y_unified_test = train_test_split(
X_unified, y_unified, test_size=0.2, random_state=42, stratify=y_unified
)
# Store test indices for ensemble evaluation
test_indices = X_unified_test.index
unified_model = GradientBoostingClassifier(n_estimators=100, max_depth=4,
learning_rate=0.1, random_state=42)
unified_model.fit(X_unified_train, y_unified_train)
unified_pred = unified_model.predict_proba(X_unified_test)[:, 1]
unified_auc = roc_auc_score(y_unified_test, unified_pred)
unified_brier = brier_score_loss(y_unified_test, unified_pred)
print(f"Unified Model AUC: {unified_auc:.4f}")
print(f"Unified Model Brier: {unified_brier:.4f}")
# Create ensemble prediction
print("\n=== ENSEMBLE MODEL ===")
def ensemble_predict(row, header_model, foot_model, header_features, foot_features):
if row['is_header']:
row_features = {
'distance': row['distance'], 'angle': row['angle'],
'under_pressure': row['under_pressure'], 'first_time': row['first_time'],
'from_corner': row['is_corner'],
'central_position': abs(row['y'] - 40) < 10,
'close_range': row['distance'] < 12,
'six_yard_box': row['x'] > 114 and abs(row['y'] - 40) < 10
}
X = pd.DataFrame([row_features])[header_features].astype(float)
return header_model.predict_proba(X)[0, 1]
else:
row_features = {
'distance': row['distance'], 'angle': row['angle'],
'under_pressure': row['under_pressure'], 'first_time': row['first_time'],
'is_open_play': row['is_open_play'],
'is_right_foot_zone': row['y'] < 40,
'likely_one_on_one': row['distance'] < 15 and row['is_open_play'],
'long_range': row['distance'] > 20,
'tight_angle': row['angle'] > 30
}
X = pd.DataFrame([row_features])[foot_features].astype(float)
return foot_model.predict_proba(X)[0, 1]
# Apply ensemble to test set
test_data = shots_features.loc[test_indices].copy()
ensemble_preds = test_data.apply(
lambda row: ensemble_predict(row, header_model, foot_model,
header_features, foot_features), axis=1
)
ensemble_auc = roc_auc_score(y_unified_test, ensemble_preds)
ensemble_brier = brier_score_loss(y_unified_test, ensemble_preds)
print(f"Ensemble Model AUC: {ensemble_auc:.4f}")
print(f"Ensemble Model Brier: {ensemble_brier:.4f}")
# Final Comparison
print("\n=== FINAL COMPARISON ===")
results = pd.DataFrame({
'Model': ['Unified Model', 'Ensemble (Header + Foot)',
'Header-Only Test', 'Foot-Only Test'],
'AUC': [unified_auc, ensemble_auc, header_auc, foot_auc],
'Brier Score': [unified_brier, ensemble_brier, header_brier, foot_brier]
})
print(results.to_string(index=False))
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Header conversion by distance
dist_bins = pd.cut(headers['distance'], bins=range(0, 35, 5))
header_conv = headers.groupby(dist_bins)['is_goal'].mean() * 100
axes[0].bar(range(len(header_conv)), header_conv.values, color='coral')
axes[0].set_xticks(range(len(header_conv)))
axes[0].set_xticklabels([f'{i*5}-{(i+1)*5}' for i in range(len(header_conv))])
axes[0].set_xlabel('Distance (m)')
axes[0].set_ylabel('Conversion %')
axes[0].set_title('Header Conversion by Distance')
# Foot shot conversion by distance
dist_bins = pd.cut(foot_shots['distance'], bins=range(0, 45, 5))
foot_conv = foot_shots.groupby(dist_bins)['is_goal'].mean() * 100
axes[1].bar(range(len(foot_conv)), foot_conv.values, color='steelblue')
axes[1].set_xticks(range(len(foot_conv)))
axes[1].set_xticklabels([f'{i*5}-{(i+1)*5}' for i in range(len(foot_conv))])
axes[1].set_xlabel('Distance (m)')
axes[1].set_ylabel('Conversion %')
axes[1].set_title('Foot Shot Conversion by Distance')
plt.tight_layout()
plt.savefig('body_part_xg_analysis.png', dpi=150, bbox_inches='tight')
plt.show()
print("\nConclusion: Ensemble model routes shots to specialized models for better performance.")library(StatsBombR)
library(tidyverse)
library(caret)
library(pROC)
# Load shot data
competitions <- FreeCompetitions() %>%
filter(competition_name == "La Liga")
matches <- FreeMatches(competitions)
events <- StatsBombFreeEvents(MatchesDF = matches)
# Extract and engineer features for all shots
shots <- events %>%
filter(type.name == "Shot", shot.type.name != "Penalty") %>%
mutate(
x = location.x,
y = location.y,
distance = sqrt((120 - x)^2 + (40 - y)^2),
angle = atan2(abs(y - 40), 120 - x) * 180 / pi,
is_header = shot.body_part.name == "Head",
is_goal = shot.outcome.name == "Goal",
under_pressure = !is.na(under_pressure),
first_time = !is.na(shot.first_time),
is_open_play = shot.type.name == "Open Play",
is_corner = shot.type.name == "Corner",
is_free_kick = shot.type.name == "Free Kick",
minute_norm = minute / 90,
provider_xg = shot.statsbomb_xg
) %>%
filter(!is.na(distance)) %>%
select(x, y, distance, angle, is_header, is_goal, under_pressure,
first_time, is_open_play, is_corner, is_free_kick, minute_norm, provider_xg)
# Split into headers and foot shots
headers <- shots %>% filter(is_header == TRUE)
foot_shots <- shots %>% filter(is_header == FALSE)
cat("Total shots:", nrow(shots), "\n")
cat("Headers:", nrow(headers), "(", round(mean(headers$is_goal)*100, 2), "% conversion)\n")
cat("Foot shots:", nrow(foot_shots), "(", round(mean(foot_shots$is_goal)*100, 2), "% conversion)\n")
# Engineer header-specific features
headers <- headers %>%
mutate(
# Headers from corners are often contested
from_corner = is_corner,
# Central position advantage for headers
central_position = abs(y - 40) < 10,
# Close range header (more effective)
close_range = distance < 12,
# Very close headers (high probability)
six_yard_box = x > 114 & abs(y - 40) < 10
)
# Engineer foot shot-specific features
foot_shots <- foot_shots %>%
mutate(
# Strong foot vs weak foot (proxy: right foot more common)
is_right_foot_zone = y < 40, # Right side favors right foot
# One-on-one situations (close range, open play)
likely_one_on_one = distance < 15 & is_open_play,
# Long range (outside box)
long_range = distance > 20,
# Tight angle shots
tight_angle = angle > 30
)
# Train header model
cat("\n=== HEADER MODEL ===\n")
header_features <- c("distance", "angle", "under_pressure", "first_time",
"from_corner", "central_position", "close_range", "six_yard_box")
set.seed(42)
header_train_idx <- createDataPartition(headers$is_goal, p = 0.8, list = FALSE)
header_train <- headers[header_train_idx, ]
header_test <- headers[-header_train_idx, ]
header_model <- train(
x = header_train[, header_features],
y = factor(header_train$is_goal),
method = "gbm",
trControl = trainControl(method = "cv", number = 5, classProbs = TRUE),
tuneGrid = expand.grid(
n.trees = 100,
interaction.depth = 3,
shrinkage = 0.1,
n.minobsinnode = 10
),
verbose = FALSE
)
header_pred <- predict(header_model, header_test[, header_features], type = "prob")[, "TRUE"]
header_auc <- auc(roc(header_test$is_goal, header_pred))
header_brier <- mean((header_pred - as.numeric(header_test$is_goal))^2)
cat("Header Model AUC:", round(header_auc, 4), "\n")
cat("Header Model Brier:", round(header_brier, 4), "\n")
# Train foot shot model
cat("\n=== FOOT SHOT MODEL ===\n")
foot_features <- c("distance", "angle", "under_pressure", "first_time",
"is_open_play", "is_right_foot_zone", "likely_one_on_one",
"long_range", "tight_angle")
foot_train_idx <- createDataPartition(foot_shots$is_goal, p = 0.8, list = FALSE)
foot_train <- foot_shots[foot_train_idx, ]
foot_test <- foot_shots[-foot_train_idx, ]
foot_model <- train(
x = foot_train[, foot_features],
y = factor(foot_train$is_goal),
method = "gbm",
trControl = trainControl(method = "cv", number = 5, classProbs = TRUE),
tuneGrid = expand.grid(
n.trees = 100,
interaction.depth = 4,
shrinkage = 0.1,
n.minobsinnode = 10
),
verbose = FALSE
)
foot_pred <- predict(foot_model, foot_test[, foot_features], type = "prob")[, "TRUE"]
foot_auc <- auc(roc(foot_test$is_goal, foot_pred))
foot_brier <- mean((foot_pred - as.numeric(foot_test$is_goal))^2)
cat("Foot Shot Model AUC:", round(foot_auc, 4), "\n")
cat("Foot Shot Model Brier:", round(foot_brier, 4), "\n")
# Train unified model for comparison
cat("\n=== UNIFIED MODEL ===\n")
unified_features <- c("distance", "angle", "under_pressure", "first_time",
"is_open_play", "is_corner", "is_header")
set.seed(42)
unified_train_idx <- createDataPartition(shots$is_goal, p = 0.8, list = FALSE)
unified_train <- shots[unified_train_idx, ]
unified_test <- shots[-unified_train_idx, ]
unified_model <- train(
x = unified_train[, unified_features],
y = factor(unified_train$is_goal),
method = "gbm",
trControl = trainControl(method = "cv", number = 5, classProbs = TRUE),
tuneGrid = expand.grid(
n.trees = 100,
interaction.depth = 4,
shrinkage = 0.1,
n.minobsinnode = 10
),
verbose = FALSE
)
unified_pred <- predict(unified_model, unified_test[, unified_features], type = "prob")[, "TRUE"]
unified_auc <- auc(roc(unified_test$is_goal, unified_pred))
unified_brier <- mean((unified_pred - as.numeric(unified_test$is_goal))^2)
cat("Unified Model AUC:", round(unified_auc, 4), "\n")
cat("Unified Model Brier:", round(unified_brier, 4), "\n")
# Create ensemble prediction function
ensemble_predict <- function(shot_data, header_model, foot_model) {
# Prepare data for appropriate model
if (shot_data$is_header) {
shot_data$from_corner <- shot_data$is_corner
shot_data$central_position <- abs(shot_data$y - 40) < 10
shot_data$close_range <- shot_data$distance < 12
shot_data$six_yard_box <- shot_data$x > 114 & abs(shot_data$y - 40) < 10
features <- shot_data[, c("distance", "angle", "under_pressure", "first_time",
"from_corner", "central_position", "close_range", "six_yard_box")]
return(predict(header_model, features, type = "prob")[, "TRUE"])
} else {
shot_data$is_right_foot_zone <- shot_data$y < 40
shot_data$likely_one_on_one <- shot_data$distance < 15 & shot_data$is_open_play
shot_data$long_range <- shot_data$distance > 20
shot_data$tight_angle <- shot_data$angle > 30
features <- shot_data[, c("distance", "angle", "under_pressure", "first_time",
"is_open_play", "is_right_foot_zone", "likely_one_on_one",
"long_range", "tight_angle")]
return(predict(foot_model, features, type = "prob")[, "TRUE"])
}
}
# Apply ensemble to test set
cat("\n=== ENSEMBLE MODEL ===\n")
ensemble_preds <- sapply(1:nrow(unified_test), function(i) {
ensemble_predict(unified_test[i, ], header_model, foot_model)
})
ensemble_auc <- auc(roc(unified_test$is_goal, ensemble_preds))
ensemble_brier <- mean((ensemble_preds - as.numeric(unified_test$is_goal))^2)
cat("Ensemble Model AUC:", round(ensemble_auc, 4), "\n")
cat("Ensemble Model Brier:", round(ensemble_brier, 4), "\n")
# Final Comparison
cat("\n=== FINAL COMPARISON ===\n")
results <- data.frame(
Model = c("Unified Model", "Ensemble (Header + Foot)",
"Header-Only Test", "Foot-Only Test"),
AUC = c(unified_auc, ensemble_auc, header_auc, foot_auc),
Brier = c(unified_brier, ensemble_brier, header_brier, foot_brier)
)
print(results)
# Visualize body part conversion by distance
par(mfrow = c(1, 2))
# Headers
header_bins <- cut(headers$distance, breaks = seq(0, 30, by = 5))
header_conv <- tapply(headers$is_goal, header_bins, mean)
barplot(header_conv * 100, main = "Header Conversion by Distance",
xlab = "Distance (m)", ylab = "Conversion %", col = "coral")
# Foot shots
foot_bins <- cut(foot_shots$distance, breaks = seq(0, 40, by = 5))
foot_conv <- tapply(foot_shots$is_goal, foot_bins, mean)
barplot(foot_conv * 100, main = "Foot Shot Conversion by Distance",
xlab = "Distance (m)", ylab = "Conversion %", col = "steelblue")
par(mfrow = c(1, 1))Exercise 28.3: Post-Shot xG (PSxG) Model with Shot Placement
Task: Build a Post-Shot xG model that incorporates where the shot was placed in the goal frame to assess goalkeeper performance and finishing quality.
Requirements:
- Extract shot end location (where the shot was aimed in the goal)
- Engineer shot placement features (height, horizontal position, velocity proxy)
- Train a PSxG model that predicts goal probability given shot placement
- Calculate Goals Saved Above Expected (GSAx) for goalkeepers
- Identify overperforming and underperforming finishers
import pandas as pd
import numpy as np
from statsbombpy import sb
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, brier_score_loss
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# Load shot data
competitions = sb.competitions()
la_liga = competitions[competitions['competition_name'] == 'La Liga'].iloc[0]
matches = sb.matches(competition_id=la_liga['competition_id'],
season_id=la_liga['season_id'])
all_shots = []
for _, match in matches.head(50).iterrows():
events = sb.events(match_id=match['match_id'])
shots = events[events['type'] == 'Shot'].copy()
if len(shots) > 0:
all_shots.append(shots)
shots_df = pd.concat(all_shots, ignore_index=True)
shots_df = shots_df[shots_df['shot_type'] != 'Penalty']
# Extract shot end locations and engineer features
def extract_psxg_features(df):
features = pd.DataFrame()
# Basic location
features['x'] = df['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
features['y'] = df['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)
features['distance'] = np.sqrt((120 - features['x'])**2 + (40 - features['y'])**2)
features['angle'] = np.arctan2(np.abs(features['y'] - 40), 120 - features['x']) * 180 / np.pi
# Shot end location (goal frame)
features['end_y'] = df['shot_end_location'].apply(
lambda loc: loc[1] if isinstance(loc, list) and len(loc) >= 2 else np.nan
)
features['end_z'] = df['shot_end_location'].apply(
lambda loc: loc[2] if isinstance(loc, list) and len(loc) >= 3 else np.nan
)
# Normalize to goal frame (goal: 36.6 to 43.4 in y, 0 to 2.44 in z)
features['goal_y_normalized'] = (features['end_y'] - 36.6) / 7.32
features['goal_z_normalized'] = features['end_z'] / 2.44
# Clip to valid range
features['goal_y_normalized'] = features['goal_y_normalized'].clip(0, 1)
features['goal_z_normalized'] = features['goal_z_normalized'].clip(0, 1)
# Shot placement features
features['dist_from_center'] = np.sqrt(
(features['goal_y_normalized'] - 0.5)**2 +
(features['goal_z_normalized'] - 0.5)**2
)
features['is_corner_shot'] = (
((features['goal_y_normalized'] < 0.15) | (features['goal_y_normalized'] > 0.85)) &
((features['goal_z_normalized'] > 0.7) | (features['goal_z_normalized'] < 0.15))
).astype(int)
features['near_post'] = (
((features['y'] < 40) & (features['goal_y_normalized'] < 0.3)) |
((features['y'] >= 40) & (features['goal_y_normalized'] > 0.7))
).astype(int)
# Other features
features['is_header'] = (df['shot_body_part'] == 'Head').astype(int)
features['under_pressure'] = df['under_pressure'].fillna(False).astype(int)
# Target and comparison
features['is_goal'] = (df['shot_outcome'] == 'Goal').astype(int)
features['is_on_target'] = df['shot_outcome'].isin(['Goal', 'Saved', 'Saved to Post']).astype(int)
features['pre_shot_xg'] = df['shot_statsbomb_xg']
# Player info
features['player_name'] = df['player']
features['team_name'] = df['team']
return features
# Engineer features
shots_features = extract_psxg_features(shots_df)
# Filter to on-target shots with valid placement data
on_target = shots_features[
(shots_features['is_on_target'] == 1) &
shots_features['goal_y_normalized'].notna() &
shots_features['goal_z_normalized'].notna()
].copy()
print(f"On-target shots with placement data: {len(on_target)}")
print(f"Goal conversion on target: {on_target['is_goal'].mean()*100:.2f}%")
# PSxG model features
psxg_features = ['distance', 'angle', 'goal_y_normalized', 'goal_z_normalized',
'dist_from_center', 'is_corner_shot', 'near_post',
'is_header', 'under_pressure']
# Prepare clean data
model_data = on_target.dropna(subset=psxg_features + ['is_goal', 'pre_shot_xg'])
print(f"Shots for modeling: {len(model_data)}")
# Split data
X = model_data[psxg_features]
y = model_data['is_goal']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train PSxG model
psxg_model = GradientBoostingClassifier(
n_estimators=150, max_depth=4, learning_rate=0.1, random_state=42
)
psxg_model.fit(X_train, y_train)
# Predictions
psxg_pred = psxg_model.predict_proba(X_test)[:, 1]
pre_xg_test = model_data.loc[X_test.index, 'pre_shot_xg']
# Model evaluation
print("\n=== MODEL COMPARISON ===")
psxg_auc = roc_auc_score(y_test, psxg_pred)
psxg_brier = brier_score_loss(y_test, psxg_pred)
pre_xg_auc = roc_auc_score(y_test, pre_xg_test)
pre_xg_brier = brier_score_loss(y_test, pre_xg_test)
print(f"PSxG Model AUC: {psxg_auc:.4f}")
print(f"PSxG Model Brier: {psxg_brier:.4f}")
print(f"Pre-shot xG AUC: {pre_xg_auc:.4f}")
print(f"Pre-shot xG Brier: {pre_xg_brier:.4f}")
# Apply to full dataset
model_data['psxg'] = psxg_model.predict_proba(model_data[psxg_features])[:, 1]
# Finisher analysis
print("\n=== TOP FINISHERS (Goals Above PSxG) ===")
finisher_analysis = model_data.groupby('player_name').agg({
'is_goal': ['count', 'sum'],
'psxg': 'sum',
'pre_shot_xg': 'sum'
}).reset_index()
finisher_analysis.columns = ['player_name', 'shots_on_target', 'goals',
'total_psxg', 'total_pre_xg']
finisher_analysis['goals_minus_psxg'] = finisher_analysis['goals'] - finisher_analysis['total_psxg']
finisher_analysis['conversion_rate'] = (finisher_analysis['goals'] /
finisher_analysis['shots_on_target'] * 100)
# Filter players with enough shots
finisher_analysis = finisher_analysis[finisher_analysis['shots_on_target'] >= 10]
finisher_analysis = finisher_analysis.sort_values('goals_minus_psxg', ascending=False)
print("\nBest Finishers (overperforming PSxG):")
print(finisher_analysis.head(10).to_string(index=False))
print("\nWorst Finishers (underperforming PSxG):")
print(finisher_analysis.tail(10).to_string(index=False))
# Goalkeeper analysis (simulated by opponent team)
print("\n=== GOALKEEPER ANALYSIS (by opponent team) ===")
gk_analysis = model_data.groupby('team_name').agg({
'is_goal': ['count', 'sum'],
'psxg': 'sum'
}).reset_index()
gk_analysis.columns = ['team', 'shots_faced', 'goals_conceded', 'total_psxg_faced']
gk_analysis['gsax'] = gk_analysis['total_psxg_faced'] - gk_analysis['goals_conceded']
gk_analysis['save_pct'] = ((gk_analysis['shots_faced'] - gk_analysis['goals_conceded']) /
gk_analysis['shots_faced'] * 100)
gk_analysis = gk_analysis[gk_analysis['shots_faced'] >= 15]
gk_analysis = gk_analysis.sort_values('gsax', ascending=False)
print("\nBest Goalkeeping (highest GSAx):")
print(gk_analysis.head(5).to_string(index=False))
# Visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Shot placement on goal frame
ax1 = axes[0]
goals = model_data[model_data['is_goal'] == 1]
saves = model_data[model_data['is_goal'] == 0]
ax1.scatter(saves['goal_y_normalized'], saves['goal_z_normalized'],
c='blue', alpha=0.3, s=30, label='Save')
ax1.scatter(goals['goal_y_normalized'], goals['goal_z_normalized'],
c='red', alpha=0.6, s=50, label='Goal')
# Draw goal frame
ax1.axhline(y=0, color='black', linewidth=2)
ax1.axhline(y=1, color='black', linewidth=2)
ax1.axvline(x=0, color='black', linewidth=2)
ax1.axvline(x=1, color='black', linewidth=2)
ax1.set_xlim(-0.1, 1.1)
ax1.set_ylim(-0.1, 1.1)
ax1.set_xlabel('Goal Width (Left Post to Right Post)')
ax1.set_ylabel('Goal Height (Ground to Crossbar)')
ax1.set_title('Shot Placement (On Target Shots)')
ax1.legend()
# PSxG vs Pre-shot xG
ax2 = axes[1]
test_goals = y_test == 1
ax2.scatter(pre_xg_test[~test_goals], psxg_pred[~test_goals],
c='blue', alpha=0.4, s=30, label='No Goal')
ax2.scatter(pre_xg_test[test_goals], psxg_pred[test_goals],
c='red', alpha=0.6, s=50, label='Goal')
ax2.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Equal')
ax2.set_xlabel('Pre-shot xG')
ax2.set_ylabel('Post-shot xG (PSxG)')
ax2.set_title('PSxG vs Pre-shot xG')
ax2.legend()
ax2.set_xlim(0, 0.8)
ax2.set_ylim(0, 1)
plt.tight_layout()
plt.savefig('psxg_analysis.png', dpi=150, bbox_inches='tight')
plt.show()
# Shot placement conversion heatmap
fig, ax = plt.subplots(figsize=(8, 6))
# Create bins for heatmap
y_bins = pd.cut(model_data['goal_y_normalized'], bins=5, labels=['Far Left', 'Left', 'Center', 'Right', 'Far Right'])
z_bins = pd.cut(model_data['goal_z_normalized'], bins=3, labels=['Low', 'Mid', 'High'])
placement_matrix = model_data.groupby([z_bins, y_bins])['is_goal'].mean().unstack() * 100
im = ax.imshow(placement_matrix.values, cmap='RdYlGn', aspect='auto', vmin=30, vmax=90)
ax.set_xticks(range(5))
ax.set_xticklabels(['Far Left', 'Left', 'Center', 'Right', 'Far Right'])
ax.set_yticks(range(3))
ax.set_yticklabels(['Low', 'Mid', 'High'])
ax.set_xlabel('Horizontal Position')
ax.set_ylabel('Vertical Position')
ax.set_title('Goal Conversion Rate by Shot Placement (%)')
# Add text annotations
for i in range(3):
for j in range(5):
val = placement_matrix.values[i, j]
if not np.isnan(val):
ax.text(j, i, f'{val:.1f}%', ha='center', va='center',
color='white' if val > 60 else 'black', fontweight='bold')
plt.colorbar(im, label='Conversion %')
plt.tight_layout()
plt.savefig('shot_placement_heatmap.png', dpi=150, bbox_inches='tight')
plt.show()
print("\nPSxG analysis complete! Use for goalkeeper evaluation (GSAx) and finishing quality.")library(StatsBombR)
library(tidyverse)
library(caret)
library(pROC)
# Load shot data with end locations
competitions <- FreeCompetitions() %>%
filter(competition_name == "La Liga")
matches <- FreeMatches(competitions)
events <- StatsBombFreeEvents(MatchesDF = matches)
# Extract shots with end location (on target only for PSxG)
shots <- events %>%
filter(type.name == "Shot") %>%
mutate(
# Extract shot end location (goal frame coordinates)
end_y = map_dbl(shot.end_location, ~ ifelse(length(.x) >= 2, .x[[2]], NA)),
end_z = map_dbl(shot.end_location, ~ ifelse(length(.x) >= 3, .x[[3]], NA)),
# Basic shot features
x = location.x,
y = location.y,
distance = sqrt((120 - x)^2 + (40 - y)^2),
angle = atan2(abs(y - 40), 120 - x) * 180 / pi,
# Shot outcome
is_goal = shot.outcome.name == "Goal",
is_on_target = shot.outcome.name %in% c("Goal", "Saved", "Saved to Post"),
# Pre-shot xG
pre_shot_xg = shot.statsbomb_xg,
# Context
is_header = shot.body_part.name == "Head",
under_pressure = !is.na(under_pressure),
# Player and team info
player_name = player.name,
team_name = team.name,
goalkeeper_name = ifelse(!is.na(shot.outcome.name),
"Opposition GK", NA) # Simplified
) %>%
filter(shot.type.name != "Penalty")
# Filter to on-target shots with valid end locations
on_target_shots <- shots %>%
filter(is_on_target, !is.na(end_y), !is.na(end_z)) %>%
mutate(
# Normalize goal frame coordinates
# Goal is 7.32m wide (36.6 to 43.4 in y) and 2.44m tall (0 to 2.44 in z)
goal_y_normalized = (end_y - 36.6) / 7.32, # 0 = left post, 1 = right post
goal_z_normalized = end_z / 2.44, # 0 = ground, 1 = crossbar
# Shot placement features
shot_height = case_when(
goal_z_normalized < 0.33 ~ "low",
goal_z_normalized < 0.66 ~ "mid",
TRUE ~ "high"
),
shot_side = case_when(
goal_y_normalized < 0.33 ~ "left",
goal_y_normalized < 0.66 ~ "center",
TRUE ~ "right"
),
# Distance from center of goal (harder for keeper)
dist_from_center = sqrt((goal_y_normalized - 0.5)^2 + (goal_z_normalized - 0.5)^2),
# Corner shots (hardest to save)
is_corner_shot = (goal_y_normalized < 0.15 | goal_y_normalized > 0.85) &
(goal_z_normalized > 0.7 | goal_z_normalized < 0.15),
# Near post vs far post
near_post = (y < 40 & goal_y_normalized < 0.3) | (y >= 40 & goal_y_normalized > 0.7)
)
cat("On-target shots with placement data:", nrow(on_target_shots), "\n")
cat("Goal conversion on target:", round(mean(on_target_shots$is_goal) * 100, 2), "%\n")
# Build PSxG model
psxg_features <- c("distance", "angle", "goal_y_normalized", "goal_z_normalized",
"dist_from_center", "is_corner_shot", "near_post",
"is_header", "under_pressure")
# Prepare data
model_data <- on_target_shots %>%
select(all_of(psxg_features), is_goal, pre_shot_xg, player_name, team_name) %>%
drop_na(distance, angle, goal_y_normalized, goal_z_normalized)
cat("Shots for modeling:", nrow(model_data), "\n")
# Split data
set.seed(42)
train_idx <- createDataPartition(model_data$is_goal, p = 0.8, list = FALSE)
train_data <- model_data[train_idx, ]
test_data <- model_data[-train_idx, ]
# Train PSxG model
psxg_model <- train(
x = train_data[, psxg_features],
y = factor(train_data$is_goal),
method = "gbm",
trControl = trainControl(method = "cv", number = 5, classProbs = TRUE),
tuneGrid = expand.grid(
n.trees = 150,
interaction.depth = 4,
shrinkage = 0.1,
n.minobsinnode = 10
),
verbose = FALSE
)
# Predict PSxG
test_data$psxg <- predict(psxg_model, test_data[, psxg_features], type = "prob")[, "TRUE"]
# Model evaluation
psxg_auc <- auc(roc(test_data$is_goal, test_data$psxg))
psxg_brier <- mean((test_data$psxg - as.numeric(test_data$is_goal))^2)
pre_xg_auc <- auc(roc(test_data$is_goal, test_data$pre_shot_xg))
pre_xg_brier <- mean((test_data$pre_shot_xg - as.numeric(test_data$is_goal))^2)
cat("\n=== MODEL COMPARISON ===\n")
cat("PSxG Model AUC:", round(psxg_auc, 4), "\n")
cat("PSxG Model Brier:", round(psxg_brier, 4), "\n")
cat("Pre-shot xG AUC:", round(pre_xg_auc, 4), "\n")
cat("Pre-shot xG Brier:", round(pre_xg_brier, 4), "\n")
# Apply to full dataset for player analysis
full_predictions <- on_target_shots %>%
select(all_of(psxg_features), is_goal, pre_shot_xg, player_name, team_name) %>%
drop_na()
full_predictions$psxg <- predict(psxg_model, full_predictions[, psxg_features],
type = "prob")[, "TRUE"]
# Calculate finishing performance (Goals - PSxG)
cat("\n=== TOP FINISHERS (Goals Above PSxG) ===\n")
finisher_analysis <- full_predictions %>%
group_by(player_name) %>%
summarise(
shots_on_target = n(),
goals = sum(is_goal),
total_psxg = sum(psxg),
total_pre_xg = sum(pre_shot_xg),
goals_minus_psxg = goals - total_psxg,
conversion_rate = goals / shots_on_target * 100,
.groups = "drop"
) %>%
filter(shots_on_target >= 10) %>%
arrange(desc(goals_minus_psxg))
cat("\nBest Finishers (overperforming PSxG):\n")
print(head(finisher_analysis, 10))
cat("\nWorst Finishers (underperforming PSxG):\n")
print(tail(finisher_analysis, 10))
# Simulated Goalkeeper Analysis (GSAx)
# In reality, you'd link shots to specific goalkeepers
cat("\n=== GOALKEEPER ANALYSIS (Simulated) ===\n")
# Create simulated GK assignments based on opponent team
gk_analysis <- full_predictions %>%
mutate(opponent_team = team_name) %>% # Simplified - would need actual GK data
group_by(opponent_team) %>%
summarise(
shots_faced = n(),
goals_conceded = sum(is_goal),
total_psxg_faced = sum(psxg),
gsax = total_psxg_faced - goals_conceded, # Positive = goals saved above expected
save_percentage = (shots_faced - goals_conceded) / shots_faced * 100,
.groups = "drop"
) %>%
filter(shots_faced >= 15) %>%
arrange(desc(gsax))
cat("\nBest Goalkeeping (by team, highest GSAx):\n")
print(head(gk_analysis, 5))
# Visualization: Shot placement heatmap
cat("\n=== GENERATING VISUALIZATIONS ===\n")
# Goal conversion by placement
placement_conv <- on_target_shots %>%
group_by(shot_height, shot_side) %>%
summarise(
shots = n(),
goals = sum(is_goal),
conversion = goals / shots * 100,
.groups = "drop"
)
print(placement_conv)
# Plot goal frame with conversion rates
par(mfrow = c(1, 2))
# Shot placement heatmap
plot(1, type = "n", xlim = c(0, 1), ylim = c(0, 1),
xlab = "Goal Width (Left to Right)", ylab = "Goal Height",
main = "Shot Placement (On Target)")
# Color by outcome
goals <- on_target_shots %>% filter(is_goal)
saves <- on_target_shots %>% filter(!is_goal)
points(saves$goal_y_normalized, saves$goal_z_normalized,
col = rgb(0, 0, 1, 0.3), pch = 19, cex = 0.8)
points(goals$goal_y_normalized, goals$goal_z_normalized,
col = rgb(1, 0, 0, 0.5), pch = 19, cex = 1)
legend("topright", legend = c("Goal", "Save"),
col = c("red", "blue"), pch = 19, cex = 0.8)
# PSxG vs Pre-shot xG
plot(test_data$pre_shot_xg, test_data$psxg,
xlab = "Pre-shot xG", ylab = "Post-shot xG (PSxG)",
main = "PSxG vs Pre-shot xG",
col = ifelse(test_data$is_goal, "red", "blue"),
pch = 19, cex = 0.7)
abline(0, 1, lty = 2, col = "gray")
legend("bottomright", legend = c("Goal", "No Goal"),
col = c("red", "blue"), pch = 19, cex = 0.8)
par(mfrow = c(1, 1))
cat("\nPSxG model complete! Use for goalkeeper and finisher evaluation.\n")Summary
Key Takeaways
- Feature engineering is crucial - distance, angle, and goal visibility are primary determinants of shot quality
- Multiple models should be compared - logistic regression for interpretability, gradient boosting for accuracy
- Calibration matters as much as accuracy - xG should match actual conversion rates
- Penalties should be handled separately with a fixed xG value around 0.76
- Validation against provider xG helps identify model strengths and weaknesses