Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)

Possession has long been considered a key indicator of team dominance, but raw possession percentage tells only part of the story. Modern possession analytics focuses on the quality, location, and purpose of possession to understand which teams truly control matches.

Beyond Possession Percentage

Raw possession percentage is one of the most misleading statistics in football. A team can have 70% possession while creating fewer chances than their opponent. Quality metrics reveal the true value of possession.

Possession Quality Metrics
  • Final Third Possession %
  • Box Entries per possession
  • Progressive Passes per sequence
  • Shots per possession sequence
  • xG per Possession
Low-Value Possession Signs
  • High possession in own third
  • Low progressive pass rate
  • Few box entries
  • Backward/sideways passing dominance
  • Low shots per possession
possession_quality.py
import pandas as pd
import numpy as np
from statsbombpy import sb

# Load match data
events = sb.events(match_id=3773585)

def identify_possession_sequences(events):
    """
    Identify distinct possession sequences.
    """
    events = events.sort_values('index').copy()

    # New possession when team changes
    events['new_possession'] = events['team'] != events['team'].shift(1)
    events['possession_id'] = events['new_possession'].cumsum()

    # Add sequence metadata
    sequence_info = (
        events
        .groupby('possession_id')
        .agg({
            'team': 'first',
            'index': 'count',
            'minute': ['min', 'max'],
            'second': ['min', 'max']
        })
        .reset_index()
    )
    sequence_info.columns = [
        'possession_id', 'possession_team', 'sequence_length',
        'start_min', 'end_min', 'start_sec', 'end_sec'
    ]

    events = events.merge(
        sequence_info[['possession_id', 'possession_team', 'sequence_length']],
        on='possession_id'
    )

    return events

events_with_sequences = identify_possession_sequences(events)

def calculate_possession_quality(events, team_name):
    """
    Calculate possession quality metrics for a team.
    """
    team_events = events[events['possession_team'] == team_name]

    # Extract locations
    team_events['x'] = team_events['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    team_events['y'] = team_events['location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    )

    # Calculate per-possession metrics
    possession_metrics = []

    for poss_id in team_events['possession_id'].unique():
        poss = team_events[team_events['possession_id'] == poss_id]

        metrics = {
            'possession_id': poss_id,
            'sequence_length': len(poss),
            'start_x': poss['x'].iloc[0] if len(poss) > 0 else np.nan,
            'max_x': poss['x'].max(),

            # Territory
            'entered_final_third': (poss['x'] > 80).any(),
            'entered_box': ((poss['x'] > 102) & (poss['y'] > 18) & (poss['y'] < 62)).any(),

            # Passes
            'total_passes': (poss['type'] == 'Pass').sum(),
            'successful_passes': (
                (poss['type'] == 'Pass') &
                (poss['pass_outcome'].isna())
            ).sum(),

            # Outcomes
            'ended_in_shot': (poss['type'] == 'Shot').any(),
            'xg_generated': poss['shot_statsbomb_xg'].sum(),
            'ended_in_goal': (
                (poss['type'] == 'Shot') &
                (poss['shot_outcome'] == 'Goal')
            ).any()
        }

        possession_metrics.append(metrics)

    poss_df = pd.DataFrame(possession_metrics)

    # Aggregate summary
    summary = {
        'total_possessions': len(poss_df),
        'avg_sequence_length': poss_df['sequence_length'].mean(),
        'final_third_entry_rate': poss_df['entered_final_third'].mean() * 100,
        'box_entry_rate': poss_df['entered_box'].mean() * 100,
        'pass_completion': (
            poss_df['successful_passes'].sum() /
            poss_df['total_passes'].sum() * 100
        ),
        'shot_rate': poss_df['ended_in_shot'].mean() * 100,
        'xg_per_possession': poss_df['xg_generated'].mean(),
        'goal_rate': poss_df['ended_in_goal'].mean() * 100
    }

    return {
        'sequences': poss_df,
        'summary': summary
    }

# Calculate for both teams
liverpool_quality = calculate_possession_quality(events_with_sequences, 'Liverpool')
city_quality = calculate_possession_quality(events_with_sequences, 'Manchester City')

print("Liverpool Possession Quality:")
print(pd.Series(liverpool_quality['summary']))
print("\nManchester City Possession Quality:")
print(pd.Series(city_quality['summary']))
library(tidyverse)
library(StatsBombR)

# Load match data
events <- get.matchFree(match_id)

# Calculate possession sequences
identify_possession_sequences <- function(events) {
  events %>%
    arrange(index) %>%
    mutate(
      # New possession when team changes
      new_possession = team.name != lag(team.name) | is.na(lag(team.name)),
      possession_id = cumsum(new_possession)
    ) %>%
    group_by(possession_id) %>%
    mutate(
      possession_team = first(team.name),
      sequence_length = n(),
      sequence_duration = max(minute * 60 + second) - min(minute * 60 + second)
    ) %>%
    ungroup()
}

events_with_sequences <- identify_possession_sequences(events)

# Calculate possession quality metrics
calculate_possession_quality <- function(events, team_name) {

  team_possessions <- events %>%
    filter(possession_team == team_name) %>%
    group_by(possession_id) %>%
    summarise(
      # Basic metrics
      sequence_length = max(sequence_length),
      duration_seconds = max(sequence_duration),

      # Location metrics
      start_x = first(location.x),
      end_x = last(location.x),
      max_x = max(location.x, na.rm = TRUE),
      entered_final_third = any(location.x > 80),
      entered_box = any(location.x > 102 & location.y > 18 & location.y < 62),

      # Pass metrics
      total_passes = sum(type.name == "Pass"),
      successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
      progressive_passes = sum(
        type.name == "Pass" &
        (120 - location.x) - (120 - pass.end_location.x) > 10,
        na.rm = TRUE
      ),

      # Outcome metrics
      ended_in_shot = any(type.name == "Shot"),
      xg_generated = sum(shot.statsbomb_xg, na.rm = TRUE),
      ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),

      .groups = "drop"
    )

  # Aggregate quality metrics
  quality_summary <- team_possessions %>%
    summarise(
      total_possessions = n(),
      avg_sequence_length = mean(sequence_length),
      avg_duration = mean(duration_seconds),

      # Territorial
      final_third_entry_rate = mean(entered_final_third) * 100,
      box_entry_rate = mean(entered_box) * 100,

      # Passing quality
      pass_completion = sum(successful_passes) / sum(total_passes) * 100,
      progressive_pass_rate = sum(progressive_passes) / sum(total_passes) * 100,

      # Outcome quality
      shot_rate = mean(ended_in_shot) * 100,
      xg_per_possession = mean(xg_generated),
      goal_rate = mean(ended_in_goal) * 100
    )

  return(list(
    sequences = team_possessions,
    summary = quality_summary
  ))
}

# Calculate for both teams
home_quality <- calculate_possession_quality(events_with_sequences, "Liverpool")
away_quality <- calculate_possession_quality(events_with_sequences, "Manchester City")

# Compare quality
comparison <- bind_rows(
  home_quality$summary %>% mutate(team = "Liverpool"),
  away_quality$summary %>% mutate(team = "Manchester City")
)

print(comparison)

Field Tilt & Territorial Dominance

Field tilt measures where on the pitch the action is happening. A team can have lower possession but higher field tilt if they're winning the ball high and keeping play in the opponent's half.

field_tilt.py
def calculate_field_tilt(events, team_name):
    """
    Calculate field tilt and territorial dominance.
    """
    # Get all touches
    touch_types = ['Pass', 'Carry', 'Dribble', 'Shot', 'Ball Receipt']
    all_touches = events[events['type'].isin(touch_types)].copy()

    all_touches['x'] = all_touches['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    team_touches = all_touches[all_touches['team'] == team_name]
    opp_touches = all_touches[all_touches['team'] != team_name]

    # Field tilt calculation
    team_final_third = (team_touches['x'] > 80).sum()
    opp_defensive_third = (opp_touches['x'] < 40).sum()

    field_tilt = (
        team_final_third / (team_final_third + opp_defensive_third) * 100
        if (team_final_third + opp_defensive_third) > 0 else 50
    )

    # Zone breakdown
    team_touches['zone'] = pd.cut(
        team_touches['x'],
        bins=[0, 40, 80, 120],
        labels=['Defensive Third', 'Middle Third', 'Final Third']
    )
    zone_breakdown = team_touches['zone'].value_counts(normalize=True) * 100

    # Timeline
    all_touches['minute_bucket'] = (all_touches['minute'] // 5) * 5
    all_touches['in_opp_half'] = all_touches['x'] > 60

    timeline = (
        all_touches
        .groupby(['minute_bucket', 'team'])
        .agg({
            'id': 'count',
            'in_opp_half': 'sum'
        })
        .reset_index()
        .rename(columns={'id': 'touches', 'in_opp_half': 'touches_in_opp_half'})
    )

    return {
        'field_tilt': field_tilt,
        'zone_breakdown': zone_breakdown,
        'timeline': timeline
    }

liverpool_tilt = calculate_field_tilt(events, 'Liverpool')
city_tilt = calculate_field_tilt(events, 'Manchester City')

print(f"Liverpool Field Tilt: {liverpool_tilt['field_tilt']:.1f}%")
print(f"Man City Field Tilt: {city_tilt['field_tilt']:.1f}%")

def plot_field_tilt_timeline(timeline):
    """
    Visualize territorial control over time.
    """
    fig, ax = plt.subplots(figsize=(12, 6))

    # Pivot for stacked area
    pivot = timeline.pivot(
        index='minute_bucket',
        columns='team',
        values='touches_in_opp_half'
    ).fillna(0)

    # Normalize to percentages
    pivot_pct = pivot.div(pivot.sum(axis=1), axis=0) * 100

    pivot_pct.plot(kind='area', stacked=True, ax=ax, alpha=0.7,
                   color=['#C8102E', '#6CABDD'])

    ax.set_xlabel('Match Minute')
    ax.set_ylabel('Share of Touches in Opponent Half (%)')
    ax.set_title('Territorial Control Over Time')
    ax.legend(title='Team')
    plt.tight_layout()
    plt.show()

plot_field_tilt_timeline(liverpool_tilt['timeline'])
# Calculate field tilt metrics
calculate_field_tilt <- function(events, team_name) {

  # All touches by both teams
  all_touches <- events %>%
    filter(type.name %in% c("Pass", "Carry", "Dribble", "Shot", "Ball Receipt*"))

  # Field tilt = % of touches in opponent's third
  team_touches <- all_touches %>% filter(team.name == team_name)
  opp_touches <- all_touches %>% filter(team.name != team_name)

  # Calculate by zone
  team_zone_touches <- team_touches %>%
    mutate(
      zone = case_when(
        location.x > 80 ~ "Final Third",
        location.x > 40 ~ "Middle Third",
        TRUE ~ "Defensive Third"
      )
    ) %>%
    count(zone) %>%
    mutate(pct = n / sum(n) * 100)

  # Field tilt calculation
  # Touches in final third / (touches in final third + opp touches in their final third)
  team_final_third <- sum(team_touches$location.x > 80, na.rm = TRUE)
  opp_defensive_third <- sum(opp_touches$location.x < 40, na.rm = TRUE)

  field_tilt <- team_final_third / (team_final_third + opp_defensive_third) * 100

  # Territorial control by minute
  territorial_timeline <- events %>%
    mutate(
      minute_bucket = floor(minute / 5) * 5,
      in_opp_half = location.x > 60
    ) %>%
    group_by(minute_bucket, team.name) %>%
    summarise(
      touches = n(),
      touches_in_opp_half = sum(in_opp_half, na.rm = TRUE),
      .groups = "drop"
    )

  return(list(
    field_tilt = field_tilt,
    zone_breakdown = team_zone_touches,
    timeline = territorial_timeline
  ))
}

# Calculate field tilt
liverpool_tilt <- calculate_field_tilt(events, "Liverpool")
city_tilt <- calculate_field_tilt(events, "Manchester City")

cat("Liverpool Field Tilt:", round(liverpool_tilt$field_tilt, 1), "%\n")
cat("Man City Field Tilt:", round(city_tilt$field_tilt, 1), "%\n")

# Visualize territorial control over time
ggplot(liverpool_tilt$timeline, aes(x = minute_bucket, y = touches_in_opp_half,
                                     fill = team.name)) +
  geom_area(position = "fill", alpha = 0.7) +
  scale_fill_manual(values = c("Liverpool" = "#C8102E", "Manchester City" = "#6CABDD")) +
  labs(
    title = "Territorial Control Over Time",
    x = "Match Minute", y = "Share of Touches in Opponent Half",
    fill = "Team"
  ) +
  theme_minimal()

# Heatmap of touch locations
library(ggsoccer)

ggplot(events %>% filter(team.name == "Liverpool"), aes(x = location.x, y = location.y)) +
  annotate_pitch(dimensions = pitch_statsbomb) +
  stat_density_2d(aes(fill = ..level..), geom = "polygon", alpha = 0.6) +
  scale_fill_viridis_c(option = "plasma") +
  labs(title = "Liverpool Touch Heatmap") +
  theme_pitch() +
  coord_flip()

Ball Retention & Recycling

Ball retention measures a team's ability to keep possession under pressure and recycle the ball to create new attacking opportunities.

ball_retention.py
def analyze_ball_retention(events, team_name):
    """
    Analyze ball retention and recycling patterns.
    """
    team_events = events[events['team'] == team_name].copy()

    team_events['x'] = team_events['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    # Identify possession losses
    loss_conditions = (
        ((team_events['type'] == 'Pass') & (team_events['pass_outcome'].notna())) |
        ((team_events['type'] == 'Dribble') & (team_events['dribble_outcome'] != 'Complete')) |
        (team_events['type'] == 'Dispossessed') |
        (team_events['type'] == 'Miscontrol')
    )

    losses = team_events[loss_conditions].copy()
    losses['zone'] = pd.cut(
        losses['x'],
        bins=[0, 40, 80, 120],
        labels=['Defensive Third', 'Middle Third', 'Final Third']
    )

    # Retention by zone
    all_actions = team_events[team_events['type'].isin(['Pass', 'Dribble', 'Carry'])].copy()
    all_actions['zone'] = pd.cut(
        all_actions['x'],
        bins=[0, 40, 80, 120],
        labels=['Defensive Third', 'Middle Third', 'Final Third']
    )

    # Count losses per zone
    zone_losses = losses.groupby('zone').size()
    zone_total = all_actions.groupby('zone').size()

    retention_by_zone = pd.DataFrame({
        'total_actions': zone_total,
        'losses': zone_losses
    }).fillna(0)
    retention_by_zone['retention_rate'] = (
        (1 - retention_by_zone['losses'] / retention_by_zone['total_actions']) * 100
    )

    # Retention under pressure
    under_pressure = team_events[team_events.get('under_pressure', False) == True]

    if len(under_pressure) > 0:
        successful_under_pressure = (
            ((under_pressure['type'] == 'Pass') & (under_pressure['pass_outcome'].isna())) |
            ((under_pressure['type'] == 'Dribble') & (under_pressure['dribble_outcome'] == 'Complete'))
        ).sum()

        pressure_retention = successful_under_pressure / len(under_pressure) * 100
    else:
        pressure_retention = 0

    return {
        'losses': losses,
        'retention_by_zone': retention_by_zone,
        'retention_under_pressure': pressure_retention
    }

retention = analyze_ball_retention(events, 'Liverpool')

print("Retention by Zone:")
print(retention['retention_by_zone'])
print(f"\nRetention Under Pressure: {retention['retention_under_pressure']:.1f}%")

def plot_retention_by_zone(retention_data):
    """
    Visualize retention rates by zone.
    """
    fig, ax = plt.subplots(figsize=(8, 6))

    zones = retention_data['retention_by_zone'].index
    rates = retention_data['retention_by_zone']['retention_rate']

    colors = ['#90EE90', '#32CD32', '#228B22']
    bars = ax.bar(zones, rates, color=colors)

    # Add labels
    for bar, rate in zip(bars, rates):
        ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                f'{rate:.1f}%', ha='center', va='bottom')

    ax.set_ylabel('Retention Rate (%)')
    ax.set_title('Ball Retention Rate by Zone')
    ax.set_ylim(0, 100)
    plt.tight_layout()
    plt.show()

plot_retention_by_zone(retention)
# Analyze ball retention patterns
analyze_ball_retention <- function(events, team_name) {

  team_events <- events %>%
    filter(team.name == team_name)

  # Possession loss events
  possession_losses <- team_events %>%
    filter(
      (type.name == "Pass" & !is.na(pass.outcome.name)) |
      (type.name == "Dribble" & dribble.outcome.name != "Complete") |
      type.name == "Dispossessed" |
      type.name == "Miscontrol"
    ) %>%
    mutate(
      loss_zone = case_when(
        location.x > 80 ~ "Final Third",
        location.x > 40 ~ "Middle Third",
        TRUE ~ "Defensive Third"
      ),
      loss_type = case_when(
        type.name == "Pass" ~ "Failed Pass",
        type.name == "Dribble" ~ "Failed Dribble",
        type.name == "Dispossessed" ~ "Dispossessed",
        type.name == "Miscontrol" ~ "Miscontrol",
        TRUE ~ "Other"
      )
    )

  # Retention rate by zone
  all_actions <- team_events %>%
    filter(type.name %in% c("Pass", "Dribble", "Carry")) %>%
    mutate(
      zone = case_when(
        location.x > 80 ~ "Final Third",
        location.x > 40 ~ "Middle Third",
        TRUE ~ "Defensive Third"
      )
    )

  retention_by_zone <- all_actions %>%
    group_by(zone) %>%
    summarise(
      total_actions = n(),
      losses = sum(
        (type.name == "Pass" & !is.na(pass.outcome.name)) |
        (type.name == "Dribble" & dribble.outcome.name != "Complete")
      ),
      retention_rate = (1 - losses / total_actions) * 100
    )

  # Retention under pressure
  under_pressure <- team_events %>%
    filter(under_pressure == TRUE)

  pressure_retention <- under_pressure %>%
    summarise(
      actions_under_pressure = n(),
      successful = sum(
        (type.name == "Pass" & is.na(pass.outcome.name)) |
        (type.name == "Dribble" & dribble.outcome.name == "Complete")
      ),
      retention_under_pressure = successful / actions_under_pressure * 100
    )

  # Ball recycling patterns
  recycling <- events_with_sequences %>%
    filter(possession_team == team_name) %>%
    group_by(possession_id) %>%
    mutate(
      direction = case_when(
        type.name == "Pass" &
        pass.end_location.x < location.x ~ "Backward",
        type.name == "Pass" &
        abs(pass.end_location.y - location.y) >
        abs(pass.end_location.x - location.x) ~ "Sideways",
        TRUE ~ "Forward"
      )
    ) %>%
    summarise(
      forward_pct = mean(direction == "Forward", na.rm = TRUE) * 100,
      sideways_pct = mean(direction == "Sideways", na.rm = TRUE) * 100,
      backward_pct = mean(direction == "Backward", na.rm = TRUE) * 100
    )

  return(list(
    losses = possession_losses,
    retention_by_zone = retention_by_zone,
    under_pressure = pressure_retention,
    recycling_patterns = recycling
  ))
}

retention_analysis <- analyze_ball_retention(events, "Liverpool")

# Visualize retention by zone
ggplot(retention_analysis$retention_by_zone,
       aes(x = zone, y = retention_rate, fill = zone)) +
  geom_bar(stat = "identity") +
  geom_text(aes(label = paste0(round(retention_rate, 1), "%")),
            vjust = -0.5) +
  scale_fill_brewer(palette = "Greens") +
  labs(
    title = "Ball Retention Rate by Zone",
    x = "Zone", y = "Retention Rate (%)"
  ) +
  theme_minimal() +
  theme(legend.position = "none")

Progressive Possession Sequences

Progressive possession sequences move the ball toward goal in a meaningful way. Analyzing these sequences reveals how teams build attacks and which players drive progression.

progressive_sequences.py
def analyze_progressive_sequences(events, team_name):
    """
    Analyze progressive possession sequences.
    """
    team_sequences = events[events['possession_team'] == team_name].copy()

    # Extract locations
    team_sequences['x'] = team_sequences['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    team_sequences['y'] = team_sequences['location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    )
    team_sequences['end_x'] = team_sequences['pass_end_location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    # Calculate per-sequence metrics
    sequence_metrics = []

    for poss_id in team_sequences['possession_id'].unique():
        seq = team_sequences[team_sequences['possession_id'] == poss_id]

        if len(seq) == 0:
            continue

        # Progressive passes
        passes = seq[seq['type'] == 'Pass']
        if len(passes) > 0:
            prog_passes = ((passes['end_x'] - passes['x']) > 10).sum()
        else:
            prog_passes = 0

        metrics = {
            'possession_id': poss_id,
            'start_x': seq['x'].iloc[0],
            'end_x': seq['x'].iloc[-1],
            'max_x': seq['x'].max(),
            'sequence_length': len(seq),
            'progressive_passes': prog_passes,
            'entered_final_third': (seq['x'] > 80).any(),
            'entered_box': ((seq['x'] > 102) & (seq['y'] > 18) & (seq['y'] < 62)).any(),
            'ended_in_shot': (seq['type'] == 'Shot').any(),
            'xg': seq['shot_statsbomb_xg'].sum()
        }

        metrics['net_progression'] = metrics['end_x'] - metrics['start_x']
        metrics['is_progressive'] = metrics['net_progression'] > 20

        sequence_metrics.append(metrics)

    seq_df = pd.DataFrame(sequence_metrics)

    # Classify quality
    def classify_quality(row):
        if row['ended_in_shot'] and row['xg'] > 0.1:
            return 'High Quality Chance'
        elif row['ended_in_shot']:
            return 'Shot Created'
        elif row['entered_box']:
            return 'Box Entry'
        elif row['entered_final_third']:
            return 'Final Third Entry'
        elif row['is_progressive']:
            return 'Progressive'
        else:
            return 'Non-Progressive'

    seq_df['sequence_quality'] = seq_df.apply(classify_quality, axis=1)

    # Summary
    summary = {
        'total_sequences': len(seq_df),
        'progressive_sequences': seq_df['is_progressive'].sum(),
        'progressive_rate': seq_df['is_progressive'].mean() * 100,
        'avg_progression': seq_df['net_progression'].mean(),
        'box_entry_rate': seq_df['entered_box'].mean() * 100,
        'shot_rate': seq_df['ended_in_shot'].mean() * 100
    }

    quality_breakdown = seq_df['sequence_quality'].value_counts()

    return {
        'sequences': seq_df,
        'summary': summary,
        'quality_breakdown': quality_breakdown
    }

progressive = analyze_progressive_sequences(events_with_sequences, 'Liverpool')

print("Progressive Sequence Summary:")
print(pd.Series(progressive['summary']))

print("\nQuality Breakdown:")
print(progressive['quality_breakdown'])
# Analyze progressive possession sequences
analyze_progressive_sequences <- function(events, team_name) {

  team_sequences <- events_with_sequences %>%
    filter(possession_team == team_name) %>%
    group_by(possession_id) %>%
    summarise(
      start_x = first(location.x),
      end_x = last(location.x),
      max_x = max(location.x, na.rm = TRUE),
      sequence_length = n(),

      # Progressive actions
      progressive_passes = sum(
        type.name == "Pass" &
        (pass.end_location.x - location.x) > 10,
        na.rm = TRUE
      ),
      progressive_carries = sum(
        type.name == "Carry" &
        (carry.end_location.x - location.x) > 10,
        na.rm = TRUE
      ),

      # Key events
      entered_final_third = any(location.x > 80),
      entered_box = any(location.x > 102 & location.y > 18 & location.y < 62),
      ended_in_shot = any(type.name == "Shot"),
      xg = sum(shot.statsbomb_xg, na.rm = TRUE),

      .groups = "drop"
    ) %>%
    mutate(
      net_progression = end_x - start_x,
      is_progressive = net_progression > 20,
      progressive_actions = progressive_passes + progressive_carries
    )

  # Classify sequence quality
  team_sequences <- team_sequences %>%
    mutate(
      sequence_quality = case_when(
        ended_in_shot & xg > 0.1 ~ "High Quality Chance",
        ended_in_shot ~ "Shot Created",
        entered_box ~ "Box Entry",
        entered_final_third ~ "Final Third Entry",
        is_progressive ~ "Progressive",
        TRUE ~ "Non-Progressive"
      )
    )

  # Summary statistics
  summary <- team_sequences %>%
    summarise(
      total_sequences = n(),
      progressive_sequences = sum(is_progressive),
      progressive_rate = mean(is_progressive) * 100,
      avg_progression = mean(net_progression),
      box_entry_rate = mean(entered_box) * 100,
      shot_sequence_rate = mean(ended_in_shot) * 100,
      avg_xg_per_sequence = mean(xg)
    )

  # Quality breakdown
  quality_breakdown <- team_sequences %>%
    count(sequence_quality) %>%
    mutate(pct = n / sum(n) * 100)

  return(list(
    sequences = team_sequences,
    summary = summary,
    quality_breakdown = quality_breakdown
  ))
}

progressive_analysis <- analyze_progressive_sequences(events, "Liverpool")

# Visualize sequence quality
ggplot(progressive_analysis$quality_breakdown,
       aes(x = reorder(sequence_quality, n), y = n, fill = sequence_quality)) +
  geom_bar(stat = "identity") +
  coord_flip() +
  labs(
    title = "Possession Sequence Quality Breakdown",
    x = "", y = "Count"
  ) +
  theme_minimal() +
  theme(legend.position = "none")

# Progressive players
progressive_players <- events %>%
  filter(team.name == "Liverpool") %>%
  filter(
    (type.name == "Pass" & (pass.end_location.x - location.x) > 10) |
    (type.name == "Carry" & (carry.end_location.x - location.x) > 10)
  ) %>%
  group_by(player.name, position.name) %>%
  summarise(
    progressive_actions = n(),
    .groups = "drop"
  ) %>%
  arrange(desc(progressive_actions))

print(head(progressive_players, 10))

Possession Value Models

Possession value models assign a value to each possession location based on the probability of scoring from that position. This helps quantify the quality of possession beyond simple territory metrics.

possession_value.py
def build_possession_value_model(events, grid_size=10):
    """
    Build a simple possession value model based on shot probability.
    """
    events = events.copy()

    # Extract locations
    events['x'] = events['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    events['y'] = events['location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    )

    # Create grid cells
    events['grid_x'] = (events['x'] // grid_size) * grid_size
    events['grid_y'] = (events['y'] // grid_size) * grid_size

    # Calculate shot probability per grid cell
    grid_stats = (
        events
        .groupby(['possession_id', 'grid_x', 'grid_y'])
        .agg({
            'type': lambda x: (x == 'Shot').any(),
            'shot_statsbomb_xg': 'max'
        })
        .reset_index()
        .rename(columns={'type': 'ended_in_shot'})
    )

    pv_model = (
        grid_stats
        .groupby(['grid_x', 'grid_y'])
        .agg({
            'ended_in_shot': ['count', 'mean'],
            'shot_statsbomb_xg': 'mean'
        })
        .reset_index()
    )
    pv_model.columns = ['grid_x', 'grid_y', 'n', 'shot_probability', 'avg_xg']

    # Filter for minimum sample size
    pv_model = pv_model[pv_model['n'] >= 10]

    # Calculate possession value
    pv_model['possession_value'] = (
        pv_model['shot_probability'] * pv_model['avg_xg'].fillna(0.1)
    )

    return pv_model

pv_model = build_possession_value_model(season_events)

def plot_possession_value_grid(pv_model):
    """
    Visualize possession value across the pitch.
    """
    from mplsoccer import Pitch

    pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a1a1a')
    fig, ax = pitch.draw(figsize=(12, 8))

    scatter = ax.scatter(
        pv_model['grid_x'] + 5,  # Center of grid cell
        pv_model['grid_y'] + 5,
        c=pv_model['possession_value'],
        s=200,
        cmap='plasma',
        alpha=0.7
    )

    plt.colorbar(scatter, ax=ax, label='Possession Value')
    ax.set_title('Possession Value by Location', fontsize=14, color='white')
    plt.tight_layout()
    plt.show()

plot_possession_value_grid(pv_model)

def calculate_pv_added(events, pv_model):
    """
    Calculate possession value added by each action.
    """
    events = events.copy()

    # Grid locations
    events['x'] = events['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    events['grid_x'] = (events['x'] // 10) * 10
    events['grid_y'] = (events['location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    ) // 10) * 10

    # End locations
    events['end_x'] = events['pass_end_location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    events['end_grid_x'] = (events['end_x'] // 10) * 10
    events['end_grid_y'] = (events['pass_end_location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    ) // 10) * 10

    # Merge with PV model
    events = events.merge(
        pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
            columns={'possession_value': 'start_pv'}
        ),
        on=['grid_x', 'grid_y'],
        how='left'
    )

    events = events.merge(
        pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
            columns={'grid_x': 'end_grid_x', 'grid_y': 'end_grid_y',
                     'possession_value': 'end_pv'}
        ),
        on=['end_grid_x', 'end_grid_y'],
        how='left'
    )

    events['pv_added'] = events['end_pv'] - events['start_pv']

    # Aggregate by player
    player_pv = (
        events[events['type'].isin(['Pass', 'Carry', 'Dribble'])]
        .groupby(['player', 'team', 'position'])
        .agg({
            'id': 'count',
            'pv_added': ['sum', 'mean']
        })
        .reset_index()
    )
    player_pv.columns = ['player', 'team', 'position', 'actions',
                          'total_pv_added', 'avg_pv_added']

    return player_pv.sort_values('total_pv_added', ascending=False)

player_pv = calculate_pv_added(events, pv_model)
print(player_pv.head(10))
# Build simple possession value model
build_possession_value_model <- function(events) {

  # Create grid of pitch locations
  grid_size <- 10  # 10x10 meter squares

  # For each location, calculate probability of shot within next N actions
  shot_probability <- events %>%
    mutate(
      grid_x = floor(location.x / grid_size) * grid_size,
      grid_y = floor(location.y / grid_size) * grid_size
    ) %>%
    group_by(possession_id, grid_x, grid_y) %>%
    summarise(
      sequence_ended_in_shot = any(type.name == "Shot"),
      sequence_xg = max(shot.statsbomb_xg, na.rm = TRUE),
      .groups = "drop"
    ) %>%
    group_by(grid_x, grid_y) %>%
    summarise(
      n = n(),
      shot_probability = mean(sequence_ended_in_shot, na.rm = TRUE),
      avg_xg = mean(sequence_xg, na.rm = TRUE),
      possession_value = shot_probability * coalesce(avg_xg, 0.1),
      .groups = "drop"
    ) %>%
    filter(n >= 10)  # Minimum sample size

  return(shot_probability)
}

# Build PV model from season data
pv_model <- build_possession_value_model(season_events)

# Visualize possession value grid
ggplot(pv_model, aes(x = grid_x, y = grid_y, fill = possession_value)) +
  annotate_pitch(dimensions = pitch_statsbomb, colour = "white") +
  geom_tile(alpha = 0.7) +
  scale_fill_viridis_c(option = "plasma", name = "PV") +
  labs(title = "Possession Value by Location") +
  theme_pitch() +
  coord_flip()

# Calculate possession value added per action
calculate_pv_added <- function(events, pv_model) {

  events_with_pv <- events %>%
    mutate(
      grid_x = floor(location.x / 10) * 10,
      grid_y = floor(location.y / 10) * 10,
      end_grid_x = floor(coalesce(pass.end_location.x, carry.end_location.x, location.x) / 10) * 10,
      end_grid_y = floor(coalesce(pass.end_location.y, carry.end_location.y, location.y) / 10) * 10
    ) %>%
    left_join(pv_model %>% select(grid_x, grid_y, start_pv = possession_value),
              by = c("grid_x", "grid_y")) %>%
    left_join(pv_model %>% select(end_grid_x = grid_x, end_grid_y = grid_y, end_pv = possession_value),
              by = c("end_grid_x", "end_grid_y")) %>%
    mutate(
      pv_added = end_pv - start_pv
    )

  # Aggregate by player
  player_pv <- events_with_pv %>%
    filter(type.name %in% c("Pass", "Carry", "Dribble")) %>%
    group_by(player.name, team.name, position.name) %>%
    summarise(
      actions = n(),
      total_pv_added = sum(pv_added, na.rm = TRUE),
      avg_pv_added = mean(pv_added, na.rm = TRUE),
      .groups = "drop"
    ) %>%
    arrange(desc(total_pv_added))

  return(player_pv)
}

player_pv <- calculate_pv_added(events, pv_model)
print(head(player_pv, 10))

Practice Exercises

Exercise 26.1: Complete Possession Quality Analysis System

Task: Build a comprehensive possession quality analysis system that calculates multi-dimensional possession metrics for all teams in a league, identifies possession style clusters, and generates visual comparisons.

Requirements:

  • Calculate possession percentage and quality metrics (xG/possession, box entries, progressive passes) for each team
  • Create a possession efficiency score combining quantity and quality
  • Identify teams with high possession but low quality vs low possession but high quality
  • Cluster teams by possession style using k-means
  • Generate scatter plots showing possession % vs quality metrics
  • Create detailed team comparison reports

possession_quality_system
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from statsbombpy import sb

# ============================================
# POSSESSION QUALITY ANALYSIS SYSTEM
# ============================================

def identify_possession_sequences(events):
    """Identify distinct possession sequences."""
    events = events.sort_values('index').copy()
    events['new_possession'] = events['team'] != events['team'].shift(1)
    events['possession_id'] = events['new_possession'].cumsum()

    seq_info = (
        events.groupby('possession_id')
        .agg({'team': 'first', 'index': 'count'})
        .reset_index()
        .rename(columns={'team': 'possession_team', 'index': 'sequence_length'})
    )

    events = events.merge(seq_info, on='possession_id')
    return events

def calculate_team_possession_quality(events, team_name):
    """Calculate possession quality metrics for a team."""
    def get_x(loc):
        return loc[0] if isinstance(loc, list) else np.nan

    def get_y(loc):
        return loc[1] if isinstance(loc, list) else np.nan

    events = events.copy()
    events['x'] = events['location'].apply(get_x)
    events['y'] = events['location'].apply(get_y)

    if 'pass_end_location' in events.columns:
        events['end_x'] = events['pass_end_location'].apply(get_x)

    team_poss = events[events['possession_team'] == team_name]

    possession_metrics = []
    for poss_id in team_poss['possession_id'].unique():
        poss = team_poss[team_poss['possession_id'] == poss_id]

        metrics = {
            'possession_id': poss_id,
            'sequence_length': len(poss),
            'max_x': poss['x'].max(),
            'entered_final_third': (poss['x'] > 80).any(),
            'entered_box': ((poss['x'] > 102) & (poss['y'] > 18) & (poss['y'] < 62)).any(),
            'total_passes': (poss['type'] == 'Pass').sum(),
            'successful_passes': ((poss['type'] == 'Pass') & (poss['pass_outcome'].isna())).sum(),
            'ended_in_shot': (poss['type'] == 'Shot').any(),
            'xg_generated': poss['shot_statsbomb_xg'].sum() if 'shot_statsbomb_xg' in poss.columns else 0,
            'ended_in_goal': ((poss['type'] == 'Shot') & (poss['shot_outcome'] == 'Goal')).any()
        }

        # Progressive passes
        if 'end_x' in poss.columns:
            poss_passes = poss[poss['type'] == 'Pass']
            metrics['progressive_passes'] = (
                ((120 - poss_passes['x']) - (120 - poss_passes['end_x']) > 10).sum()
            )
        else:
            metrics['progressive_passes'] = 0

        possession_metrics.append(metrics)

    poss_df = pd.DataFrame(possession_metrics)

    # Possession percentage
    all_poss = events.groupby('possession_team')['possession_id'].nunique()
    team_poss_pct = all_poss.get(team_name, 0) / all_poss.sum() * 100

    # Quality summary
    quality = {
        'team': team_name,
        'total_possessions': len(poss_df),
        'possession_pct': team_poss_pct,
        'avg_sequence_length': poss_df['sequence_length'].mean(),
        'final_third_entry_rate': poss_df['entered_final_third'].mean() * 100,
        'box_entry_rate': poss_df['entered_box'].mean() * 100,
        'pass_completion': poss_df['successful_passes'].sum() / max(poss_df['total_passes'].sum(), 1) * 100,
        'progressive_pass_rate': poss_df['progressive_passes'].sum() / max(poss_df['total_passes'].sum(), 1) * 100,
        'shot_rate': poss_df['ended_in_shot'].mean() * 100,
        'xg_per_possession': poss_df['xg_generated'].mean(),
        'goal_rate': poss_df['ended_in_goal'].mean() * 100
    }

    # Quality score
    quality['quality_score'] = (
        (quality['final_third_entry_rate'] / 50) * 25 +
        (quality['box_entry_rate'] / 20) * 25 +
        (quality['progressive_pass_rate'] / 15) * 25 +
        (quality['xg_per_possession'] / 0.05) * 25
    )

    # Efficiency score
    quality['efficiency_score'] = quality['quality_score'] / (quality['possession_pct'] / 50)

    return quality

def calculate_league_possession(events):
    """Calculate possession metrics for all teams."""
    events_seq = identify_possession_sequences(events)
    teams = events_seq['team'].unique()

    league_data = [calculate_team_possession_quality(events_seq, team) for team in teams if team]
    return pd.DataFrame(league_data)

def classify_possession_style(league_df):
    """Classify teams by possession style."""
    def get_category(row):
        if row['possession_pct'] >= 55 and row['quality_score'] >= 50:
            return 'Dominant'
        elif row['possession_pct'] >= 55 and row['quality_score'] < 50:
            return 'Sterile Possession'
        elif row['possession_pct'] < 45 and row['quality_score'] >= 50:
            return 'Efficient Counter'
        elif row['possession_pct'] < 45 and row['quality_score'] < 50:
            return 'Low Quality'
        return 'Balanced'

    league_df['possession_category'] = league_df.apply(get_category, axis=1)
    return league_df

def cluster_possession_styles(league_df):
    """Cluster teams by possession style using K-means."""
    features = ['possession_pct', 'final_third_entry_rate', 'box_entry_rate',
                'progressive_pass_rate', 'xg_per_possession', 'shot_rate']

    X = league_df[features].fillna(0)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    kmeans = KMeans(n_clusters=4, random_state=42, n_init=25)
    league_df['cluster'] = kmeans.fit_predict(X_scaled)

    return league_df

def visualize_possession_quality(league_df):
    """Create possession quality visualizations."""
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))

    # Scatter plot
    colors = {'Dominant': 'green', 'Sterile Possession': 'red',
              'Efficient Counter': 'blue', 'Low Quality': 'gray', 'Balanced': 'orange'}

    for cat in league_df['possession_category'].unique():
        subset = league_df[league_df['possession_category'] == cat]
        axes[0].scatter(subset['possession_pct'], subset['quality_score'],
                        c=colors.get(cat, 'gray'), label=cat, s=100, alpha=0.7)

    axes[0].axvline(x=50, linestyle='--', alpha=0.5, color='gray')
    axes[0].axhline(y=50, linestyle='--', alpha=0.5, color='gray')
    axes[0].set_xlabel('Possession %')
    axes[0].set_ylabel('Quality Score')
    axes[0].set_title('Possession Quantity vs Quality')
    axes[0].legend()

    # Efficiency bar chart
    sorted_df = league_df.sort_values('efficiency_score', ascending=True)
    bars = axes[1].barh(sorted_df['team'], sorted_df['efficiency_score'],
                        color=[colors.get(c, 'gray') for c in sorted_df['possession_category']])
    axes[1].set_xlabel('Efficiency Score')
    axes[1].set_title('Possession Efficiency (Quality / Quantity)')

    plt.tight_layout()
    plt.show()

def generate_possession_report(league_df):
    """Generate comprehensive possession report."""
    print("\n" + "=" * 65)
    print("LEAGUE POSSESSION QUALITY REPORT")
    print("=" * 65 + "\n")

    print("POSSESSION STYLE DISTRIBUTION:")
    print(league_df['possession_category'].value_counts())

    print("\n\nTOP 5 QUALITY TEAMS:")
    print("-" * 55)
    top_quality = league_df.nlargest(5, 'quality_score')[
        ['team', 'possession_pct', 'quality_score', 'xg_per_possession', 'possession_category']
    ]
    print(top_quality.to_string(index=False))

    print("\n\nTOP 5 MOST EFFICIENT TEAMS:")
    print("-" * 55)
    top_eff = league_df.nlargest(5, 'efficiency_score')[
        ['team', 'possession_pct', 'quality_score', 'efficiency_score', 'possession_category']
    ]
    print(top_eff.to_string(index=False))

    print("\n\nSTERILE POSSESSION TEAMS:")
    print("-" * 55)
    sterile = league_df[league_df['possession_category'] == 'Sterile Possession'][
        ['team', 'possession_pct', 'quality_score', 'box_entry_rate', 'xg_per_possession']
    ]
    print(sterile.to_string(index=False) if len(sterile) > 0 else "  None found")

    print("\n\nEFFICIENT COUNTER TEAMS:")
    print("-" * 55)
    efficient = league_df[league_df['possession_category'] == 'Efficient Counter'][
        ['team', 'possession_pct', 'quality_score', 'box_entry_rate', 'xg_per_possession']
    ]
    print(efficient.to_string(index=False) if len(efficient) > 0 else "  None found")

# Main execution
matches = sb.matches(competition_id=11, season_id=90)
events = pd.concat([sb.events(match_id=m) for m in matches['match_id'].head(10)])

league_possession = calculate_league_possession(events)
league_possession = classify_possession_style(league_possession)
league_possession = cluster_possession_styles(league_possession)

generate_possession_report(league_possession)
visualize_possession_quality(league_possession)
library(tidyverse)
library(StatsBombR)
library(scales)

# ============================================
# POSSESSION QUALITY ANALYSIS SYSTEM
# ============================================

# Calculate possession sequences for a match
identify_possession_sequences <- function(events) {
  events %>%
    arrange(index) %>%
    mutate(
      new_possession = team.name != lag(team.name) | is.na(lag(team.name)),
      possession_id = cumsum(new_possession)
    ) %>%
    group_by(possession_id) %>%
    mutate(
      possession_team = first(team.name),
      sequence_length = n(),
      sequence_duration = max(minute * 60 + second) - min(minute * 60 + second)
    ) %>%
    ungroup()
}

# Calculate possession quality for a team
calculate_team_possession_quality <- function(events, team_name) {

  team_possessions <- events %>%
    filter(possession_team == team_name) %>%
    group_by(possession_id) %>%
    summarise(
      sequence_length = max(sequence_length),
      duration_seconds = max(sequence_duration),
      start_x = first(location.x),
      max_x = max(location.x, na.rm = TRUE),
      entered_final_third = any(location.x > 80, na.rm = TRUE),
      entered_box = any(location.x > 102 & location.y > 18 & location.y < 62, na.rm = TRUE),
      total_passes = sum(type.name == "Pass"),
      successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
      progressive_passes = sum(
        type.name == "Pass" &
        !is.na(pass.end_location.x) &
        (120 - location.x) - (120 - pass.end_location.x) > 10,
        na.rm = TRUE
      ),
      ended_in_shot = any(type.name == "Shot"),
      xg_generated = sum(shot.statsbomb_xg, na.rm = TRUE),
      ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
      .groups = "drop"
    )

  # Calculate possession percentage
  all_possessions <- events %>%
    distinct(possession_id, possession_team) %>%
    count(possession_team) %>%
    mutate(pct = n / sum(n) * 100)

  team_poss_pct <- all_possessions %>%
    filter(possession_team == team_name) %>%
    pull(pct)

  # Quality summary
  quality <- team_possessions %>%
    summarise(
      total_possessions = n(),
      possession_pct = team_poss_pct,
      avg_sequence_length = mean(sequence_length, na.rm = TRUE),
      avg_duration = mean(duration_seconds, na.rm = TRUE),
      final_third_entry_rate = mean(entered_final_third, na.rm = TRUE) * 100,
      box_entry_rate = mean(entered_box, na.rm = TRUE) * 100,
      pass_completion = sum(successful_passes, na.rm = TRUE) / sum(total_passes, na.rm = TRUE) * 100,
      progressive_pass_rate = sum(progressive_passes, na.rm = TRUE) / sum(total_passes, na.rm = TRUE) * 100,
      shot_rate = mean(ended_in_shot, na.rm = TRUE) * 100,
      xg_per_possession = mean(xg_generated, na.rm = TRUE),
      goal_rate = mean(ended_in_goal, na.rm = TRUE) * 100
    ) %>%
    mutate(
      team = team_name,
      # Quality score combining multiple factors
      quality_score = (
        (final_third_entry_rate / 50) * 25 +
        (box_entry_rate / 20) * 25 +
        (progressive_pass_rate / 15) * 25 +
        (xg_per_possession / 0.05) * 25
      ),
      # Efficiency score: quality relative to possession
      efficiency_score = quality_score / (possession_pct / 50)
    )

  return(quality)
}

# Process full season data
calculate_league_possession <- function(all_events) {

  # Process by match
  all_events_seq <- identify_possession_sequences(all_events)

  # Get all teams
  teams <- unique(all_events_seq$team.name)

  # Calculate for each team
  league_possession <- map_dfr(teams, function(team) {
    calculate_team_possession_quality(all_events_seq, team)
  })

  return(league_possession)
}

# Classify possession style
classify_possession_style <- function(league_data) {

  league_data %>%
    mutate(
      possession_category = case_when(
        possession_pct >= 55 & quality_score >= 50 ~ "Dominant",
        possession_pct >= 55 & quality_score < 50 ~ "Sterile Possession",
        possession_pct < 45 & quality_score >= 50 ~ "Efficient Counter",
        possession_pct < 45 & quality_score < 50 ~ "Low Quality",
        TRUE ~ "Balanced"
      ),
      style = case_when(
        progressive_pass_rate > 12 & shot_rate > 15 ~ "Direct",
        progressive_pass_rate < 8 & pass_completion > 85 ~ "Patient Build-up",
        box_entry_rate > 15 & final_third_entry_rate > 40 ~ "Box Focused",
        TRUE ~ "Mixed"
      )
    )
}

# Cluster teams by possession style
cluster_possession_styles <- function(league_data) {

  # Select features for clustering
  cluster_features <- league_data %>%
    select(team, possession_pct, final_third_entry_rate, box_entry_rate,
           progressive_pass_rate, xg_per_possession, shot_rate)

  # Scale features
  scaled_features <- cluster_features %>%
    select(-team) %>%
    scale()

  # K-means clustering
  set.seed(42)
  kmeans_result <- kmeans(scaled_features, centers = 4, nstart = 25)

  league_data$cluster <- kmeans_result$cluster

  # Name clusters
  cluster_names <- league_data %>%
    group_by(cluster) %>%
    summarise(
      avg_poss = mean(possession_pct),
      avg_quality = mean(quality_score),
      .groups = "drop"
    ) %>%
    arrange(desc(avg_poss), desc(avg_quality)) %>%
    mutate(cluster_name = c("High Poss / High Quality",
                            "High Poss / Low Quality",
                            "Low Poss / High Quality",
                            "Low Poss / Low Quality")[row_number()])

  league_data <- league_data %>%
    left_join(cluster_names %>% select(cluster, cluster_name), by = "cluster")

  return(league_data)
}

# Visualize possession quality
visualize_possession_quality <- function(league_data) {

  # Main scatter plot
  p1 <- ggplot(league_data, aes(x = possession_pct, y = quality_score)) +
    geom_point(aes(color = possession_category, size = xg_per_possession),
               alpha = 0.7) +
    geom_text(aes(label = team), vjust = -0.8, size = 3, check_overlap = TRUE) +
    geom_vline(xintercept = 50, linetype = "dashed", alpha = 0.5) +
    geom_hline(yintercept = 50, linetype = "dashed", alpha = 0.5) +
    scale_color_brewer(palette = "Set1") +
    labs(
      title = "Possession Quantity vs Quality",
      x = "Possession %", y = "Quality Score",
      color = "Category", size = "xG/Poss"
    ) +
    theme_minimal() +
    theme(legend.position = "right")

  # Efficiency comparison
  p2 <- ggplot(league_data, aes(x = reorder(team, efficiency_score),
                                 y = efficiency_score)) +
    geom_bar(stat = "identity", aes(fill = possession_category)) +
    coord_flip() +
    labs(
      title = "Possession Efficiency (Quality / Quantity)",
      x = "", y = "Efficiency Score"
    ) +
    theme_minimal()

  return(list(quality_plot = p1, efficiency_plot = p2))
}

# Generate comprehensive report
generate_possession_report <- function(league_data) {

  cat("\n", rep("=", 65), "\n", sep = "")
  cat("LEAGUE POSSESSION QUALITY REPORT\n")
  cat(rep("=", 65), "\n\n", sep = "")

  cat("POSSESSION STYLE DISTRIBUTION:\n")
  print(table(league_data$possession_category))

  cat("\n\nTOP 5 QUALITY TEAMS (regardless of possession %):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  league_data %>%
    arrange(desc(quality_score)) %>%
    head(5) %>%
    select(team, possession_pct, quality_score, xg_per_possession, possession_category) %>%
    print()

  cat("\n\nTOP 5 MOST EFFICIENT TEAMS:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  league_data %>%
    arrange(desc(efficiency_score)) %>%
    head(5) %>%
    select(team, possession_pct, quality_score, efficiency_score, possession_category) %>%
    print()

  cat("\n\nSTERILE POSSESSION TEAMS (high possession, low quality):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  league_data %>%
    filter(possession_category == "Sterile Possession") %>%
    select(team, possession_pct, quality_score, box_entry_rate, xg_per_possession) %>%
    print()

  cat("\n\nEFFICIENT COUNTER TEAMS (low possession, high quality):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  league_data %>%
    filter(possession_category == "Efficient Counter") %>%
    select(team, possession_pct, quality_score, box_entry_rate, xg_per_possession) %>%
    print()
}

# Main execution
# Load season data
comps <- FreeCompetitions()
matches <- FreeMatches(Competitions = comps %>% filter(competition_id == 11))
events <- free_allevents(MatchesDF = matches, Atea = TRUE)
events <- allclean(events)

# Run analysis
league_possession <- calculate_league_possession(events)
league_possession <- classify_possession_style(league_possession)
league_possession <- cluster_possession_styles(league_possession)

# Generate outputs
generate_possession_report(league_possession)
plots <- visualize_possession_quality(league_possession)
print(plots$quality_plot)
print(plots$efficiency_plot)
Exercise 26.2: Field Tilt and Territorial Control Dashboard

Task: Build a comprehensive territorial control analysis system that calculates field tilt metrics, tracks territorial momentum over time, and visualizes spatial dominance patterns.

Requirements:

  • Calculate field tilt (% of final third touches vs opponent defensive third)
  • Track territorial control in 5-minute intervals throughout matches
  • Create heat maps showing touch density by zone for each team
  • Identify momentum shifts when territorial control changes
  • Calculate correlation between field tilt and xG generated
  • Generate visual dashboard with multiple territorial views

territorial_control_dashboard
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mplsoccer import Pitch
from statsbombpy import sb

# ============================================
# FIELD TILT & TERRITORIAL CONTROL DASHBOARD
# ============================================

def calculate_field_tilt_metrics(events):
    """Calculate comprehensive field tilt metrics."""
    events = events.dropna(subset=['location']).copy()
    events['x'] = events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
    events['y'] = events['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)

    def get_zone(x):
        if x > 80:
            return 'final_third'
        elif x > 40:
            return 'middle_third'
        return 'defensive_third'

    events['zone'] = events['x'].apply(get_zone)
    teams = events['team'].unique()

    metrics = []
    for team in teams:
        if pd.isna(team):
            continue

        team_events = events[events['team'] == team]
        total = len(team_events)

        team_final = len(team_events[team_events['zone'] == 'final_third'])
        opp_defensive = len(events[(events['team'] != team) & (events['zone'] == 'defensive_third')])

        field_tilt = team_final / (team_final + opp_defensive) * 100 if (team_final + opp_defensive) > 0 else 50

        metrics.append({
            'team': team,
            'field_tilt': field_tilt,
            'final_third_pct': (team_events['zone'] == 'final_third').mean() * 100,
            'middle_third_pct': (team_events['zone'] == 'middle_third').mean() * 100,
            'defensive_third_pct': (team_events['zone'] == 'defensive_third').mean() * 100,
            'total_touches': total
        })

    return pd.DataFrame(metrics)

def calculate_territorial_timeline(events, interval_minutes=5):
    """Track territorial control over time."""
    events = events.dropna(subset=['location']).copy()
    events['x'] = events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
    events['time_bucket'] = (events['minute'] // interval_minutes) * interval_minutes
    events['in_opp_half'] = events['x'] > 60
    events['in_final_third'] = events['x'] > 80

    timeline = (
        events.groupby(['time_bucket', 'team'])
        .agg({
            'id': 'count',
            'in_opp_half': 'sum',
            'in_final_third': 'sum',
            'x': 'mean'
        })
        .reset_index()
        .rename(columns={'id': 'touches', 'in_opp_half': 'touches_opp_half',
                         'in_final_third': 'touches_final_third', 'x': 'avg_x'})
    )

    # Calculate territorial share
    total_opp_half = timeline.groupby('time_bucket')['touches_opp_half'].transform('sum')
    timeline['territorial_share'] = timeline['touches_opp_half'] / total_opp_half * 100

    return timeline

def identify_momentum_shifts(timeline, threshold=15):
    """Identify significant momentum shifts."""
    teams = timeline['team'].unique()
    team_a = teams[0]

    team_timeline = timeline[timeline['team'] == team_a].copy()
    team_timeline = team_timeline.sort_values('time_bucket')
    team_timeline['tilt_change'] = team_timeline['territorial_share'].diff()

    def classify_shift(change):
        if pd.isna(change):
            return 'Stable'
        elif change > threshold:
            return 'Gained Control'
        elif change < -threshold:
            return 'Lost Control'
        return 'Stable'

    team_timeline['momentum_shift'] = team_timeline['tilt_change'].apply(classify_shift)

    shifts = team_timeline[team_timeline['momentum_shift'] != 'Stable'][
        ['time_bucket', 'territorial_share', 'tilt_change', 'momentum_shift']
    ]

    return shifts

def calculate_tilt_xg_correlation(events, timeline):
    """Calculate correlation between field tilt and xG."""
    xg_by_period = (
        events[events['type'] == 'Shot']
        .dropna(subset=['shot_statsbomb_xg'])
        .assign(time_bucket=lambda x: (x['minute'] // 5) * 5)
        .groupby(['time_bucket', 'team'])
        ['shot_statsbomb_xg'].sum()
        .reset_index()
        .rename(columns={'shot_statsbomb_xg': 'xg'})
    )

    merged = timeline.merge(xg_by_period, on=['time_bucket', 'team'], how='left')
    merged['xg'] = merged['xg'].fillna(0)

    correlation = merged['territorial_share'].corr(merged['xg'])

    return {'correlation': correlation, 'data': merged}

def visualize_territorial_control(timeline):
    """Create territorial control visualizations."""
    fig, axes = plt.subplots(2, 1, figsize=(14, 10))
    teams = timeline['team'].unique()
    colors = {'team_a': '#C8102E', 'team_b': '#6CABDD'}

    # Area chart
    pivot = timeline.pivot(index='time_bucket', columns='team', values='territorial_share').fillna(50)
    pivot.plot(kind='area', stacked=True, ax=axes[0], alpha=0.8,
               color=['#C8102E', '#6CABDD'])
    axes[0].set_xlabel('Match Minute')
    axes[0].set_ylabel('Territorial Share (%)')
    axes[0].set_title('Territorial Control Over Time')
    axes[0].legend(title='Team')

    # Average position timeline
    for i, team in enumerate(teams):
        if pd.isna(team):
            continue
        team_data = timeline[timeline['team'] == team]
        axes[1].plot(team_data['time_bucket'], team_data['avg_x'],
                     label=team, linewidth=2, color=['#C8102E', '#6CABDD'][i])

    axes[1].axhline(y=60, linestyle='--', alpha=0.5, color='gray')
    axes[1].set_xlabel('Match Minute')
    axes[1].set_ylabel('Average X Position')
    axes[1].set_title('Average Touch Position Over Time')
    axes[1].legend()

    plt.tight_layout()
    plt.show()

def create_touch_heatmaps(events):
    """Create side-by-side touch heatmaps."""
    teams = [t for t in events['team'].unique() if pd.notna(t)]

    fig, axes = plt.subplots(1, 2, figsize=(16, 7))

    for i, team in enumerate(teams[:2]):
        pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a472a')
        pitch.draw(ax=axes[i])

        team_events = events[events['team'] == team].copy()
        team_events['x'] = team_events['location'].apply(lambda loc: loc[0] if isinstance(loc, list) else np.nan)
        team_events['y'] = team_events['location'].apply(lambda loc: loc[1] if isinstance(loc, list) else np.nan)

        valid_events = team_events.dropna(subset=['x', 'y'])

        pitch.kdeplot(valid_events['x'], valid_events['y'], ax=axes[i],
                      cmap='plasma', fill=True, levels=50, alpha=0.7)

        axes[i].set_title(f'{team} Touch Heatmap', fontsize=14, fontweight='bold')

    plt.tight_layout()
    plt.show()

def generate_territorial_report(metrics, momentum_shifts, correlation):
    """Generate comprehensive territorial report."""
    print("\n" + "=" * 65)
    print("TERRITORIAL CONTROL DASHBOARD")
    print("=" * 65 + "\n")

    print("FIELD TILT SUMMARY:")
    print("-" * 55)
    print(metrics.to_string(index=False))

    print("\n\nMOMENTUM SHIFTS:")
    print("-" * 55)
    if len(momentum_shifts) > 0:
        print(momentum_shifts.to_string(index=False))
    else:
        print("  No significant momentum shifts detected")

    print("\n\nFIELD TILT vs xG CORRELATION:")
    print("-" * 55)
    print(f"Correlation coefficient: {correlation['correlation']:.3f}")

    if abs(correlation['correlation']) > 0.7:
        interpretation = "Strong"
    elif abs(correlation['correlation']) > 0.4:
        interpretation = "Moderate"
    else:
        interpretation = "Weak"

    print(f"Interpretation: {interpretation} relationship between territorial control and xG")

# Main execution
events = sb.events(match_id=3773585)

metrics = calculate_field_tilt_metrics(events)
timeline = calculate_territorial_timeline(events, interval_minutes=5)
momentum_shifts = identify_momentum_shifts(timeline)
correlation = calculate_tilt_xg_correlation(events, timeline)

generate_territorial_report(metrics, momentum_shifts, correlation)
visualize_territorial_control(timeline)
create_touch_heatmaps(events)
library(tidyverse)
library(StatsBombR)
library(patchwork)

# ============================================
# FIELD TILT & TERRITORIAL CONTROL DASHBOARD
# ============================================

# Calculate comprehensive field tilt metrics
calculate_field_tilt_metrics <- function(events) {

  # Extract coordinates
  events <- events %>%
    filter(!is.na(location.x), !is.na(location.y))

  teams <- unique(events$team.name)
  team_a <- teams[1]
  team_b <- teams[2]

  # Zone definitions
  events <- events %>%
    mutate(
      zone = case_when(
        location.x > 80 ~ "final_third",
        location.x > 40 ~ "middle_third",
        TRUE ~ "defensive_third"
      )
    )

  # Calculate touches by team and zone
  touch_summary <- events %>%
    count(team.name, zone) %>%
    pivot_wider(names_from = zone, values_from = n, values_fill = 0)

  # Calculate field tilt for each team
  calculate_team_tilt <- function(team_name) {
    team_final <- events %>%
      filter(team.name == team_name, zone == "final_third") %>%
      nrow()

    opp_defensive <- events %>%
      filter(team.name != team_name, zone == "defensive_third") %>%
      nrow()

    field_tilt <- team_final / (team_final + opp_defensive) * 100

    # Zone breakdown
    team_touches <- events %>% filter(team.name == team_name)
    total <- nrow(team_touches)

    list(
      team = team_name,
      field_tilt = field_tilt,
      final_third_pct = sum(team_touches$zone == "final_third") / total * 100,
      middle_third_pct = sum(team_touches$zone == "middle_third") / total * 100,
      defensive_third_pct = sum(team_touches$zone == "defensive_third") / total * 100,
      total_touches = total
    )
  }

  metrics <- map_dfr(teams, calculate_team_tilt)
  return(metrics)
}

# Track territorial control over time
calculate_territorial_timeline <- function(events, interval_minutes = 5) {

  events <- events %>%
    filter(!is.na(location.x)) %>%
    mutate(
      time_bucket = floor(minute / interval_minutes) * interval_minutes,
      in_opp_half = location.x > 60,
      in_final_third = location.x > 80
    )

  teams <- unique(events$team.name)

  timeline <- events %>%
    group_by(time_bucket, team.name) %>%
    summarise(
      touches = n(),
      touches_opp_half = sum(in_opp_half, na.rm = TRUE),
      touches_final_third = sum(in_final_third, na.rm = TRUE),
      avg_x = mean(location.x, na.rm = TRUE),
      .groups = "drop"
    )

  # Calculate relative control per period
  timeline_wide <- timeline %>%
    pivot_wider(
      id_cols = time_bucket,
      names_from = team.name,
      values_from = c(touches, touches_opp_half, touches_final_third, avg_x),
      values_fill = 0
    )

  # Add territorial control percentage
  timeline <- timeline %>%
    group_by(time_bucket) %>%
    mutate(
      period_total_touches = sum(touches),
      territorial_share = touches_opp_half / sum(touches_opp_half) * 100
    ) %>%
    ungroup()

  return(timeline)
}

# Identify momentum shifts
identify_momentum_shifts <- function(timeline, threshold = 15) {

  teams <- unique(timeline$team.name)
  team_a <- teams[1]

  team_a_timeline <- timeline %>%
    filter(team.name == team_a) %>%
    arrange(time_bucket) %>%
    mutate(
      tilt_change = territorial_share - lag(territorial_share),
      momentum_shift = case_when(
        tilt_change > threshold ~ "Gained Control",
        tilt_change < -threshold ~ "Lost Control",
        TRUE ~ "Stable"
      )
    )

  shifts <- team_a_timeline %>%
    filter(momentum_shift != "Stable") %>%
    select(time_bucket, territorial_share, tilt_change, momentum_shift)

  return(shifts)
}

# Calculate field tilt vs xG correlation
calculate_tilt_xg_correlation <- function(events, timeline) {

  # Calculate xG per time bucket
  xg_by_period <- events %>%
    filter(type.name == "Shot", !is.na(shot.statsbomb_xg)) %>%
    mutate(time_bucket = floor(minute / 5) * 5) %>%
    group_by(time_bucket, team.name) %>%
    summarise(xg = sum(shot.statsbomb_xg, na.rm = TRUE), .groups = "drop")

  # Join with territorial data
  merged <- timeline %>%
    left_join(xg_by_period, by = c("time_bucket", "team.name")) %>%
    mutate(xg = replace_na(xg, 0))

  # Correlation
  correlation <- cor(merged$territorial_share, merged$xg, use = "complete.obs")

  return(list(
    correlation = correlation,
    data = merged
  ))
}

# Visualize territorial control
visualize_territorial_control <- function(timeline, team_colors = NULL) {

  if (is.null(team_colors)) {
    teams <- unique(timeline$team.name)
    team_colors <- c("#C8102E", "#6CABDD")
    names(team_colors) <- teams
  }

  # Area chart
  p1 <- ggplot(timeline, aes(x = time_bucket, y = territorial_share,
                              fill = team.name)) +
    geom_area(position = "fill", alpha = 0.8) +
    scale_fill_manual(values = team_colors) +
    scale_y_continuous(labels = scales::percent) +
    labs(
      title = "Territorial Control Over Time",
      x = "Match Minute", y = "Share of Opponent Half Touches",
      fill = "Team"
    ) +
    theme_minimal()

  # Average position timeline
  p2 <- ggplot(timeline, aes(x = time_bucket, y = avg_x, color = team.name)) +
    geom_line(linewidth = 1.2) +
    geom_hline(yintercept = 60, linetype = "dashed", alpha = 0.5) +
    scale_color_manual(values = team_colors) +
    labs(
      title = "Average Touch Position Over Time",
      x = "Match Minute", y = "Average X Position",
      color = "Team"
    ) +
    theme_minimal()

  return(p1 / p2)
}

# Create touch heatmaps
create_touch_heatmaps <- function(events) {

  teams <- unique(events$team.name)

  plots <- map(teams, function(team) {
    team_events <- events %>% filter(team.name == team)

    ggplot(team_events, aes(x = location.x, y = location.y)) +
      annotate("rect", xmin = 0, xmax = 120, ymin = 0, ymax = 80,
               fill = "#228B22", alpha = 0.3) +
      stat_density_2d(aes(fill = ..level..), geom = "polygon",
                      alpha = 0.6, bins = 15) +
      scale_fill_viridis_c(option = "plasma") +
      labs(title = paste(team, "Touch Heatmap")) +
      coord_fixed(xlim = c(0, 120), ylim = c(0, 80)) +
      theme_void() +
      theme(legend.position = "none")
  })

  return(plots[[1]] + plots[[2]])
}

# Generate comprehensive report
generate_territorial_report <- function(events, metrics, timeline, momentum_shifts, correlation) {

  cat("\n", rep("=", 65), "\n", sep = "")
  cat("TERRITORIAL CONTROL DASHBOARD\n")
  cat(rep("=", 65), "\n\n", sep = "")

  cat("FIELD TILT SUMMARY:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  print(metrics)

  cat("\n\nMOMENTUM SHIFTS:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  if (nrow(momentum_shifts) > 0) {
    print(momentum_shifts)
  } else {
    cat("  No significant momentum shifts detected\n")
  }

  cat("\n\nFIELD TILT vs xG CORRELATION:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  cat("Correlation coefficient:", round(correlation$correlation, 3), "\n")
  interpretation <- case_when(
    abs(correlation$correlation) > 0.7 ~ "Strong",
    abs(correlation$correlation) > 0.4 ~ "Moderate",
    TRUE ~ "Weak"
  )
  cat("Interpretation:", interpretation, "relationship between territorial control and xG\n")
}

# Main execution
events <- get.matchFree(match_id)

metrics <- calculate_field_tilt_metrics(events)
timeline <- calculate_territorial_timeline(events, interval_minutes = 5)
momentum_shifts <- identify_momentum_shifts(timeline)
correlation <- calculate_tilt_xg_correlation(events, timeline)

generate_territorial_report(events, metrics, timeline, momentum_shifts, correlation)
print(visualize_territorial_control(timeline))
print(create_touch_heatmaps(events))
Exercise 26.3: Possession Value Model and Player Ranking System

Task: Build a complete possession value model from season data that assigns value to pitch locations based on scoring probability, then use it to calculate possession value added for each player action.

Requirements:

  • Create a grid-based possession value model using historical shot data
  • Calculate possession value for each grid cell based on shot probability and expected xG
  • Apply the model to calculate PV added for passes, carries, and dribbles
  • Rank players by total possession value added per 90 minutes
  • Visualize the possession value grid on a pitch
  • Identify which positions and action types generate the most value

possession_value_model
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mplsoccer import Pitch
from statsbombpy import sb

# ============================================
# POSSESSION VALUE MODEL & PLAYER RANKING
# ============================================

def build_pv_model(events, grid_size=10):
    """Build possession value model from event data."""
    events = events.copy()

    # Add possession sequences
    events = events.sort_values(['match_id', 'index'])
    events['new_poss'] = events.groupby('match_id')['team'].apply(
        lambda x: x != x.shift(1)
    ).values
    events['possession_id'] = events.groupby('match_id')['new_poss'].cumsum().values

    # Extract coordinates
    def get_coord(loc, idx):
        return loc[idx] if isinstance(loc, list) and len(loc) > idx else np.nan

    events['x'] = events['location'].apply(lambda loc: get_coord(loc, 0))
    events['y'] = events['location'].apply(lambda loc: get_coord(loc, 1))

    events = events.dropna(subset=['x', 'y'])

    # Grid locations
    events['grid_x'] = (events['x'] // grid_size) * grid_size
    events['grid_y'] = (events['y'] // grid_size) * grid_size

    # Calculate metrics per grid cell
    grid_metrics = (
        events.groupby(['match_id', 'possession_id', 'grid_x', 'grid_y'])
        .agg({
            'type': lambda x: (x == 'Shot').any(),
            'shot_statsbomb_xg': 'max',
            'shot_outcome': lambda x: (x == 'Goal').any()
        })
        .reset_index()
        .rename(columns={'type': 'ended_in_shot', 'shot_outcome': 'ended_in_goal'})
    )

    # Aggregate by grid cell
    pv_model = (
        grid_metrics.groupby(['grid_x', 'grid_y'])
        .agg({
            'ended_in_shot': ['count', 'mean'],
            'shot_statsbomb_xg': 'mean',
            'ended_in_goal': 'mean'
        })
        .reset_index()
    )
    pv_model.columns = ['grid_x', 'grid_y', 'n_possessions', 'shot_probability',
                        'avg_xg_when_shot', 'goal_probability']

    # Filter for minimum sample
    pv_model = pv_model[pv_model['n_possessions'] >= 20]

    # Calculate possession value
    pv_model['possession_value'] = (
        pv_model['shot_probability'] *
        pv_model['avg_xg_when_shot'].fillna(0.08)
    )

    return pv_model

def calculate_pv_added(events, pv_model, grid_size=10):
    """Calculate possession value added for each action."""
    events = events.copy()

    def get_coord(loc, idx):
        return loc[idx] if isinstance(loc, list) and len(loc) > idx else np.nan

    events['x'] = events['location'].apply(lambda loc: get_coord(loc, 0))
    events['y'] = events['location'].apply(lambda loc: get_coord(loc, 1))
    events['grid_x'] = (events['x'] // grid_size) * grid_size
    events['grid_y'] = (events['y'] // grid_size) * grid_size

    # End locations
    events['end_x'] = events['pass_end_location'].apply(lambda loc: get_coord(loc, 0))
    events['end_y'] = events['pass_end_location'].apply(lambda loc: get_coord(loc, 1))

    # Fill carry end locations
    if 'carry_end_location' in events.columns:
        events['end_x'] = events['end_x'].fillna(
            events['carry_end_location'].apply(lambda loc: get_coord(loc, 0))
        )
        events['end_y'] = events['end_y'].fillna(
            events['carry_end_location'].apply(lambda loc: get_coord(loc, 1))
        )

    events['end_x'] = events['end_x'].fillna(events['x'])
    events['end_y'] = events['end_y'].fillna(events['y'])

    events['end_grid_x'] = (events['end_x'] // grid_size) * grid_size
    events['end_grid_y'] = (events['end_y'] // grid_size) * grid_size

    # Merge with PV model
    events = events.merge(
        pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
            columns={'possession_value': 'start_pv'}
        ),
        on=['grid_x', 'grid_y'],
        how='left'
    )

    events = events.merge(
        pv_model[['grid_x', 'grid_y', 'possession_value']].rename(
            columns={'grid_x': 'end_grid_x', 'grid_y': 'end_grid_y',
                     'possession_value': 'end_pv'}
        ),
        on=['end_grid_x', 'end_grid_y'],
        how='left'
    )

    events['pv_added'] = events['end_pv'] - events['start_pv']

    # Nullify for failed actions
    failed_pass = (events['type'] == 'Pass') & events['pass_outcome'].notna()
    failed_dribble = (events['type'] == 'Dribble') & (events['dribble_outcome'] != 'Complete')
    events.loc[failed_pass | failed_dribble, 'pv_added'] = np.nan

    return events

def calculate_player_pv_rankings(events_with_pv):
    """Calculate player rankings by possession value added."""
    actions = events_with_pv[
        events_with_pv['type'].isin(['Pass', 'Carry', 'Dribble']) &
        events_with_pv['pv_added'].notna()
    ]

    # Approximate minutes
    player_minutes = (
        events_with_pv.groupby(['player', 'team', 'position'])
        .agg({'minute': ['min', 'max']})
        .reset_index()
    )
    player_minutes.columns = ['player', 'team', 'position', 'first_event', 'last_event']
    player_minutes['minutes_approx'] = player_minutes['last_event'] - player_minutes['first_event']
    player_minutes = player_minutes[player_minutes['minutes_approx'] > 10]

    # Aggregate by player
    player_pv = (
        actions.groupby(['player', 'team', 'position'])
        .agg({
            'id': 'count',
            'pv_added': ['sum', 'mean', lambda x: (x > 0).sum(), lambda x: (x < 0).sum()]
        })
        .reset_index()
    )
    player_pv.columns = ['player', 'team', 'position', 'total_actions',
                         'total_pv_added', 'avg_pv_per_action',
                         'positive_actions', 'negative_actions']

    player_pv = player_pv.merge(
        player_minutes[['player', 'team', 'position', 'minutes_approx']],
        on=['player', 'team', 'position'],
        how='left'
    )

    player_pv['pv_per_90'] = player_pv['total_pv_added'] / (player_pv['minutes_approx'] / 90)
    player_pv['positive_ratio'] = player_pv['positive_actions'] / player_pv['total_actions'] * 100

    return player_pv.sort_values('total_pv_added', ascending=False)

def analyze_pv_by_category(events_with_pv, player_pv):
    """Analyze PV by position and action type."""
    # By position
    position_pv = (
        player_pv[player_pv['position'].notna()]
        .groupby('position')
        .agg({
            'player': 'count',
            'total_pv_added': 'mean',
            'pv_per_90': 'mean',
            'positive_ratio': 'mean'
        })
        .reset_index()
        .rename(columns={'player': 'n_players', 'total_pv_added': 'avg_total_pv',
                         'pv_per_90': 'avg_pv_per_90', 'positive_ratio': 'avg_positive_ratio'})
        .sort_values('avg_pv_per_90', ascending=False)
    )

    # By action type
    actions = events_with_pv[
        events_with_pv['type'].isin(['Pass', 'Carry', 'Dribble']) &
        events_with_pv['pv_added'].notna()
    ]

    action_pv = (
        actions.groupby('type')
        .agg({
            'id': 'count',
            'pv_added': ['sum', 'mean', lambda x: (x > 0).mean() * 100]
        })
        .reset_index()
    )
    action_pv.columns = ['action_type', 'n_actions', 'total_pv', 'avg_pv', 'positive_pct']

    return {'by_position': position_pv, 'by_action': action_pv}

def visualize_pv_grid(pv_model):
    """Visualize possession value grid on pitch."""
    pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a1a1a')
    fig, ax = pitch.draw(figsize=(14, 9))

    scatter = ax.scatter(
        pv_model['grid_x'] + 5,
        pv_model['grid_y'] + 5,
        c=pv_model['possession_value'],
        s=300,
        cmap='plasma',
        alpha=0.8
    )

    plt.colorbar(scatter, ax=ax, label='Possession Value')
    ax.set_title('Possession Value Model\n(Based on shot probability × expected xG)',
                 fontsize=14, fontweight='bold', color='white')
    plt.tight_layout()
    plt.show()

def generate_pv_report(pv_model, player_pv, category_analysis):
    """Generate comprehensive PV report."""
    print("\n" + "=" * 70)
    print("POSSESSION VALUE ANALYSIS REPORT")
    print("=" * 70 + "\n")

    print("PV MODEL SUMMARY:")
    print("-" * 55)
    print(f"Grid cells with data: {len(pv_model)}")
    print(f"Max possession value: {pv_model['possession_value'].max():.4f}")
    print(f"Avg possession value: {pv_model['possession_value'].mean():.4f}")

    print("\n\nTOP 10 PLAYERS BY TOTAL PV ADDED:")
    print("-" * 55)
    print(player_pv.head(10)[['player', 'position', 'total_pv_added', 'pv_per_90', 'positive_ratio']].to_string(index=False))

    print("\n\nTOP 10 PLAYERS BY PV PER 90:")
    print("-" * 55)
    qualified = player_pv[player_pv['minutes_approx'] >= 45].nlargest(10, 'pv_per_90')
    print(qualified[['player', 'position', 'pv_per_90', 'total_actions', 'positive_ratio']].to_string(index=False))

    print("\n\nPV BY POSITION:")
    print("-" * 55)
    print(category_analysis['by_position'].to_string(index=False))

    print("\n\nPV BY ACTION TYPE:")
    print("-" * 55)
    print(category_analysis['by_action'].to_string(index=False))

# Main execution
matches = sb.matches(competition_id=11, season_id=90)
events = pd.concat([sb.events(match_id=m) for m in matches['match_id'].head(20)])

pv_model = build_pv_model(events, grid_size=10)
events_with_pv = calculate_pv_added(events, pv_model)
player_pv = calculate_player_pv_rankings(events_with_pv)
category_analysis = analyze_pv_by_category(events_with_pv, player_pv)

generate_pv_report(pv_model, player_pv, category_analysis)
visualize_pv_grid(pv_model)
library(tidyverse)
library(StatsBombR)
library(viridis)

# ============================================
# POSSESSION VALUE MODEL & PLAYER RANKING
# ============================================

# Build possession value model
build_pv_model <- function(events, grid_size = 10) {

  # Add possession sequences
  events <- events %>%
    arrange(match_id, index) %>%
    group_by(match_id) %>%
    mutate(
      new_poss = team.name != lag(team.name) | is.na(lag(team.name)),
      possession_id = cumsum(new_poss)
    ) %>%
    ungroup()

  # Create grid locations
  events <- events %>%
    filter(!is.na(location.x), !is.na(location.y)) %>%
    mutate(
      grid_x = floor(location.x / grid_size) * grid_size,
      grid_y = floor(location.y / grid_size) * grid_size
    )

  # For each grid cell, calculate metrics
  grid_metrics <- events %>%
    group_by(match_id, possession_id, grid_x, grid_y) %>%
    summarise(
      ended_in_shot = any(type.name == "Shot"),
      max_xg = max(shot.statsbomb_xg, na.rm = TRUE),
      ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
      .groups = "drop"
    ) %>%
    mutate(max_xg = ifelse(is.infinite(max_xg), NA, max_xg))

  # Aggregate by grid cell
  pv_model <- grid_metrics %>%
    group_by(grid_x, grid_y) %>%
    summarise(
      n_possessions = n(),
      shot_probability = mean(ended_in_shot, na.rm = TRUE),
      avg_xg_when_shot = mean(max_xg, na.rm = TRUE),
      goal_probability = mean(ended_in_goal, na.rm = TRUE),
      .groups = "drop"
    ) %>%
    filter(n_possessions >= 20) %>%  # Minimum sample size
    mutate(
      # Possession value = probability of shot * expected xG
      possession_value = shot_probability * coalesce(avg_xg_when_shot, 0.08),
      # Alternative: direct goal probability
      pv_goal_based = goal_probability
    )

  return(pv_model)
}

# Calculate possession value added for each action
calculate_pv_added <- function(events, pv_model, grid_size = 10) {

  events <- events %>%
    filter(!is.na(location.x)) %>%
    mutate(
      grid_x = floor(location.x / grid_size) * grid_size,
      grid_y = floor(location.y / grid_size) * grid_size,
      end_x = coalesce(pass.end_location.x, carry.end_location.x, location.x),
      end_y = coalesce(pass.end_location.y, carry.end_location.y, location.y),
      end_grid_x = floor(end_x / grid_size) * grid_size,
      end_grid_y = floor(end_y / grid_size) * grid_size
    )

  # Join with PV model
  events <- events %>%
    left_join(
      pv_model %>% select(grid_x, grid_y, start_pv = possession_value),
      by = c("grid_x", "grid_y")
    ) %>%
    left_join(
      pv_model %>% select(end_grid_x = grid_x, end_grid_y = grid_y, end_pv = possession_value),
      by = c("end_grid_x", "end_grid_y")
    ) %>%
    mutate(
      # PV added = end value - start value
      pv_added = end_pv - start_pv,
      # Only for successful actions
      pv_added = case_when(
        type.name == "Pass" & !is.na(pass.outcome.name) ~ NA_real_,  # Failed pass
        type.name == "Dribble" & dribble.outcome.name != "Complete" ~ NA_real_,
        TRUE ~ pv_added
      )
    )

  return(events)
}

# Calculate player rankings
calculate_player_pv_rankings <- function(events_with_pv) {

  # Filter to relevant actions
  actions <- events_with_pv %>%
    filter(type.name %in% c("Pass", "Carry", "Dribble")) %>%
    filter(!is.na(pv_added))

  # Calculate minutes played (approximate)
  player_minutes <- events_with_pv %>%
    group_by(player.name, team.name, position.name) %>%
    summarise(
      first_event = min(minute),
      last_event = max(minute),
      minutes_approx = last_event - first_event,
      .groups = "drop"
    ) %>%
    filter(minutes_approx > 10)

  # Aggregate by player
  player_pv <- actions %>%
    group_by(player.name, team.name, position.name) %>%
    summarise(
      total_actions = n(),
      total_pv_added = sum(pv_added, na.rm = TRUE),
      avg_pv_per_action = mean(pv_added, na.rm = TRUE),
      positive_actions = sum(pv_added > 0, na.rm = TRUE),
      negative_actions = sum(pv_added < 0, na.rm = TRUE),

      # Breakdown by action type
      pass_pv = sum(pv_added[type.name == "Pass"], na.rm = TRUE),
      carry_pv = sum(pv_added[type.name == "Carry"], na.rm = TRUE),
      dribble_pv = sum(pv_added[type.name == "Dribble"], na.rm = TRUE),

      .groups = "drop"
    ) %>%
    left_join(player_minutes, by = c("player.name", "team.name", "position.name")) %>%
    mutate(
      pv_per_90 = total_pv_added / (minutes_approx / 90),
      positive_ratio = positive_actions / total_actions * 100
    ) %>%
    arrange(desc(total_pv_added))

  return(player_pv)
}

# Analyze PV by position and action type
analyze_pv_by_category <- function(events_with_pv, player_pv) {

  # By position
  position_pv <- player_pv %>%
    filter(!is.na(position.name)) %>%
    group_by(position.name) %>%
    summarise(
      n_players = n(),
      avg_total_pv = mean(total_pv_added, na.rm = TRUE),
      avg_pv_per_90 = mean(pv_per_90, na.rm = TRUE),
      avg_positive_ratio = mean(positive_ratio, na.rm = TRUE),
      .groups = "drop"
    ) %>%
    arrange(desc(avg_pv_per_90))

  # By action type
  action_pv <- events_with_pv %>%
    filter(type.name %in% c("Pass", "Carry", "Dribble"), !is.na(pv_added)) %>%
    group_by(type.name) %>%
    summarise(
      n_actions = n(),
      total_pv = sum(pv_added, na.rm = TRUE),
      avg_pv = mean(pv_added, na.rm = TRUE),
      positive_pct = mean(pv_added > 0, na.rm = TRUE) * 100,
      .groups = "drop"
    )

  return(list(
    by_position = position_pv,
    by_action = action_pv
  ))
}

# Visualize PV model grid
visualize_pv_grid <- function(pv_model) {

  ggplot(pv_model, aes(x = grid_x + 5, y = grid_y + 5, fill = possession_value)) +
    # Pitch outline
    annotate("rect", xmin = 0, xmax = 120, ymin = 0, ymax = 80,
             fill = NA, color = "white", size = 1) +
    # Center line
    annotate("segment", x = 60, xend = 60, y = 0, yend = 80, color = "white") +
    # Penalty areas
    annotate("rect", xmin = 0, xmax = 18, ymin = 18, ymax = 62,
             fill = NA, color = "white") +
    annotate("rect", xmin = 102, xmax = 120, ymin = 18, ymax = 62,
             fill = NA, color = "white") +
    # PV tiles
    geom_tile(alpha = 0.8) +
    scale_fill_viridis(option = "plasma", name = "Possession\nValue") +
    labs(
      title = "Possession Value Model",
      subtitle = "Value based on probability of shot × expected xG from location"
    ) +
    coord_fixed(xlim = c(0, 120), ylim = c(0, 80)) +
    theme_void() +
    theme(
      plot.title = element_text(hjust = 0.5, face = "bold"),
      plot.subtitle = element_text(hjust = 0.5),
      legend.position = "right"
    )
}

# Generate comprehensive report
generate_pv_report <- function(pv_model, player_pv, category_analysis) {

  cat("\n", rep("=", 70), "\n", sep = "")
  cat("POSSESSION VALUE ANALYSIS REPORT\n")
  cat(rep("=", 70), "\n\n", sep = "")

  cat("PV MODEL SUMMARY:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  cat("Grid cells with data:", nrow(pv_model), "\n")
  cat("Max possession value:", round(max(pv_model$possession_value), 4), "\n")
  cat("Avg possession value:", round(mean(pv_model$possession_value), 4), "\n")

  cat("\n\nTOP 10 PLAYERS BY TOTAL PV ADDED:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  player_pv %>%
    head(10) %>%
    select(player.name, position.name, total_pv_added, pv_per_90, positive_ratio) %>%
    mutate(across(where(is.numeric), ~round(., 3))) %>%
    print()

  cat("\n\nTOP 10 PLAYERS BY PV PER 90:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  player_pv %>%
    filter(minutes_approx >= 45) %>%
    arrange(desc(pv_per_90)) %>%
    head(10) %>%
    select(player.name, position.name, pv_per_90, total_actions, positive_ratio) %>%
    mutate(across(where(is.numeric), ~round(., 3))) %>%
    print()

  cat("\n\nPV BY POSITION:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  print(category_analysis$by_position)

  cat("\n\nPV BY ACTION TYPE:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  print(category_analysis$by_action)
}

# Main execution
# Load season data
comps <- FreeCompetitions()
matches <- FreeMatches(Competitions = comps %>% filter(competition_id == 11))
events <- free_allevents(MatchesDF = matches, Atea = TRUE)
events <- allclean(events)

# Build model and calculate
pv_model <- build_pv_model(events, grid_size = 10)
events_with_pv <- calculate_pv_added(events, pv_model)
player_pv <- calculate_player_pv_rankings(events_with_pv)
category_analysis <- analyze_pv_by_category(events_with_pv, player_pv)

# Generate outputs
generate_pv_report(pv_model, player_pv, category_analysis)
print(visualize_pv_grid(pv_model))

Summary

Key Takeaways
  • Possession percentage alone is misleading - quality metrics like xG per possession and box entry rate matter more
  • Field tilt measures territorial dominance and can differ significantly from raw possession
  • Ball retention varies by zone, with retention under pressure being a key differentiator
  • Progressive sequences move the ball meaningfully toward goal and indicate attacking intent
  • Possession value models assign value to locations based on scoring probability, enabling action-level valuation