Capstone - Complete Analytics System
Possession has long been considered a key indicator of team dominance, but raw possession percentage tells only part of the story. Modern possession analytics focuses on the quality, location, and purpose of possession to understand which teams truly control matches.
Learning Objectives
- Measure possession quality beyond simple percentages
- Analyze territorial dominance and field tilt
- Calculate possession value and expected threat
- Understand ball retention and recycling patterns
- Evaluate progressive possession sequences
Beyond Possession Percentage
Raw possession percentage is one of the most misleading statistics in football. A team can have 70% possession while creating fewer chances than their opponent. Quality metrics reveal the true value of possession.
Possession Quality Metrics
- Final Third Possession %
- Box Entries per possession
- Progressive Passes per sequence
- Shots per possession sequence
- xG per Possession
Low-Value Possession Signs
- High possession in own third
- Low progressive pass rate
- Few box entries
- Backward/sideways passing dominance
- Low shots per possession
import pandas as pd
import numpy as np
from statsbombpy import sb
# Load match data
events = sb.events(match_id=3773585)
def identify_possession_sequences(events):
"""
Identify distinct possession sequences.
"""
events = events.sort_values('index').copy()
# New possession when team changes
events['new_possession'] = events['team'] != events['team'].shift(1)
events['possession_id'] = events['new_possession'].cumsum()
# Add sequence metadata
sequence_info = (
events
.groupby('possession_id')
.agg({
'team': 'first',
'index': 'count',
'minute': ['min', 'max'],
'second': ['min', 'max']
})
.reset_index()
)
sequence_info.columns = [
'possession_id', 'possession_team', 'sequence_length',
'start_min', 'end_min', 'start_sec', 'end_sec'
]
events = events.merge(
sequence_info[['possession_id', 'possession_team', 'sequence_length']],
on='possession_id'
)
return events
events_with_sequences = identify_possession_sequences(events)
def calculate_possession_quality(events, team_name):
"""
Calculate possession quality metrics for a team.
"""
team_events = events[events['possession_team'] == team_name]
# Extract locations
team_events['x'] = team_events['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
team_events['y'] = team_events['location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
)
# Calculate per-possession metrics
possession_metrics = []
for poss_id in team_events['possession_id'].unique():
poss = team_events[team_events['possession_id'] == poss_id]
metrics = {
'possession_id': poss_id,
'sequence_length': len(poss),
'start_x': poss['x'].iloc[0] if len(poss) > 0 else np.nan,
'max_x': poss['x'].max(),
# Territory
'entered_final_third': (poss['x'] > 80).any(),
'entered_box': ((poss['x'] > 102) & (poss['y'] > 18) & (poss['y'] < 62)).any(),
# Passes
'total_passes': (poss['type'] == 'Pass').sum(),
'successful_passes': (
(poss['type'] == 'Pass') &
(poss['pass_outcome'].isna())
).sum(),
# Outcomes
'ended_in_shot': (poss['type'] == 'Shot').any(),
'xg_generated': poss['shot_statsbomb_xg'].sum(),
'ended_in_goal': (
(poss['type'] == 'Shot') &
(poss['shot_outcome'] == 'Goal')
).any()
}
possession_metrics.append(metrics)
poss_df = pd.DataFrame(possession_metrics)
# Aggregate summary
summary = {
'total_possessions': len(poss_df),
'avg_sequence_length': poss_df['sequence_length'].mean(),
'final_third_entry_rate': poss_df['entered_final_third'].mean() * 100,
'box_entry_rate': poss_df['entered_box'].mean() * 100,
'pass_completion': (
poss_df['successful_passes'].sum() /
poss_df['total_passes'].sum() * 100
),
'shot_rate': poss_df['ended_in_shot'].mean() * 100,
'xg_per_possession': poss_df['xg_generated'].mean(),
'goal_rate': poss_df['ended_in_goal'].mean() * 100
}
return {
'sequences': poss_df,
'summary': summary
}
# Calculate for both teams
liverpool_quality = calculate_possession_quality(events_with_sequences, 'Liverpool')
city_quality = calculate_possession_quality(events_with_sequences, 'Manchester City')
print("Liverpool Possession Quality:")
print(pd.Series(liverpool_quality['summary']))
print("\nManchester City Possession Quality:")
print(pd.Series(city_quality['summary']))library(tidyverse)
library(StatsBombR)
# Load match data
events <- get.matchFree(match_id)
# Calculate possession sequences
identify_possession_sequences <- function(events) {
events %>%
arrange(index) %>%
mutate(
# New possession when team changes
new_possession = team.name != lag(team.name) | is.na(lag(team.name)),
possession_id = cumsum(new_possession)
) %>%
group_by(possession_id) %>%
mutate(
possession_team = first(team.name),
sequence_length = n(),
sequence_duration = max(minute * 60 + second) - min(minute * 60 + second)
) %>%
ungroup()
}
events_with_sequences <- identify_possession_sequences(events)
# Calculate possession quality metrics
calculate_possession_quality <- function(events, team_name) {
team_possessions <- events %>%
filter(possession_team == team_name) %>%
group_by(possession_id) %>%
summarise(
# Basic metrics
sequence_length = max(sequence_length),
duration_seconds = max(sequence_duration),
# Location metrics
start_x = first(location.x),
end_x = last(location.x),
max_x = max(location.x, na.rm = TRUE),
entered_final_third = any(location.x > 80),
entered_box = any(location.x > 102 & location.y > 18 & location.y < 62),
# Pass metrics
total_passes = sum(type.name == "Pass"),
successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
progressive_passes = sum(
type.name == "Pass" &
(120 - location.x) - (120 - pass.end_location.x) > 10,
na.rm = TRUE
),
# Outcome metrics
ended_in_shot = any(type.name == "Shot"),
xg_generated = sum(shot.statsbomb_xg, na.rm = TRUE),
ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),
.groups = "drop"
)
# Aggregate quality metrics
quality_summary <- team_possessions %>%
summarise(
total_possessions = n(),
avg_sequence_length = mean(sequence_length),
avg_duration = mean(duration_seconds),
# Territorial
final_third_entry_rate = mean(entered_final_third) * 100,
box_entry_rate = mean(entered_box) * 100,
# Passing quality
pass_completion = sum(successful_passes) / sum(total_passes) * 100,
progressive_pass_rate = sum(progressive_passes) / sum(total_passes) * 100,
# Outcome quality
shot_rate = mean(ended_in_shot) * 100,
xg_per_possession = mean(xg_generated),
goal_rate = mean(ended_in_goal) * 100
)
return(list(
sequences = team_possessions,
summary = quality_summary
))
}
# Calculate for both teams
home_quality <- calculate_possession_quality(events_with_sequences, "Liverpool")
away_quality <- calculate_possession_quality(events_with_sequences, "Manchester City")
# Compare quality
comparison <- bind_rows(
home_quality$summary %>% mutate(team = "Liverpool"),
away_quality$summary %>% mutate(team = "Manchester City")
)
print(comparison)Field Tilt & Territorial Dominance
Field tilt measures where on the pitch the action is happening. A team can have lower possession but higher field tilt if they're winning the ball high and keeping play in the opponent's half.
def calculate_field_tilt(events, team_name):
"""
Calculate field tilt and territorial dominance.
"""
# Get all touches
touch_types = ['Pass', 'Carry', 'Dribble', 'Shot', 'Ball Receipt']
all_touches = events[events['type'].isin(touch_types)].copy()
all_touches['x'] = all_touches['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
team_touches = all_touches[all_touches['team'] == team_name]
opp_touches = all_touches[all_touches['team'] != team_name]
# Field tilt calculation
team_final_third = (team_touches['x'] > 80).sum()
opp_defensive_third = (opp_touches['x'] < 40).sum()
field_tilt = (
team_final_third / (team_final_third + opp_defensive_third) * 100
if (team_final_third + opp_defensive_third) > 0 else 50
)
# Zone breakdown
team_touches['zone'] = pd.cut(
team_touches['x'],
bins=[0, 40, 80, 120],
labels=['Defensive Third', 'Middle Third', 'Final Third']
)
zone_breakdown = team_touches['zone'].value_counts(normalize=True) * 100
# Timeline
all_touches['minute_bucket'] = (all_touches['minute'] // 5) * 5
all_touches['in_opp_half'] = all_touches['x'] > 60
timeline = (
all_touches
.groupby(['minute_bucket', 'team'])
.agg({
'id': 'count',
'in_opp_half': 'sum'
})
.reset_index()
.rename(columns={'id': 'touches', 'in_opp_half': 'touches_in_opp_half'})
)
return {
'field_tilt': field_tilt,
'zone_breakdown': zone_breakdown,
'timeline': timeline
}
liverpool_tilt = calculate_field_tilt(events, 'Liverpool')
city_tilt = calculate_field_tilt(events, 'Manchester City')
print(f"Liverpool Field Tilt: {liverpool_tilt['field_tilt']:.1f}%")
print(f"Man City Field Tilt: {city_tilt['field_tilt']:.1f}%")
def plot_field_tilt_timeline(timeline):
"""
Visualize territorial control over time.
"""
fig, ax = plt.subplots(figsize=(12, 6))
# Pivot for stacked area
pivot = timeline.pivot(
index='minute_bucket',
columns='team',
values='touches_in_opp_half'
).fillna(0)
# Normalize to percentages
pivot_pct = pivot.div(pivot.sum(axis=1), axis=0) * 100
pivot_pct.plot(kind='area', stacked=True, ax=ax, alpha=0.7,
color=['#C8102E', '#6CABDD'])
ax.set_xlabel('Match Minute')
ax.set_ylabel('Share of Touches in Opponent Half (%)')
ax.set_title('Territorial Control Over Time')
ax.legend(title='Team')
plt.tight_layout()
plt.show()
plot_field_tilt_timeline(liverpool_tilt['timeline'])# Calculate field tilt metrics
calculate_field_tilt <- function(events, team_name) {
# All touches by both teams
all_touches <- events %>%
filter(type.name %in% c("Pass", "Carry", "Dribble", "Shot", "Ball Receipt*"))
# Field tilt = % of touches in opponent's third
team_touches <- all_touches %>% filter(team.name == team_name)
opp_touches <- all_touches %>% filter(team.name != team_name)
# Calculate by zone
team_zone_touches <- team_touches %>%
mutate(
zone = case_when(
location.x > 80 ~ "Final Third",
location.x > 40 ~ "Middle Third",
TRUE ~ "Defensive Third"
)
) %>%
count(zone) %>%
mutate(pct = n / sum(n) * 100)
# Field tilt calculation
# Touches in final third / (touches in final third + opp touches in their final third)
team_final_third <- sum(team_touches$location.x > 80, na.rm = TRUE)
opp_defensive_third <- sum(opp_touches$location.x < 40, na.rm = TRUE)
field_tilt <- team_final_third / (team_final_third + opp_defensive_third) * 100
# Territorial control by minute
territorial_timeline <- events %>%
mutate(
minute_bucket = floor(minute / 5) * 5,
in_opp_half = location.x > 60
) %>%
group_by(minute_bucket, team.name) %>%
summarise(
touches = n(),
touches_in_opp_half = sum(in_opp_half, na.rm = TRUE),
.groups = "drop"
)
return(list(
field_tilt = field_tilt,
zone_breakdown = team_zone_touches,
timeline = territorial_timeline
))
}
# Calculate field tilt
liverpool_tilt <- calculate_field_tilt(events, "Liverpool")
city_tilt <- calculate_field_tilt(events, "Manchester City")
cat("Liverpool Field Tilt:", round(liverpool_tilt$field_tilt, 1), "%\n")
cat("Man City Field Tilt:", round(city_tilt$field_tilt, 1), "%\n")
# Visualize territorial control over time
ggplot(liverpool_tilt$timeline, aes(x = minute_bucket, y = touches_in_opp_half,
fill = team.name)) +
geom_area(position = "fill", alpha = 0.7) +
scale_fill_manual(values = c("Liverpool" = "#C8102E", "Manchester City" = "#6CABDD")) +
labs(
title = "Territorial Control Over Time",
x = "Match Minute", y = "Share of Touches in Opponent Half",
fill = "Team"
) +
theme_minimal()
# Heatmap of touch locations
library(ggsoccer)
ggplot(events %>% filter(team.name == "Liverpool"), aes(x = location.x, y = location.y)) +
annotate_pitch(dimensions = pitch_statsbomb) +
stat_density_2d(aes(fill = ..level..), geom = "polygon", alpha = 0.6) +
scale_fill_viridis_c(option = "plasma") +
labs(title = "Liverpool Touch Heatmap") +
theme_pitch() +
coord_flip()Ball Retention & Recycling
Ball retention measures a team's ability to keep possession under pressure and recycle the ball to create new attacking opportunities.
def analyze_ball_retention(events, team_name):
"""
Analyze ball retention and recycling patterns.
"""
team_events = events[events['team'] == team_name].copy()
team_events['x'] = team_events['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
# Identify possession losses
loss_conditions = (
((team_events['type'] == 'Pass') & (team_events['pass_outcome'].notna())) |
((team_events['type'] == 'Dribble') & (team_events['dribble_outcome'] != 'Complete')) |
(team_events['type'] == 'Dispossessed') |
(team_events['type'] == 'Miscontrol')
)
losses = team_events[loss_conditions].copy()
losses['zone'] = pd.cut(
losses['x'],
bins=[0, 40, 80, 120],
labels=['Defensive Third', 'Middle Third', 'Final Third']
)
# Retention by zone
all_actions = team_events[team_events['type'].isin(['Pass', 'Dribble', 'Carry'])].copy()
all_actions['zone'] = pd.cut(
all_actions['x'],
bins=[0, 40, 80, 120],
labels=['Defensive Third', 'Middle Third', 'Final Third']
)
# Count losses per zone
zone_losses = losses.groupby('zone').size()
zone_total = all_actions.groupby('zone').size()
retention_by_zone = pd.DataFrame({
'total_actions': zone_total,
'losses': zone_losses
}).fillna(0)
retention_by_zone['retention_rate'] = (
(1 - retention_by_zone['losses'] / retention_by_zone['total_actions']) * 100
)
# Retention under pressure
under_pressure = team_events[team_events.get('under_pressure', False) == True]
if len(under_pressure) > 0:
successful_under_pressure = (
((under_pressure['type'] == 'Pass') & (under_pressure['pass_outcome'].isna())) |
((under_pressure['type'] == 'Dribble') & (under_pressure['dribble_outcome'] == 'Complete'))
).sum()
pressure_retention = successful_under_pressure / len(under_pressure) * 100
else:
pressure_retention = 0
return {
'losses': losses,
'retention_by_zone': retention_by_zone,
'retention_under_pressure': pressure_retention
}
retention = analyze_ball_retention(events, 'Liverpool')
print("Retention by Zone:")
print(retention['retention_by_zone'])
print(f"\nRetention Under Pressure: {retention['retention_under_pressure']:.1f}%")
def plot_retention_by_zone(retention_data):
"""
Visualize retention rates by zone.
"""
fig, ax = plt.subplots(figsize=(8, 6))
zones = retention_data['retention_by_zone'].index
rates = retention_data['retention_by_zone']['retention_rate']
colors = ['#90EE90', '#32CD32', '#228B22']
bars = ax.bar(zones, rates, color=colors)
# Add labels
for bar, rate in zip(bars, rates):
ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
f'{rate:.1f}%', ha='center', va='bottom')
ax.set_ylabel('Retention Rate (%)')
ax.set_title('Ball Retention Rate by Zone')
ax.set_ylim(0, 100)
plt.tight_layout()
plt.show()
plot_retention_by_zone(retention)# Analyze ball retention patterns
analyze_ball_retention <- function(events, team_name) {
team_events <- events %>%
filter(team.name == team_name)
# Possession loss events
possession_losses <- team_events %>%
filter(
(type.name == "Pass" & !is.na(pass.outcome.name)) |
(type.name == "Dribble" & dribble.outcome.name != "Complete") |
type.name == "Dispossessed" |
type.name == "Miscontrol"
) %>%
mutate(
loss_zone = case_when(
location.x > 80 ~ "Final Third",
location.x > 40 ~ "Middle Third",
TRUE ~ "Defensive Third"
),
loss_type = case_when(
type.name == "Pass" ~ "Failed Pass",
type.name == "Dribble" ~ "Failed Dribble",
type.name == "Dispossessed" ~ "Dispossessed",
type.name == "Miscontrol" ~ "Miscontrol",
TRUE ~ "Other"
)
)
# Retention rate by zone
all_actions <- team_events %>%
filter(type.name %in% c("Pass", "Dribble", "Carry")) %>%
mutate(
zone = case_when(
location.x > 80 ~ "Final Third",
location.x > 40 ~ "Middle Third",
TRUE ~ "Defensive Third"
)
)
retention_by_zone <- all_actions %>%
group_by(zone) %>%
summarise(
total_actions = n(),
losses = sum(
(type.name == "Pass" & !is.na(pass.outcome.name)) |
(type.name == "Dribble" & dribble.outcome.name != "Complete")
),
retention_rate = (1 - losses / total_actions) * 100
)
# Retention under pressure
under_pressure <- team_events %>%
filter(under_pressure == TRUE)
pressure_retention <- under_pressure %>%
summarise(
actions_under_pressure = n(),
successful = sum(
(type.name == "Pass" & is.na(pass.outcome.name)) |
(type.name == "Dribble" & dribble.outcome.name == "Complete")
),
retention_under_pressure = successful / actions_under_pressure * 100
)
# Ball recycling patterns
recycling <- events_with_sequences %>%
filter(possession_team == team_name) %>%
group_by(possession_id) %>%
mutate(
direction = case_when(
type.name == "Pass" &
pass.end_location.x < location.x ~ "Backward",
type.name == "Pass" &
abs(pass.end_location.y - location.y) >
abs(pass.end_location.x - location.x) ~ "Sideways",
TRUE ~ "Forward"
)
) %>%
summarise(
forward_pct = mean(direction == "Forward", na.rm = TRUE) * 100,
sideways_pct = mean(direction == "Sideways", na.rm = TRUE) * 100,
backward_pct = mean(direction == "Backward", na.rm = TRUE) * 100
)
return(list(
losses = possession_losses,
retention_by_zone = retention_by_zone,
under_pressure = pressure_retention,
recycling_patterns = recycling
))
}
retention_analysis <- analyze_ball_retention(events, "Liverpool")
# Visualize retention by zone
ggplot(retention_analysis$retention_by_zone,
aes(x = zone, y = retention_rate, fill = zone)) +
geom_bar(stat = "identity") +
geom_text(aes(label = paste0(round(retention_rate, 1), "%")),
vjust = -0.5) +
scale_fill_brewer(palette = "Greens") +
labs(
title = "Ball Retention Rate by Zone",
x = "Zone", y = "Retention Rate (%)"
) +
theme_minimal() +
theme(legend.position = "none")Progressive Possession Sequences
Progressive possession sequences move the ball toward goal in a meaningful way. Analyzing these sequences reveals how teams build attacks and which players drive progression.
def analyze_progressive_sequences(events, team_name):
"""
Analyze progressive possession sequences.
"""
team_sequences = events[events['possession_team'] == team_name].copy()
# Extract locations
team_sequences['x'] = team_sequences['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
team_sequences['y'] = team_sequences['location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
)
team_sequences['end_x'] = team_sequences['pass_end_location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
# Calculate per-sequence metrics
sequence_metrics = []
for poss_id in team_sequences['possession_id'].unique():
seq = team_sequences[team_sequences['possession_id'] == poss_id]
if len(seq) == 0:
continue
# Progressive passes
passes = seq[seq['type'] == 'Pass']
if len(passes) > 0:
prog_passes = ((passes['end_x'] - passes['x']) > 10).sum()
else:
prog_passes = 0
metrics = {
'possession_id': poss_id,
'start_x': seq['x'].iloc[0],
'end_x': seq['x'].iloc[-1],
'max_x': seq['x'].max(),
'sequence_length': len(seq),
'progressive_passes': prog_passes,
'entered_final_third': (seq['x'] > 80).any(),
'entered_box': ((seq['x'] > 102) & (seq['y'] > 18) & (seq['y'] < 62)).any(),
'ended_in_shot': (seq['type'] == 'Shot').any(),
'xg': seq['shot_statsbomb_xg'].sum()
}
metrics['net_progression'] = metrics['end_x'] - metrics['start_x']
metrics['is_progressive'] = metrics['net_progression'] > 20
sequence_metrics.append(metrics)
seq_df = pd.DataFrame(sequence_metrics)
# Classify quality
def classify_quality(row):
if row['ended_in_shot'] and row['xg'] > 0.1:
return 'High Quality Chance'
elif row['ended_in_shot']:
return 'Shot Created'
elif row['entered_box']:
return 'Box Entry'
elif row['entered_final_third']:
return 'Final Third Entry'
elif row['is_progressive']:
return 'Progressive'
else:
return 'Non-Progressive'
seq_df['sequence_quality'] = seq_df.apply(classify_quality, axis=1)
# Summary
summary = {
'total_sequences': len(seq_df),
'progressive_sequences': seq_df['is_progressive'].sum(),
'progressive_rate': seq_df['is_progressive'].mean() * 100,
'avg_progression': seq_df['net_progression'].mean(),
'box_entry_rate': seq_df['entered_box'].mean() * 100,
'shot_rate': seq_df['ended_in_shot'].mean() * 100
}
quality_breakdown = seq_df['sequence_quality'].value_counts()
return {
'sequences': seq_df,
'summary': summary,
'quality_breakdown': quality_breakdown
}
progressive = analyze_progressive_sequences(events_with_sequences, 'Liverpool')
print("Progressive Sequence Summary:")
print(pd.Series(progressive['summary']))
print("\nQuality Breakdown:")
print(progressive['quality_breakdown'])# Analyze progressive possession sequences
analyze_progressive_sequences <- function(events, team_name) {
team_sequences <- events_with_sequences %>%
filter(possession_team == team_name) %>%
group_by(possession_id) %>%
summarise(
start_x = first(location.x),
end_x = last(location.x),
max_x = max(location.x, na.rm = TRUE),
sequence_length = n(),
# Progressive actions
progressive_passes = sum(
type.name == "Pass" &
(pass.end_location.x - location.x) > 10,
na.rm = TRUE
),
progressive_carries = sum(
type.name == "Carry" &
(carry.end_location.x - location.x) > 10,
na.rm = TRUE
),
# Key events
entered_final_third = any(location.x > 80),
entered_box = any(location.x > 102 & location.y > 18 & location.y < 62),
ended_in_shot = any(type.name == "Shot"),
xg = sum(shot.statsbomb_xg, na.rm = TRUE),
.groups = "drop"
) %>%
mutate(
net_progression = end_x - start_x,
is_progressive = net_progression > 20,
progressive_actions = progressive_passes + progressive_carries
)
# Classify sequence quality
team_sequences <- team_sequences %>%
mutate(
sequence_quality = case_when(
ended_in_shot & xg > 0.1 ~ "High Quality Chance",
ended_in_shot ~ "Shot Created",
entered_box ~ "Box Entry",
entered_final_third ~ "Final Third Entry",
is_progressive ~ "Progressive",
TRUE ~ "Non-Progressive"
)
)
# Summary statistics
summary <- team_sequences %>%
summarise(
total_sequences = n(),
progressive_sequences = sum(is_progressive),
progressive_rate = mean(is_progressive) * 100,
avg_progression = mean(net_progression),
box_entry_rate = mean(entered_box) * 100,
shot_sequence_rate = mean(ended_in_shot) * 100,
avg_xg_per_sequence = mean(xg)
)
# Quality breakdown
quality_breakdown <- team_sequences %>%
count(sequence_quality) %>%
mutate(pct = n / sum(n) * 100)
return(list(
sequences = team_sequences,
summary = summary,
quality_breakdown = quality_breakdown
))
}
progressive_analysis <- analyze_progressive_sequences(events, "Liverpool")
# Visualize sequence quality
ggplot(progressive_analysis$quality_breakdown,
aes(x = reorder(sequence_quality, n), y = n, fill = sequence_quality)) +
geom_bar(stat = "identity") +
coord_flip() +
labs(
title = "Possession Sequence Quality Breakdown",
x = "", y = "Count"
) +
theme_minimal() +
theme(legend.position = "none")
# Progressive players
progressive_players <- events %>%
filter(team.name == "Liverpool") %>%
filter(
(type.name == "Pass" & (pass.end_location.x - location.x) > 10) |
(type.name == "Carry" & (carry.end_location.x - location.x) > 10)
) %>%
group_by(player.name, position.name) %>%
summarise(
progressive_actions = n(),
.groups = "drop"
) %>%
arrange(desc(progressive_actions))
print(head(progressive_players, 10))Possession Value Models
Possession value models assign a value to each possession location based on the probability of scoring from that position. This helps quantify the quality of possession beyond simple territory metrics.
def build_possession_value_model(events, grid_size=10):
"""
Build a simple possession value model based on shot probability.
"""
events = events.copy()
# Extract locations
events['x'] = events['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
events['y'] = events['location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
)
# Create grid cells
events['grid_x'] = (events['x'] // grid_size) * grid_size
events['grid_y'] = (events['y'] // grid_size) * grid_size
# Calculate shot probability per grid cell
grid_stats = (
events
.groupby(['possession_id', 'grid_x', 'grid_y'])
.agg({
'type': lambda x: (x == 'Shot').any(),
'shot_statsbomb_xg': 'max'
})
.reset_index()
.rename(columns={'type': 'ended_in_shot'})
)
pv_model = (
grid_stats
.groupby(['grid_x', 'grid_y'])
.agg({
'ended_in_shot': ['count', 'mean'],
'shot_statsbomb_xg': 'mean'
})
.reset_index()
)
pv_model.columns = ['grid_x', 'grid_y', 'n', 'shot_probability', 'avg_xg']
# Filter for minimum sample size
pv_model = pv_model[pv_model['n'] >= 10]
# Calculate possession value
pv_model['possession_value'] = (
pv_model['shot_probability'] * pv_model['avg_xg'].fillna(0.1)
)
return pv_model
pv_model = build_possession_value_model(season_events)
def plot_possession_value_grid(pv_model):
"""
Visualize possession value across the pitch.
"""
from mplsoccer import Pitch
pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a1a1a')
fig, ax = pitch.draw(figsize=(12, 8))
scatter = ax.scatter(
pv_model['grid_x'] + 5, # Center of grid cell
pv_model['grid_y'] + 5,
c=pv_model['possession_value'],
s=200,
cmap='plasma',
alpha=0.7
)
plt.colorbar(scatter, ax=ax, label='Possession Value')
ax.set_title('Possession Value by Location', fontsize=14, color='white')
plt.tight_layout()
plt.show()
plot_possession_value_grid(pv_model)
def calculate_pv_added(events, pv_model):
"""
Calculate possession value added by each action.
"""
events = events.copy()
# Grid locations
events['x'] = events['location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
events['grid_x'] = (events['x'] // 10) * 10
events['grid_y'] = (events['location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
) // 10) * 10
# End locations
events['end_x'] = events['pass_end_location'].apply(
lambda loc: loc[0] if isinstance(loc, list) else np.nan
)
events['end_grid_x'] = (events['end_x'] // 10) * 10
events['end_grid_y'] = (events['pass_end_location'].apply(
lambda loc: loc[1] if isinstance(loc, list) else np.nan
) // 10) * 10
# Merge with PV model
events = events.merge(
pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
columns={'possession_value': 'start_pv'}
),
on=['grid_x', 'grid_y'],
how='left'
)
events = events.merge(
pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
columns={'grid_x': 'end_grid_x', 'grid_y': 'end_grid_y',
'possession_value': 'end_pv'}
),
on=['end_grid_x', 'end_grid_y'],
how='left'
)
events['pv_added'] = events['end_pv'] - events['start_pv']
# Aggregate by player
player_pv = (
events[events['type'].isin(['Pass', 'Carry', 'Dribble'])]
.groupby(['player', 'team', 'position'])
.agg({
'id': 'count',
'pv_added': ['sum', 'mean']
})
.reset_index()
)
player_pv.columns = ['player', 'team', 'position', 'actions',
'total_pv_added', 'avg_pv_added']
return player_pv.sort_values('total_pv_added', ascending=False)
player_pv = calculate_pv_added(events, pv_model)
print(player_pv.head(10))# Build simple possession value model
build_possession_value_model <- function(events) {
# Create grid of pitch locations
grid_size <- 10 # 10x10 meter squares
# For each location, calculate probability of shot within next N actions
shot_probability <- events %>%
mutate(
grid_x = floor(location.x / grid_size) * grid_size,
grid_y = floor(location.y / grid_size) * grid_size
) %>%
group_by(possession_id, grid_x, grid_y) %>%
summarise(
sequence_ended_in_shot = any(type.name == "Shot"),
sequence_xg = max(shot.statsbomb_xg, na.rm = TRUE),
.groups = "drop"
) %>%
group_by(grid_x, grid_y) %>%
summarise(
n = n(),
shot_probability = mean(sequence_ended_in_shot, na.rm = TRUE),
avg_xg = mean(sequence_xg, na.rm = TRUE),
possession_value = shot_probability * coalesce(avg_xg, 0.1),
.groups = "drop"
) %>%
filter(n >= 10) # Minimum sample size
return(shot_probability)
}
# Build PV model from season data
pv_model <- build_possession_value_model(season_events)
# Visualize possession value grid
ggplot(pv_model, aes(x = grid_x, y = grid_y, fill = possession_value)) +
annotate_pitch(dimensions = pitch_statsbomb, colour = "white") +
geom_tile(alpha = 0.7) +
scale_fill_viridis_c(option = "plasma", name = "PV") +
labs(title = "Possession Value by Location") +
theme_pitch() +
coord_flip()
# Calculate possession value added per action
calculate_pv_added <- function(events, pv_model) {
events_with_pv <- events %>%
mutate(
grid_x = floor(location.x / 10) * 10,
grid_y = floor(location.y / 10) * 10,
end_grid_x = floor(coalesce(pass.end_location.x, carry.end_location.x, location.x) / 10) * 10,
end_grid_y = floor(coalesce(pass.end_location.y, carry.end_location.y, location.y) / 10) * 10
) %>%
left_join(pv_model %>% select(grid_x, grid_y, start_pv = possession_value),
by = c("grid_x", "grid_y")) %>%
left_join(pv_model %>% select(end_grid_x = grid_x, end_grid_y = grid_y, end_pv = possession_value),
by = c("end_grid_x", "end_grid_y")) %>%
mutate(
pv_added = end_pv - start_pv
)
# Aggregate by player
player_pv <- events_with_pv %>%
filter(type.name %in% c("Pass", "Carry", "Dribble")) %>%
group_by(player.name, team.name, position.name) %>%
summarise(
actions = n(),
total_pv_added = sum(pv_added, na.rm = TRUE),
avg_pv_added = mean(pv_added, na.rm = TRUE),
.groups = "drop"
) %>%
arrange(desc(total_pv_added))
return(player_pv)
}
player_pv <- calculate_pv_added(events, pv_model)
print(head(player_pv, 10))Practice Exercises
Exercise 26.1: Complete Possession Quality Analysis System
Task: Build a comprehensive possession quality analysis system that calculates multi-dimensional possession metrics for all teams in a league, identifies possession style clusters, and generates visual comparisons.
Requirements:
- Calculate possession percentage and quality metrics (xG/possession, box entries, progressive passes) for each team
- Create a possession efficiency score combining quantity and quality
- Identify teams with high possession but low quality vs low possession but high quality
- Cluster teams by possession style using k-means
- Generate scatter plots showing possession % vs quality metrics
- Create detailed team comparison reports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from statsbombpy import sb
# ============================================
# POSSESSION QUALITY ANALYSIS SYSTEM
# ============================================
def identify_possession_sequences(events):
"""Identify distinct possession sequences."""
events = events.sort_values('index').copy()
events['new_possession'] = events['team'] != events['team'].shift(1)
events['possession_id'] = events['new_possession'].cumsum()
seq_info = (
events.groupby('possession_id')
.agg({'team': 'first', 'index': 'count'})
.reset_index()
.rename(columns={'team': 'possession_team', 'index': 'sequence_length'})
)
events = events.merge(seq_info, on='possession_id')
return events
def calculate_team_possession_quality(events, team_name):
"""Calculate possession quality metrics for a team."""
def get_x(loc):
return loc[0] if isinstance(loc, list) else np.nan
def get_y(loc):
return loc[1] if isinstance(loc, list) else np.nan
events = events.copy()
events['x'] = events['location'].apply(get_x)
events['y'] = events['location'].apply(get_y)
if 'pass_end_location' in events.columns:
events['end_x'] = events['pass_end_location'].apply(get_x)
team_poss = events[events['possession_team'] == team_name]
possession_metrics = []
for poss_id in team_poss['possession_id'].unique():
poss = team_poss[team_poss['possession_id'] == poss_id]
metrics = {
'possession_id': poss_id,
'sequence_length': len(poss),
'max_x': poss['x'].max(),
'entered_final_third': (poss['x'] > 80).any(),
'entered_box': ((poss['x'] > 102) & (poss['y'] > 18) & (poss['y'] < 62)).any(),
'total_passes': (poss['type'] == 'Pass').sum(),
'successful_passes': ((poss['type'] == 'Pass') & (poss['pass_outcome'].isna())).sum(),
'ended_in_shot': (poss['type'] == 'Shot').any(),
'xg_generated': poss['shot_statsbomb_xg'].sum() if 'shot_statsbomb_xg' in poss.columns else 0,
'ended_in_goal': ((poss['type'] == 'Shot') & (poss['shot_outcome'] == 'Goal')).any()
}
# Progressive passes
if 'end_x' in poss.columns:
poss_passes = poss[poss['type'] == 'Pass']
metrics['progressive_passes'] = (
((120 - poss_passes['x']) - (120 - poss_passes['end_x']) > 10).sum()
)
else:
metrics['progressive_passes'] = 0
possession_metrics.append(metrics)
poss_df = pd.DataFrame(possession_metrics)
# Possession percentage
all_poss = events.groupby('possession_team')['possession_id'].nunique()
team_poss_pct = all_poss.get(team_name, 0) / all_poss.sum() * 100
# Quality summary
quality = {
'team': team_name,
'total_possessions': len(poss_df),
'possession_pct': team_poss_pct,
'avg_sequence_length': poss_df['sequence_length'].mean(),
'final_third_entry_rate': poss_df['entered_final_third'].mean() * 100,
'box_entry_rate': poss_df['entered_box'].mean() * 100,
'pass_completion': poss_df['successful_passes'].sum() / max(poss_df['total_passes'].sum(), 1) * 100,
'progressive_pass_rate': poss_df['progressive_passes'].sum() / max(poss_df['total_passes'].sum(), 1) * 100,
'shot_rate': poss_df['ended_in_shot'].mean() * 100,
'xg_per_possession': poss_df['xg_generated'].mean(),
'goal_rate': poss_df['ended_in_goal'].mean() * 100
}
# Quality score
quality['quality_score'] = (
(quality['final_third_entry_rate'] / 50) * 25 +
(quality['box_entry_rate'] / 20) * 25 +
(quality['progressive_pass_rate'] / 15) * 25 +
(quality['xg_per_possession'] / 0.05) * 25
)
# Efficiency score
quality['efficiency_score'] = quality['quality_score'] / (quality['possession_pct'] / 50)
return quality
def calculate_league_possession(events):
"""Calculate possession metrics for all teams."""
events_seq = identify_possession_sequences(events)
teams = events_seq['team'].unique()
league_data = [calculate_team_possession_quality(events_seq, team) for team in teams if team]
return pd.DataFrame(league_data)
def classify_possession_style(league_df):
"""Classify teams by possession style."""
def get_category(row):
if row['possession_pct'] >= 55 and row['quality_score'] >= 50:
return 'Dominant'
elif row['possession_pct'] >= 55 and row['quality_score'] < 50:
return 'Sterile Possession'
elif row['possession_pct'] < 45 and row['quality_score'] >= 50:
return 'Efficient Counter'
elif row['possession_pct'] < 45 and row['quality_score'] < 50:
return 'Low Quality'
return 'Balanced'
league_df['possession_category'] = league_df.apply(get_category, axis=1)
return league_df
def cluster_possession_styles(league_df):
"""Cluster teams by possession style using K-means."""
features = ['possession_pct', 'final_third_entry_rate', 'box_entry_rate',
'progressive_pass_rate', 'xg_per_possession', 'shot_rate']
X = league_df[features].fillna(0)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
kmeans = KMeans(n_clusters=4, random_state=42, n_init=25)
league_df['cluster'] = kmeans.fit_predict(X_scaled)
return league_df
def visualize_possession_quality(league_df):
"""Create possession quality visualizations."""
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
# Scatter plot
colors = {'Dominant': 'green', 'Sterile Possession': 'red',
'Efficient Counter': 'blue', 'Low Quality': 'gray', 'Balanced': 'orange'}
for cat in league_df['possession_category'].unique():
subset = league_df[league_df['possession_category'] == cat]
axes[0].scatter(subset['possession_pct'], subset['quality_score'],
c=colors.get(cat, 'gray'), label=cat, s=100, alpha=0.7)
axes[0].axvline(x=50, linestyle='--', alpha=0.5, color='gray')
axes[0].axhline(y=50, linestyle='--', alpha=0.5, color='gray')
axes[0].set_xlabel('Possession %')
axes[0].set_ylabel('Quality Score')
axes[0].set_title('Possession Quantity vs Quality')
axes[0].legend()
# Efficiency bar chart
sorted_df = league_df.sort_values('efficiency_score', ascending=True)
bars = axes[1].barh(sorted_df['team'], sorted_df['efficiency_score'],
color=[colors.get(c, 'gray') for c in sorted_df['possession_category']])
axes[1].set_xlabel('Efficiency Score')
axes[1].set_title('Possession Efficiency (Quality / Quantity)')
plt.tight_layout()
plt.show()
def generate_possession_report(league_df):
"""Generate comprehensive possession report."""
print("\n" + "=" * 65)
print("LEAGUE POSSESSION QUALITY REPORT")
print("=" * 65 + "\n")
print("POSSESSION STYLE DISTRIBUTION:")
print(league_df['possession_category'].value_counts())
print("\n\nTOP 5 QUALITY TEAMS:")
print("-" * 55)
top_quality = league_df.nlargest(5, 'quality_score')[
['team', 'possession_pct', 'quality_score', 'xg_per_possession', 'possession_category']
]
print(top_quality.to_string(index=False))
print("\n\nTOP 5 MOST EFFICIENT TEAMS:")
print("-" * 55)
top_eff = league_df.nlargest(5, 'efficiency_score')[
['team', 'possession_pct', 'quality_score', 'efficiency_score', 'possession_category']
]
print(top_eff.to_string(index=False))
print("\n\nSTERILE POSSESSION TEAMS:")
print("-" * 55)
sterile = league_df[league_df['possession_category'] == 'Sterile Possession'][
['team', 'possession_pct', 'quality_score', 'box_entry_rate', 'xg_per_possession']
]
print(sterile.to_string(index=False) if len(sterile) > 0 else " None found")
print("\n\nEFFICIENT COUNTER TEAMS:")
print("-" * 55)
efficient = league_df[league_df['possession_category'] == 'Efficient Counter'][
['team', 'possession_pct', 'quality_score', 'box_entry_rate', 'xg_per_possession']
]
print(efficient.to_string(index=False) if len(efficient) > 0 else " None found")
# Main execution
matches = sb.matches(competition_id=11, season_id=90)
events = pd.concat([sb.events(match_id=m) for m in matches['match_id'].head(10)])
league_possession = calculate_league_possession(events)
league_possession = classify_possession_style(league_possession)
league_possession = cluster_possession_styles(league_possession)
generate_possession_report(league_possession)
visualize_possession_quality(league_possession)library(tidyverse)
library(StatsBombR)
library(scales)
# ============================================
# POSSESSION QUALITY ANALYSIS SYSTEM
# ============================================
# Calculate possession sequences for a match
identify_possession_sequences <- function(events) {
events %>%
arrange(index) %>%
mutate(
new_possession = team.name != lag(team.name) | is.na(lag(team.name)),
possession_id = cumsum(new_possession)
) %>%
group_by(possession_id) %>%
mutate(
possession_team = first(team.name),
sequence_length = n(),
sequence_duration = max(minute * 60 + second) - min(minute * 60 + second)
) %>%
ungroup()
}
# Calculate possession quality for a team
calculate_team_possession_quality <- function(events, team_name) {
team_possessions <- events %>%
filter(possession_team == team_name) %>%
group_by(possession_id) %>%
summarise(
sequence_length = max(sequence_length),
duration_seconds = max(sequence_duration),
start_x = first(location.x),
max_x = max(location.x, na.rm = TRUE),
entered_final_third = any(location.x > 80, na.rm = TRUE),
entered_box = any(location.x > 102 & location.y > 18 & location.y < 62, na.rm = TRUE),
total_passes = sum(type.name == "Pass"),
successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
progressive_passes = sum(
type.name == "Pass" &
!is.na(pass.end_location.x) &
(120 - location.x) - (120 - pass.end_location.x) > 10,
na.rm = TRUE
),
ended_in_shot = any(type.name == "Shot"),
xg_generated = sum(shot.statsbomb_xg, na.rm = TRUE),
ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
.groups = "drop"
)
# Calculate possession percentage
all_possessions <- events %>%
distinct(possession_id, possession_team) %>%
count(possession_team) %>%
mutate(pct = n / sum(n) * 100)
team_poss_pct <- all_possessions %>%
filter(possession_team == team_name) %>%
pull(pct)
# Quality summary
quality <- team_possessions %>%
summarise(
total_possessions = n(),
possession_pct = team_poss_pct,
avg_sequence_length = mean(sequence_length, na.rm = TRUE),
avg_duration = mean(duration_seconds, na.rm = TRUE),
final_third_entry_rate = mean(entered_final_third, na.rm = TRUE) * 100,
box_entry_rate = mean(entered_box, na.rm = TRUE) * 100,
pass_completion = sum(successful_passes, na.rm = TRUE) / sum(total_passes, na.rm = TRUE) * 100,
progressive_pass_rate = sum(progressive_passes, na.rm = TRUE) / sum(total_passes, na.rm = TRUE) * 100,
shot_rate = mean(ended_in_shot, na.rm = TRUE) * 100,
xg_per_possession = mean(xg_generated, na.rm = TRUE),
goal_rate = mean(ended_in_goal, na.rm = TRUE) * 100
) %>%
mutate(
team = team_name,
# Quality score combining multiple factors
quality_score = (
(final_third_entry_rate / 50) * 25 +
(box_entry_rate / 20) * 25 +
(progressive_pass_rate / 15) * 25 +
(xg_per_possession / 0.05) * 25
),
# Efficiency score: quality relative to possession
efficiency_score = quality_score / (possession_pct / 50)
)
return(quality)
}
# Process full season data
calculate_league_possession <- function(all_events) {
# Process by match
all_events_seq <- identify_possession_sequences(all_events)
# Get all teams
teams <- unique(all_events_seq$team.name)
# Calculate for each team
league_possession <- map_dfr(teams, function(team) {
calculate_team_possession_quality(all_events_seq, team)
})
return(league_possession)
}
# Classify possession style
classify_possession_style <- function(league_data) {
league_data %>%
mutate(
possession_category = case_when(
possession_pct >= 55 & quality_score >= 50 ~ "Dominant",
possession_pct >= 55 & quality_score < 50 ~ "Sterile Possession",
possession_pct < 45 & quality_score >= 50 ~ "Efficient Counter",
possession_pct < 45 & quality_score < 50 ~ "Low Quality",
TRUE ~ "Balanced"
),
style = case_when(
progressive_pass_rate > 12 & shot_rate > 15 ~ "Direct",
progressive_pass_rate < 8 & pass_completion > 85 ~ "Patient Build-up",
box_entry_rate > 15 & final_third_entry_rate > 40 ~ "Box Focused",
TRUE ~ "Mixed"
)
)
}
# Cluster teams by possession style
cluster_possession_styles <- function(league_data) {
# Select features for clustering
cluster_features <- league_data %>%
select(team, possession_pct, final_third_entry_rate, box_entry_rate,
progressive_pass_rate, xg_per_possession, shot_rate)
# Scale features
scaled_features <- cluster_features %>%
select(-team) %>%
scale()
# K-means clustering
set.seed(42)
kmeans_result <- kmeans(scaled_features, centers = 4, nstart = 25)
league_data$cluster <- kmeans_result$cluster
# Name clusters
cluster_names <- league_data %>%
group_by(cluster) %>%
summarise(
avg_poss = mean(possession_pct),
avg_quality = mean(quality_score),
.groups = "drop"
) %>%
arrange(desc(avg_poss), desc(avg_quality)) %>%
mutate(cluster_name = c("High Poss / High Quality",
"High Poss / Low Quality",
"Low Poss / High Quality",
"Low Poss / Low Quality")[row_number()])
league_data <- league_data %>%
left_join(cluster_names %>% select(cluster, cluster_name), by = "cluster")
return(league_data)
}
# Visualize possession quality
visualize_possession_quality <- function(league_data) {
# Main scatter plot
p1 <- ggplot(league_data, aes(x = possession_pct, y = quality_score)) +
geom_point(aes(color = possession_category, size = xg_per_possession),
alpha = 0.7) +
geom_text(aes(label = team), vjust = -0.8, size = 3, check_overlap = TRUE) +
geom_vline(xintercept = 50, linetype = "dashed", alpha = 0.5) +
geom_hline(yintercept = 50, linetype = "dashed", alpha = 0.5) +
scale_color_brewer(palette = "Set1") +
labs(
title = "Possession Quantity vs Quality",
x = "Possession %", y = "Quality Score",
color = "Category", size = "xG/Poss"
) +
theme_minimal() +
theme(legend.position = "right")
# Efficiency comparison
p2 <- ggplot(league_data, aes(x = reorder(team, efficiency_score),
y = efficiency_score)) +
geom_bar(stat = "identity", aes(fill = possession_category)) +
coord_flip() +
labs(
title = "Possession Efficiency (Quality / Quantity)",
x = "", y = "Efficiency Score"
) +
theme_minimal()
return(list(quality_plot = p1, efficiency_plot = p2))
}
# Generate comprehensive report
generate_possession_report <- function(league_data) {
cat("\n", rep("=", 65), "\n", sep = "")
cat("LEAGUE POSSESSION QUALITY REPORT\n")
cat(rep("=", 65), "\n\n", sep = "")
cat("POSSESSION STYLE DISTRIBUTION:\n")
print(table(league_data$possession_category))
cat("\n\nTOP 5 QUALITY TEAMS (regardless of possession %):\n")
cat("-", rep("-", 50), "\n", sep = "")
league_data %>%
arrange(desc(quality_score)) %>%
head(5) %>%
select(team, possession_pct, quality_score, xg_per_possession, possession_category) %>%
print()
cat("\n\nTOP 5 MOST EFFICIENT TEAMS:\n")
cat("-", rep("-", 50), "\n", sep = "")
league_data %>%
arrange(desc(efficiency_score)) %>%
head(5) %>%
select(team, possession_pct, quality_score, efficiency_score, possession_category) %>%
print()
cat("\n\nSTERILE POSSESSION TEAMS (high possession, low quality):\n")
cat("-", rep("-", 50), "\n", sep = "")
league_data %>%
filter(possession_category == "Sterile Possession") %>%
select(team, possession_pct, quality_score, box_entry_rate, xg_per_possession) %>%
print()
cat("\n\nEFFICIENT COUNTER TEAMS (low possession, high quality):\n")
cat("-", rep("-", 50), "\n", sep = "")
league_data %>%
filter(possession_category == "Efficient Counter") %>%
select(team, possession_pct, quality_score, box_entry_rate, xg_per_possession) %>%
print()
}
# Main execution
# Load season data
comps <- FreeCompetitions()
matches <- FreeMatches(Competitions = comps %>% filter(competition_id == 11))
events <- free_allevents(MatchesDF = matches, Atea = TRUE)
events <- allclean(events)
# Run analysis
league_possession <- calculate_league_possession(events)
league_possession <- classify_possession_style(league_possession)
league_possession <- cluster_possession_styles(league_possession)
# Generate outputs
generate_possession_report(league_possession)
plots <- visualize_possession_quality(league_possession)
print(plots$quality_plot)
print(plots$efficiency_plot)Exercise 26.2: Field Tilt and Territorial Control Dashboard
Task: Build a comprehensive territorial control analysis system that calculates field tilt metrics, tracks territorial momentum over time, and visualizes spatial dominance patterns.
Requirements:
- Calculate field tilt (% of final third touches vs opponent defensive third)
- Track territorial control in 5-minute intervals throughout matches
- Create heat maps showing touch density by zone for each team
- Identify momentum shifts when territorial control changes
- Calculate correlation between field tilt and xG generated
- Generate visual dashboard with multiple territorial views
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mplsoccer import Pitch
from statsbombpy import sb
# ============================================
# FIELD TILT & TERRITORIAL CONTROL DASHBOARD
# ============================================
def calculate_field_tilt_metrics(events):
"""Calculate comprehensive field tilt metrics."""
events = events.dropna(subset=['location']).copy()
events['x'] = events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
events['y'] = events['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)
def get_zone(x):
if x > 80:
return 'final_third'
elif x > 40:
return 'middle_third'
return 'defensive_third'
events['zone'] = events['x'].apply(get_zone)
teams = events['team'].unique()
metrics = []
for team in teams:
if pd.isna(team):
continue
team_events = events[events['team'] == team]
total = len(team_events)
team_final = len(team_events[team_events['zone'] == 'final_third'])
opp_defensive = len(events[(events['team'] != team) & (events['zone'] == 'defensive_third')])
field_tilt = team_final / (team_final + opp_defensive) * 100 if (team_final + opp_defensive) > 0 else 50
metrics.append({
'team': team,
'field_tilt': field_tilt,
'final_third_pct': (team_events['zone'] == 'final_third').mean() * 100,
'middle_third_pct': (team_events['zone'] == 'middle_third').mean() * 100,
'defensive_third_pct': (team_events['zone'] == 'defensive_third').mean() * 100,
'total_touches': total
})
return pd.DataFrame(metrics)
def calculate_territorial_timeline(events, interval_minutes=5):
"""Track territorial control over time."""
events = events.dropna(subset=['location']).copy()
events['x'] = events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
events['time_bucket'] = (events['minute'] // interval_minutes) * interval_minutes
events['in_opp_half'] = events['x'] > 60
events['in_final_third'] = events['x'] > 80
timeline = (
events.groupby(['time_bucket', 'team'])
.agg({
'id': 'count',
'in_opp_half': 'sum',
'in_final_third': 'sum',
'x': 'mean'
})
.reset_index()
.rename(columns={'id': 'touches', 'in_opp_half': 'touches_opp_half',
'in_final_third': 'touches_final_third', 'x': 'avg_x'})
)
# Calculate territorial share
total_opp_half = timeline.groupby('time_bucket')['touches_opp_half'].transform('sum')
timeline['territorial_share'] = timeline['touches_opp_half'] / total_opp_half * 100
return timeline
def identify_momentum_shifts(timeline, threshold=15):
"""Identify significant momentum shifts."""
teams = timeline['team'].unique()
team_a = teams[0]
team_timeline = timeline[timeline['team'] == team_a].copy()
team_timeline = team_timeline.sort_values('time_bucket')
team_timeline['tilt_change'] = team_timeline['territorial_share'].diff()
def classify_shift(change):
if pd.isna(change):
return 'Stable'
elif change > threshold:
return 'Gained Control'
elif change < -threshold:
return 'Lost Control'
return 'Stable'
team_timeline['momentum_shift'] = team_timeline['tilt_change'].apply(classify_shift)
shifts = team_timeline[team_timeline['momentum_shift'] != 'Stable'][
['time_bucket', 'territorial_share', 'tilt_change', 'momentum_shift']
]
return shifts
def calculate_tilt_xg_correlation(events, timeline):
"""Calculate correlation between field tilt and xG."""
xg_by_period = (
events[events['type'] == 'Shot']
.dropna(subset=['shot_statsbomb_xg'])
.assign(time_bucket=lambda x: (x['minute'] // 5) * 5)
.groupby(['time_bucket', 'team'])
['shot_statsbomb_xg'].sum()
.reset_index()
.rename(columns={'shot_statsbomb_xg': 'xg'})
)
merged = timeline.merge(xg_by_period, on=['time_bucket', 'team'], how='left')
merged['xg'] = merged['xg'].fillna(0)
correlation = merged['territorial_share'].corr(merged['xg'])
return {'correlation': correlation, 'data': merged}
def visualize_territorial_control(timeline):
"""Create territorial control visualizations."""
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
teams = timeline['team'].unique()
colors = {'team_a': '#C8102E', 'team_b': '#6CABDD'}
# Area chart
pivot = timeline.pivot(index='time_bucket', columns='team', values='territorial_share').fillna(50)
pivot.plot(kind='area', stacked=True, ax=axes[0], alpha=0.8,
color=['#C8102E', '#6CABDD'])
axes[0].set_xlabel('Match Minute')
axes[0].set_ylabel('Territorial Share (%)')
axes[0].set_title('Territorial Control Over Time')
axes[0].legend(title='Team')
# Average position timeline
for i, team in enumerate(teams):
if pd.isna(team):
continue
team_data = timeline[timeline['team'] == team]
axes[1].plot(team_data['time_bucket'], team_data['avg_x'],
label=team, linewidth=2, color=['#C8102E', '#6CABDD'][i])
axes[1].axhline(y=60, linestyle='--', alpha=0.5, color='gray')
axes[1].set_xlabel('Match Minute')
axes[1].set_ylabel('Average X Position')
axes[1].set_title('Average Touch Position Over Time')
axes[1].legend()
plt.tight_layout()
plt.show()
def create_touch_heatmaps(events):
"""Create side-by-side touch heatmaps."""
teams = [t for t in events['team'].unique() if pd.notna(t)]
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
for i, team in enumerate(teams[:2]):
pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a472a')
pitch.draw(ax=axes[i])
team_events = events[events['team'] == team].copy()
team_events['x'] = team_events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
team_events['y'] = team_events['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)
valid_events = team_events.dropna(subset=['x', 'y'])
pitch.kdeplot(valid_events['x'], valid_events['y'], ax=axes[i],
cmap='plasma', fill=True, levels=50, alpha=0.7)
axes[i].set_title(f'{team} Touch Heatmap', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()
def generate_territorial_report(metrics, momentum_shifts, correlation):
"""Generate comprehensive territorial report."""
print("\n" + "=" * 65)
print("TERRITORIAL CONTROL DASHBOARD")
print("=" * 65 + "\n")
print("FIELD TILT SUMMARY:")
print("-" * 55)
print(metrics.to_string(index=False))
print("\n\nMOMENTUM SHIFTS:")
print("-" * 55)
if len(momentum_shifts) > 0:
print(momentum_shifts.to_string(index=False))
else:
print(" No significant momentum shifts detected")
print("\n\nFIELD TILT vs xG CORRELATION:")
print("-" * 55)
print(f"Correlation coefficient: {correlation['correlation']:.3f}")
if abs(correlation['correlation']) > 0.7:
interpretation = "Strong"
elif abs(correlation['correlation']) > 0.4:
interpretation = "Moderate"
else:
interpretation = "Weak"
print(f"Interpretation: {interpretation} relationship between territorial control and xG")
# Main execution
events = sb.events(match_id=3773585)
metrics = calculate_field_tilt_metrics(events)
timeline = calculate_territorial_timeline(events, interval_minutes=5)
momentum_shifts = identify_momentum_shifts(timeline)
correlation = calculate_tilt_xg_correlation(events, timeline)
generate_territorial_report(metrics, momentum_shifts, correlation)
visualize_territorial_control(timeline)
create_touch_heatmaps(events)library(tidyverse)
library(StatsBombR)
library(patchwork)
# ============================================
# FIELD TILT & TERRITORIAL CONTROL DASHBOARD
# ============================================
# Calculate comprehensive field tilt metrics
calculate_field_tilt_metrics <- function(events) {
# Extract coordinates
events <- events %>%
filter(!is.na(location.x), !is.na(location.y))
teams <- unique(events$team.name)
team_a <- teams[1]
team_b <- teams[2]
# Zone definitions
events <- events %>%
mutate(
zone = case_when(
location.x > 80 ~ "final_third",
location.x > 40 ~ "middle_third",
TRUE ~ "defensive_third"
)
)
# Calculate touches by team and zone
touch_summary <- events %>%
count(team.name, zone) %>%
pivot_wider(names_from = zone, values_from = n, values_fill = 0)
# Calculate field tilt for each team
calculate_team_tilt <- function(team_name) {
team_final <- events %>%
filter(team.name == team_name, zone == "final_third") %>%
nrow()
opp_defensive <- events %>%
filter(team.name != team_name, zone == "defensive_third") %>%
nrow()
field_tilt <- team_final / (team_final + opp_defensive) * 100
# Zone breakdown
team_touches <- events %>% filter(team.name == team_name)
total <- nrow(team_touches)
list(
team = team_name,
field_tilt = field_tilt,
final_third_pct = sum(team_touches$zone == "final_third") / total * 100,
middle_third_pct = sum(team_touches$zone == "middle_third") / total * 100,
defensive_third_pct = sum(team_touches$zone == "defensive_third") / total * 100,
total_touches = total
)
}
metrics <- map_dfr(teams, calculate_team_tilt)
return(metrics)
}
# Track territorial control over time
calculate_territorial_timeline <- function(events, interval_minutes = 5) {
events <- events %>%
filter(!is.na(location.x)) %>%
mutate(
time_bucket = floor(minute / interval_minutes) * interval_minutes,
in_opp_half = location.x > 60,
in_final_third = location.x > 80
)
teams <- unique(events$team.name)
timeline <- events %>%
group_by(time_bucket, team.name) %>%
summarise(
touches = n(),
touches_opp_half = sum(in_opp_half, na.rm = TRUE),
touches_final_third = sum(in_final_third, na.rm = TRUE),
avg_x = mean(location.x, na.rm = TRUE),
.groups = "drop"
)
# Calculate relative control per period
timeline_wide <- timeline %>%
pivot_wider(
id_cols = time_bucket,
names_from = team.name,
values_from = c(touches, touches_opp_half, touches_final_third, avg_x),
values_fill = 0
)
# Add territorial control percentage
timeline <- timeline %>%
group_by(time_bucket) %>%
mutate(
period_total_touches = sum(touches),
territorial_share = touches_opp_half / sum(touches_opp_half) * 100
) %>%
ungroup()
return(timeline)
}
# Identify momentum shifts
identify_momentum_shifts <- function(timeline, threshold = 15) {
teams <- unique(timeline$team.name)
team_a <- teams[1]
team_a_timeline <- timeline %>%
filter(team.name == team_a) %>%
arrange(time_bucket) %>%
mutate(
tilt_change = territorial_share - lag(territorial_share),
momentum_shift = case_when(
tilt_change > threshold ~ "Gained Control",
tilt_change < -threshold ~ "Lost Control",
TRUE ~ "Stable"
)
)
shifts <- team_a_timeline %>%
filter(momentum_shift != "Stable") %>%
select(time_bucket, territorial_share, tilt_change, momentum_shift)
return(shifts)
}
# Calculate field tilt vs xG correlation
calculate_tilt_xg_correlation <- function(events, timeline) {
# Calculate xG per time bucket
xg_by_period <- events %>%
filter(type.name == "Shot", !is.na(shot.statsbomb_xg)) %>%
mutate(time_bucket = floor(minute / 5) * 5) %>%
group_by(time_bucket, team.name) %>%
summarise(xg = sum(shot.statsbomb_xg, na.rm = TRUE), .groups = "drop")
# Join with territorial data
merged <- timeline %>%
left_join(xg_by_period, by = c("time_bucket", "team.name")) %>%
mutate(xg = replace_na(xg, 0))
# Correlation
correlation <- cor(merged$territorial_share, merged$xg, use = "complete.obs")
return(list(
correlation = correlation,
data = merged
))
}
# Visualize territorial control
visualize_territorial_control <- function(timeline, team_colors = NULL) {
if (is.null(team_colors)) {
teams <- unique(timeline$team.name)
team_colors <- c("#C8102E", "#6CABDD")
names(team_colors) <- teams
}
# Area chart
p1 <- ggplot(timeline, aes(x = time_bucket, y = territorial_share,
fill = team.name)) +
geom_area(position = "fill", alpha = 0.8) +
scale_fill_manual(values = team_colors) +
scale_y_continuous(labels = scales::percent) +
labs(
title = "Territorial Control Over Time",
x = "Match Minute", y = "Share of Opponent Half Touches",
fill = "Team"
) +
theme_minimal()
# Average position timeline
p2 <- ggplot(timeline, aes(x = time_bucket, y = avg_x, color = team.name)) +
geom_line(linewidth = 1.2) +
geom_hline(yintercept = 60, linetype = "dashed", alpha = 0.5) +
scale_color_manual(values = team_colors) +
labs(
title = "Average Touch Position Over Time",
x = "Match Minute", y = "Average X Position",
color = "Team"
) +
theme_minimal()
return(p1 / p2)
}
# Create touch heatmaps
create_touch_heatmaps <- function(events) {
teams <- unique(events$team.name)
plots <- map(teams, function(team) {
team_events <- events %>% filter(team.name == team)
ggplot(team_events, aes(x = location.x, y = location.y)) +
annotate("rect", xmin = 0, xmax = 120, ymin = 0, ymax = 80,
fill = "#228B22", alpha = 0.3) +
stat_density_2d(aes(fill = ..level..), geom = "polygon",
alpha = 0.6, bins = 15) +
scale_fill_viridis_c(option = "plasma") +
labs(title = paste(team, "Touch Heatmap")) +
coord_fixed(xlim = c(0, 120), ylim = c(0, 80)) +
theme_void() +
theme(legend.position = "none")
})
return(plots[[1]] + plots[[2]])
}
# Generate comprehensive report
generate_territorial_report <- function(events, metrics, timeline, momentum_shifts, correlation) {
cat("\n", rep("=", 65), "\n", sep = "")
cat("TERRITORIAL CONTROL DASHBOARD\n")
cat(rep("=", 65), "\n\n", sep = "")
cat("FIELD TILT SUMMARY:\n")
cat("-", rep("-", 50), "\n", sep = "")
print(metrics)
cat("\n\nMOMENTUM SHIFTS:\n")
cat("-", rep("-", 50), "\n", sep = "")
if (nrow(momentum_shifts) > 0) {
print(momentum_shifts)
} else {
cat(" No significant momentum shifts detected\n")
}
cat("\n\nFIELD TILT vs xG CORRELATION:\n")
cat("-", rep("-", 50), "\n", sep = "")
cat("Correlation coefficient:", round(correlation$correlation, 3), "\n")
interpretation <- case_when(
abs(correlation$correlation) > 0.7 ~ "Strong",
abs(correlation$correlation) > 0.4 ~ "Moderate",
TRUE ~ "Weak"
)
cat("Interpretation:", interpretation, "relationship between territorial control and xG\n")
}
# Main execution
events <- get.matchFree(match_id)
metrics <- calculate_field_tilt_metrics(events)
timeline <- calculate_territorial_timeline(events, interval_minutes = 5)
momentum_shifts <- identify_momentum_shifts(timeline)
correlation <- calculate_tilt_xg_correlation(events, timeline)
generate_territorial_report(events, metrics, timeline, momentum_shifts, correlation)
print(visualize_territorial_control(timeline))
print(create_touch_heatmaps(events))Exercise 26.3: Possession Value Model and Player Ranking System
Task: Build a complete possession value model from season data that assigns value to pitch locations based on scoring probability, then use it to calculate possession value added for each player action.
Requirements:
- Create a grid-based possession value model using historical shot data
- Calculate possession value for each grid cell based on shot probability and expected xG
- Apply the model to calculate PV added for passes, carries, and dribbles
- Rank players by total possession value added per 90 minutes
- Visualize the possession value grid on a pitch
- Identify which positions and action types generate the most value
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mplsoccer import Pitch
from statsbombpy import sb
# ============================================
# POSSESSION VALUE MODEL & PLAYER RANKING
# ============================================
def build_pv_model(events, grid_size=10):
"""Build possession value model from event data."""
events = events.copy()
# Add possession sequences
events = events.sort_values(['match_id', 'index'])
events['new_poss'] = events.groupby('match_id')['team'].apply(
lambda x: x != x.shift(1)
).values
events['possession_id'] = events.groupby('match_id')['new_poss'].cumsum().values
# Extract coordinates
def get_coord(loc, idx):
return loc[idx] if isinstance(loc, list) and len(loc) > idx else np.nan
events['x'] = events['location'].apply(lambda loc: get_coord(loc, 0))
events['y'] = events['location'].apply(lambda loc: get_coord(loc, 1))
events = events.dropna(subset=['x', 'y'])
# Grid locations
events['grid_x'] = (events['x'] // grid_size) * grid_size
events['grid_y'] = (events['y'] // grid_size) * grid_size
# Calculate metrics per grid cell
grid_metrics = (
events.groupby(['match_id', 'possession_id', 'grid_x', 'grid_y'])
.agg({
'type': lambda x: (x == 'Shot').any(),
'shot_statsbomb_xg': 'max',
'shot_outcome': lambda x: (x == 'Goal').any()
})
.reset_index()
.rename(columns={'type': 'ended_in_shot', 'shot_outcome': 'ended_in_goal'})
)
# Aggregate by grid cell
pv_model = (
grid_metrics.groupby(['grid_x', 'grid_y'])
.agg({
'ended_in_shot': ['count', 'mean'],
'shot_statsbomb_xg': 'mean',
'ended_in_goal': 'mean'
})
.reset_index()
)
pv_model.columns = ['grid_x', 'grid_y', 'n_possessions', 'shot_probability',
'avg_xg_when_shot', 'goal_probability']
# Filter for minimum sample
pv_model = pv_model[pv_model['n_possessions'] >= 20]
# Calculate possession value
pv_model['possession_value'] = (
pv_model['shot_probability'] *
pv_model['avg_xg_when_shot'].fillna(0.08)
)
return pv_model
def calculate_pv_added(events, pv_model, grid_size=10):
"""Calculate possession value added for each action."""
events = events.copy()
def get_coord(loc, idx):
return loc[idx] if isinstance(loc, list) and len(loc) > idx else np.nan
events['x'] = events['location'].apply(lambda loc: get_coord(loc, 0))
events['y'] = events['location'].apply(lambda loc: get_coord(loc, 1))
events['grid_x'] = (events['x'] // grid_size) * grid_size
events['grid_y'] = (events['y'] // grid_size) * grid_size
# End locations
events['end_x'] = events['pass_end_location'].apply(lambda loc: get_coord(loc, 0))
events['end_y'] = events['pass_end_location'].apply(lambda loc: get_coord(loc, 1))
# Fill carry end locations
if 'carry_end_location' in events.columns:
events['end_x'] = events['end_x'].fillna(
events['carry_end_location'].apply(lambda loc: get_coord(loc, 0))
)
events['end_y'] = events['end_y'].fillna(
events['carry_end_location'].apply(lambda loc: get_coord(loc, 1))
)
events['end_x'] = events['end_x'].fillna(events['x'])
events['end_y'] = events['end_y'].fillna(events['y'])
events['end_grid_x'] = (events['end_x'] // grid_size) * grid_size
events['end_grid_y'] = (events['end_y'] // grid_size) * grid_size
# Merge with PV model
events = events.merge(
pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
columns={'possession_value': 'start_pv'}
),
on=['grid_x', 'grid_y'],
how='left'
)
events = events.merge(
pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
columns={'grid_x': 'end_grid_x', 'grid_y': 'end_grid_y',
'possession_value': 'end_pv'}
),
on=['end_grid_x', 'end_grid_y'],
how='left'
)
events['pv_added'] = events['end_pv'] - events['start_pv']
# Nullify for failed actions
failed_pass = (events['type'] == 'Pass') & events['pass_outcome'].notna()
failed_dribble = (events['type'] == 'Dribble') & (events['dribble_outcome'] != 'Complete')
events.loc[failed_pass | failed_dribble, 'pv_added'] = np.nan
return events
def calculate_player_pv_rankings(events_with_pv):
"""Calculate player rankings by possession value added."""
actions = events_with_pv[
events_with_pv['type'].isin(['Pass', 'Carry', 'Dribble']) &
events_with_pv['pv_added'].notna()
]
# Approximate minutes
player_minutes = (
events_with_pv.groupby(['player', 'team', 'position'])
.agg({'minute': ['min', 'max']})
.reset_index()
)
player_minutes.columns = ['player', 'team', 'position', 'first_event', 'last_event']
player_minutes['minutes_approx'] = player_minutes['last_event'] - player_minutes['first_event']
player_minutes = player_minutes[player_minutes['minutes_approx'] > 10]
# Aggregate by player
player_pv = (
actions.groupby(['player', 'team', 'position'])
.agg({
'id': 'count',
'pv_added': ['sum', 'mean', lambda x: (x > 0).sum(), lambda x: (x < 0).sum()]
})
.reset_index()
)
player_pv.columns = ['player', 'team', 'position', 'total_actions',
'total_pv_added', 'avg_pv_per_action',
'positive_actions', 'negative_actions']
player_pv = player_pv.merge(
player_minutes[['player', 'team', 'position', 'minutes_approx']],
on=['player', 'team', 'position'],
how='left'
)
player_pv['pv_per_90'] = player_pv['total_pv_added'] / (player_pv['minutes_approx'] / 90)
player_pv['positive_ratio'] = player_pv['positive_actions'] / player_pv['total_actions'] * 100
return player_pv.sort_values('total_pv_added', ascending=False)
def analyze_pv_by_category(events_with_pv, player_pv):
"""Analyze PV by position and action type."""
# By position
position_pv = (
player_pv[player_pv['position'].notna()]
.groupby('position')
.agg({
'player': 'count',
'total_pv_added': 'mean',
'pv_per_90': 'mean',
'positive_ratio': 'mean'
})
.reset_index()
.rename(columns={'player': 'n_players', 'total_pv_added': 'avg_total_pv',
'pv_per_90': 'avg_pv_per_90', 'positive_ratio': 'avg_positive_ratio'})
.sort_values('avg_pv_per_90', ascending=False)
)
# By action type
actions = events_with_pv[
events_with_pv['type'].isin(['Pass', 'Carry', 'Dribble']) &
events_with_pv['pv_added'].notna()
]
action_pv = (
actions.groupby('type')
.agg({
'id': 'count',
'pv_added': ['sum', 'mean', lambda x: (x > 0).mean() * 100]
})
.reset_index()
)
action_pv.columns = ['action_type', 'n_actions', 'total_pv', 'avg_pv', 'positive_pct']
return {'by_position': position_pv, 'by_action': action_pv}
def visualize_pv_grid(pv_model):
"""Visualize possession value grid on pitch."""
pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a1a1a')
fig, ax = pitch.draw(figsize=(14, 9))
scatter = ax.scatter(
pv_model['grid_x'] + 5,
pv_model['grid_y'] + 5,
c=pv_model['possession_value'],
s=300,
cmap='plasma',
alpha=0.8
)
plt.colorbar(scatter, ax=ax, label='Possession Value')
ax.set_title('Possession Value Model\n(Based on shot probability × expected xG)',
fontsize=14, fontweight='bold', color='white')
plt.tight_layout()
plt.show()
def generate_pv_report(pv_model, player_pv, category_analysis):
"""Generate comprehensive PV report."""
print("\n" + "=" * 70)
print("POSSESSION VALUE ANALYSIS REPORT")
print("=" * 70 + "\n")
print("PV MODEL SUMMARY:")
print("-" * 55)
print(f"Grid cells with data: {len(pv_model)}")
print(f"Max possession value: {pv_model['possession_value'].max():.4f}")
print(f"Avg possession value: {pv_model['possession_value'].mean():.4f}")
print("\n\nTOP 10 PLAYERS BY TOTAL PV ADDED:")
print("-" * 55)
print(player_pv.head(10)[['player', 'position', 'total_pv_added', 'pv_per_90', 'positive_ratio']].to_string(index=False))
print("\n\nTOP 10 PLAYERS BY PV PER 90:")
print("-" * 55)
qualified = player_pv[player_pv['minutes_approx'] >= 45].nlargest(10, 'pv_per_90')
print(qualified[['player', 'position', 'pv_per_90', 'total_actions', 'positive_ratio']].to_string(index=False))
print("\n\nPV BY POSITION:")
print("-" * 55)
print(category_analysis['by_position'].to_string(index=False))
print("\n\nPV BY ACTION TYPE:")
print("-" * 55)
print(category_analysis['by_action'].to_string(index=False))
# Main execution
matches = sb.matches(competition_id=11, season_id=90)
events = pd.concat([sb.events(match_id=m) for m in matches['match_id'].head(20)])
pv_model = build_pv_model(events, grid_size=10)
events_with_pv = calculate_pv_added(events, pv_model)
player_pv = calculate_player_pv_rankings(events_with_pv)
category_analysis = analyze_pv_by_category(events_with_pv, player_pv)
generate_pv_report(pv_model, player_pv, category_analysis)
visualize_pv_grid(pv_model)library(tidyverse)
library(StatsBombR)
library(viridis)
# ============================================
# POSSESSION VALUE MODEL & PLAYER RANKING
# ============================================
# Build possession value model
build_pv_model <- function(events, grid_size = 10) {
# Add possession sequences
events <- events %>%
arrange(match_id, index) %>%
group_by(match_id) %>%
mutate(
new_poss = team.name != lag(team.name) | is.na(lag(team.name)),
possession_id = cumsum(new_poss)
) %>%
ungroup()
# Create grid locations
events <- events %>%
filter(!is.na(location.x), !is.na(location.y)) %>%
mutate(
grid_x = floor(location.x / grid_size) * grid_size,
grid_y = floor(location.y / grid_size) * grid_size
)
# For each grid cell, calculate metrics
grid_metrics <- events %>%
group_by(match_id, possession_id, grid_x, grid_y) %>%
summarise(
ended_in_shot = any(type.name == "Shot"),
max_xg = max(shot.statsbomb_xg, na.rm = TRUE),
ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
.groups = "drop"
) %>%
mutate(max_xg = ifelse(is.infinite(max_xg), NA, max_xg))
# Aggregate by grid cell
pv_model <- grid_metrics %>%
group_by(grid_x, grid_y) %>%
summarise(
n_possessions = n(),
shot_probability = mean(ended_in_shot, na.rm = TRUE),
avg_xg_when_shot = mean(max_xg, na.rm = TRUE),
goal_probability = mean(ended_in_goal, na.rm = TRUE),
.groups = "drop"
) %>%
filter(n_possessions >= 20) %>% # Minimum sample size
mutate(
# Possession value = probability of shot * expected xG
possession_value = shot_probability * coalesce(avg_xg_when_shot, 0.08),
# Alternative: direct goal probability
pv_goal_based = goal_probability
)
return(pv_model)
}
# Calculate possession value added for each action
calculate_pv_added <- function(events, pv_model, grid_size = 10) {
events <- events %>%
filter(!is.na(location.x)) %>%
mutate(
grid_x = floor(location.x / grid_size) * grid_size,
grid_y = floor(location.y / grid_size) * grid_size,
end_x = coalesce(pass.end_location.x, carry.end_location.x, location.x),
end_y = coalesce(pass.end_location.y, carry.end_location.y, location.y),
end_grid_x = floor(end_x / grid_size) * grid_size,
end_grid_y = floor(end_y / grid_size) * grid_size
)
# Join with PV model
events <- events %>%
left_join(
pv_model %>% select(grid_x, grid_y, start_pv = possession_value),
by = c("grid_x", "grid_y")
) %>%
left_join(
pv_model %>% select(end_grid_x = grid_x, end_grid_y = grid_y, end_pv = possession_value),
by = c("end_grid_x", "end_grid_y")
) %>%
mutate(
# PV added = end value - start value
pv_added = end_pv - start_pv,
# Only for successful actions
pv_added = case_when(
type.name == "Pass" & !is.na(pass.outcome.name) ~ NA_real_, # Failed pass
type.name == "Dribble" & dribble.outcome.name != "Complete" ~ NA_real_,
TRUE ~ pv_added
)
)
return(events)
}
# Calculate player rankings
calculate_player_pv_rankings <- function(events_with_pv) {
# Filter to relevant actions
actions <- events_with_pv %>%
filter(type.name %in% c("Pass", "Carry", "Dribble")) %>%
filter(!is.na(pv_added))
# Calculate minutes played (approximate)
player_minutes <- events_with_pv %>%
group_by(player.name, team.name, position.name) %>%
summarise(
first_event = min(minute),
last_event = max(minute),
minutes_approx = last_event - first_event,
.groups = "drop"
) %>%
filter(minutes_approx > 10)
# Aggregate by player
player_pv <- actions %>%
group_by(player.name, team.name, position.name) %>%
summarise(
total_actions = n(),
total_pv_added = sum(pv_added, na.rm = TRUE),
avg_pv_per_action = mean(pv_added, na.rm = TRUE),
positive_actions = sum(pv_added > 0, na.rm = TRUE),
negative_actions = sum(pv_added < 0, na.rm = TRUE),
# Breakdown by action type
pass_pv = sum(pv_added[type.name == "Pass"], na.rm = TRUE),
carry_pv = sum(pv_added[type.name == "Carry"], na.rm = TRUE),
dribble_pv = sum(pv_added[type.name == "Dribble"], na.rm = TRUE),
.groups = "drop"
) %>%
left_join(player_minutes, by = c("player.name", "team.name", "position.name")) %>%
mutate(
pv_per_90 = total_pv_added / (minutes_approx / 90),
positive_ratio = positive_actions / total_actions * 100
) %>%
arrange(desc(total_pv_added))
return(player_pv)
}
# Analyze PV by position and action type
analyze_pv_by_category <- function(events_with_pv, player_pv) {
# By position
position_pv <- player_pv %>%
filter(!is.na(position.name)) %>%
group_by(position.name) %>%
summarise(
n_players = n(),
avg_total_pv = mean(total_pv_added, na.rm = TRUE),
avg_pv_per_90 = mean(pv_per_90, na.rm = TRUE),
avg_positive_ratio = mean(positive_ratio, na.rm = TRUE),
.groups = "drop"
) %>%
arrange(desc(avg_pv_per_90))
# By action type
action_pv <- events_with_pv %>%
filter(type.name %in% c("Pass", "Carry", "Dribble"), !is.na(pv_added)) %>%
group_by(type.name) %>%
summarise(
n_actions = n(),
total_pv = sum(pv_added, na.rm = TRUE),
avg_pv = mean(pv_added, na.rm = TRUE),
positive_pct = mean(pv_added > 0, na.rm = TRUE) * 100,
.groups = "drop"
)
return(list(
by_position = position_pv,
by_action = action_pv
))
}
# Visualize PV model grid
visualize_pv_grid <- function(pv_model) {
ggplot(pv_model, aes(x = grid_x + 5, y = grid_y + 5, fill = possession_value)) +
# Pitch outline
annotate("rect", xmin = 0, xmax = 120, ymin = 0, ymax = 80,
fill = NA, color = "white", size = 1) +
# Center line
annotate("segment", x = 60, xend = 60, y = 0, yend = 80, color = "white") +
# Penalty areas
annotate("rect", xmin = 0, xmax = 18, ymin = 18, ymax = 62,
fill = NA, color = "white") +
annotate("rect", xmin = 102, xmax = 120, ymin = 18, ymax = 62,
fill = NA, color = "white") +
# PV tiles
geom_tile(alpha = 0.8) +
scale_fill_viridis(option = "plasma", name = "Possession\nValue") +
labs(
title = "Possession Value Model",
subtitle = "Value based on probability of shot × expected xG from location"
) +
coord_fixed(xlim = c(0, 120), ylim = c(0, 80)) +
theme_void() +
theme(
plot.title = element_text(hjust = 0.5, face = "bold"),
plot.subtitle = element_text(hjust = 0.5),
legend.position = "right"
)
}
# Generate comprehensive report
generate_pv_report <- function(pv_model, player_pv, category_analysis) {
cat("\n", rep("=", 70), "\n", sep = "")
cat("POSSESSION VALUE ANALYSIS REPORT\n")
cat(rep("=", 70), "\n\n", sep = "")
cat("PV MODEL SUMMARY:\n")
cat("-", rep("-", 50), "\n", sep = "")
cat("Grid cells with data:", nrow(pv_model), "\n")
cat("Max possession value:", round(max(pv_model$possession_value), 4), "\n")
cat("Avg possession value:", round(mean(pv_model$possession_value), 4), "\n")
cat("\n\nTOP 10 PLAYERS BY TOTAL PV ADDED:\n")
cat("-", rep("-", 50), "\n", sep = "")
player_pv %>%
head(10) %>%
select(player.name, position.name, total_pv_added, pv_per_90, positive_ratio) %>%
mutate(across(where(is.numeric), ~round(., 3))) %>%
print()
cat("\n\nTOP 10 PLAYERS BY PV PER 90:\n")
cat("-", rep("-", 50), "\n", sep = "")
player_pv %>%
filter(minutes_approx >= 45) %>%
arrange(desc(pv_per_90)) %>%
head(10) %>%
select(player.name, position.name, pv_per_90, total_actions, positive_ratio) %>%
mutate(across(where(is.numeric), ~round(., 3))) %>%
print()
cat("\n\nPV BY POSITION:\n")
cat("-", rep("-", 50), "\n", sep = "")
print(category_analysis$by_position)
cat("\n\nPV BY ACTION TYPE:\n")
cat("-", rep("-", 50), "\n", sep = "")
print(category_analysis$by_action)
}
# Main execution
# Load season data
comps <- FreeCompetitions()
matches <- FreeMatches(Competitions = comps %>% filter(competition_id == 11))
events <- free_allevents(MatchesDF = matches, Atea = TRUE)
events <- allclean(events)
# Build model and calculate
pv_model <- build_pv_model(events, grid_size = 10)
events_with_pv <- calculate_pv_added(events, pv_model)
player_pv <- calculate_player_pv_rankings(events_with_pv)
category_analysis <- analyze_pv_by_category(events_with_pv, player_pv)
# Generate outputs
generate_pv_report(pv_model, player_pv, category_analysis)
print(visualize_pv_grid(pv_model))Summary
Key Takeaways
- Possession percentage alone is misleading - quality metrics like xG per possession and box entry rate matter more
- Field tilt measures territorial dominance and can differ significantly from raw possession
- Ball retention varies by zone, with retention under pressure being a key differentiator
- Progressive sequences move the ball meaningfully toward goal and indicate attacking intent
- Possession value models assign value to locations based on scoring probability, enabling action-level valuation