Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)

Transitions - the moments when possession changes hands - are among the most dangerous phases of play. Counter-attacks from turnovers create high-quality scoring chances, while defensive transitions can leave teams exposed. Mastering transition analysis is essential for modern tactical understanding.

Understanding Transitions

Transitions occur whenever possession changes. We can classify them by how they start, where they occur, and how they develop.

Counter-Attacks

Fast attacks after winning the ball, exploiting disorganized defenses.

  • Direct counter (immediate attack)
  • Fast break (quick but controlled)
  • Second phase counter
Defensive Transitions

The phase immediately after losing possession.

  • Counterpress (immediate regain attempt)
  • Recovery run
  • Tactical foul
Set-Play Transitions

Transitions from dead-ball situations.

  • Quick free kicks
  • Throw-in attacks
  • Goal kick counters
transition_identification.py
import pandas as pd
import numpy as np

def identify_transitions(events):
    """
    Identify and classify transition sequences.
    """
    events = events.sort_values('index').copy()

    # Find possession changes
    events['possession_change'] = (
        (events['team'] != events['team'].shift(1)) &
        (events['team'].shift(1).notna())
    )
    events['transition_id'] = events['possession_change'].cumsum()

    # Get transition start points
    transitions = events[events['possession_change']].copy()

    # Previous event info
    events_shifted = events.shift(1)

    transitions['prev_type'] = events_shifted.loc[transitions.index, 'type']
    transitions['prev_outcome'] = events_shifted.loc[transitions.index, 'pass_outcome']
    transitions['prev_x'] = events_shifted.loc[transitions.index, 'location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    # Classify turnover type
    def classify_turnover(row):
        if row['prev_type'] == 'Pass' and pd.notna(row['prev_outcome']):
            return 'Interception/Failed Pass'
        elif row['prev_type'] == 'Dribble':
            return 'Tackle/Failed Dribble'
        elif row['prev_type'] == 'Shot':
            return 'Shot Recovery'
        elif row['prev_type'] == 'Clearance':
            return 'Clearance Won'
        else:
            return 'Other'

    transitions['turnover_type'] = transitions.apply(classify_turnover, axis=1)

    # Turnover zone
    transitions['turnover_zone'] = pd.cut(
        transitions['prev_x'],
        bins=[0, 40, 80, 120],
        labels=['Deep Turnover', 'Middle Turnover', 'High Turnover']
    )

    # Start location
    transitions['start_x'] = transitions['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    return {
        'events': events,
        'transitions': transitions
    }

transition_data = identify_transitions(events)

def analyze_transition_outcomes(events, transitions):
    """
    Analyze outcomes of transition sequences.
    """
    results = []

    for trans_id in events['transition_id'].unique():
        seq = events[events['transition_id'] == trans_id]

        if len(seq) < 2:
            continue

        # Extract locations
        seq_x = seq['location'].apply(
            lambda loc: loc[0] if isinstance(loc, list) else np.nan
        )
        seq_y = seq['location'].apply(
            lambda loc: loc[1] if isinstance(loc, list) else np.nan
        )

        # Calculate timing
        start_time = seq['minute'].iloc[0] * 60 + seq['second'].iloc[0]
        end_time = seq['minute'].iloc[-1] * 60 + seq['second'].iloc[-1]
        duration = end_time - start_time

        metrics = {
            'transition_id': trans_id,
            'team': seq['team'].iloc[0],
            'start_x': seq_x.iloc[0],
            'max_x': seq_x.max(),
            'sequence_length': len(seq),
            'duration_seconds': duration,

            # Outcomes
            'ended_in_shot': (seq['type'] == 'Shot').any(),
            'xg': seq['shot_statsbomb_xg'].sum(),
            'ended_in_goal': (
                (seq['type'] == 'Shot') &
                (seq['shot_outcome'] == 'Goal')
            ).any(),
            'entered_box': (
                (seq_x > 102) & (seq_y > 18) & (seq_y < 62)
            ).any(),

            # Speed
            'x_progression': seq_x.max() - seq_x.iloc[0],
        }

        metrics['progression_speed'] = (
            metrics['x_progression'] / max(duration, 1)
        )

        results.append(metrics)

    results_df = pd.DataFrame(results)

    # Merge with transition info
    trans_info = transitions[['transition_id', 'turnover_type', 'turnover_zone']]
    results_df = results_df.merge(trans_info, on='transition_id', how='left')

    return results_df

transition_outcomes = analyze_transition_outcomes(
    transition_data['events'],
    transition_data['transitions']
)

# Summary by zone
zone_summary = (
    transition_outcomes
    .groupby('turnover_zone')
    .agg({
        'transition_id': 'count',
        'ended_in_shot': 'mean',
        'xg': 'mean',
        'ended_in_goal': 'mean',
        'progression_speed': 'mean'
    })
    .reset_index()
)
zone_summary.columns = ['zone', 'count', 'shot_rate', 'avg_xg', 'goal_rate', 'avg_speed']
zone_summary['shot_rate'] *= 100
zone_summary['goal_rate'] *= 100

print(zone_summary)
library(tidyverse)
library(StatsBombR)

# Identify transition sequences
identify_transitions <- function(events) {

  events_sorted <- events %>%
    arrange(index)

  # Find possession changes
  events_sorted <- events_sorted %>%
    mutate(
      possession_change = team.name != lag(team.name) & !is.na(lag(team.name)),
      transition_id = cumsum(possession_change)
    )

  # Classify each transition
  transitions <- events_sorted %>%
    filter(possession_change) %>%
    mutate(
      # How possession was won
      turnover_type = case_when(
        lag(type.name) == "Pass" & !is.na(lag(pass.outcome.name)) ~ "Interception/Failed Pass",
        lag(type.name) == "Dribble" & lag(dribble.outcome.name) != "Complete" ~ "Tackle/Failed Dribble",
        lag(type.name) == "Shot" ~ "Shot Recovery",
        lag(type.name) == "Clearance" ~ "Clearance Won",
        lag(type.name) %in% c("Foul Won", "Foul Committed") ~ "Foul",
        TRUE ~ "Other"
      ),

      # Where possession was won
      turnover_zone = case_when(
        lag(location.x) > 80 ~ "High Turnover",
        lag(location.x) > 40 ~ "Middle Turnover",
        TRUE ~ "Deep Turnover"
      ),

      # Initial action after winning
      first_action = type.name,
      start_x = location.x,
      start_y = location.y
    )

  return(list(
    events = events_sorted,
    transitions = transitions
  ))
}

transition_data <- identify_transitions(events)

# Analyze transition outcomes
analyze_transition_outcomes <- function(events, transitions) {

  # For each transition, track the sequence
  transition_sequences <- events %>%
    group_by(transition_id) %>%
    summarise(
      team = first(team.name),
      start_x = first(location.x),
      start_y = first(location.y),
      max_x = max(location.x, na.rm = TRUE),
      sequence_length = n(),
      duration_seconds = max(minute * 60 + second) - min(minute * 60 + second),

      # Outcomes
      ended_in_shot = any(type.name == "Shot"),
      xg = sum(shot.statsbomb_xg, na.rm = TRUE),
      ended_in_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),
      entered_box = any(location.x > 102 & location.y > 18 & location.y < 62),

      # Speed metrics
      x_progression = max_x - first(location.x),
      progression_speed = x_progression / max(duration_seconds, 1),

      .groups = "drop"
    ) %>%
    filter(sequence_length >= 2)  # At least 2 actions

  # Join with transition details
  transition_sequences <- transition_sequences %>%
    left_join(
      transitions %>% select(transition_id, turnover_type, turnover_zone),
      by = "transition_id"
    )

  return(transition_sequences)
}

transition_outcomes <- analyze_transition_outcomes(
  transition_data$events,
  transition_data$transitions
)

# Summarize by turnover zone
zone_summary <- transition_outcomes %>%
  group_by(turnover_zone) %>%
  summarise(
    n = n(),
    shot_rate = mean(ended_in_shot) * 100,
    avg_xg = mean(xg),
    goal_rate = mean(ended_in_goal) * 100,
    avg_speed = mean(progression_speed),
    .groups = "drop"
  )

print(zone_summary)

Counter-Attack Analysis

Counter-attacks are the most dangerous transitions. We analyze their speed, directness, and effectiveness to understand what makes successful counters.

counter_attacks.py
def identify_counter_attacks(transition_outcomes):
    """
    Identify and classify counter-attacks.
    """
    # Counter-attack criteria
    counters = transition_outcomes[
        (transition_outcomes['start_x'] < 60) &  # Won in own half
        (transition_outcomes['duration_seconds'] <= 15) &  # Fast
        (transition_outcomes['x_progression'] >= 25)  # Significant progression
    ].copy()

    # Classify counter type
    counters['counter_type'] = pd.cut(
        counters['duration_seconds'],
        bins=[0, 6, 10, 15],
        labels=['Direct Counter', 'Fast Break', 'Quick Attack']
    )

    counters['success'] = (
        counters['ended_in_shot'] | counters['entered_box']
    )

    return counters

counter_attacks = identify_counter_attacks(transition_outcomes)

# Team effectiveness
counter_effectiveness = (
    counter_attacks
    .groupby('team')
    .agg({
        'transition_id': 'count',
        'ended_in_shot': 'sum',
        'xg': 'sum',
        'ended_in_goal': 'sum',
        'progression_speed': 'mean'
    })
    .reset_index()
)
counter_effectiveness.columns = [
    'team', 'total_counters', 'shots', 'xg', 'goals', 'avg_speed'
]
counter_effectiveness['shot_rate'] = (
    counter_effectiveness['shots'] / counter_effectiveness['total_counters'] * 100
)

print(counter_effectiveness.sort_values('xg', ascending=False))

def plot_counter_speed_vs_xg(counters):
    """
    Visualize relationship between counter speed and xG.
    """
    fig, ax = plt.subplots(figsize=(10, 6))

    # Color by goal scored
    colors = counters['ended_in_goal'].map({True: 'red', False: 'steelblue'})

    scatter = ax.scatter(
        counters['duration_seconds'],
        counters['xg'],
        c=colors,
        s=counters['x_progression'] * 2,
        alpha=0.6
    )

    ax.set_xlabel('Duration (seconds)')
    ax.set_ylabel('xG Generated')
    ax.set_title('Counter-Attack Speed vs. xG')

    # Legend
    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], marker='o', color='w', markerfacecolor='red',
               markersize=10, label='Goal'),
        Line2D([0], [0], marker='o', color='w', markerfacecolor='steelblue',
               markersize=10, label='No Goal')
    ]
    ax.legend(handles=legend_elements)

    plt.tight_layout()
    plt.show()

plot_counter_speed_vs_xg(counter_attacks)
# Define and analyze counter-attacks
identify_counter_attacks <- function(transition_outcomes) {

  # Counter-attack criteria:
  # - Won ball in own half
  # - Reached final third or shot within 15 seconds
  # - Minimum 25 meters progression

  counter_attacks <- transition_outcomes %>%
    filter(
      start_x < 60,  # Won ball in own/middle third
      duration_seconds <= 15,  # Fast attack
      x_progression >= 25  # Significant progression
    ) %>%
    mutate(
      counter_type = case_when(
        duration_seconds <= 6 ~ "Direct Counter",
        duration_seconds <= 10 ~ "Fast Break",
        TRUE ~ "Quick Attack"
      ),
      success = ended_in_shot | entered_box
    )

  return(counter_attacks)
}

counter_attacks <- identify_counter_attacks(transition_outcomes)

# Counter-attack effectiveness by team
counter_effectiveness <- counter_attacks %>%
  group_by(team) %>%
  summarise(
    total_counters = n(),
    direct_counters = sum(counter_type == "Direct Counter"),
    shots_from_counters = sum(ended_in_shot),
    xg_from_counters = sum(xg),
    goals_from_counters = sum(ended_in_goal),
    counter_shot_rate = mean(ended_in_shot) * 100,
    avg_counter_speed = mean(progression_speed),
    .groups = "drop"
  ) %>%
  arrange(desc(xg_from_counters))

print(counter_effectiveness)

# Visualize counter-attack paths
visualize_counters <- function(events, counter_ids) {

  counter_events <- events %>%
    filter(transition_id %in% counter_ids) %>%
    filter(type.name %in% c("Pass", "Carry", "Shot"))

  ggplot(counter_events) +
    annotate_pitch(dimensions = pitch_statsbomb) +
    geom_segment(
      aes(
        x = location.x, y = location.y,
        xend = coalesce(pass.end_location.x, carry.end_location.x, location.x),
        yend = coalesce(pass.end_location.y, carry.end_location.y, location.y),
        color = factor(transition_id)
      ),
      arrow = arrow(length = unit(0.1, "cm")),
      alpha = 0.7
    ) +
    geom_point(
      data = counter_events %>% filter(type.name == "Shot"),
      aes(x = location.x, y = location.y),
      color = "red", size = 4, shape = 17
    ) +
    labs(title = "Counter-Attack Paths") +
    theme_pitch() +
    theme(legend.position = "none") +
    coord_flip()
}

# Visualize top 5 highest xG counters
top_counters <- counter_attacks %>%
  slice_max(xg, n = 5) %>%
  pull(transition_id)

visualize_counters(transition_data$events, top_counters)

# Counter speed analysis
ggplot(counter_attacks, aes(x = duration_seconds, y = xg)) +
  geom_point(aes(color = ended_in_goal, size = x_progression), alpha = 0.6) +
  geom_smooth(method = "loess", se = FALSE, color = "black") +
  scale_color_manual(values = c("FALSE" = "steelblue", "TRUE" = "red")) +
  labs(
    title = "Counter-Attack Speed vs. xG Generated",
    x = "Duration (seconds)", y = "xG",
    color = "Goal Scored", size = "Progression (m)"
  ) +
  theme_minimal()

Defensive Transition Analysis

Analyzing how teams defend during transitions reveals vulnerabilities and the effectiveness of counterpressing strategies.

defensive_transitions.py
def analyze_defensive_transitions(events, team_name):
    """
    Analyze vulnerability during defensive transitions.
    """
    # Find possession losses
    loss_conditions = (
        ((events['team'] == team_name) & (events['type'] == 'Pass') &
         (events['pass_outcome'].notna())) |
        ((events['team'] == team_name) & (events['type'] == 'Dribble') &
         (events['dribble_outcome'] != 'Complete')) |
        ((events['team'] == team_name) & (events['type'] == 'Dispossessed'))
    )

    losses = events[loss_conditions].copy()
    losses['loss_x'] = losses['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )

    losses['loss_zone'] = pd.cut(
        losses['loss_x'],
        bins=[0, 40, 80, 120],
        labels=['Defensive Third Loss', 'Middle Third Loss', 'Final Third Loss']
    )

    # Analyze what happened after each loss
    opp_events = events[events['team'] != team_name]

    # Group opponent sequences after losses
    def analyze_post_loss(loss_row, all_events, opp_events):
        # Find subsequent opponent possession
        subsequent = opp_events[opp_events['index'] > loss_row['index']]

        if len(subsequent) == 0:
            return pd.Series({
                'opp_shot': False, 'opp_xg': 0, 'opp_goal': False
            })

        # Look at next possession
        poss_id = subsequent['possession_id'].iloc[0]
        poss = subsequent[subsequent['possession_id'] == poss_id]

        return pd.Series({
            'opp_shot': (poss['type'] == 'Shot').any(),
            'opp_xg': poss['shot_statsbomb_xg'].sum(),
            'opp_goal': ((poss['type'] == 'Shot') &
                         (poss['shot_outcome'] == 'Goal')).any()
        })

    # Calculate vulnerability by zone (simplified)
    zone_vulnerability = (
        losses
        .groupby('loss_zone')
        .size()
        .reset_index(name='losses')
    )

    return {
        'losses': losses,
        'zone_vulnerability': zone_vulnerability
    }

defensive_trans = analyze_defensive_transitions(events, 'Liverpool')

print("Losses by Zone:")
print(defensive_trans['zone_vulnerability'])

def analyze_counterpress(events, team_name):
    """
    Analyze counterpressing effectiveness.
    """
    # Use proper DataFrame column access with fillna for missing values
    counterpress_col = events['counterpress'].fillna(False) if 'counterpress' in events.columns else pd.Series([False] * len(events))
    counterpresses = events[
        (events['team'] == team_name) &
        (counterpress_col == True)
    ]

    if len(counterpresses) == 0:
        return None

    # Check if next action is by same team (successful regain)
    events_sorted = events.sort_values('index')
    counterpress_indices = counterpresses.index

    regains = 0
    for idx in counterpress_indices:
        next_idx = events_sorted.index.get_loc(idx) + 1
        if next_idx < len(events_sorted):
            next_event = events_sorted.iloc[next_idx]
            if next_event['team'] == team_name:
                regains += 1

    return {
        'attempts': len(counterpresses),
        'regains': regains,
        'regain_rate': regains / len(counterpresses) * 100
    }

counterpress_stats = analyze_counterpress(events, 'Liverpool')
if counterpress_stats:
    print(f"\nCounterpress Stats:")
    print(f"Attempts: {counterpress_stats['attempts']}")
    print(f"Regain Rate: {counterpress_stats['regain_rate']:.1f}%")
# Analyze defensive transition vulnerability
analyze_defensive_transitions <- function(events, team_name) {

  # Find possessions lost by team
  possession_losses <- events %>%
    filter(team.name == team_name) %>%
    filter(
      (type.name == "Pass" & !is.na(pass.outcome.name)) |
      (type.name == "Dribble" & dribble.outcome.name != "Complete") |
      type.name == "Dispossessed"
    ) %>%
    mutate(
      loss_x = location.x,
      loss_y = location.y,
      loss_zone = case_when(
        loss_x > 80 ~ "Final Third Loss",
        loss_x > 40 ~ "Middle Third Loss",
        TRUE ~ "Defensive Third Loss"
      )
    )

  # What happened after each loss
  loss_outcomes <- possession_losses %>%
    mutate(loss_index = index) %>%
    left_join(
      events %>%
        filter(team.name != team_name) %>%
        group_by(possession_id) %>%
        summarise(
          opp_shot = any(type.name == "Shot"),
          opp_xg = sum(shot.statsbomb_xg, na.rm = TRUE),
          opp_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),
          opp_max_x = max(120 - location.x, na.rm = TRUE),  # Convert to defensive perspective
          sequence_duration = max(minute * 60 + second) - min(minute * 60 + second),
          .groups = "drop"
        ),
      by = "possession_id"
    )

  # Vulnerability by loss zone
  zone_vulnerability <- loss_outcomes %>%
    group_by(loss_zone) %>%
    summarise(
      losses = n(),
      conceded_shots = sum(opp_shot, na.rm = TRUE),
      conceded_xg = sum(opp_xg, na.rm = TRUE),
      conceded_goals = sum(opp_goal, na.rm = TRUE),
      shot_conceded_rate = mean(opp_shot, na.rm = TRUE) * 100,
      avg_xg_conceded = mean(opp_xg, na.rm = TRUE),
      .groups = "drop"
    )

  # Recovery time analysis
  recovery_analysis <- loss_outcomes %>%
    mutate(
      quick_recovery = coalesce(sequence_duration, 999) <= 5,
      recovered = !opp_shot
    ) %>%
    summarise(
      total_losses = n(),
      quick_recovery_rate = mean(quick_recovery, na.rm = TRUE) * 100,
      overall_recovery_rate = mean(recovered, na.rm = TRUE) * 100
    )

  return(list(
    losses = loss_outcomes,
    zone_vulnerability = zone_vulnerability,
    recovery = recovery_analysis
  ))
}

defensive_transitions <- analyze_defensive_transitions(events, "Liverpool")

# Visualize vulnerability
ggplot(defensive_transitions$zone_vulnerability,
       aes(x = loss_zone, y = shot_conceded_rate, fill = loss_zone)) +
  geom_bar(stat = "identity") +
  geom_text(aes(label = paste0(round(shot_conceded_rate, 1), "%")),
            vjust = -0.5) +
  scale_fill_brewer(palette = "Reds") +
  labs(
    title = "Defensive Transition Vulnerability by Zone",
    subtitle = "Shot conceded rate after losing possession",
    x = "", y = "Shot Conceded Rate (%)"
  ) +
  theme_minimal() +
  theme(legend.position = "none")

# Counterpressing effectiveness
counterpress_effectiveness <- events %>%
  filter(team.name == "Liverpool", counterpress == TRUE) %>%
  summarise(
    counterpress_attempts = n(),
    regains = sum(
      lead(team.name) == "Liverpool",
      na.rm = TRUE
    ),
    regain_rate = regains / counterpress_attempts * 100
  )

print(counterpress_effectiveness)

Building a Transition Danger Model

We can build a model to predict how dangerous a transition will be based on where and how possession was won.

transition_danger_model.py
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.preprocessing import LabelEncoder

def build_transition_model(transition_data):
    """
    Build model to predict transition danger.
    """
    # Prepare data
    model_data = transition_data.dropna(subset=['turnover_zone']).copy()

    # Target: high danger transition
    model_data['high_danger'] = (
        (model_data['xg'] > 0.1) | (model_data['ended_in_goal'])
    ).astype(int)

    # Features
    model_data['start_x_normalized'] = model_data['start_x'] / 120
    model_data['sequence_length_capped'] = model_data['sequence_length'].clip(upper=15)

    # Encode categoricals
    le_zone = LabelEncoder()
    le_type = LabelEncoder()

    model_data['zone_encoded'] = le_zone.fit_transform(
        model_data['turnover_zone'].astype(str)
    )
    model_data['type_encoded'] = le_type.fit_transform(
        model_data['turnover_type'].astype(str)
    )

    # Feature matrix
    features = ['start_x_normalized', 'zone_encoded', 'type_encoded',
                'sequence_length_capped']

    X = model_data[features].fillna(0)
    y = model_data['high_danger']

    # Split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=42, stratify=y
    )

    # Train model
    model = GradientBoostingClassifier(
        n_estimators=100,
        max_depth=4,
        random_state=42
    )
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    print("Classification Report:")
    print(classification_report(y_test, y_pred))
    print(f"ROC-AUC: {roc_auc_score(y_test, y_pred_proba):.3f}")

    # Feature importance
    importance = pd.DataFrame({
        'feature': features,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)

    return {
        'model': model,
        'importance': importance,
        'encoders': {'zone': le_zone, 'type': le_type}
    }

transition_model = build_transition_model(transition_outcomes)

# Plot importance
fig, ax = plt.subplots(figsize=(8, 5))
ax.barh(transition_model['importance']['feature'],
        transition_model['importance']['importance'],
        color='coral')
ax.set_xlabel('Importance')
ax.set_title('Transition Danger Model - Feature Importance')
plt.tight_layout()
plt.show()
library(randomForest)
library(caret)

# Build transition danger model
build_transition_model <- function(transition_data) {

  # Prepare features
  model_data <- transition_data %>%
    filter(!is.na(turnover_zone)) %>%
    mutate(
      # Target: high danger transition
      high_danger = as.factor(ifelse(xg > 0.1 | ended_in_goal, "Yes", "No")),

      # Features
      start_x_normalized = start_x / 120,
      turnover_type_encoded = as.factor(turnover_type),
      turnover_zone_encoded = as.factor(turnover_zone),
      sequence_length_capped = pmin(sequence_length, 15)
    ) %>%
    filter(!is.na(high_danger))

  # Split data
  set.seed(42)
  train_idx <- createDataPartition(model_data$high_danger, p = 0.7, list = FALSE)
  train <- model_data[train_idx, ]
  test <- model_data[-train_idx, ]

  # Train model
  danger_model <- randomForest(
    high_danger ~ start_x_normalized + turnover_zone_encoded +
                  turnover_type_encoded + sequence_length_capped,
    data = train,
    ntree = 200,
    mtry = 2
  )

  # Evaluate
  predictions <- predict(danger_model, test)
  conf_matrix <- confusionMatrix(predictions, test$high_danger)

  print(conf_matrix)

  # Feature importance
  importance_df <- importance(danger_model) %>%
    as.data.frame() %>%
    rownames_to_column("feature") %>%
    arrange(desc(MeanDecreaseGini))

  return(list(
    model = danger_model,
    importance = importance_df,
    accuracy = conf_matrix$overall['Accuracy']
  ))
}

transition_model <- build_transition_model(transition_outcomes)

# Visualize feature importance
ggplot(transition_model$importance,
       aes(x = reorder(feature, MeanDecreaseGini), y = MeanDecreaseGini)) +
  geom_bar(stat = "identity", fill = "coral") +
  coord_flip() +
  labs(
    title = "Transition Danger Model - Feature Importance",
    x = "", y = "Importance"
  ) +
  theme_minimal()

# Apply model to predict danger of new transitions
predict_transition_danger <- function(model, new_transition) {
  prediction <- predict(model, new_transition, type = "prob")
  return(prediction[, "Yes"])
}

Player Transition Contributions

Individual players contribute differently to transitions. Some excel at winning the ball, others at carrying forward, and some at finishing counter-attacks.

player_transitions.py
def analyze_player_transitions(events, team_name):
    """
    Analyze individual player contributions to transitions.
    """
    team_events = events[events['team'] == team_name]

    # Ball winners
    ball_win_types = ['Interception', 'Duel', 'Tackle', 'Ball Recovery']
    ball_winners = (
        team_events[team_events['type'].isin(ball_win_types)]
        .groupby(['player', 'position'])
        .size()
        .reset_index(name='ball_wins')
    )

    # Progressive carriers
    carries = team_events[team_events['type'].isin(['Carry', 'Dribble'])].copy()
    carries['start_x'] = carries['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    carries['end_x'] = carries['carry_end_location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    carries['progression'] = carries['end_x'] - carries['start_x']

    progressive_carriers = (
        carries[carries['progression'] > 10]
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'progression': 'mean'
        })
        .reset_index()
        .rename(columns={'id': 'progressive_carries', 'progression': 'avg_carry_distance'})
    )

    # Progressive passers
    passes = team_events[team_events['type'] == 'Pass'].copy()
    passes['start_x'] = passes['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    passes['end_x'] = passes['pass_end_location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    passes['progression'] = passes['end_x'] - passes['start_x']

    progressive_passers = (
        passes[(passes['progression'] > 15) & (passes['pass_outcome'].isna())]
        .groupby(['player', 'position'])
        .size()
        .reset_index(name='progressive_passes')
    )

    # Transition finishers
    transition_shots = team_events[
        (team_events['type'] == 'Shot') &
        (team_events['play_pattern'].isin(['From Counter', 'From Throw In']))
    ]

    transition_finishers = (
        transition_shots
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'shot_statsbomb_xg': 'sum',
            'shot_outcome': lambda x: (x == 'Goal').sum()
        })
        .reset_index()
        .rename(columns={
            'id': 'transition_shots',
            'shot_statsbomb_xg': 'transition_xg',
            'shot_outcome': 'transition_goals'
        })
    )

    # Combine all
    player_trans = ball_winners
    for df in [progressive_carriers, progressive_passers, transition_finishers]:
        player_trans = player_trans.merge(df, on=['player', 'position'], how='outer')

    player_trans = player_trans.fillna(0)
    player_trans['total_contribution'] = (
        player_trans['ball_wins'] +
        player_trans['progressive_carries'] +
        player_trans['progressive_passes'] +
        player_trans['transition_shots']
    )

    return player_trans.sort_values('total_contribution', ascending=False)

player_transitions = analyze_player_transitions(events, 'Liverpool')
print(player_transitions.head(10))
# Analyze player transition contributions
analyze_player_transitions <- function(events, team_name) {

  team_events <- events %>%
    filter(team.name == team_name)

  # Ball winners (who initiates transitions)
  ball_winners <- team_events %>%
    filter(
      type.name %in% c("Interception", "Duel", "Tackle") |
      (type.name == "Ball Recovery" & !is.na(location.x))
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      ball_wins = n(),
      high_ball_wins = sum(location.x > 60, na.rm = TRUE),
      .groups = "drop"
    )

  # Transition carriers (who progresses the ball)
  transition_carriers <- team_events %>%
    filter(
      type.name %in% c("Carry", "Dribble"),
      (carry.end_location.x - location.x) > 10 |
      (dribble.outcome.name == "Complete" & (location.x > 40 & location.x < 80))
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      progressive_carries = n(),
      avg_carry_distance = mean(
        coalesce(carry.end_location.x, location.x) - location.x,
        na.rm = TRUE
      ),
      .groups = "drop"
    )

  # Transition passers
  transition_passers <- team_events %>%
    filter(
      type.name == "Pass",
      is.na(pass.outcome.name),  # Successful
      (pass.end_location.x - location.x) > 15  # Progressive
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      progressive_passes = n(),
      through_balls = sum(pass.technique.name == "Through Ball", na.rm = TRUE),
      .groups = "drop"
    )

  # Transition finishers (shots in transition sequences)
  transition_finishers <- events %>%
    filter(
      team.name == team_name,
      type.name == "Shot",
      play_pattern.name %in% c("From Counter", "From Throw In")
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      transition_shots = n(),
      transition_xg = sum(shot.statsbomb_xg, na.rm = TRUE),
      transition_goals = sum(shot.outcome.name == "Goal"),
      .groups = "drop"
    )

  # Combine all metrics
  player_transitions <- ball_winners %>%
    full_join(transition_carriers, by = c("player.name", "position.name")) %>%
    full_join(transition_passers, by = c("player.name", "position.name")) %>%
    full_join(transition_finishers, by = c("player.name", "position.name")) %>%
    replace_na(list(
      ball_wins = 0, high_ball_wins = 0,
      progressive_carries = 0, avg_carry_distance = 0,
      progressive_passes = 0, through_balls = 0,
      transition_shots = 0, transition_xg = 0, transition_goals = 0
    )) %>%
    mutate(
      transition_contribution = ball_wins + progressive_carries +
                                progressive_passes + transition_shots
    ) %>%
    arrange(desc(transition_contribution))

  return(player_transitions)
}

player_transitions <- analyze_player_transitions(events, "Liverpool")
print(head(player_transitions, 10))

# Visualize player transition profiles
top_players <- player_transitions %>%
  slice_head(n = 8)

ggplot(top_players %>%
         pivot_longer(c(ball_wins, progressive_carries, progressive_passes, transition_shots),
                      names_to = "metric", values_to = "value"),
       aes(x = player.name, y = value, fill = metric)) +
  geom_bar(stat = "identity", position = "stack") +
  coord_flip() +
  labs(
    title = "Player Transition Contributions",
    x = "", y = "Actions", fill = "Type"
  ) +
  theme_minimal()

Practice Exercises

Exercise 27.1: League-Wide Counter-Attack Analysis System

Task: Build a comprehensive counter-attack analysis system that evaluates all teams in a league on their transition effectiveness, both offensively and defensively.

Requirements:

  • Identify counter-attack sequences from play pattern data
  • Calculate offensive metrics: counter-attacks per match, xG from counters, conversion rate
  • Calculate defensive vulnerability: counters conceded, xG conceded from counters
  • Create transition balance score (counter xG generated vs conceded)
  • Rank teams by counter-attack effectiveness and vulnerability
  • Visualize findings with scatter plots and rankings

counter_attack_analysis
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsbombpy import sb

# ============================================
# LEAGUE COUNTER-ATTACK ANALYSIS SYSTEM
# ============================================

def identify_counter_attacks(events):
    """Identify counter-attack sequences."""
    counters = events[events['play_pattern'] == 'From Counter'].copy()
    counters['sequence_id'] = (
        counters['match_id'].astype(str) + '_' +
        counters['possession'].astype(str)
    )
    return counters

def calculate_team_counter_metrics(events, matches):
    """Calculate counter-attack metrics for all teams."""
    counter_events = identify_counter_attacks(events)

    teams = list(set(matches['home_team'].tolist() + matches['away_team'].tolist()))
    n_matches = matches.groupby('home_team').size().max()

    # Offensive metrics
    offensive = (
        counter_events.groupby('team')
        .agg({
            'sequence_id': 'nunique',
            'type': lambda x: (x == 'Shot').sum(),
            'shot_statsbomb_xg': 'sum',
            'shot_outcome': lambda x: (x == 'Goal').sum()
        })
        .reset_index()
        .rename(columns={
            'sequence_id': 'counter_sequences',
            'type': 'counter_shots',
            'shot_statsbomb_xg': 'counter_xg',
            'shot_outcome': 'counter_goals'
        })
    )

    offensive['counters_per_match'] = offensive['counter_sequences'] / n_matches
    offensive['xg_per_counter'] = offensive['counter_xg'] / offensive['counter_sequences']
    offensive['counter_conversion'] = (
        offensive['counter_goals'] / offensive['counter_shots'] * 100
    ).fillna(0)

    # Defensive vulnerability
    defensive = (
        counter_events.groupby('possession_team')
        .agg({
            'sequence_id': 'nunique',
            'type': lambda x: (x == 'Shot').sum(),
            'shot_statsbomb_xg': 'sum',
            'shot_outcome': lambda x: (x == 'Goal').sum()
        })
        .reset_index()
        .rename(columns={
            'possession_team': 'team',
            'sequence_id': 'counters_conceded',
            'type': 'shots_conceded',
            'shot_statsbomb_xg': 'xg_conceded',
            'shot_outcome': 'goals_conceded'
        })
    )

    defensive['conceded_per_match'] = defensive['counters_conceded'] / n_matches

    # Combine
    combined = offensive.merge(defensive, on='team', how='outer').fillna(0)

    combined['transition_balance'] = combined['counter_xg'] - combined['xg_conceded']
    combined['transition_ratio'] = np.where(
        combined['xg_conceded'] > 0,
        combined['counter_xg'] / combined['xg_conceded'],
        np.inf
    )
    combined['overall_effectiveness'] = (
        combined['counters_per_match'] * 20 +
        combined['xg_per_counter'] * 50 -
        combined['conceded_per_match'] * 15 -
        (combined['xg_conceded'] / combined['counters_conceded'].replace(0, 1)) * 40
    )

    return combined.sort_values('overall_effectiveness', ascending=False)

def visualize_counter_analysis(metrics):
    """Create counter-attack analysis visualizations."""
    fig, axes = plt.subplots(1, 2, figsize=(16, 7))

    # Scatter plot
    scatter = axes[0].scatter(
        metrics['counter_xg'], metrics['xg_conceded'],
        c=metrics['transition_balance'], cmap='RdYlGn',
        s=metrics['counter_sequences'] * 10, alpha=0.7
    )
    axes[0].plot([0, metrics[['counter_xg', 'xg_conceded']].max().max()],
                  [0, metrics[['counter_xg', 'xg_conceded']].max().max()],
                  'k--', alpha=0.5)

    for _, row in metrics.iterrows():
        axes[0].annotate(row['team'][:10], (row['counter_xg'], row['xg_conceded']),
                         fontsize=8, alpha=0.7)

    axes[0].set_xlabel('Counter xG Generated')
    axes[0].set_ylabel('Counter xG Conceded')
    axes[0].set_title('Counter-Attack Balance')
    plt.colorbar(scatter, ax=axes[0], label='Balance')

    # Rankings
    top_15 = metrics.head(15)
    colors = ['#1a9850' if x > 0 else '#d73027' for x in top_15['overall_effectiveness']]
    axes[1].barh(top_15['team'], top_15['overall_effectiveness'], color=colors)
    axes[1].set_xlabel('Effectiveness Score')
    axes[1].set_title('Transition Effectiveness Rankings')
    axes[1].invert_yaxis()

    plt.tight_layout()
    plt.show()

def generate_counter_report(metrics):
    """Generate comprehensive counter-attack report."""
    print("\n" + "=" * 65)
    print("LEAGUE COUNTER-ATTACK ANALYSIS")
    print("=" * 65 + "\n")

    print("TOP 5 COUNTER-ATTACKING TEAMS (by xG generated):")
    print("-" * 55)
    print(metrics.nlargest(5, 'counter_xg')[
        ['team', 'counter_sequences', 'counter_xg', 'counter_goals', 'xg_per_counter']
    ].to_string(index=False))

    print("\n\nTOP 5 MOST VULNERABLE TEAMS:")
    print("-" * 55)
    print(metrics.nlargest(5, 'xg_conceded')[
        ['team', 'counters_conceded', 'xg_conceded', 'goals_conceded']
    ].to_string(index=False))

    print("\n\nBEST TRANSITION BALANCE:")
    print("-" * 55)
    print(metrics.nlargest(5, 'transition_balance')[
        ['team', 'counter_xg', 'xg_conceded', 'transition_balance']
    ].to_string(index=False))

# Main execution
matches = sb.matches(competition_id=11, season_id=90)
events = pd.concat([sb.events(match_id=m) for m in matches['match_id'].head(50)])

metrics = calculate_team_counter_metrics(events, matches)
generate_counter_report(metrics)
visualize_counter_analysis(metrics)
library(tidyverse)
library(StatsBombR)

# ============================================
# LEAGUE COUNTER-ATTACK ANALYSIS SYSTEM
# ============================================

# Identify counter-attack sequences
identify_counter_attacks <- function(events) {
  events %>%
    filter(play_pattern.name == "From Counter") %>%
    mutate(
      sequence_id = paste(match_id, possession, sep = "_")
    )
}

# Calculate team counter-attack metrics
calculate_team_counter_metrics <- function(events, matches) {

  counter_events <- identify_counter_attacks(events)
  teams <- unique(c(matches$home_team.home_team_name, matches$away_team.away_team_name))
  n_matches <- matches %>% count(home_team.home_team_name) %>% pull(n) %>% max()

  # Offensive metrics
  offensive <- counter_events %>%
    group_by(team.name) %>%
    summarise(
      counter_sequences = n_distinct(sequence_id),
      counter_shots = sum(type.name == "Shot"),
      counter_xg = sum(shot.statsbomb_xg, na.rm = TRUE),
      counter_goals = sum(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
      .groups = "drop"
    ) %>%
    mutate(
      counters_per_match = counter_sequences / n_matches,
      xg_per_counter = counter_xg / counter_sequences,
      counter_conversion = counter_goals / counter_shots * 100
    )

  # Defensive vulnerability (counters conceded)
  defensive <- counter_events %>%
    group_by(possession_team.name) %>%
    summarise(
      counters_conceded = n_distinct(sequence_id),
      shots_conceded = sum(type.name == "Shot"),
      xg_conceded = sum(shot.statsbomb_xg, na.rm = TRUE),
      goals_conceded = sum(type.name == "Shot" & shot.outcome.name == "Goal", na.rm = TRUE),
      .groups = "drop"
    ) %>%
    rename(team.name = possession_team.name) %>%
    mutate(
      conceded_per_match = counters_conceded / n_matches,
      xg_conceded_per_counter = xg_conceded / counters_conceded
    )

  # Combine offensive and defensive
  combined <- offensive %>%
    full_join(defensive, by = "team.name", suffix = c("_off", "_def")) %>%
    replace_na(list(
      counter_sequences = 0, counter_xg = 0,
      counters_conceded = 0, xg_conceded = 0
    )) %>%
    mutate(
      transition_balance = counter_xg - xg_conceded,
      transition_ratio = ifelse(xg_conceded > 0, counter_xg / xg_conceded, Inf),
      overall_effectiveness = (
        (counters_per_match * 20) +
        (xg_per_counter * 50) -
        (conceded_per_match * 15) -
        (xg_conceded_per_counter * 40)
      )
    ) %>%
    arrange(desc(overall_effectiveness))

  return(combined)
}

# Visualize counter-attack analysis
visualize_counter_analysis <- function(metrics) {

  # Scatter: Offensive vs Defensive
  p1 <- ggplot(metrics, aes(x = counter_xg, y = xg_conceded)) +
    geom_point(aes(size = counter_sequences, color = transition_balance), alpha = 0.7) +
    geom_text(aes(label = team.name), vjust = -0.8, size = 3, check_overlap = TRUE) +
    geom_abline(intercept = 0, slope = 1, linetype = "dashed", alpha = 0.5) +
    scale_color_gradient2(low = "red", mid = "gray", high = "green", midpoint = 0) +
    labs(
      title = "Counter-Attack Balance",
      subtitle = "Above line = vulnerable, Below line = effective",
      x = "Counter xG Generated", y = "Counter xG Conceded",
      color = "Balance", size = "Counters"
    ) +
    theme_minimal()

  # Rankings bar chart
  p2 <- ggplot(metrics %>% head(15),
               aes(x = reorder(team.name, overall_effectiveness), y = overall_effectiveness)) +
    geom_bar(stat = "identity", aes(fill = overall_effectiveness > 0)) +
    coord_flip() +
    scale_fill_manual(values = c("FALSE" = "#d73027", "TRUE" = "#1a9850"), guide = "none") +
    labs(
      title = "Transition Effectiveness Rankings",
      x = "", y = "Effectiveness Score"
    ) +
    theme_minimal()

  return(list(scatter = p1, rankings = p2))
}

# Generate comprehensive report
generate_counter_report <- function(metrics) {

  cat("\n", rep("=", 65), "\n", sep = "")
  cat("LEAGUE COUNTER-ATTACK ANALYSIS\n")
  cat(rep("=", 65), "\n\n", sep = "")

  cat("TOP 5 COUNTER-ATTACKING TEAMS (by xG generated):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  metrics %>%
    arrange(desc(counter_xg)) %>%
    head(5) %>%
    select(team.name, counter_sequences, counter_xg, counter_goals, xg_per_counter) %>%
    print()

  cat("\n\nTOP 5 MOST VULNERABLE TEAMS (by xG conceded on counters):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  metrics %>%
    arrange(desc(xg_conceded)) %>%
    head(5) %>%
    select(team.name, counters_conceded, xg_conceded, goals_conceded) %>%
    print()

  cat("\n\nBEST TRANSITION BALANCE:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  metrics %>%
    arrange(desc(transition_balance)) %>%
    head(5) %>%
    select(team.name, counter_xg, xg_conceded, transition_balance) %>%
    print()
}

# Main execution
comps <- FreeCompetitions()
matches <- FreeMatches(Competitions = comps %>% filter(competition_id == 11))
events <- free_allevents(MatchesDF = matches, Atea = TRUE)
events <- allclean(events)

metrics <- calculate_team_counter_metrics(events, matches)
generate_counter_report(metrics)
plots <- visualize_counter_analysis(metrics)
print(plots$scatter)
print(plots$rankings)
Exercise 27.2: Optimal Pressing Zone Analysis

Task: Build a pressing zone effectiveness analyzer that identifies where a team should focus their pressing to generate the most dangerous transitions.

Requirements:

  • Divide the pitch into zones (grid or thirds-based)
  • Track ball recovery events and their locations
  • Calculate what happens after recoveries in each zone (xG generated, shots, goals)
  • Identify optimal pressing zones with highest transition danger
  • Create heatmap visualization of pressing effectiveness
  • Generate tactical recommendations

pressing_zone_analysis
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from mplsoccer import Pitch
from statsbombpy import sb

# ============================================
# OPTIMAL PRESSING ZONE ANALYSIS
# ============================================

def analyze_recovery_outcomes(events, team_name, grid_size=20):
    """Analyze what happens after ball recoveries in each zone."""
    team_events = events[events['team'] == team_name].copy()

    # Recovery events
    recovery_types = ['Ball Recovery', 'Interception', 'Tackle']
    recoveries = team_events[
        team_events['type'].isin(recovery_types) |
        ((team_events['type'] == 'Duel') &
         (team_events['duel_outcome'].isin(['Won', 'Success'])))
    ].copy()

    recoveries['x'] = recoveries['location'].apply(
        lambda loc: loc[0] if isinstance(loc, list) else np.nan
    )
    recoveries['y'] = recoveries['location'].apply(
        lambda loc: loc[1] if isinstance(loc, list) else np.nan
    )

    recoveries = recoveries.dropna(subset=['x', 'y'])
    recoveries['grid_x'] = (recoveries['x'] // grid_size) * grid_size
    recoveries['grid_y'] = (recoveries['y'] // grid_size) * grid_size

    def classify_zone(x):
        if x > 80:
            return 'High'
        elif x > 40:
            return 'Mid'
        return 'Low'

    recoveries['recovery_zone'] = recoveries['x'].apply(classify_zone)

    outcomes = []
    for _, recovery in recoveries.iterrows():
        # Get next 30 events after recovery
        subsequent = events[
            (events['match_id'] == recovery['match_id']) &
            (events['index'] > recovery['index']) &
            (events['index'] <= recovery['index'] + 30) &
            (events['team'] == team_name)
        ]

        outcomes.append({
            'grid_x': recovery['grid_x'],
            'grid_y': recovery['grid_y'],
            'recovery_zone': recovery['recovery_zone'],
            'recovery_x': recovery['x'],
            'led_to_shot': (subsequent['type'] == 'Shot').any(),
            'shot_xg': subsequent['shot_statsbomb_xg'].sum(),
            'led_to_goal': (
                (subsequent['type'] == 'Shot') &
                (subsequent['shot_outcome'] == 'Goal')
            ).any()
        })

    return pd.DataFrame(outcomes)

def calculate_zone_effectiveness(recovery_outcomes):
    """Calculate effectiveness metrics by zone."""
    grid_stats = (
        recovery_outcomes
        .groupby(['grid_x', 'grid_y'])
        .agg({
            'led_to_shot': ['count', 'sum', 'mean'],
            'shot_xg': ['sum', 'mean'],
            'led_to_goal': 'sum'
        })
        .reset_index()
    )

    grid_stats.columns = ['grid_x', 'grid_y', 'n_recoveries', 'shots_generated',
                          'shot_rate', 'total_xg', 'avg_xg_per_recovery', 'goals']
    grid_stats['shot_rate'] *= 100

    grid_stats = grid_stats[grid_stats['n_recoveries'] >= 5]
    grid_stats['effectiveness_score'] = (
        grid_stats['shot_rate'] * 0.3 +
        grid_stats['avg_xg_per_recovery'] * 100 * 0.4 +
        grid_stats['avg_xg_per_recovery'] * 30 * 0.3
    )

    zone_summary = (
        recovery_outcomes
        .groupby('recovery_zone')
        .agg({
            'led_to_shot': ['count', 'mean'],
            'shot_xg': 'mean',
            'led_to_goal': 'mean'
        })
        .reset_index()
    )
    zone_summary.columns = ['zone', 'n_recoveries', 'shot_rate', 'avg_xg', 'goal_rate']
    zone_summary['shot_rate'] *= 100
    zone_summary['goal_rate'] *= 100

    return {'grid': grid_stats.sort_values('effectiveness_score', ascending=False),
            'zones': zone_summary}

def visualize_pressing_zones(zone_stats, team_name):
    """Create pressing zone effectiveness heatmap."""
    pitch = Pitch(pitch_type='statsbomb', line_color='white', pitch_color='#1a472a')
    fig, ax = pitch.draw(figsize=(14, 9))

    scatter = ax.scatter(
        zone_stats['grid_x'] + 10,
        zone_stats['grid_y'] + 10,
        c=zone_stats['effectiveness_score'],
        s=zone_stats['n_recoveries'] * 20,
        cmap='plasma',
        alpha=0.7
    )

    for _, row in zone_stats.iterrows():
        ax.text(row['grid_x'] + 10, row['grid_y'] + 10,
                f"{row['shot_rate']:.0f}%",
                ha='center', va='center', fontsize=9, color='white')

    # Zone lines
    ax.axvline(x=40, color='white', linestyle='--', alpha=0.5)
    ax.axvline(x=80, color='white', linestyle='--', alpha=0.5)

    plt.colorbar(scatter, ax=ax, label='Effectiveness Score')
    ax.set_title(f'{team_name} - Pressing Zone Effectiveness\n(Numbers = shot generation rate)',
                 fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

def generate_pressing_recommendations(zone_stats, zone_summary):
    """Generate tactical pressing recommendations."""
    print("\n" + "=" * 65)
    print("PRESSING ZONE TACTICAL ANALYSIS")
    print("=" * 65 + "\n")

    print("ZONE-LEVEL SUMMARY:")
    print("-" * 55)
    print(zone_summary.to_string(index=False))

    print("\n\nTOP 5 PRESSING ZONES:")
    print("-" * 55)
    print(zone_stats.head(5)[
        ['grid_x', 'grid_y', 'n_recoveries', 'shot_rate', 'avg_xg_per_recovery', 'effectiveness_score']
    ].to_string(index=False))

    print("\n\nTACTICAL RECOMMENDATIONS:")
    print("-" * 55)

    best = zone_stats.iloc[0]
    if best['grid_x'] > 80:
        print("  1. HIGH PRESS recommended - best returns from final third recoveries")
    elif best['grid_x'] > 40:
        print("  1. MID-BLOCK with pressing triggers - middle third most effective")
    else:
        print("  1. LOW BLOCK counter-attack - winning ball deep creates danger")

# Main execution
events = sb.events(match_id=3773585)
team_name = 'Liverpool'

recovery_outcomes = analyze_recovery_outcomes(events, team_name, grid_size=20)
effectiveness = calculate_zone_effectiveness(recovery_outcomes)

visualize_pressing_zones(effectiveness['grid'], team_name)
generate_pressing_recommendations(effectiveness['grid'], effectiveness['zones'])
library(tidyverse)
library(StatsBombR)
library(viridis)

# ============================================
# OPTIMAL PRESSING ZONE ANALYSIS
# ============================================

# Identify ball recoveries and subsequent outcomes
analyze_recovery_outcomes <- function(events, team_name, grid_size = 20) {

  team_events <- events %>% filter(team.name == team_name)

  # Ball recovery events
  recoveries <- team_events %>%
    filter(
      type.name %in% c("Ball Recovery", "Interception", "Tackle") |
      (type.name == "Duel" & duel.outcome.name %in% c("Won", "Success"))
    ) %>%
    filter(!is.na(location.x), !is.na(location.y)) %>%
    mutate(
      recovery_index = index,
      recovery_match = match_id,
      recovery_x = location.x,
      recovery_y = location.y,
      grid_x = floor(location.x / grid_size) * grid_size,
      grid_y = floor(location.y / grid_size) * grid_size,
      recovery_zone = case_when(
        location.x > 80 ~ "High",
        location.x > 40 ~ "Mid",
        TRUE ~ "Low"
      )
    )

  # Track what happens in the 10 seconds after recovery
  recovery_outcomes <- map_dfr(1:nrow(recoveries), function(i) {
    recovery <- recoveries[i, ]

    # Get subsequent events (within 10 seconds / ~30 events)
    subsequent <- events %>%
      filter(
        match_id == recovery$recovery_match,
        index > recovery$recovery_index,
        index <= recovery$recovery_index + 30,
        team.name == team_name
      )

    # Check outcomes
    tibble(
      grid_x = recovery$grid_x,
      grid_y = recovery$grid_y,
      recovery_zone = recovery$recovery_zone,
      recovery_x = recovery$recovery_x,
      recovery_y = recovery$recovery_y,
      led_to_shot = any(subsequent$type.name == "Shot"),
      shot_xg = sum(subsequent$shot.statsbomb_xg, na.rm = TRUE),
      led_to_goal = any(subsequent$type.name == "Shot" &
                        subsequent$shot.outcome.name == "Goal", na.rm = TRUE),
      sequence_length = nrow(subsequent),
      max_progression = ifelse(nrow(subsequent) > 0,
                               max(subsequent$location.x, na.rm = TRUE) - recovery$recovery_x,
                               0)
    )
  })

  return(recovery_outcomes)
}

# Calculate zone effectiveness
calculate_zone_effectiveness <- function(recovery_outcomes) {

  zone_stats <- recovery_outcomes %>%
    group_by(grid_x, grid_y) %>%
    summarise(
      n_recoveries = n(),
      shots_generated = sum(led_to_shot),
      total_xg = sum(shot_xg),
      goals = sum(led_to_goal),
      shot_rate = mean(led_to_shot) * 100,
      avg_xg_per_recovery = mean(shot_xg),
      avg_progression = mean(max_progression),
      .groups = "drop"
    ) %>%
    filter(n_recoveries >= 5) %>%  # Minimum sample
    mutate(
      effectiveness_score = (shot_rate * 0.3) +
                            (avg_xg_per_recovery * 100 * 0.4) +
                            (avg_progression * 0.3)
    ) %>%
    arrange(desc(effectiveness_score))

  # Zone-level summary
  zone_summary <- recovery_outcomes %>%
    group_by(recovery_zone) %>%
    summarise(
      n_recoveries = n(),
      shot_rate = mean(led_to_shot) * 100,
      avg_xg = mean(shot_xg),
      goal_rate = mean(led_to_goal) * 100,
      .groups = "drop"
    )

  return(list(grid = zone_stats, zones = zone_summary))
}

# Visualize pressing effectiveness
visualize_pressing_zones <- function(zone_stats, team_name) {

  ggplot(zone_stats, aes(x = grid_x + 10, y = grid_y + 10)) +
    # Pitch background
    annotate("rect", xmin = 0, xmax = 120, ymin = 0, ymax = 80,
             fill = "#228B22", alpha = 0.3) +
    # Zone lines
    annotate("segment", x = 40, xend = 40, y = 0, yend = 80,
             color = "white", linetype = "dashed") +
    annotate("segment", x = 80, xend = 80, y = 0, yend = 80,
             color = "white", linetype = "dashed") +
    # Effectiveness tiles
    geom_tile(aes(fill = effectiveness_score), alpha = 0.8) +
    geom_text(aes(label = paste0(round(shot_rate, 0), "%")),
              color = "white", size = 3) +
    scale_fill_viridis(option = "plasma", name = "Effectiveness") +
    labs(
      title = paste(team_name, "- Pressing Zone Effectiveness"),
      subtitle = "Numbers show shot generation rate from recoveries",
      x = "Pitch Length", y = "Pitch Width"
    ) +
    coord_fixed(xlim = c(0, 120), ylim = c(0, 80)) +
    theme_void() +
    theme(
      plot.title = element_text(hjust = 0.5, face = "bold"),
      legend.position = "right"
    )
}

# Generate tactical recommendations
generate_pressing_recommendations <- function(zone_stats, zone_summary) {

  cat("\n", rep("=", 65), "\n", sep = "")
  cat("PRESSING ZONE TACTICAL ANALYSIS\n")
  cat(rep("=", 65), "\n\n", sep = "")

  cat("ZONE-LEVEL SUMMARY:\n")
  cat("-", rep("-", 50), "\n", sep = "")
  print(zone_summary)

  cat("\n\nTOP 5 PRESSING ZONES (by effectiveness):\n")
  cat("-", rep("-", 50), "\n", sep = "")
  zone_stats %>%
    head(5) %>%
    select(grid_x, grid_y, n_recoveries, shot_rate, avg_xg_per_recovery, effectiveness_score) %>%
    print()

  cat("\n\nTACTICAL RECOMMENDATIONS:\n")
  cat("-", rep("-", 50), "\n", sep = "")

  # Find best zone
  best_zone <- zone_stats %>% slice(1)

  if (best_zone$grid_x > 80) {
    cat("  1. HIGH PRESS recommended - best returns from pressing in final third\n")
  } else if (best_zone$grid_x > 40) {
    cat("  1. MID-BLOCK with pressing triggers - best returns from middle third\n")
  } else {
    cat("  1. LOW BLOCK counter-attack style - winning ball deep creates danger\n")
  }

  # Check wide vs central
  wide_effectiveness <- zone_stats %>%
    filter(grid_y < 30 | grid_y > 50) %>%
    pull(effectiveness_score) %>%
    mean(na.rm = TRUE)

  central_effectiveness <- zone_stats %>%
    filter(grid_y >= 30 & grid_y <= 50) %>%
    pull(effectiveness_score) %>%
    mean(na.rm = TRUE)

  if (wide_effectiveness > central_effectiveness) {
    cat("  2. Focus pressing on WIDE areas - better transition opportunities\n")
  } else {
    cat("  2. Focus pressing CENTRALLY - better transition opportunities\n")
  }
}

# Main execution
events <- get.matchFree(match_id)
team_name <- "Liverpool"

recovery_outcomes <- analyze_recovery_outcomes(events, team_name, grid_size = 20)
zone_effectiveness <- calculate_zone_effectiveness(recovery_outcomes)

print(visualize_pressing_zones(zone_effectiveness$grid, team_name))
generate_pressing_recommendations(zone_effectiveness$grid, zone_effectiveness$zones)
Exercise 27.3: Player Transition Profile System

Task: Build a player transition profiling system that identifies each player's role and contribution to transitions (ball winner, carrier, passer, finisher).

Requirements:

  • Calculate ball-winning metrics (recoveries, interceptions, tackles won)
  • Measure transition carrying ability (progressive carries, distance covered)
  • Evaluate transition passing (progressive passes, through balls, key passes in transitions)
  • Track transition finishing (shots, xG, goals from transitions)
  • Create composite transition contribution score
  • Generate radar charts comparing player profiles

player_transition_profiles
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from math import pi
from statsbombpy import sb

# ============================================
# PLAYER TRANSITION PROFILE SYSTEM
# ============================================

def calculate_player_transition_profiles(events, team_name):
    """Calculate transition profiles for all players."""
    team = events[events['team'] == team_name].copy()

    def get_x(loc):
        return loc[0] if isinstance(loc, list) else np.nan

    # Ball Winning
    ball_win_events = team[team['type'].isin(['Ball Recovery', 'Interception', 'Tackle'])]
    ball_win_events['x'] = ball_win_events['location'].apply(get_x)

    ball_winning = (
        ball_win_events
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'x': lambda x: (x > 60).sum()
        })
        .reset_index()
        .rename(columns={'id': 'ball_wins', 'x': 'high_ball_wins'})
    )

    # Carrying
    carries = team[team['type'].isin(['Carry', 'Dribble'])].copy()
    carries['start_x'] = carries['location'].apply(get_x)
    carries['end_x'] = carries['carry_end_location'].apply(get_x)
    carries['progression'] = carries['end_x'] - carries['start_x']

    carrying = (
        carries[carries['progression'] > 10]
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'progression': ['sum', 'mean']
        })
        .reset_index()
    )
    carrying.columns = ['player', 'position', 'progressive_carries', 'total_progression', 'avg_carry_distance']

    # Passing
    passes = team[team['type'] == 'Pass'].copy()
    passes['start_x'] = passes['location'].apply(get_x)
    passes['end_x'] = passes['pass_end_location'].apply(get_x)
    passes['progression'] = passes['end_x'] - passes['start_x']
    passes['is_progressive'] = (passes['progression'] > 10) & (passes['pass_outcome'].isna())

    passing = (
        passes
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'is_progressive': 'sum',
            'pass_shot_assist': lambda x: x.fillna(False).sum(),
            'pass_goal_assist': lambda x: x.fillna(False).sum()
        })
        .reset_index()
        .rename(columns={'id': 'total_passes', 'is_progressive': 'progressive_passes',
                         'pass_shot_assist': 'key_passes', 'pass_goal_assist': 'assists'})
    )

    # Finishing
    shots = team[team['type'] == 'Shot']
    finishing = (
        shots
        .groupby(['player', 'position'])
        .agg({
            'id': 'count',
            'shot_statsbomb_xg': 'sum',
            'shot_outcome': lambda x: (x == 'Goal').sum()
        })
        .reset_index()
        .rename(columns={'id': 'shots', 'shot_statsbomb_xg': 'xg', 'shot_outcome': 'goals'})
    )

    # Combine
    profiles = (
        ball_winning
        .merge(carrying, on=['player', 'position'], how='outer')
        .merge(passing, on=['player', 'position'], how='outer')
        .merge(finishing, on=['player', 'position'], how='outer')
        .fillna(0)
    )

    # Composite scores
    profiles['ball_winning_score'] = profiles['ball_wins'] + profiles['high_ball_wins'] * 0.5
    profiles['carrying_score'] = profiles['progressive_carries'] * 2 + profiles['avg_carry_distance'] * 0.1
    profiles['passing_score'] = profiles['progressive_passes'] + profiles['key_passes'] * 2
    profiles['finishing_score'] = profiles['shots'] + profiles['xg'] * 10 + profiles['goals'] * 5

    profiles['transition_contribution'] = (
        profiles['ball_winning_score'] + profiles['carrying_score'] +
        profiles['passing_score'] + profiles['finishing_score']
    )

    # Dominant role
    def get_dominant_role(row):
        scores = {
            'Ball Winner': row['ball_winning_score'],
            'Carrier': row['carrying_score'],
            'Passer': row['passing_score'],
            'Finisher': row['finishing_score']
        }
        return max(scores, key=scores.get)

    profiles['dominant_role'] = profiles.apply(get_dominant_role, axis=1)

    return profiles.sort_values('transition_contribution', ascending=False)

def create_player_radar(profiles, player_names):
    """Create radar chart comparing player profiles."""
    metrics = ['ball_winning_score', 'carrying_score', 'passing_score', 'finishing_score']
    labels = ['Ball Winning', 'Carrying', 'Passing', 'Finishing']

    # Normalize metrics 0-100
    for col in metrics:
        max_val = profiles[col].max()
        profiles[col + '_norm'] = profiles[col] / max_val * 100 if max_val > 0 else 0

    norm_metrics = [m + '_norm' for m in metrics]
    num_vars = len(metrics)
    angles = [n / float(num_vars) * 2 * pi for n in range(num_vars)]
    angles += angles[:1]

    fig, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))
    colors = ['#E41A1C', '#377EB8', '#4DAF4A', '#984EA3']

    for idx, player in enumerate(player_names):
        player_data = profiles[profiles['player'] == player]
        if len(player_data) == 0:
            continue

        values = player_data[norm_metrics].values.flatten().tolist()
        values += values[:1]

        ax.plot(angles, values, 'o-', linewidth=2, label=player, color=colors[idx])
        ax.fill(angles, values, alpha=0.25, color=colors[idx])

    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(labels, size=12)
    ax.set_ylim(0, 100)
    ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
    ax.set_title('Transition Profile Comparison', size=14, fontweight='bold', y=1.08)

    plt.tight_layout()
    plt.show()

def generate_profile_report(profiles):
    """Generate comprehensive profile report."""
    print("\n" + "=" * 70)
    print("PLAYER TRANSITION PROFILE ANALYSIS")
    print("=" * 70 + "\n")

    print("TOP 10 OVERALL TRANSITION CONTRIBUTORS:")
    print("-" * 60)
    print(profiles.head(10)[
        ['player', 'dominant_role', 'transition_contribution',
         'ball_winning_score', 'carrying_score', 'passing_score', 'finishing_score']
    ].to_string(index=False))

    print("\n\nTOP PLAYERS BY ROLE:")
    print("-" * 60)

    for role in ['Ball Winner', 'Carrier', 'Passer', 'Finisher']:
        print(f"\n{role}s:")
        top_role = profiles[profiles['dominant_role'] == role].nlargest(3, 'transition_contribution')
        print(top_role[['player', 'transition_contribution']].to_string(index=False))

# Main execution
events = sb.events(match_id=3773585)
profiles = calculate_player_transition_profiles(events, 'Liverpool')

generate_profile_report(profiles)
create_player_radar(profiles, profiles.head(4)['player'].tolist())
library(tidyverse)
library(StatsBombR)
library(fmsb)

# ============================================
# PLAYER TRANSITION PROFILE SYSTEM
# ============================================

calculate_player_transition_profiles <- function(events, team_name) {

  team_events <- events %>% filter(team.name == team_name)

  # 1. Ball Winning
  ball_winning <- team_events %>%
    filter(type.name %in% c("Ball Recovery", "Interception", "Tackle", "Duel")) %>%
    mutate(
      won = case_when(
        type.name %in% c("Ball Recovery", "Interception") ~ TRUE,
        type.name == "Tackle" & tackle.outcome.name %in% c("Won", "Success") ~ TRUE,
        type.name == "Duel" & duel.outcome.name %in% c("Won", "Success") ~ TRUE,
        TRUE ~ FALSE
      ),
      high_recovery = !is.na(location.x) & location.x > 60
    ) %>%
    filter(won) %>%
    group_by(player.name, position.name) %>%
    summarise(
      ball_wins = n(),
      high_ball_wins = sum(high_recovery),
      .groups = "drop"
    )

  # 2. Transition Carrying
  carrying <- team_events %>%
    filter(type.name %in% c("Carry", "Dribble")) %>%
    mutate(
      end_x = coalesce(carry.end_location.x, location.x),
      progression = end_x - location.x,
      is_progressive = progression > 10
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      total_carries = n(),
      progressive_carries = sum(is_progressive, na.rm = TRUE),
      total_progression = sum(progression[is_progressive], na.rm = TRUE),
      avg_carry_distance = mean(progression[is_progressive], na.rm = TRUE),
      .groups = "drop"
    )

  # 3. Transition Passing
  passing <- team_events %>%
    filter(type.name == "Pass") %>%
    mutate(
      is_successful = is.na(pass.outcome.name),
      progression = pass.end_location.x - location.x,
      is_progressive = progression > 10 & is_successful,
      is_through_ball = pass.technique.name == "Through Ball",
      is_key_pass = pass.shot_assist == TRUE | pass.goal_assist == TRUE
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      total_passes = n(),
      progressive_passes = sum(is_progressive, na.rm = TRUE),
      through_balls = sum(is_through_ball & is_successful, na.rm = TRUE),
      key_passes = sum(is_key_pass, na.rm = TRUE),
      .groups = "drop"
    )

  # 4. Transition Finishing
  finishing <- team_events %>%
    filter(
      type.name == "Shot",
      play_pattern.name %in% c("From Counter", "From Throw In", "Regular Play")
    ) %>%
    group_by(player.name, position.name) %>%
    summarise(
      shots = n(),
      xg = sum(shot.statsbomb_xg, na.rm = TRUE),
      goals = sum(shot.outcome.name == "Goal"),
      .groups = "drop"
    )

  # Combine all metrics
  profiles <- ball_winning %>%
    full_join(carrying, by = c("player.name", "position.name")) %>%
    full_join(passing, by = c("player.name", "position.name")) %>%
    full_join(finishing, by = c("player.name", "position.name")) %>%
    replace_na(list(
      ball_wins = 0, high_ball_wins = 0,
      progressive_carries = 0, avg_carry_distance = 0,
      progressive_passes = 0, through_balls = 0, key_passes = 0,
      shots = 0, xg = 0, goals = 0
    )) %>%
    mutate(
      # Composite scores
      ball_winning_score = (ball_wins + high_ball_wins * 0.5),
      carrying_score = (progressive_carries * 2 + avg_carry_distance * 0.1),
      passing_score = (progressive_passes + through_balls * 3 + key_passes * 2),
      finishing_score = (shots + xg * 10 + goals * 5),

      # Total contribution
      transition_contribution = ball_winning_score + carrying_score +
                                passing_score + finishing_score,

      # Dominant role
      dominant_role = case_when(
        ball_winning_score >= pmax(carrying_score, passing_score, finishing_score) ~ "Ball Winner",
        carrying_score >= pmax(ball_winning_score, passing_score, finishing_score) ~ "Carrier",
        passing_score >= pmax(ball_winning_score, carrying_score, finishing_score) ~ "Passer",
        TRUE ~ "Finisher"
      )
    ) %>%
    arrange(desc(transition_contribution))

  return(profiles)
}

# Normalize for radar charts
normalize_for_radar <- function(profiles) {

  profiles %>%
    mutate(
      across(
        c(ball_winning_score, carrying_score, passing_score, finishing_score),
        ~ (. - min(.)) / (max(.) - min(.)) * 100
      )
    )
}

# Create radar chart comparison
create_player_radar <- function(profiles, player_names) {

  radar_data <- profiles %>%
    filter(player.name %in% player_names) %>%
    select(player.name, ball_winning_score, carrying_score,
           passing_score, finishing_score)

  # Prepare for fmsb
  radar_matrix <- radar_data %>%
    select(-player.name) %>%
    as.data.frame()

  rownames(radar_matrix) <- radar_data$player.name
  colnames(radar_matrix) <- c("Ball Winning", "Carrying", "Passing", "Finishing")

  # Add max/min rows
  radar_matrix <- rbind(
    rep(100, 4),
    rep(0, 4),
    radar_matrix
  )

  # Plot
  colors <- c("#E41A1C", "#377EB8", "#4DAF4A", "#984EA3")[1:length(player_names)]

  par(mar = c(1, 1, 2, 1))
  radarchart(
    radar_matrix,
    pcol = colors,
    pfcol = alpha(colors, 0.3),
    plwd = 2,
    plty = 1,
    cglcol = "grey",
    cglty = 1,
    vlcex = 0.9,
    title = "Transition Profile Comparison"
  )

  legend("bottomright",
         legend = player_names,
         col = colors,
         lty = 1,
         lwd = 2,
         bty = "n")
}

# Generate profile report
generate_profile_report <- function(profiles) {

  cat("\n", rep("=", 70), "\n", sep = "")
  cat("PLAYER TRANSITION PROFILE ANALYSIS\n")
  cat(rep("=", 70), "\n\n", sep = "")

  cat("TOP 10 OVERALL TRANSITION CONTRIBUTORS:\n")
  cat("-", rep("-", 55), "\n", sep = "")
  profiles %>%
    head(10) %>%
    select(player.name, dominant_role, transition_contribution,
           ball_winning_score, carrying_score, passing_score, finishing_score) %>%
    print()

  cat("\n\nTOP PLAYERS BY ROLE:\n")
  cat("-", rep("-", 55), "\n", sep = "")

  for (role in c("Ball Winner", "Carrier", "Passer", "Finisher")) {
    cat("\n", role, "s:\n", sep = "")
    profiles %>%
      filter(dominant_role == role) %>%
      arrange(desc(transition_contribution)) %>%
      head(3) %>%
      select(player.name, transition_contribution) %>%
      print()
  }
}

# Main execution
events <- allclean(get.matchFree(match_id))
profiles <- calculate_player_transition_profiles(events, "Liverpool")
profiles_normalized <- normalize_for_radar(profiles)

generate_profile_report(profiles)
create_player_radar(profiles_normalized, head(profiles$player.name, 4))

Summary

Key Takeaways
  • Transitions are the moments when possession changes and defenses are most vulnerable
  • Counter-attacks generate high-quality chances - speed and directness are key success factors
  • Defensive transitions reveal vulnerability patterns based on where possession is lost
  • Counterpressing is a key defensive transition strategy that can be measured by regain rate
  • Player transition profiles identify specialists in ball-winning, carrying, passing, and finishing