Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)
Learning Objectives
  • Define and extract possession sequences from event data
  • Analyze buildup patterns and attacking moves
  • Calculate sequence-level metrics (length, duration, territory gained)
  • Identify successful sequence patterns using statistical analysis
  • Apply Markov chains to model possession flow
  • Use sequence mining to discover common tactical patterns
  • Compare team attacking styles through sequence analysis
  • Build sequence value models similar to xGChain

Understanding Sequences in Football

Football is not just a collection of individual events—it's a sequence of connected actions. Understanding how events chain together reveals tactical patterns, buildup preferences, and the pathways to goal-scoring opportunities that isolated metrics miss.

What is a Sequence?

A sequence is a series of consecutive on-ball events by the same team, starting from gaining possession and ending with a shot, turnover, or set piece. Sequences can range from a single touch to elaborate 20+ pass buildups.

Direct

1-5

passes per sequence

Fast, vertical attacks

Build-up

6-12

passes per sequence

Patient possession play

Extended

13+

passes per sequence

Sustained pressure
extract_sequences.py
# Python: Extracting Possession Sequences
import pandas as pd
import numpy as np

def extract_sequences(events):
    """Extract and characterize possession sequences."""
    events = events.sort_values("index").copy()

    # Detect possession changes
    events["team_change"] = events["team"] != events["team"].shift(1)
    events["new_sequence"] = (
        events["team_change"] |
        events["type"].isin(["Half Start", "Starting XI", "Kick Off"])
    )
    events["sequence_id"] = events["new_sequence"].cumsum()

    # Summarize each sequence
    def summarize_sequence(group):
        return pd.Series({
            "team": group["team"].iloc[0],
            "start_minute": group["minute"].min(),
            "start_second": group["second"].min(),
            "end_minute": group["minute"].max(),
            "end_second": group["second"].max(),
            "n_events": len(group),
            "n_passes": (group["type"] == "Pass").sum(),
            "n_carries": (group["type"] == "Carry").sum(),
            "n_dribbles": (group["type"] == "Dribble").sum(),
            "ends_with_shot": (group["type"] == "Shot").any(),
            "ends_with_goal": ((group["type"] == "Shot") &
                              (group["shot_outcome"] == "Goal")).any(),
            "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum(),
            "start_x": group["location_x"].dropna().iloc[0] if len(group["location_x"].dropna()) > 0 else np.nan,
            "end_x": group["location_x"].dropna().iloc[-1] if len(group["location_x"].dropna()) > 0 else np.nan,
            "max_x": group["location_x"].max(),
        })

    sequences = events.groupby("sequence_id").apply(summarize_sequence).reset_index()

    # Derived metrics
    sequences["duration"] = (
        (sequences["end_minute"] * 60 + sequences["end_second"]) -
        (sequences["start_minute"] * 60 + sequences["start_second"])
    )
    sequences["territory_gained"] = sequences["end_x"] - sequences["start_x"]
    sequences["reached_final_third"] = sequences["max_x"] >= 80
    sequences["reached_box"] = sequences["max_x"] >= 102

    # Sequence type classification
    def classify_sequence(n_passes):
        if n_passes <= 2:
            return "Counter/Direct"
        elif n_passes <= 5:
            return "Quick Attack"
        elif n_passes <= 10:
            return "Build-up"
        else:
            return "Extended Build-up"

    sequences["sequence_type"] = sequences["n_passes"].apply(classify_sequence)

    return sequences

sequences = extract_sequences(events)

print("Sequence Type Distribution:")
print(sequences["sequence_type"].value_counts())

print("\nAverage sequence by type:")
print(sequences.groupby("sequence_type").agg({
    "sequence_id": "count",
    "n_passes": "mean",
    "ends_with_shot": "mean",
    "total_xg": "mean"
}).rename(columns={"sequence_id": "count"}))
# R: Extracting Possession Sequences
library(tidyverse)

extract_sequences <- function(events) {
    events <- events %>%
        arrange(index) %>%
        mutate(
            # Detect possession changes
            possession_change = team != lag(team, default = first(team)) |
                               type %in% c("Half Start", "Starting XI", "Kick Off"),
            sequence_id = cumsum(possession_change)
        )

    # Summarize each sequence
    sequences <- events %>%
        group_by(sequence_id, team) %>%
        summarize(
            # Timing
            start_minute = min(minute),
            start_second = min(second),
            end_minute = max(minute),
            end_second = max(second),

            # Event counts
            n_events = n(),
            n_passes = sum(type == "Pass"),
            n_carries = sum(type == "Carry"),
            n_dribbles = sum(type == "Dribble"),

            # Outcomes
            ends_with_shot = any(type == "Shot"),
            ends_with_goal = any(type == "Shot" & shot_outcome == "Goal"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),

            # Spatial
            start_x = first(location_x[!is.na(location_x)]),
            end_x = last(location_x[!is.na(location_x)]),
            max_x = max(location_x, na.rm = TRUE),

            .groups = "drop"
        ) %>%
        mutate(
            # Derived metrics
            duration = (end_minute * 60 + end_second) - (start_minute * 60 + start_second),
            territory_gained = end_x - start_x,
            reached_final_third = max_x >= 80,
            reached_box = max_x >= 102,

            # Sequence type classification
            sequence_type = case_when(
                n_passes <= 2 ~ "Counter/Direct",
                n_passes <= 5 ~ "Quick Attack",
                n_passes <= 10 ~ "Build-up",
                TRUE ~ "Extended Build-up"
            )
        )

    return(sequences)
}

sequences <- extract_sequences(events)

# Summary statistics
cat("Sequence Type Distribution:\n")
print(table(sequences$sequence_type))

cat("\nAverage sequence by type:\n")
sequences %>%
    group_by(sequence_type) %>%
    summarize(
        count = n(),
        avg_passes = mean(n_passes),
        shot_rate = mean(ends_with_shot),
        avg_xg = mean(total_xg)
    ) %>%
    print()
Output
Sequence Type Distribution:
Counter/Direct       98
Quick Attack         67
Build-up            45
Extended Build-up    23

Average sequence by type:
                   count  n_passes  ends_with_shot  total_xg
sequence_type
Build-up              45      7.82            0.18     0.024
Counter/Direct        98      1.45            0.08     0.012
Extended Build-up     23     14.35            0.26     0.038
Quick Attack          67      3.67            0.12     0.015

Advanced Sequence Classification

Beyond simple pass counts, we can classify sequences using multiple dimensions including tempo, spatial coverage, and directness. This multi-dimensional approach reveals tactical nuances that single metrics miss.

Tempo Classification
  • Lightning (0-4s): Direct counters, immediate shots
  • Fast (4-10s): Quick transitions, minimal buildup
  • Medium (10-20s): Standard attacking play
  • Patient (20-40s): Possession-oriented buildup
  • Extended (40s+): Sustained possession probing
Spatial Coverage
  • Narrow: Central channel focus, width < 30m
  • Medium: Moderate width usage, 30-45m
  • Wide: Wing involvement, width > 45m
  • Full: Complete pitch width utilized
sequence_classification.py
# Python: Multi-dimensional Sequence Classification
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

def classify_sequences_advanced(events):
    """Classify sequences using multiple dimensions."""
    events = events.sort_values(["sequence_id", "index"]).copy()

    def compute_features(group):
        locs_x = group["location_x"].dropna()
        locs_y = group["location_y"].dropna()
        times = group["minute"] * 60 + group["second"]

        duration = times.max() - times.min() if len(times) > 1 else 0
        n_events = len(group)

        # Spatial features
        start_x = locs_x.iloc[0] if len(locs_x) > 0 else 60
        end_x = locs_x.iloc[-1] if len(locs_x) > 0 else 60
        max_x = locs_x.max() if len(locs_x) > 0 else 60
        min_x = locs_x.min() if len(locs_x) > 0 else 60

        # Width
        min_y = locs_y.min() if len(locs_y) > 0 else 40
        max_y = locs_y.max() if len(locs_y) > 0 else 40
        width_used = max_y - min_y

        # Directness
        net_progress = end_x - start_x
        x_changes = locs_x.diff().abs().sum() if len(locs_x) > 1 else 0
        directness = net_progress / x_changes if x_changes > 0 else 0

        # Pass analysis
        passes = group[group["type"] == "Pass"]
        n_passes = len(passes)

        return pd.Series({
            "team": group["team"].iloc[0],
            "duration": duration,
            "n_events": n_events,
            "events_per_second": n_events / duration if duration > 0 else n_events,
            "start_x": start_x,
            "end_x": end_x,
            "max_x": max_x,
            "net_progress": net_progress,
            "directness": directness,
            "width_used": width_used,
            "n_passes": n_passes,
            "ends_with_shot": (group["type"] == "Shot").any(),
            "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum(),
        })

    seq_features = events.groupby("sequence_id").apply(compute_features).reset_index()

    # Multi-dimensional classification
    def tempo_class(d):
        if d <= 4: return "Lightning"
        if d <= 10: return "Fast"
        if d <= 20: return "Medium"
        if d <= 40: return "Patient"
        return "Extended"

    def width_class(w):
        if w < 30: return "Narrow"
        if w < 45: return "Medium"
        if w < 55: return "Wide"
        return "Full"

    def directness_class(d):
        if d > 0.7: return "Very Direct"
        if d > 0.4: return "Direct"
        if d > 0.1: return "Balanced"
        if d > -0.1: return "Recycling"
        return "Retreating"

    seq_features["tempo_class"] = seq_features["duration"].apply(tempo_class)
    seq_features["width_class"] = seq_features["width_used"].apply(width_class)
    seq_features["directness_class"] = seq_features["directness"].apply(directness_class)

    return seq_features

# Apply classification
sequences_classified = classify_sequences_advanced(events)

# Analyze effectiveness
effectiveness = sequences_classified.groupby(["tempo_class", "directness_class"]).agg({
    "sequence_id": "count",
    "ends_with_shot": "mean",
    "total_xg": "mean",
    "max_x": "mean"
}).rename(columns={"sequence_id": "count"}).reset_index()

effectiveness = effectiveness[effectiveness["count"] >= 10].sort_values("total_xg", ascending=False)
print("Effectiveness by Tempo and Directness:")
print(effectiveness)
# R: Multi-dimensional Sequence Classification
library(tidyverse)
library(cluster)

classify_sequences_advanced <- function(events) {
    # Extract detailed sequence features
    sequence_features <- events %>%
        group_by(sequence_id, team) %>%
        summarize(
            # Temporal features
            duration = max(minute * 60 + second) - min(minute * 60 + second),
            n_events = n(),
            events_per_second = ifelse(duration > 0, n_events / duration, n_events),

            # Spatial features
            start_x = first(location_x[!is.na(location_x)]),
            end_x = last(location_x[!is.na(location_x)]),
            max_x = max(location_x, na.rm = TRUE),
            min_x = min(location_x, na.rm = TRUE),
            avg_x = mean(location_x, na.rm = TRUE),

            # Width metrics
            min_y = min(location_y, na.rm = TRUE),
            max_y = max(location_y, na.rm = TRUE),
            width_used = max_y - min_y,

            # Directness metrics
            net_progress = end_x - start_x,
            total_x_distance = sum(abs(diff(location_x[!is.na(location_x)]))),
            directness = ifelse(total_x_distance > 0,
                               net_progress / total_x_distance, 0),

            # Pass breakdown
            n_passes = sum(type == "Pass"),
            n_forward_passes = sum(type == "Pass" &
                                   lead(location_x) > location_x, na.rm = TRUE),
            n_backward_passes = sum(type == "Pass" &
                                    lead(location_x) < location_x - 5, na.rm = TRUE),
            n_progressive_carries = sum(type == "Carry" &
                                        lead(location_x) - location_x > 10, na.rm = TRUE),

            # Outcome
            ends_with_shot = any(type == "Shot"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),

            .groups = "drop"
        ) %>%
        mutate(
            forward_pass_ratio = n_forward_passes / pmax(n_passes, 1),
            backward_pass_ratio = n_backward_passes / pmax(n_passes, 1)
        )

    # Multi-dimensional classification
    sequence_features <- sequence_features %>%
        mutate(
            # Tempo classification
            tempo_class = case_when(
                duration <= 4 ~ "Lightning",
                duration <= 10 ~ "Fast",
                duration <= 20 ~ "Medium",
                duration <= 40 ~ "Patient",
                TRUE ~ "Extended"
            ),

            # Width classification
            width_class = case_when(
                width_used < 30 ~ "Narrow",
                width_used < 45 ~ "Medium",
                width_used < 55 ~ "Wide",
                TRUE ~ "Full"
            ),

            # Directness classification
            directness_class = case_when(
                directness > 0.7 ~ "Very Direct",
                directness > 0.4 ~ "Direct",
                directness > 0.1 ~ "Balanced",
                directness > -0.1 ~ "Recycling",
                TRUE ~ "Retreating"
            ),

            # Combined style classification
            style = paste(tempo_class, width_class, directness_class, sep = "-")
        )

    return(sequence_features)
}

# Apply classification
sequences_classified <- classify_sequences_advanced(events)

# Analyze effectiveness by style
style_effectiveness <- sequences_classified %>%
    group_by(tempo_class, directness_class) %>%
    summarize(
        count = n(),
        shot_rate = mean(ends_with_shot),
        avg_xg = mean(total_xg),
        avg_max_x = mean(max_x, na.rm = TRUE),
        .groups = "drop"
    ) %>%
    filter(count >= 10) %>%
    arrange(desc(avg_xg))

print("Effectiveness by Tempo and Directness:")
print(style_effectiveness)
Output
Effectiveness by Tempo and Directness:
  tempo_class directness_class  count  ends_with_shot  total_xg  max_x
  Lightning   Very Direct          34           0.324     0.052   98.2
  Fast        Direct               45           0.244     0.038   94.1
  Medium      Balanced             78           0.179     0.028   88.6
  Patient     Direct               23           0.217     0.031   96.3
  Fast        Very Direct          28           0.286     0.044   95.8

Clustering Sequences by Style

K-means clustering can discover natural groupings in sequence data, revealing attacking archetypes that may not align with predefined categories.

sequence_clustering.py
# Python: K-means Clustering of Sequences
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

def cluster_sequences(sequence_features, n_clusters=6):
    """Cluster sequences by style characteristics."""
    # Select features
    cluster_vars = ["duration", "n_passes", "directness",
                    "width_used", "net_progress"]

    # Prepare data
    cluster_data = sequence_features[["sequence_id"] + cluster_vars].dropna()

    # Standardize
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(cluster_data[cluster_vars])

    # Find optimal k
    silhouette_scores = []
    for k in range(2, 11):
        km = KMeans(n_clusters=k, random_state=42, n_init=25)
        labels = km.fit_predict(scaled_data)
        score = silhouette_score(scaled_data, labels)
        silhouette_scores.append(score)

    optimal_k = np.argmax(silhouette_scores) + 2
    print(f"Optimal clusters: {optimal_k}")

    # Final clustering
    final_km = KMeans(n_clusters=n_clusters, random_state=42, n_init=25)
    cluster_data["cluster"] = final_km.fit_predict(scaled_data)

    # Analyze clusters
    profiles = cluster_data.groupby("cluster").agg({
        "sequence_id": "count",
        "duration": "mean",
        "n_passes": "mean",
        "directness": "mean",
        "width_used": "mean",
        "net_progress": "mean"
    }).rename(columns={"sequence_id": "count"}).reset_index()

    # Name clusters
    def name_cluster(row):
        if row["n_passes"] < 3 and row["directness"] > 0.5:
            return "Counter Attack"
        if row["n_passes"] > 10 and row["width_used"] > 45:
            return "Tiki-Taka"
        if row["n_passes"] > 8 and row["directness"] < 0.2:
            return "Patient Buildup"
        if row["width_used"] > 50 and row["directness"] > 0.3:
            return "Wing Play"
        if row["directness"] > 0.6:
            return "Direct Attack"
        return "Mixed"

    profiles["style_name"] = profiles.apply(name_cluster, axis=1)

    return cluster_data, profiles

cluster_data, profiles = cluster_sequences(sequences_classified)
print(profiles)
# R: K-means Clustering of Sequences
library(tidyverse)
library(cluster)

cluster_sequences <- function(sequence_features, n_clusters = 6) {
    # Select numeric features for clustering
    cluster_vars <- c("duration", "n_passes", "directness",
                      "width_used", "net_progress", "forward_pass_ratio")

    # Prepare data
    cluster_data <- sequence_features %>%
        select(sequence_id, all_of(cluster_vars)) %>%
        drop_na()

    # Standardize
    scaled_data <- scale(cluster_data[, cluster_vars])

    # Find optimal k using silhouette
    silhouette_scores <- sapply(2:10, function(k) {
        km <- kmeans(scaled_data, centers = k, nstart = 25)
        mean(silhouette(km$cluster, dist(scaled_data))[, 3])
    })

    optimal_k <- which.max(silhouette_scores) + 1
    cat("Optimal clusters:", optimal_k, "\n")

    # Final clustering
    final_km <- kmeans(scaled_data, centers = n_clusters, nstart = 25)
    cluster_data$cluster <- factor(final_km$cluster)

    # Analyze clusters
    cluster_profiles <- cluster_data %>%
        group_by(cluster) %>%
        summarize(
            count = n(),
            avg_duration = mean(duration),
            avg_passes = mean(n_passes),
            avg_directness = mean(directness),
            avg_width = mean(width_used),
            avg_progress = mean(net_progress),
            .groups = "drop"
        )

    # Name clusters based on characteristics
    cluster_profiles <- cluster_profiles %>%
        mutate(
            style_name = case_when(
                avg_passes < 3 & avg_directness > 0.5 ~ "Counter Attack",
                avg_passes > 10 & avg_width > 45 ~ "Tiki-Taka",
                avg_passes > 8 & avg_directness < 0.2 ~ "Patient Buildup",
                avg_width > 50 & avg_directness > 0.3 ~ "Wing Play",
                avg_directness > 0.6 ~ "Direct Attack",
                TRUE ~ "Mixed"
            )
        )

    return(list(data = cluster_data, profiles = cluster_profiles))
}

cluster_result <- cluster_sequences(sequences_classified)
print(cluster_result$profiles)
Output
Optimal clusters: 5
   cluster  count  duration  n_passes  directness  width_used  net_progress  style_name
0        0     67      8.32      3.45       0.612       28.4         32.1  Counter Attack
1        1     45     34.56     12.23       0.156       52.3         18.7  Patient Buildup
2        2     52     18.92      7.82       0.423       48.9         28.5  Wing Play
3        3     38     12.45      4.21       0.721       24.6         35.8  Direct Attack
4        4     31     42.18     15.67       0.089       55.2         12.3  Tiki-Taka

Counter-Attack Analysis

Counter-attacks are among the most dangerous attacking sequences. They exploit defensive disorganization following turnovers, requiring specialized detection and analysis.

Counter-Attack Characteristics
  • High tempo: Rapid progression from defense to attack (<10 seconds)
  • Directness: Minimal lateral or backward passes
  • Trigger: Often follows a turnover, clearance, or goalkeeper save
  • Space exploitation: Attacking into open areas before defense recovers
counter_attack_detection.py
# Python: Counter-Attack Detection
import pandas as pd
import numpy as np

def detect_counter_attacks(events):
    """Detect and classify counter-attacking sequences."""
    events = events.sort_values("index").copy()

    # Identify possession changes
    events["team_prev"] = events["team"].shift(1)
    events["possession_change"] = events["team"] != events["team_prev"]
    events["sequence_id"] = events["possession_change"].cumsum()

    # Get trigger info (previous event before possession won)
    events["prev_type"] = events["type"].shift(1)
    events["prev_outcome"] = events.get("pass_outcome", pd.Series()).shift(1)
    events["prev_x"] = events["location_x"].shift(1)

    # Calculate sequence characteristics
    def analyze_sequence(group):
        locs_x = group["location_x"].dropna()
        times = group["minute"] * 60 + group["second"]
        passes = group[group["type"] == "Pass"]

        start_x = locs_x.iloc[0] if len(locs_x) > 0 else 60
        end_x = locs_x.iloc[-1] if len(locs_x) > 0 else 60
        max_x = locs_x.max() if len(locs_x) > 0 else 60
        duration = times.max() - times.min() if len(times) > 1 else 0

        # Count forward passes
        n_forward = 0
        for i in range(len(group) - 1):
            if group.iloc[i]["type"] == "Pass":
                curr_x = group.iloc[i]["location_x"]
                next_x = group.iloc[i + 1]["location_x"]
                if pd.notna(curr_x) and pd.notna(next_x) and next_x - curr_x > 5:
                    n_forward += 1

        return pd.Series({
            "team": group["team"].iloc[0],
            "start_x": start_x,
            "end_x": end_x,
            "max_x": max_x,
            "duration": duration,
            "x_progress": end_x - start_x,
            "n_passes": len(passes),
            "n_forward_passes": n_forward,
            "ends_shot": (group["type"] == "Shot").any(),
            "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum(),
            "prev_type": group["prev_type"].iloc[0],
            "prev_outcome": group["prev_outcome"].iloc[0] if "prev_outcome" in group else None,
        })

    seq_chars = events.groupby("sequence_id").apply(analyze_sequence).reset_index()
    seq_chars["pace"] = np.where(
        seq_chars["duration"] > 0,
        seq_chars["x_progress"] / seq_chars["duration"],
        seq_chars["x_progress"]
    )

    # Counter-attack criteria
    counters = seq_chars[
        (seq_chars["start_x"] < 60) &
        (seq_chars["duration"] <= 12) &
        (seq_chars["x_progress"] >= 35) &
        (seq_chars["n_passes"] <= 5) &
        ((seq_chars["n_forward_passes"] >= seq_chars["n_passes"] * 0.6) |
         (seq_chars["n_passes"] <= 2))
    ].copy()

    # Classify counter type
    def classify_counter(row):
        if row["prev_type"] == "Interception":
            return "Interception Counter"
        if row["prev_type"] == "Tackle":
            return "Tackle Counter"
        if row["prev_outcome"] == "Incomplete":
            return "Failed Pass Counter"
        if row["prev_type"] == "Clearance":
            return "Transition Counter"
        if row["prev_type"] in ["Goal Keeper", "Save"]:
            return "GK Distribution"
        return "Other"

    counters["counter_type"] = counters.apply(classify_counter, axis=1)

    return counters

counters = detect_counter_attacks(events)

print("Counter-attack Statistics:")
print(f"Total counters detected: {len(counters)}")
print(f"Shot rate: {counters[\"ends_shot\"].mean():.3f}")
print(f"Average xG: {counters[\"total_xg\"].mean():.4f}")
print()

print("By Trigger Type:")
print(counters.groupby("counter_type").agg({
    "sequence_id": "count",
    "ends_shot": "mean",
    "total_xg": "mean",
    "pace": "mean"
}).rename(columns={"sequence_id": "count"}))
# R: Counter-Attack Detection
library(tidyverse)

detect_counter_attacks <- function(events) {
    # First, identify possession changes
    events <- events %>%
        arrange(index) %>%
        mutate(
            possession_change = team != lag(team, default = first(team)),
            sequence_id = cumsum(possession_change)
        )

    # Classify the trigger event (how possession was won)
    sequence_starts <- events %>%
        group_by(sequence_id) %>%
        slice(1) %>%
        ungroup() %>%
        select(sequence_id, start_type = type, start_x = location_x,
               start_minute = minute, start_second = second)

    # Get previous event to determine turnover type
    prev_events <- events %>%
        mutate(
            prev_type = lag(type),
            prev_outcome = lag(pass_outcome, default = NA),
            prev_x = lag(location_x)
        ) %>%
        group_by(sequence_id) %>%
        slice(1) %>%
        select(sequence_id, prev_type, prev_outcome, prev_x)

    # Calculate sequence characteristics
    sequence_chars <- events %>%
        group_by(sequence_id, team) %>%
        summarize(
            start_x = first(location_x[!is.na(location_x)]),
            end_x = last(location_x[!is.na(location_x)]),
            max_x = max(location_x, na.rm = TRUE),
            start_time = min(minute * 60 + second),
            end_time = max(minute * 60 + second),
            duration = end_time - start_time,
            n_passes = sum(type == "Pass"),
            n_forward_passes = sum(type == "Pass" & !pass_outcome %in% c("Incomplete", "Out") &
                                  lead(location_x, default = 60) - location_x > 5, na.rm = TRUE),
            ends_shot = any(type == "Shot"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),
            .groups = "drop"
        ) %>%
        mutate(
            x_progress = end_x - start_x,
            pace = ifelse(duration > 0, x_progress / duration, x_progress)
        )

    # Join trigger information
    sequence_chars <- sequence_chars %>%
        left_join(prev_events, by = "sequence_id")

    # Counter-attack criteria
    counter_attacks <- sequence_chars %>%
        filter(
            # Started in own half
            start_x < 60,
            # Quick progression
            duration <= 12,
            # Significant ground covered
            x_progress >= 35,
            # Few passes (direct)
            n_passes <= 5,
            # High forward pass ratio
            n_forward_passes >= n_passes * 0.6 | n_passes <= 2
        ) %>%
        mutate(
            counter_type = case_when(
                prev_type == "Interception" ~ "Interception Counter",
                prev_type == "Tackle" ~ "Tackle Counter",
                prev_outcome == "Incomplete" ~ "Failed Pass Counter",
                prev_type == "Clearance" ~ "Transition Counter",
                prev_type %in% c("Goal Keeper", "Save") ~ "GK Distribution",
                TRUE ~ "Other"
            )
        )

    return(counter_attacks)
}

counters <- detect_counter_attacks(events)

# Counter-attack analysis
cat("Counter-attack Statistics:\n")
cat("Total counters detected:", nrow(counters), "\n")
cat("Shot rate:", mean(counters$ends_shot), "\n")
cat("Average xG:", mean(counters$total_xg), "\n\n")

# By trigger type
cat("By Trigger Type:\n")
counters %>%
    group_by(counter_type) %>%
    summarize(
        count = n(),
        shot_rate = mean(ends_shot),
        avg_xg = mean(total_xg),
        avg_pace = mean(pace)
    ) %>%
    print()
Output
Counter-attack Statistics:
Total counters detected: 47
Shot rate: 0.319
Average xG: 0.0412

By Trigger Type:
                     count  ends_shot  total_xg   pace
counter_type
Failed Pass Counter     15      0.333    0.0445   4.23
GK Distribution          8      0.250    0.0312   3.87
Interception Counter    12      0.417    0.0523   4.56
Tackle Counter           7      0.286    0.0389   4.12
Transition Counter       5      0.200    0.0298   3.65

Counter-Pressing and PPDA Analysis

Counter-pressing (gegenpressing) is the defensive counterpart to counter-attacks—immediately pressing after losing possession to regain it quickly. PPDA (Passes Per Defensive Action) measures pressing intensity.

pressing_analysis.py
# Python: Counter-Pressing and PPDA Analysis
import pandas as pd
import numpy as np

def analyze_pressing(events):
    """Analyze counter-pressing and calculate PPDA."""
    events = events.sort_values("index").copy()

    # Identify possession changes
    events["next_team"] = events["team"].shift(-1)
    events["prev_team"] = events["team"].shift(1)

    events["lost_possession"] = (
        (events["team"] != events["next_team"]) &
        events["type"].isin(["Pass", "Dribble", "Carry"])
    )
    events["won_possession"] = events["team"] != events["prev_team"]

    # Track losses and recoveries
    losses = events[events["lost_possession"]][
        ["index", "team", "location_x", "location_y", "minute", "second"]
    ].copy()
    losses.columns = ["loss_index", "loss_team", "loss_x", "loss_y", "loss_min", "loss_sec"]
    losses["loss_time"] = losses["loss_min"] * 60 + losses["loss_sec"]

    recoveries = events[
        events["won_possession"] |
        events["type"].isin(["Interception", "Tackle", "Ball Recovery"])
    ][["index", "team", "location_x", "location_y", "minute", "second", "type"]].copy()
    recoveries.columns = ["rec_index", "rec_team", "rec_x", "rec_y", "rec_min", "rec_sec", "rec_type"]
    recoveries["rec_time"] = recoveries["rec_min"] * 60 + recoveries["rec_sec"]

    # Match losses to recoveries
    counter_press_data = []
    for _, loss in losses.iterrows():
        team_recoveries = recoveries[
            (recoveries["rec_team"] == loss["loss_team"]) &
            (recoveries["rec_index"] > loss["loss_index"])
        ]

        if len(team_recoveries) > 0:
            recovery = team_recoveries.iloc[0]
            time_diff = recovery["rec_time"] - loss["loss_time"]

            loss_y = loss["loss_y"] if pd.notna(loss["loss_y"]) else 40
            rec_y = recovery["rec_y"] if pd.notna(recovery["rec_y"]) else 40
            distance = np.sqrt(
                (recovery["rec_x"] - loss["loss_x"])**2 +
                (rec_y - loss_y)**2
            )

            is_counter_press = (time_diff <= 5) and (distance <= 25)

            counter_press_data.append({
                "loss_team": loss["loss_team"],
                "loss_x": loss["loss_x"],
                "recovery_time": time_diff,
                "recovery_distance": distance,
                "is_counter_press": is_counter_press
            })

    cp_df = pd.DataFrame(counter_press_data)

    # PPDA calculation
    defensive_actions = ["Tackle", "Interception", "Foul Committed", "Duel"]

    teams = events["team"].unique()
    ppda_data = []

    for team in teams:
        # Opponent passes in their attacking half (our defensive half)
        opponent_passes = len(events[
            (events["team"] != team) &
            (events["type"] == "Pass") &
            (events["location_x"] > 60)  # Opponent attacking half
        ])

        # Our defensive actions in opponent half
        our_def_actions = len(events[
            (events["team"] == team) &
            (events["type"].isin(defensive_actions)) &
            (events["location_x"] > 60)
        ])

        ppda = opponent_passes / max(our_def_actions, 1)
        ppda_data.append({"team": team, "ppda": ppda})

    return cp_df, pd.DataFrame(ppda_data)

cp_df, ppda_df = analyze_pressing(events)

print("Counter-Press Analysis:")
print(f"Total losses: {len(cp_df)}")
print(f"Counter-presses: {cp_df[\"is_counter_press\"].sum()}")
print(f"Success rate: {cp_df[\"is_counter_press\"].mean():.3f}")
successful_cp = cp_df[cp_df["is_counter_press"]]
if len(successful_cp) > 0:
    print(f"Avg recovery time: {successful_cp[\"recovery_time\"].mean():.2f}s")

print("\nPPDA by Team:")
print(ppda_df)
# R: Counter-Pressing and PPDA Analysis
library(tidyverse)

analyze_pressing <- function(events) {
    # Identify loss of possession events
    events <- events %>%
        arrange(index) %>%
        mutate(
            lost_possession = team != lead(team, default = last(team)) &
                             type %in% c("Pass", "Dribble", "Carry"),
            won_possession = team != lag(team, default = first(team))
        )

    # Track time until possession regained after loss
    counter_press_windows <- events %>%
        filter(lost_possession) %>%
        select(loss_index = index, loss_team = team,
               loss_x = location_x, loss_y = location_y,
               loss_time = minute * 60 + second)

    # Find when the team next wins possession
    recovery_events <- events %>%
        filter(won_possession | type %in% c("Interception", "Tackle", "Ball Recovery")) %>%
        select(recovery_index = index, recovery_team = team,
               recovery_x = location_x, recovery_time = minute * 60 + second,
               recovery_type = type)

    # Calculate counter-press success
    counter_press_analysis <- counter_press_windows %>%
        rowwise() %>%
        mutate(
            # Find next recovery by same team
            recovery = list(recovery_events %>%
                filter(recovery_team == loss_team,
                       recovery_index > loss_index) %>%
                slice(1))
        ) %>%
        unnest(recovery, names_sep = "_") %>%
        mutate(
            recovery_time_diff = recovery_recovery_time - loss_time,
            recovery_distance = sqrt((recovery_recovery_x - loss_x)^2 +
                                    (recovery_recovery_y - loss_y)^2),
            is_counter_press = recovery_time_diff <= 5 & recovery_distance <= 25
        )

    # PPDA calculation by team
    # PPDA = opponent passes in own defensive third / defensive actions
    defensive_actions <- c("Tackle", "Interception", "Foul Committed", "Duel")

    ppda_by_team <- events %>%
        mutate(
            is_opponent_pass = type == "Pass" & location_x < 60,
            is_defensive_action = type %in% defensive_actions & location_x > 60
        ) %>%
        group_by(team) %>%
        summarize(
            # Count opponent passes allowed in their attacking half
            # This requires knowing which team is defending
            total_passes_allowed = sum(is_opponent_pass),
            defensive_actions = sum(is_defensive_action),
            ppda = total_passes_allowed / pmax(defensive_actions, 1),
            .groups = "drop"
        )

    return(list(
        counter_press = counter_press_analysis,
        ppda = ppda_by_team
    ))
}

pressing_analysis <- analyze_pressing(events)

# Counter-press success rate
cp_success <- pressing_analysis$counter_press %>%
    summarize(
        total_losses = n(),
        counter_presses = sum(is_counter_press, na.rm = TRUE),
        success_rate = counter_presses / total_losses,
        avg_recovery_time = mean(recovery_time_diff[is_counter_press], na.rm = TRUE)
    )

cat("Counter-Press Analysis:\n")
print(cp_success)

cat("\nPPDA by Team:\n")
print(pressing_analysis$ppda)
Output
Counter-Press Analysis:
Total losses: 156
Counter-presses: 34
Success rate: 0.218
Avg recovery time: 3.24s

PPDA by Team:
         team   ppda
0   Barcelona   8.45
1  Real Madrid  12.67

Analyzing Buildup Patterns

Different teams have distinct buildup patterns. By analyzing the structure of attacking sequences, we can identify tactical preferences and compare team styles.

buildup_patterns.py
# Python: Buildup Pattern Analysis
import pandas as pd
import numpy as np

def analyze_buildup_patterns(events):
    """Analyze team buildup patterns in attacking sequences."""
    # Focus on sequences reaching final third
    sequence_max_x = events.groupby("sequence_id")["location_x"].transform("max")
    attacking_events = events[sequence_max_x >= 80].copy()

    # Categorize passes
    passes = attacking_events[attacking_events["type"] == "Pass"].copy()

    def safe_subtract(end, start):
        if pd.isna(end) or pd.isna(start):
            return 0
        return end - start

    passes["x_progress"] = passes.apply(
        lambda r: safe_subtract(
            r["pass_end_location"][0] if isinstance(r["pass_end_location"], list) else None,
            r["location_x"]
        ), axis=1
    )

    # Pass direction
    passes["pass_direction"] = pd.cut(
        passes["x_progress"],
        bins=[-np.inf, -10, 10, np.inf],
        labels=["Backward", "Lateral", "Forward"]
    )

    # Pass zone
    passes["pass_zone"] = pd.cut(
        passes["location_x"],
        bins=[0, 40, 80, 120],
        labels=["Defensive Third", "Middle Third", "Final Third"]
    )

    # Pass type
    def classify_pass_type(row):
        if row.get("pass_cross"):
            return "Cross"
        if row.get("pass_through_ball"):
            return "Through Ball"
        y = row.get("location_y", 40)
        if y < 20 or y > 60:
            return "Wide"
        return "Central"

    passes["pass_type"] = passes.apply(classify_pass_type, axis=1)

    # Sequence-level buildup profile
    def sequence_profile(group):
        return pd.Series({
            "team": group["team"].iloc[0],
            "total_passes": len(group),
            "pct_forward": (group["pass_direction"] == "Forward").mean(),
            "pct_backward": (group["pass_direction"] == "Backward").mean(),
            "pct_lateral": (group["pass_direction"] == "Lateral").mean(),
            "pct_wide": (group["pass_type"] == "Wide").mean(),
            "pct_central": (group["pass_type"] == "Central").mean(),
            "through_balls": (group["pass_type"] == "Through Ball").sum(),
            "crosses": (group["pass_type"] == "Cross").sum(),
        })

    buildup_profile = passes.groupby("sequence_id").apply(sequence_profile).reset_index()

    # Team-level summary
    team_buildup = buildup_profile.groupby("team").agg({
        "sequence_id": "count",
        "total_passes": "mean",
        "pct_forward": "mean",
        "pct_wide": "mean",
        "through_balls": "mean",
        "crosses": "mean"
    }).rename(columns={"sequence_id": "sequences"}).reset_index()

    return {"sequences": buildup_profile, "teams": team_buildup}

buildup = analyze_buildup_patterns(events)
print(buildup["teams"])
# R: Buildup Pattern Analysis
library(tidyverse)

analyze_buildup_patterns <- function(events) {
    # Focus on sequences that reach the final third
    attacking_sequences <- events %>%
        group_by(sequence_id) %>%
        filter(max(location_x, na.rm = TRUE) >= 80) %>%
        ungroup()

    # Categorize passes within sequences
    pass_patterns <- attacking_sequences %>%
        filter(type == "Pass") %>%
        mutate(
            # Pass direction
            pass_direction = case_when(
                pass_end_x - location_x > 10 ~ "Forward",
                pass_end_x - location_x < -10 ~ "Backward",
                TRUE ~ "Lateral"
            ),
            # Pass zone
            pass_zone = case_when(
                location_x < 40 ~ "Defensive Third",
                location_x < 80 ~ "Middle Third",
                TRUE ~ "Final Third"
            ),
            # Pass type
            pass_type = case_when(
                pass_cross == TRUE ~ "Cross",
                pass_through_ball == TRUE ~ "Through Ball",
                location_y < 20 | location_y > 60 ~ "Wide",
                TRUE ~ "Central"
            )
        )

    # Sequence-level buildup profile
    buildup_profile <- pass_patterns %>%
        group_by(sequence_id, team) %>%
        summarize(
            total_passes = n(),
            pct_forward = mean(pass_direction == "Forward"),
            pct_backward = mean(pass_direction == "Backward"),
            pct_lateral = mean(pass_direction == "Lateral"),
            pct_wide = mean(pass_type == "Wide"),
            pct_central = mean(pass_type == "Central"),
            through_balls = sum(pass_type == "Through Ball"),
            crosses = sum(pass_type == "Cross"),
            .groups = "drop"
        )

    # Team-level summary
    team_buildup <- buildup_profile %>%
        group_by(team) %>%
        summarize(
            sequences = n(),
            avg_passes = mean(total_passes),
            pct_forward = mean(pct_forward),
            pct_wide = mean(pct_wide),
            avg_through_balls = mean(through_balls),
            avg_crosses = mean(crosses),
            .groups = "drop"
        )

    return(list(sequences = buildup_profile, teams = team_buildup))
}

buildup <- analyze_buildup_patterns(events)
print(buildup$teams)
Output
         team  sequences  total_passes  pct_forward  pct_wide  through_balls  crosses
0   Barcelona        112         6.42        0.521     0.312          0.42     0.89
1  Real Madrid         98         5.18        0.487     0.356          0.31     1.12

Visualizing Buildup Paths

buildup_visualization.py
# Python: Visualize Common Buildup Paths
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.sankey import Sankey

def visualize_buildup_flow(events, team_name):
    """Visualize passing flow between zones."""
    # Get team passes
    team_passes = events[
        (events["team"] == team_name) &
        (events["type"] == "Pass")
    ].copy()

    # Define zone assignment function
    def assign_zone(x, y):
        if x < 40:
            prefix = "Def"
        elif x < 80:
            prefix = "Mid"
        else:
            prefix = "Att"

        if y < 26.7:
            suffix = "Right"
        elif y < 53.3:
            suffix = "Center"
        else:
            suffix = "Left"

        return f"{prefix} {suffix}"

    team_passes["zone_from"] = team_passes.apply(
        lambda r: assign_zone(r["location_x"], r["location_y"]), axis=1
    )

    # Get end location
    def get_end_coords(row):
        loc = row.get("pass_end_location")
        if isinstance(loc, list) and len(loc) >= 2:
            return loc[0], loc[1]
        return None, None

    team_passes["end_x"], team_passes["end_y"] = zip(
        *team_passes.apply(get_end_coords, axis=1)
    )
    team_passes = team_passes.dropna(subset=["end_x", "end_y"])

    team_passes["zone_to"] = team_passes.apply(
        lambda r: assign_zone(r["end_x"], r["end_y"]), axis=1
    )

    # Count zone transitions
    zone_flow = team_passes.groupby(["zone_from", "zone_to"]).size().reset_index(name="passes")
    zone_flow = zone_flow[zone_flow["passes"] >= 5]

    # Create chord-style visualization
    fig, ax = plt.subplots(figsize=(14, 10))

    # Define zone positions for visualization
    zone_positions = {
        "Def Left": (20, 67), "Def Center": (20, 40), "Def Right": (20, 13),
        "Mid Left": (60, 67), "Mid Center": (60, 40), "Mid Right": (60, 13),
        "Att Left": (100, 67), "Att Center": (100, 40), "Att Right": (100, 13),
    }

    # Draw pitch background
    ax.set_facecolor("#1a472a")
    ax.set_xlim(0, 120)
    ax.set_ylim(0, 80)

    # Draw zone points
    for zone, (x, y) in zone_positions.items():
        ax.scatter(x, y, s=500, c="white", zorder=10)
        ax.annotate(zone.split()[1], (x, y), ha="center", va="center",
                    fontsize=8, fontweight="bold", zorder=11)

    # Draw flows
    max_passes = zone_flow["passes"].max()
    for _, row in zone_flow.iterrows():
        if row["zone_from"] in zone_positions and row["zone_to"] in zone_positions:
            x1, y1 = zone_positions[row["zone_from"]]
            x2, y2 = zone_positions[row["zone_to"]]
            width = row["passes"] / max_passes * 5

            ax.annotate("", xy=(x2, y2), xytext=(x1, y1),
                        arrowprops=dict(
                            arrowstyle="-|>",
                            connectionstyle="arc3,rad=0.1",
                            lw=width,
                            color="yellow",
                            alpha=0.6
                        ))

    ax.set_aspect("equal")
    ax.set_title(f"{team_name} - Passing Flow Between Zones", fontsize=14)
    ax.axis("off")
    plt.show()

visualize_buildup_flow(events, "Barcelona")
# R: Visualize Common Buildup Paths
library(tidyverse)
library(ggalluvial)

visualize_buildup_flow <- function(events, team_name) {
    # Get team attacking sequences
    team_events <- events %>%
        filter(team == team_name, type == "Pass")

    # Assign zones to passes
    team_events <- team_events %>%
        mutate(
            zone_from = case_when(
                location_x < 40 & location_y < 26.7 ~ "Def Right",
                location_x < 40 & location_y < 53.3 ~ "Def Center",
                location_x < 40 ~ "Def Left",
                location_x < 80 & location_y < 26.7 ~ "Mid Right",
                location_x < 80 & location_y < 53.3 ~ "Mid Center",
                location_x < 80 ~ "Mid Left",
                location_y < 26.7 ~ "Att Right",
                location_y < 53.3 ~ "Att Center",
                TRUE ~ "Att Left"
            ),
            zone_to = case_when(
                pass_end_x < 40 & pass_end_y < 26.7 ~ "Def Right",
                pass_end_x < 40 & pass_end_y < 53.3 ~ "Def Center",
                pass_end_x < 40 ~ "Def Left",
                pass_end_x < 80 & pass_end_y < 26.7 ~ "Mid Right",
                pass_end_x < 80 & pass_end_y < 53.3 ~ "Mid Center",
                pass_end_x < 80 ~ "Mid Left",
                pass_end_y < 26.7 ~ "Att Right",
                pass_end_y < 53.3 ~ "Att Center",
                TRUE ~ "Att Left"
            )
        )

    # Count zone transitions
    zone_flow <- team_events %>%
        count(zone_from, zone_to, name = "passes") %>%
        filter(passes >= 5)  # Minimum threshold

    # Create Sankey diagram
    ggplot(zone_flow,
           aes(axis1 = zone_from, axis2 = zone_to, y = passes)) +
        geom_alluvium(aes(fill = zone_from), width = 0.2) +
        geom_stratum(width = 0.2, fill = "gray80", color = "white") +
        geom_text(stat = "stratum", aes(label = after_stat(stratum)), size = 3) +
        scale_x_discrete(limits = c("From Zone", "To Zone")) +
        labs(title = sprintf("%s - Passing Flow Between Zones", team_name),
             y = "Number of Passes") +
        theme_minimal() +
        theme(legend.position = "none")
}

visualize_buildup_flow(events, "Barcelona")

Markov Chain Models for Possession

Markov chains model possession as a probabilistic process where the probability of the next state depends only on the current state. This allows us to calculate expected goal probabilities from different pitch positions.

markov_chains.py
# Python: Markov Chain Possession Model
import pandas as pd
import numpy as np
from collections import defaultdict

def build_possession_markov(events, n_x_zones=4, n_y_zones=3):
    """Build Markov chain model for possession flow."""
    zone_width = 120 / n_x_zones
    zone_height = 80 / n_y_zones

    # Assign zones
    events = events.dropna(subset=["location_x", "location_y"]).copy()
    events["zone"] = events.apply(
        lambda r: f"Z{int(np.ceil(r['location_x']/zone_width))}_{int(np.ceil(r['location_y']/zone_height))}",
        axis=1
    )

    # Define states
    def get_state(row, next_row):
        if row["type"] == "Shot":
            if row.get("shot_outcome") == "Goal":
                return "GOAL"
            return "SHOT_NO_GOAL"
        if next_row is not None and next_row.get("team") != row["team"]:
            return "TURNOVER"
        return row["zone"]

    # Build transition counts
    events = events.sort_values("index").reset_index(drop=True)
    transitions = defaultdict(lambda: defaultdict(int))

    for i in range(len(events) - 1):
        current = events.iloc[i]
        next_event = events.iloc[i + 1]

        current_state = get_state(current, next_event)
        next_state = get_state(next_event, events.iloc[i + 2] if i + 2 < len(events) else None)

        transitions[current_state][next_state] += 1

    # Build transition matrix
    states = list(set(transitions.keys()) | set(
        s for d in transitions.values() for s in d.keys()
    ))
    n_states = len(states)
    state_idx = {s: i for i, s in enumerate(states)}

    trans_matrix = np.zeros((n_states, n_states))
    for from_state, to_dict in transitions.items():
        for to_state, count in to_dict.items():
            trans_matrix[state_idx[from_state], state_idx[to_state]] = count

    # Normalize rows
    row_sums = trans_matrix.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1  # Avoid division by zero
    trans_matrix = trans_matrix / row_sums

    return trans_matrix, states, state_idx

def calculate_goal_probability(trans_matrix, states, state_idx):
    """Calculate probability of reaching GOAL from each zone."""
    n = len(states)
    goal_idx = state_idx.get("GOAL")

    if goal_idx is None:
        return {}

    # Identify absorbing states
    absorbing = ["GOAL", "SHOT_NO_GOAL", "TURNOVER"]
    absorbing_idx = [state_idx[s] for s in absorbing if s in state_idx]
    transient_idx = [i for i in range(n) if i not in absorbing_idx]

    if not transient_idx:
        return {}

    # Extract Q matrix (transient to transient transitions)
    Q = trans_matrix[np.ix_(transient_idx, transient_idx)]

    # Extract R matrix (transient to absorbing transitions)
    R = trans_matrix[np.ix_(transient_idx, absorbing_idx)]

    # Fundamental matrix: N = (I - Q)^-1
    try:
        N = np.linalg.inv(np.eye(len(transient_idx)) - Q)
    except np.linalg.LinAlgError:
        return {}

    # Absorption probabilities: B = N * R
    B = N @ R

    # Get goal probability (column index in absorbing)
    goal_col = absorbing.index("GOAL") if "GOAL" in absorbing else None
    if goal_col is None:
        return {}

    goal_probs = {}
    transient_states = [states[i] for i in transient_idx]
    for i, state in enumerate(transient_states):
        goal_probs[state] = B[i, goal_col]

    return goal_probs

trans_matrix, states, state_idx = build_possession_markov(events)
goal_probs = calculate_goal_probability(trans_matrix, states, state_idx)

print("Goal probability from each zone:")
for zone, prob in sorted(goal_probs.items(), key=lambda x: -x[1])[:10]:
    print(f"  {zone}: {prob:.3f}")
# R: Markov Chain Possession Model
library(tidyverse)
library(markovchain)

build_possession_markov <- function(events, n_zones = 12) {
    # Create zone grid
    zone_width <- 120 / 4
    zone_height <- 80 / 3

    # Assign events to zones
    events <- events %>%
        filter(!is.na(location_x), !is.na(location_y)) %>%
        mutate(
            zone = paste0(
                "Z",
                ceiling(location_x / zone_width),
                "_",
                ceiling(location_y / zone_height)
            )
        )

    # Add absorbing states
    events <- events %>%
        mutate(
            state = case_when(
                type == "Shot" & shot_outcome == "Goal" ~ "GOAL",
                type == "Shot" ~ "SHOT_NO_GOAL",
                lead(team) != team ~ "TURNOVER",
                TRUE ~ zone
            )
        )

    # Count transitions
    transitions <- events %>%
        mutate(next_state = lead(state)) %>%
        filter(!is.na(next_state)) %>%
        count(state, next_state, name = "count")

    # Build transition matrix
    states <- unique(c(transitions$state, transitions$next_state))
    n_states <- length(states)

    trans_matrix <- matrix(0, nrow = n_states, ncol = n_states,
                          dimnames = list(states, states))

    for (i in 1:nrow(transitions)) {
        trans_matrix[transitions$state[i], transitions$next_state[i]] <-
            transitions$count[i]
    }

    # Normalize to probabilities
    row_sums <- rowSums(trans_matrix)
    trans_matrix <- trans_matrix / ifelse(row_sums > 0, row_sums, 1)

    # Create markovchain object
    mc <- new("markovchain",
              states = states,
              transitionMatrix = trans_matrix,
              name = "PossessionMC")

    return(mc)
}

# Build Markov chain
mc <- build_possession_markov(events)

# Calculate steady state (long-run probabilities)
steady_state <- steadyStates(mc)
print("Steady State Probabilities:")
print(head(sort(steady_state[1,], decreasing = TRUE), 10))

# Calculate absorption probabilities (probability of reaching GOAL from each zone)
# This requires solving the fundamental matrix equation
Output
Goal probability from each zone:
  Z4_2: 0.142
  Z4_1: 0.118
  Z4_3: 0.115
  Z3_2: 0.067
  Z3_1: 0.052
  Z3_3: 0.048
  Z2_2: 0.031
  Z2_1: 0.028
  Z2_3: 0.025
  Z1_2: 0.018

Sequence Pattern Mining

Sequence mining discovers frequently occurring patterns in attacking moves. This can reveal tactical signatures and identify successful attacking combinations.

sequence_mining.py
# Python: Sequential Pattern Mining
import pandas as pd
import numpy as np
from collections import defaultdict
from itertools import combinations

def prepare_sequence_data(events):
    """Prepare events as action sequences."""
    events = events.sort_values(["sequence_id", "index"]).copy()

    # Create action labels
    def create_action(row):
        zone = "DefThird" if row["location_x"] < 40 else \
               "MidThird" if row["location_x"] < 80 else "AttThird"
        return f"{row['type']}_{zone}"

    events["action"] = events.apply(create_action, axis=1)

    # Build sequence strings
    sequence_data = events.groupby("sequence_id").agg(
        sequence=("action", lambda x: " -> ".join(x)),
        team=("team", "first"),
        ended_shot=("type", lambda x: (x == "Shot").any()),
        ended_goal=("shot_outcome", lambda x: (x == "Goal").any() if "Goal" in x.values else False)
    ).reset_index()

    return sequence_data

def find_frequent_patterns(sequence_data, min_support=0.05, max_length=3):
    """Find frequently occurring action patterns."""
    pattern_sequences = defaultdict(list)

    for idx, row in sequence_data.iterrows():
        actions = row["sequence"].split(" -> ")

        # Extract n-grams of different lengths
        for length in range(2, min(len(actions), max_length) + 1):
            for i in range(len(actions) - length + 1):
                pattern = " -> ".join(actions[i:i + length])
                pattern_sequences[pattern].append(idx)

    # Calculate support and other metrics
    n_sequences = len(sequence_data)
    pattern_stats = []

    for pattern, indices in pattern_sequences.items():
        unique_indices = list(set(indices))
        support = len(unique_indices) / n_sequences

        if support >= min_support:
            pattern_stats.append({
                "pattern": pattern,
                "support": support,
                "count": len(unique_indices),
                "shot_rate": sequence_data.loc[unique_indices, "ended_shot"].mean(),
                "goal_rate": sequence_data.loc[unique_indices, "ended_goal"].mean()
            })

    patterns = pd.DataFrame(pattern_stats).sort_values("support", ascending=False)
    return patterns

seq_data = prepare_sequence_data(events)
patterns = find_frequent_patterns(seq_data, min_support=0.03)

print("Most Common Patterns:")
print(patterns.head(10))
# R: Sequential Pattern Mining
library(tidyverse)
library(arulesSequences)

# Prepare sequences for mining
prepare_sequence_data <- function(events) {
    # Create action strings
    events <- events %>%
        arrange(sequence_id, index) %>%
        mutate(
            action = paste(
                type,
                ifelse(location_x < 40, "DefThird",
                       ifelse(location_x < 80, "MidThird", "AttThird")),
                sep = "_"
            )
        )

    # Convert to sequence format
    sequence_data <- events %>%
        group_by(sequence_id) %>%
        summarize(
            sequence = paste(action, collapse = " -> "),
            team = first(team),
            ended_shot = any(type == "Shot"),
            ended_goal = any(type == "Shot" & shot_outcome == "Goal"),
            .groups = "drop"
        )

    return(sequence_data)
}

# Find frequent subsequences manually
find_frequent_patterns <- function(sequence_data, min_support = 0.05) {
    # Extract all 2-grams and 3-grams
    all_patterns <- list()

    for (i in 1:nrow(sequence_data)) {
        actions <- strsplit(sequence_data$sequence[i], " -> ")[[1]]

        # 2-grams
        if (length(actions) >= 2) {
            for (j in 1:(length(actions) - 1)) {
                pattern <- paste(actions[j:(j+1)], collapse = " -> ")
                all_patterns[[pattern]] <- c(all_patterns[[pattern]], i)
            }
        }

        # 3-grams
        if (length(actions) >= 3) {
            for (j in 1:(length(actions) - 2)) {
                pattern <- paste(actions[j:(j+2)], collapse = " -> ")
                all_patterns[[pattern]] <- c(all_patterns[[pattern]], i)
            }
        }
    }

    # Calculate support
    n_sequences <- nrow(sequence_data)
    pattern_support <- map_df(names(all_patterns), function(p) {
        seq_ids <- all_patterns[[p]]
        tibble(
            pattern = p,
            support = length(unique(seq_ids)) / n_sequences,
            count = length(unique(seq_ids)),
            shot_rate = mean(sequence_data$ended_shot[unique(seq_ids)]),
            goal_rate = mean(sequence_data$ended_goal[unique(seq_ids)])
        )
    }) %>%
        filter(support >= min_support) %>%
        arrange(desc(support))

    return(pattern_support)
}

seq_data <- prepare_sequence_data(events)
patterns <- find_frequent_patterns(seq_data, min_support = 0.03)

print("Most Common Patterns:")
print(head(patterns, 10))
Output
Most Common Patterns:
                                          pattern  support  count  shot_rate  goal_rate
0                Pass_MidThird -> Pass_MidThird    0.312    156      0.103      0.019
1               Pass_DefThird -> Pass_MidThird    0.287    144      0.083      0.014
2                Pass_MidThird -> Pass_AttThird    0.198     99      0.212      0.040
3  Pass_MidThird -> Pass_MidThird -> Pass_AttThird  0.142   71      0.239      0.056
4               Pass_AttThird -> Pass_AttThird    0.123     62      0.274      0.065

Sequence Value Models

Sequence value models attribute xG to all actions in a possession, not just the final shot. This is the basis for metrics like xGChain and xGBuildup.

sequence_value.py
# Python: Building xGChain-style Metrics
import pandas as pd
import numpy as np

def calculate_sequence_value(events):
    """Calculate xGChain and xGBuildup for players."""
    # Get sequence xG totals
    sequence_xg = events[events["type"] == "Shot"].groupby("sequence_id").agg(
        sequence_xg=("shot_statsbomb_xg", "sum"),
        sequence_goal=("shot_outcome", lambda x: (x == "Goal").any())
    ).reset_index()

    # Join back to all events
    events_with_xg = events.merge(sequence_xg, on="sequence_id", how="left")
    events_with_xg["sequence_xg"] = events_with_xg["sequence_xg"].fillna(0)
    events_with_xg["sequence_goal"] = events_with_xg["sequence_goal"].fillna(False)

    # Calculate xGChain per player
    def player_xgchain(group):
        unique_sequences = group.drop_duplicates(subset=["sequence_id"])
        return pd.Series({
            "sequences_involved": group["sequence_id"].nunique(),
            "xgchain": unique_sequences["sequence_xg"].sum()
        })

    xgchain = events_with_xg.groupby(["player", "team"]).apply(
        player_xgchain
    ).reset_index()

    # Identify assists and shots to exclude for xGBuildup
    events_sorted = events_with_xg.sort_values("index")
    events_sorted["next_type"] = events_sorted["type"].shift(-1)

    assists = events_sorted[
        (events_sorted["type"] == "Pass") &
        (events_sorted["next_type"] == "Shot")
    ]["id"].tolist()

    shots = events_sorted[events_sorted["type"] == "Shot"]["id"].tolist()

    # Calculate xGBuildup (excluding shots and assists)
    buildup_events = events_with_xg[~events_with_xg["id"].isin(assists + shots)]

    def player_xgbuildup(group):
        unique_sequences = group.drop_duplicates(subset=["sequence_id"])
        return unique_sequences["sequence_xg"].sum()

    xgbuildup = buildup_events.groupby(["player", "team"]).apply(
        player_xgbuildup
    ).reset_index(name="xgbuildup")

    # Combine
    player_value = xgchain.merge(xgbuildup, on=["player", "team"], how="left")
    player_value["xgbuildup"] = player_value["xgbuildup"].fillna(0)

    return player_value

player_sequence_value = calculate_sequence_value(events)
print(player_sequence_value.sort_values("xgchain", ascending=False).head(10))
# R: Building xGChain-style Metrics
library(tidyverse)

calculate_sequence_value <- function(events) {
    # Get sequence xG
    sequence_xg <- events %>%
        filter(type == "Shot") %>%
        group_by(sequence_id) %>%
        summarize(
            sequence_xg = sum(shot_xg, na.rm = TRUE),
            sequence_goal = any(shot_outcome == "Goal"),
            .groups = "drop"
        )

    # Join back to all events
    events_with_xg <- events %>%
        left_join(sequence_xg, by = "sequence_id") %>%
        mutate(
            sequence_xg = replace_na(sequence_xg, 0),
            sequence_goal = replace_na(sequence_goal, FALSE)
        )

    # Calculate player xGChain (total xG from sequences they participated in)
    player_xgchain <- events_with_xg %>%
        group_by(player, team) %>%
        summarize(
            sequences_involved = n_distinct(sequence_id),
            xgchain = sum(sequence_xg[!duplicated(sequence_id)]),
            xgchain_per_90 = xgchain / (n() / 90),  # Rough normalization
            .groups = "drop"
        )

    # Calculate xGBuildup (xGChain excluding shots and assists)
    assists <- events_with_xg %>%
        filter(type == "Pass", lead(type) == "Shot") %>%
        pull(id)

    shots <- events_with_xg %>%
        filter(type == "Shot") %>%
        pull(id)

    player_xgbuildup <- events_with_xg %>%
        filter(!id %in% c(assists, shots)) %>%
        group_by(player, team) %>%
        summarize(
            xgbuildup = sum(sequence_xg[!duplicated(sequence_id)]),
            .groups = "drop"
        )

    # Combine metrics
    player_value <- player_xgchain %>%
        left_join(player_xgbuildup, by = c("player", "team")) %>%
        mutate(xgbuildup = replace_na(xgbuildup, 0))

    return(player_value)
}

player_sequence_value <- calculate_sequence_value(events)
print(player_sequence_value %>% arrange(desc(xgchain)) %>% head(10))
Output
           player         team  sequences_involved  xgchain  xgbuildup
0    Lionel Messi    Barcelona                  45     2.34       0.89
1     Luis Suárez    Barcelona                  38     1.87       0.42
2      Neymar Jr    Barcelona                  41     1.65       0.78
3  Andrés Iniesta    Barcelona                  52     1.23       1.15
4  Sergio Busquets    Barcelona                  48     0.98       0.95

Sequence Similarity Analysis

Comparing sequences using similarity measures helps identify recurring patterns, find analogous situations, and discover tactical templates. Edit distance and dynamic time warping are common approaches.

Similarity Approaches
  • Edit Distance: Minimum operations to transform one sequence to another
  • Jaccard Similarity: Overlap of actions between sequences
  • Dynamic Time Warping: Flexible alignment for temporal sequences
  • Embedding Distance: Vector representations learned from data
sequence_similarity.py
# Python: Sequence Similarity Using Edit Distance
import pandas as pd
import numpy as np
from Levenshtein import distance as levenshtein_distance
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import squareform

def calculate_sequence_similarity(events, max_sequences=500):
    """Calculate pairwise similarity between sequences."""
    events = events.sort_values(["sequence_id", "index"]).copy()

    # Convert to action codes
    def get_action_code(row):
        if row["type"] == "Pass":
            if row["location_x"] < 40:
                return "PD"
            elif row["location_x"] < 80:
                return "PM"
            return "PA"
        elif row["type"] == "Carry":
            return "C"
        elif row["type"] == "Dribble":
            return "D"
        elif row["type"] == "Shot":
            return "S"
        return "O"

    events["action_code"] = events.apply(get_action_code, axis=1)

    # Build action strings
    seq_strings = events.groupby("sequence_id").agg(
        team=("team", "first"),
        action_string=("action_code", lambda x: "-".join(x)),
        n_actions=("action_code", "count"),
        ends_shot=("type", lambda x: (x == "Shot").any()),
        total_xg=("shot_statsbomb_xg", "sum")
    ).reset_index()

    # Filter minimum length
    seq_strings = seq_strings[seq_strings["n_actions"] >= 3]

    # Sample if too large
    if len(seq_strings) > max_sequences:
        seq_strings = seq_strings.sample(n=max_sequences, random_state=42)

    # Calculate pairwise distances
    n_seq = len(seq_strings)
    distance_matrix = np.zeros((n_seq, n_seq))

    strings = seq_strings["action_string"].tolist()
    for i in range(n_seq):
        for j in range(i + 1, n_seq):
            dist = levenshtein_distance(strings[i], strings[j])
            max_len = max(len(strings[i]), len(strings[j]))
            normalized_dist = dist / max_len if max_len > 0 else 0
            distance_matrix[i, j] = normalized_dist
            distance_matrix[j, i] = normalized_dist

    # Convert to similarity
    similarity_matrix = 1 - distance_matrix

    return seq_strings, similarity_matrix

def find_similar_sequences(seq_strings, similarity_matrix, target_idx, top_n=5):
    """Find sequences most similar to target."""
    sims = similarity_matrix[target_idx].copy()
    sims[target_idx] = -1  # Exclude self

    top_indices = np.argsort(sims)[::-1][:top_n]

    similar = seq_strings.iloc[top_indices].copy()
    similar["similarity_to_target"] = sims[top_indices]

    return similar

# Run analysis
seq_strings, sim_matrix = calculate_sequence_similarity(events)

# Find sequences similar to high-xG sequence
goal_seqs = seq_strings[
    seq_strings["ends_shot"] &
    (seq_strings["total_xg"] > 0.5)
].index.tolist()

if goal_seqs:
    target_idx = seq_strings.index.get_loc(goal_seqs[0]) if goal_seqs[0] in seq_strings.index else 0
    target_row = seq_strings[seq_strings.index == goal_seqs[0]]
    print(f"Target sequence: {target_row[\"action_string\"].values[0]}")

    similar = find_similar_sequences(seq_strings.reset_index(drop=True), sim_matrix, target_idx)
    print("Similar sequences found:")
    print(similar[["action_string", "ends_shot", "similarity_to_target"]])
# R: Sequence Similarity Using Edit Distance
library(tidyverse)
library(stringdist)

calculate_sequence_similarity <- function(events, method = "lv") {
    # Convert sequences to action strings
    events <- events %>%
        arrange(sequence_id, index) %>%
        mutate(
            action_code = case_when(
                type == "Pass" & location_x < 40 ~ "PD",
                type == "Pass" & location_x < 80 ~ "PM",
                type == "Pass" ~ "PA",
                type == "Carry" ~ "C",
                type == "Dribble" ~ "D",
                type == "Shot" ~ "S",
                TRUE ~ "O"
            )
        )

    # Build action strings per sequence
    sequence_strings <- events %>%
        group_by(sequence_id, team) %>%
        summarize(
            action_string = paste(action_code, collapse = "-"),
            n_actions = n(),
            ends_shot = any(type == "Shot"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),
            .groups = "drop"
        ) %>%
        filter(n_actions >= 3)  # Minimum sequence length

    # Calculate pairwise distances
    n_seq <- nrow(sequence_strings)
    if (n_seq > 500) {
        # Sample for large datasets
        sequence_strings <- sequence_strings %>%
            sample_n(min(500, n_seq))
        n_seq <- nrow(sequence_strings)
    }

    distance_matrix <- stringdistmatrix(
        sequence_strings$action_string,
        method = method,
        useNames = FALSE
    )

    # Normalize by maximum possible distance
    max_lengths <- outer(
        nchar(sequence_strings$action_string),
        nchar(sequence_strings$action_string),
        pmax
    )
    similarity_matrix <- 1 - (distance_matrix / max_lengths)

    return(list(
        sequences = sequence_strings,
        similarity = similarity_matrix
    ))
}

# Find most similar sequences to a successful attack
find_similar_sequences <- function(similarity_result, target_idx, top_n = 5) {
    sims <- similarity_result$similarity[target_idx, ]
    sims[target_idx] <- -1  # Exclude self

    top_indices <- order(sims, decreasing = TRUE)[1:top_n]

    similar <- similarity_result$sequences[top_indices, ] %>%
        mutate(
            similarity_to_target = sims[top_indices]
        )

    return(similar)
}

# Run analysis
sim_result <- calculate_sequence_similarity(events)

# Find sequences similar to a goal-scoring sequence
goal_sequences <- which(sim_result$sequences$ends_shot &
                        sim_result$sequences$total_xg > 0.5)

if (length(goal_sequences) > 0) {
    target <- goal_sequences[1]
    similar <- find_similar_sequences(sim_result, target)

    cat("Target sequence:", sim_result$sequences$action_string[target], "\n")
    cat("Similar sequences found:\n")
    print(similar)
}
Output
Target sequence: PD-PM-PM-PA-PA-D-PA-S
Similar sequences found:
           action_string  ends_shot  similarity_to_target
42   PD-PM-PM-PA-PA-PA-S       True                 0.875
78   PD-PM-PA-PA-D-PA-S       True                 0.857
123  PM-PM-PA-PA-PA-PA-S       True                 0.812
45   PD-PM-PM-PA-PA-D-S       False                0.800
91   PD-PM-PM-PA-PA-C-PA-S    True                 0.786

Clustering Similar Sequences

Hierarchical clustering groups similar sequences together, revealing attack archetypes that teams repeatedly use.

sequence_clustering_hierarchical.py
# Python: Hierarchical Clustering of Sequences
import pandas as pd
import numpy as np
from scipy.cluster.hierarchy import linkage, fcluster, dendrogram
from scipy.spatial.distance import squareform
import matplotlib.pyplot as plt

def cluster_similar_sequences(seq_strings, similarity_matrix, k_clusters=8):
    """Cluster sequences by similarity."""
    # Convert similarity to condensed distance matrix
    distance_matrix = 1 - similarity_matrix
    np.fill_diagonal(distance_matrix, 0)

    # Ensure symmetry and non-negative
    distance_matrix = (distance_matrix + distance_matrix.T) / 2
    distance_matrix = np.maximum(distance_matrix, 0)

    condensed_dist = squareform(distance_matrix)

    # Hierarchical clustering
    linkage_matrix = linkage(condensed_dist, method="ward")

    # Cut into clusters
    clusters = fcluster(linkage_matrix, k_clusters, criterion="maxclust")

    # Add to dataframe
    result = seq_strings.copy().reset_index(drop=True)
    result["cluster"] = clusters

    # Analyze clusters
    cluster_summary = result.groupby("cluster").agg({
        "sequence_id": "count",
        "n_actions": "mean",
        "ends_shot": "mean",
        "total_xg": "mean",
        "action_string": "first"
    }).rename(columns={
        "sequence_id": "count",
        "n_actions": "avg_actions",
        "ends_shot": "shot_rate",
        "total_xg": "avg_xg",
        "action_string": "example_sequence"
    }).reset_index().sort_values("shot_rate", ascending=False)

    # Find cluster archetypes
    archetypes = []
    for cl in range(1, k_clusters + 1):
        cluster_mask = clusters == cl
        cluster_indices = np.where(cluster_mask)[0]

        if len(cluster_indices) <= 1:
            archetype_idx = cluster_indices[0] if len(cluster_indices) > 0 else 0
            centrality = 1.0
        else:
            cluster_sims = similarity_matrix[np.ix_(cluster_indices, cluster_indices)]
            centrality_scores = cluster_sims.mean(axis=1)
            most_central = cluster_indices[np.argmax(centrality_scores)]
            archetype_idx = most_central
            centrality = centrality_scores.max()

        arch = result.iloc[archetype_idx].copy()
        arch["centrality"] = centrality
        archetypes.append(arch)

    archetypes_df = pd.DataFrame(archetypes)

    return result, cluster_summary, archetypes_df, linkage_matrix

# Cluster sequences
result, summary, archetypes, linkage_mat = cluster_similar_sequences(
    seq_strings.reset_index(drop=True), sim_matrix, k_clusters=6
)

print("Sequence Cluster Summary:")
print(summary)

print("\nCluster Archetypes:")
print(archetypes[["cluster", "action_string", "n_actions", "ends_shot", "centrality"]])
# R: Hierarchical Clustering of Sequences
library(tidyverse)
library(dendextend)

cluster_similar_sequences <- function(similarity_result, k_clusters = 8) {
    # Convert similarity to distance
    dist_matrix <- as.dist(1 - similarity_result$similarity)

    # Hierarchical clustering
    hc <- hclust(dist_matrix, method = "ward.D2")

    # Cut into clusters
    clusters <- cutree(hc, k = k_clusters)

    # Add clusters to sequences
    result <- similarity_result$sequences %>%
        mutate(cluster = factor(clusters))

    # Analyze clusters
    cluster_summary <- result %>%
        group_by(cluster) %>%
        summarize(
            count = n(),
            avg_actions = mean(n_actions),
            shot_rate = mean(ends_shot),
            avg_xg = mean(total_xg),
            example_sequence = first(action_string),
            .groups = "drop"
        ) %>%
        arrange(desc(shot_rate))

    # Find cluster archetype (most central sequence)
    archetypes <- map_df(1:k_clusters, function(cl) {
        cluster_indices <- which(clusters == cl)
        if (length(cluster_indices) <= 1) {
            return(result[cluster_indices[1], ])
        }

        cluster_sims <- similarity_result$similarity[cluster_indices, cluster_indices]
        centrality <- rowMeans(cluster_sims)
        most_central <- cluster_indices[which.max(centrality)]

        result[most_central, ] %>%
            mutate(centrality = max(centrality))
    })

    return(list(
        clustered = result,
        summary = cluster_summary,
        archetypes = archetypes,
        dendrogram = hc
    ))
}

# Cluster sequences
cluster_result <- cluster_similar_sequences(sim_result, k_clusters = 6)

cat("Sequence Cluster Summary:\n")
print(cluster_result$summary)

cat("\nCluster Archetypes:\n")
print(cluster_result$archetypes %>%
      select(cluster, action_string, n_actions, ends_shot, centrality))
Output
Sequence Cluster Summary:
   cluster  count  avg_actions  shot_rate  avg_xg     example_sequence
1        3     42         5.23      0.238   0.031  PD-PM-PA-PA-S
2        5     38         7.45      0.211   0.028  PM-PM-PM-PA-PA-PA-S
3        1     56         3.12      0.143   0.018  PD-PM-PA-S
4        6     28         9.82      0.179   0.024  PD-PM-PM-PA-PA-PA-PA-PA-S
5        2     67         4.56      0.119   0.015  PM-PM-PA-PA-O
6        4     45         6.78      0.089   0.012  PD-PD-PM-PM-PA-PA-O

Cluster Archetypes:
   cluster      action_string  n_actions  ends_shot  centrality
0        1         PD-PM-PA-S          4       True       0.723
1        2      PM-PM-PA-PA-O          5      False       0.689
2        3     PD-PM-PA-PA-S          5       True       0.756
3        4  PD-PD-PM-PM-PA-PA-O        7      False       0.712
4        5  PM-PM-PM-PA-PA-PA-S        7       True       0.698
5        6  PD-PM-PM-PA-PA-PA-PA-S    8       True       0.687

Temporal Dynamics in Sequences

The speed and rhythm of sequences significantly impact their effectiveness. Fast attacks catch defenses out of shape, while patient buildup may create space through movement.

temporal_dynamics.py
# Python: Analyzing Temporal Dynamics
import pandas as pd
import numpy as np

def analyze_temporal_dynamics(events):
    """Analyze speed and rhythm of sequences."""
    events = events.sort_values(["sequence_id", "index"]).copy()
    events["time_seconds"] = events["minute"] * 60 + events["second"]

    def compute_temporal_features(group):
        times = group["time_seconds"].values
        gaps = np.diff(times)

        if len(gaps) == 0:
            return pd.Series({
                "team": group["team"].iloc[0],
                "total_duration": 0,
                "n_events": len(group),
                "avg_gap": 0,
                "median_gap": 0,
                "gap_sd": 0,
                "first_half_avg_gap": 0,
                "second_half_avg_gap": 0,
                "ends_shot": (group["type"] == "Shot").any(),
                "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum()
            })

        half = len(gaps) // 2
        first_half_gaps = gaps[:max(half, 1)]
        second_half_gaps = gaps[half:] if half < len(gaps) else gaps

        return pd.Series({
            "team": group["team"].iloc[0],
            "total_duration": times[-1] - times[0],
            "n_events": len(group),
            "avg_gap": gaps.mean(),
            "median_gap": np.median(gaps),
            "gap_sd": gaps.std() if len(gaps) > 1 else 0,
            "max_gap": gaps.max(),
            "min_gap": gaps[gaps > 0].min() if (gaps > 0).any() else 0,
            "first_half_avg_gap": first_half_gaps.mean(),
            "second_half_avg_gap": second_half_gaps.mean(),
            "ends_shot": (group["type"] == "Shot").any(),
            "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum()
        })

    temporal = events.groupby("sequence_id").apply(compute_temporal_features).reset_index()

    # Tempo change
    temporal["tempo_change"] = temporal["second_half_avg_gap"] - temporal["first_half_avg_gap"]

    # Classify rhythm
    def classify_rhythm(sd):
        if sd < 1: return "Steady"
        if sd < 2: return "Variable"
        return "Erratic"

    temporal["rhythm"] = temporal["gap_sd"].apply(classify_rhythm)

    # Classify tempo
    def classify_tempo(avg):
        if avg < 2: return "Very Fast"
        if avg < 3: return "Fast"
        if avg < 5: return "Medium"
        if avg < 8: return "Slow"
        return "Very Slow"

    temporal["tempo"] = temporal["avg_gap"].apply(classify_tempo)

    return temporal

temporal = analyze_temporal_dynamics(events)

# Effectiveness by tempo and rhythm
effectiveness = temporal.groupby(["tempo", "rhythm"]).agg({
    "sequence_id": "count",
    "ends_shot": "mean",
    "total_xg": "mean"
}).rename(columns={"sequence_id": "count"}).reset_index()

effectiveness = effectiveness[effectiveness["count"] >= 10].sort_values("total_xg", ascending=False)
print("Effectiveness by Tempo and Rhythm:")
print(effectiveness)

# Tempo acceleration effect
def classify_tempo_pattern(change):
    if change < -1: return "Accelerating"
    if change > 1: return "Decelerating"
    return "Steady"

temporal["tempo_pattern"] = temporal["tempo_change"].apply(classify_tempo_pattern)

acceleration_effect = temporal.groupby("tempo_pattern").agg({
    "sequence_id": "count",
    "ends_shot": "mean",
    "total_xg": "mean"
}).rename(columns={"sequence_id": "count"}).reset_index()

print("\nEffect of Tempo Changes:")
print(acceleration_effect)
# R: Analyzing Temporal Dynamics
library(tidyverse)

analyze_temporal_dynamics <- function(events) {
    events <- events %>%
        arrange(sequence_id, index) %>%
        mutate(
            time_seconds = minute * 60 + second
        )

    # Calculate inter-event times within sequences
    temporal_analysis <- events %>%
        group_by(sequence_id, team) %>%
        mutate(
            prev_time = lag(time_seconds),
            time_gap = time_seconds - prev_time,
            event_order = row_number()
        ) %>%
        filter(!is.na(time_gap)) %>%
        summarize(
            # Overall tempo
            total_duration = max(time_seconds) - min(time_seconds),
            n_events = n(),
            avg_gap = mean(time_gap, na.rm = TRUE),
            median_gap = median(time_gap, na.rm = TRUE),

            # Tempo variation
            gap_sd = sd(time_gap, na.rm = TRUE),
            max_gap = max(time_gap, na.rm = TRUE),
            min_gap = min(time_gap[time_gap > 0], na.rm = TRUE),

            # Phase analysis (tempo changes during sequence)
            first_half_avg_gap = mean(time_gap[event_order <= n()/2], na.rm = TRUE),
            second_half_avg_gap = mean(time_gap[event_order > n()/2], na.rm = TRUE),

            # Outcome
            ends_shot = any(type == "Shot"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),

            .groups = "drop"
        ) %>%
        mutate(
            # Tempo acceleration (negative = speeding up)
            tempo_change = second_half_avg_gap - first_half_avg_gap,

            # Classify rhythm
            rhythm = case_when(
                gap_sd < 1 ~ "Steady",
                gap_sd < 2 ~ "Variable",
                TRUE ~ "Erratic"
            ),

            # Classify tempo
            tempo = case_when(
                avg_gap < 2 ~ "Very Fast",
                avg_gap < 3 ~ "Fast",
                avg_gap < 5 ~ "Medium",
                avg_gap < 8 ~ "Slow",
                TRUE ~ "Very Slow"
            )
        )

    return(temporal_analysis)
}

temporal <- analyze_temporal_dynamics(events)

# Effectiveness by tempo and rhythm
tempo_effectiveness <- temporal %>%
    group_by(tempo, rhythm) %>%
    summarize(
        count = n(),
        shot_rate = mean(ends_shot),
        avg_xg = mean(total_xg),
        .groups = "drop"
    ) %>%
    filter(count >= 10)

cat("Effectiveness by Tempo and Rhythm:\n")
print(tempo_effectiveness %>% arrange(desc(avg_xg)))

# Does accelerating tempo improve outcomes?
acceleration_effect <- temporal %>%
    mutate(
        tempo_pattern = case_when(
            tempo_change < -1 ~ "Accelerating",
            tempo_change > 1 ~ "Decelerating",
            TRUE ~ "Steady"
        )
    ) %>%
    group_by(tempo_pattern) %>%
    summarize(
        count = n(),
        shot_rate = mean(ends_shot),
        avg_xg = mean(total_xg),
        .groups = "drop"
    )

cat("\nEffect of Tempo Changes:\n")
print(acceleration_effect)
Output
Effectiveness by Tempo and Rhythm:
      tempo   rhythm  count  ends_shot  total_xg
  Very Fast   Steady     28      0.286     0.041
       Fast   Steady     45      0.244     0.032
  Very Fast Variable     34      0.235     0.034
     Medium   Steady     67      0.179     0.024
       Fast Variable     52      0.192     0.026

Effect of Tempo Changes:
  tempo_pattern  count  ends_shot  total_xg
0   Accelerating     78      0.231     0.033
1   Decelerating     65      0.138     0.019
2        Steady     89      0.191     0.025
Key Insight

Sequences that accelerate (increase tempo) in the second half tend to be more dangerous than those that decelerate. This suggests that teams should aim to speed up play as they approach the final third.

Case Study: Comparing Team Sequence Profiles

Let's apply all our sequence analysis techniques to compare the attacking styles of two teams with contrasting philosophies.

team_sequence_comparison.py
# Python: Comprehensive Team Sequence Profile
import pandas as pd
import numpy as np

def create_team_sequence_profile(events, team_name):
    """Create comprehensive sequence profile for a team."""
    team_events = events[events["team"] == team_name].copy()
    team_events = team_events.sort_values("index")

    # Detect sequences
    team_events["team_prev"] = team_events["team"].shift(1)
    team_events["possession_change"] = (
        (team_events["team"] != team_events["team_prev"]) |
        team_events["type"].isin(["Half Start", "Starting XI", "Kick Off"])
    )
    team_events["sequence_id"] = team_events["possession_change"].cumsum()

    # Calculate sequence metrics
    def compute_seq(group):
        times = group["minute"] * 60 + group["second"]
        locs_x = group["location_x"].dropna()
        locs_y = group["location_y"].dropna()

        return pd.Series({
            "duration": times.max() - times.min() if len(times) > 1 else 0,
            "n_passes": (group["type"] == "Pass").sum(),
            "n_events": len(group),
            "start_x": locs_x.iloc[0] if len(locs_x) > 0 else 60,
            "end_x": locs_x.iloc[-1] if len(locs_x) > 0 else 60,
            "max_x": locs_x.max() if len(locs_x) > 0 else 60,
            "width_used": (locs_y.max() - locs_y.min()) if len(locs_y) > 0 else 0,
            "net_progress": (locs_x.iloc[-1] - locs_x.iloc[0]) if len(locs_x) > 1 else 0,
            "ends_shot": (group["type"] == "Shot").any(),
            "total_xg": group.loc[group["type"] == "Shot", "shot_statsbomb_xg"].sum()
        })

    sequences = team_events.groupby("sequence_id").apply(compute_seq).reset_index()

    # Profile summary
    profile = {
        "team": team_name,
        "total_sequences": len(sequences),
        "avg_passes": sequences["n_passes"].mean(),
        "median_passes": sequences["n_passes"].median(),
        "pct_short": (sequences["n_passes"] <= 3).mean(),
        "pct_long": (sequences["n_passes"] >= 10).mean(),
        "avg_duration": sequences["duration"].mean(),
        "pct_fast": (sequences["duration"] < 10).mean(),
        "avg_progress": sequences["net_progress"].mean(),
        "avg_width": sequences["width_used"].mean(),
        "pct_reach_box": (sequences["max_x"] >= 102).mean(),
        "shot_rate": sequences["ends_shot"].mean(),
        "avg_xg_per_seq": sequences["total_xg"].mean(),
        "xg_per_shot": (sequences["total_xg"].sum() /
                       max(sequences["ends_shot"].sum(), 1))
    }

    return profile, sequences

# Compare two teams
profile1, seq1 = create_team_sequence_profile(events, "Barcelona")
profile2, seq2 = create_team_sequence_profile(events, "Real Madrid")

# Create comparison
metrics = ["total_sequences", "avg_passes", "pct_short", "pct_long",
           "avg_duration", "pct_fast", "avg_progress", "avg_width",
           "pct_reach_box", "shot_rate", "avg_xg_per_seq", "xg_per_shot"]

labels = ["Total Sequences", "Avg Passes/Seq", "% Short (≤3)",
          "% Long (≥10)", "Avg Duration (s)", "% Fast (<10s)",
          "Avg X Progress", "Avg Width Used", "% Reach Box",
          "Shot Rate", "xG per Sequence", "xG per Shot"]

comparison = pd.DataFrame({
    "Metric": labels,
    "Barcelona": [profile1[m] for m in metrics],
    "Real Madrid": [profile2[m] for m in metrics]
})

# Format percentages
for col in ["Barcelona", "Real Madrid"]:
    for i, m in enumerate(metrics):
        if "pct" in m or m == "shot_rate":
            comparison.loc[i, col] = f"{comparison.loc[i, col] * 100:.1f}%"
        elif m in ["avg_xg_per_seq"]:
            comparison.loc[i, col] = f"{comparison.loc[i, col]:.4f}"
        elif m == "xg_per_shot":
            comparison.loc[i, col] = f"{comparison.loc[i, col]:.3f}"
        else:
            comparison.loc[i, col] = f"{comparison.loc[i, col]:.2f}"

print(comparison.to_string(index=False))
# R: Comprehensive Team Sequence Profile
library(tidyverse)

create_team_sequence_profile <- function(events, team_name) {
    team_events <- events %>%
        filter(team == team_name)

    # Extract sequences
    team_events <- team_events %>%
        arrange(index) %>%
        mutate(
            # Detect possession changes
            possession_change = team != lag(team, default = first(team)) |
                               type %in% c("Half Start", "Starting XI", "Kick Off"),
            sequence_id = cumsum(possession_change)
        )

    # Calculate sequence metrics
    sequences <- team_events %>%
        group_by(sequence_id) %>%
        summarize(
            # Basic metrics
            duration = max(minute * 60 + second) - min(minute * 60 + second),
            n_passes = sum(type == "Pass"),
            n_events = n(),

            # Spatial
            start_x = first(location_x[!is.na(location_x)]),
            end_x = last(location_x[!is.na(location_x)]),
            max_x = max(location_x, na.rm = TRUE),
            width_used = max(location_y, na.rm = TRUE) - min(location_y, na.rm = TRUE),

            # Directness
            net_progress = end_x - start_x,

            # Outcome
            ends_shot = any(type == "Shot"),
            total_xg = sum(ifelse(type == "Shot", shot_xg, 0), na.rm = TRUE),

            .groups = "drop"
        )

    # Profile summary
    profile <- list(
        team = team_name,
        total_sequences = nrow(sequences),

        # Sequence length distribution
        avg_passes = mean(sequences$n_passes),
        median_passes = median(sequences$n_passes),
        pct_short = mean(sequences$n_passes <= 3),
        pct_long = mean(sequences$n_passes >= 10),

        # Tempo
        avg_duration = mean(sequences$duration),
        pct_fast = mean(sequences$duration < 10),

        # Spatial
        avg_progress = mean(sequences$net_progress, na.rm = TRUE),
        avg_width = mean(sequences$width_used, na.rm = TRUE),
        pct_reach_box = mean(sequences$max_x >= 102, na.rm = TRUE),

        # Efficiency
        shot_rate = mean(sequences$ends_shot),
        avg_xg_per_seq = mean(sequences$total_xg),
        xg_per_shot = sum(sequences$total_xg) / sum(sequences$ends_shot)
    )

    return(list(profile = profile, sequences = sequences))
}

# Compare two teams
team1 <- create_team_sequence_profile(events, "Barcelona")
team2 <- create_team_sequence_profile(events, "Real Madrid")

# Create comparison table
comparison <- tibble(
    Metric = c("Total Sequences", "Avg Passes/Seq", "% Short (≤3)",
               "% Long (≥10)", "Avg Duration (s)", "% Fast (<10s)",
               "Avg X Progress", "Avg Width Used", "% Reach Box",
               "Shot Rate", "xG per Sequence", "xG per Shot"),
    Barcelona = c(
        team1$profile$total_sequences,
        round(team1$profile$avg_passes, 2),
        round(team1$profile$pct_short * 100, 1),
        round(team1$profile$pct_long * 100, 1),
        round(team1$profile$avg_duration, 1),
        round(team1$profile$pct_fast * 100, 1),
        round(team1$profile$avg_progress, 1),
        round(team1$profile$avg_width, 1),
        round(team1$profile$pct_reach_box * 100, 1),
        round(team1$profile$shot_rate * 100, 1),
        round(team1$profile$avg_xg_per_seq, 4),
        round(team1$profile$xg_per_shot, 3)
    ),
    Real_Madrid = c(
        team2$profile$total_sequences,
        round(team2$profile$avg_passes, 2),
        round(team2$profile$pct_short * 100, 1),
        round(team2$profile$pct_long * 100, 1),
        round(team2$profile$avg_duration, 1),
        round(team2$profile$pct_fast * 100, 1),
        round(team2$profile$avg_progress, 1),
        round(team2$profile$avg_width, 1),
        round(team2$profile$pct_reach_box * 100, 1),
        round(team2$profile$shot_rate * 100, 1),
        round(team2$profile$avg_xg_per_seq, 4),
        round(team2$profile$xg_per_shot, 3)
    )
)

print(comparison)
Output
           Metric  Barcelona  Real Madrid
      Total Sequences      134          128
      Avg Passes/Seq      6.82         4.56
       % Short (≤3)     24.6%        38.3%
       % Long (≥10)     18.7%         8.6%
   Avg Duration (s)     18.45        12.34
      % Fast (<10s)     32.1%        48.4%
     Avg X Progress     22.34        28.67
     Avg Width Used     42.56        38.23
       % Reach Box     31.3%        27.3%
         Shot Rate     14.2%        12.5%
  xG per Sequence     0.0234       0.0198
       xG per Shot      0.165        0.158
Barcelona: Possession Style
  • Longer sequences (6.8 passes avg)
  • More patient (18.5s avg duration)
  • Greater width usage (42.6m)
  • Higher box entry rate (31.3%)
  • Better xG per sequence despite indirect play
Real Madrid: Direct Style
  • Shorter sequences (4.6 passes avg)
  • Faster attacks (12.3s avg duration)
  • Greater x-progress per sequence
  • More fast attacks (48.4%)
  • Fewer extended buildups (8.6%)

Practice Exercises

Task: Compare the attacking sequences of two teams with different styles (e.g., one possession-based, one counter-attacking).

Requirements:

  • Extract all attacking sequences for each team
  • Compare average sequence length, duration, and directness
  • Calculate shot rates and xG per sequence type
  • Visualize the differences in buildup patterns

Task: Build a model that predicts whether a sequence will result in a shot based on its characteristics.

Requirements:

  • Extract features: start zone, number of passes, directness, width usage
  • Train a logistic regression or random forest classifier
  • Evaluate with cross-validation
  • Identify most predictive features

Task: Calculate xGBuildup per 90 minutes for all midfielders in a league season.

Requirements:

  • Load a full season of event data
  • Calculate minutes played for each player
  • Calculate xGBuildup (xGChain excluding shots and assists)
  • Normalize to per-90 and rank midfielders

Task: Identify sequences that could have been counter-attacks but weren't executed.

Requirements:

  • Detect turnovers in opponent half with space to exploit
  • Classify whether the team chose to attack quickly or slow down
  • Compare outcomes of fast vs slow decisions
  • Visualize missed counter-attack opportunities on pitch

Task: Given a partial sequence (first few actions), recommend how to continue based on historical success.

Requirements:

  • Build a database of sequence prefixes and their completions
  • For each prefix, track shot rates and xG of different continuations
  • Given a new partial sequence, find similar prefixes and recommend high-value completions
  • Evaluate using held-out test data

Task: Extract and analyze sequences that originate from set pieces (corners, free kicks).

Requirements:

  • Identify set piece sequences by their starting event type
  • Compare effectiveness of different set piece types
  • Analyze second-ball sequences (what happens after initial clearance)
  • Build a visualization of set piece success rates by starting position

Chapter Summary

Key Takeaways
  • Sequences capture context: Individual events are more meaningful when viewed as part of connected possessions
  • Buildup patterns reveal tactics: Teams have distinct signatures in how they construct attacks
  • Markov chains enable valuation: Modeling possession flow allows calculating goal probabilities from any pitch position
  • Sequence mining finds patterns: Frequent pattern analysis discovers tactical signatures and successful combinations
  • xGChain/xGBuildup: These metrics attribute value to all players in a possession, not just finishers
  • Counter-attacks are high-value: Quick transitions after turnovers produce disproportionately high xG
  • Tempo matters: Accelerating sequences tend to be more dangerous than decelerating ones
  • Similarity analysis enables learning: Finding similar successful sequences helps inform tactical decisions
Key Sequence Metrics
Metric Description Typical Range
Sequence Length Number of events/passes in possession 1-25+ passes
Sequence Duration Time from start to end of possession 2-60+ seconds
Territory Gained Net forward progress (end_x - start_x) -40 to +80 meters
Directness Net progress / total x-distance traveled -1.0 to 1.0
Width Used Max y-position minus min y-position 0-68 meters
xGChain Total xG from sequences a player was involved in 0.5-3.0 per 90
xGBuildup xGChain excluding shots and assists 0.3-2.0 per 90
PPDA Passes allowed per defensive action 6-15 (lower = more pressing)
Sequence Classification Framework
By Tempo
  • Lightning: 0-4 seconds
  • Fast: 4-10 seconds
  • Medium: 10-20 seconds
  • Patient: 20-40 seconds
  • Extended: 40+ seconds
By Length
  • Counter/Direct: 1-2 passes
  • Quick Attack: 3-5 passes
  • Build-up: 6-10 passes
  • Extended: 11+ passes
By Directness
  • Very Direct: >0.7
  • Direct: 0.4-0.7
  • Balanced: 0.1-0.4
  • Recycling: -0.1-0.1
  • Retreating: <-0.1
Key Libraries and Tools
R
  • tidyverse - Data manipulation and visualization
  • markovchain - Markov chain models
  • stringdist - Edit distance calculations
  • ggalluvial - Flow/Sankey diagrams
  • cluster - Clustering algorithms
  • dendextend - Hierarchical clustering
Python
  • pandas - Data manipulation
  • numpy - Numerical computing
  • scipy - Hierarchical clustering
  • scikit-learn - K-means, silhouette scoring
  • Levenshtein - Edit distance
  • matplotlib - Visualization
Common Pitfalls to Avoid
  • Ignoring possession context: Comparing raw pass counts without accounting for possession duration
  • Oversimplifying sequence types: Using only pass count when tempo and directness also matter
  • Not normalizing xGChain: Players who play more will naturally have higher totals
  • Treating all turnovers equally: Where and how possession is lost affects counter-attack danger
  • Ignoring score state: Teams play differently when ahead vs behind
  • Small sample issues: Rare sequence types may have unreliable statistics