Capstone - Complete Analytics System
- Understand the structure and schema of football event data
- Parse and clean raw event data from different providers
- Work with nested JSON structures and qualifiers
- Create derived events and custom event types
- Build custom metrics from raw event sequences
- Handle event data quality issues and missing data
- Optimize event data processing for large datasets
- Compare event data across different providers
Understanding Event Data
Event data forms the backbone of modern football analytics. Every pass, shot, tackle, and dribble is recorded with precise coordinates and timestamps. Understanding how to work with raw event data unlocks the ability to create custom metrics and analyses that go beyond pre-packaged statistics.
What is Event Data?
Event data captures discrete on-ball actions during a match. Each event has a type (pass, shot, tackle), location (x,y coordinates), timestamp, player/team information, and qualifiers that provide additional context (pass height, shot body part, etc.).
Location Data
X,Y coordinates for start/end positions of each action
Temporal Data
Timestamps, match periods, and event sequences
Qualifiers
Additional context: body part, technique, outcome
# Python: Understanding Event Data Structure
import pandas as pd
import json
from statsbombpy import sb
# Load StatsBomb event data
events = sb.events(match_id=3788741)
# Examine the structure
print(events.info())
print(events.head())
# Core fields in event data
core_fields = [
"id", # Unique event identifier
"index", # Sequential order in match
"period", # 1 = first half, 2 = second half
"timestamp", # Time in period (HH:MM:SS.mmm)
"minute", # Match minute
"second", # Second within minute
"type", # Event type (Pass, Shot, etc.)
"player", # Player performing action
"team", # Team in possession
"location", # [x, y] coordinates
"duration" # Event duration in seconds
]
# View available columns
print("Available columns:")
print(events.columns.tolist())
# Count events by type
event_counts = events["type"].value_counts()
print("\nEvent counts by type:")
print(event_counts)# R: Understanding Event Data Structure
library(tidyverse)
library(jsonlite)
# Load StatsBomb event data
events <- fromJSON("statsbomb_events.json", flatten = TRUE)
# Examine the structure
str(events[1:3, ])
# Core fields in event data
core_fields <- c(
"id", # Unique event identifier
"index", # Sequential order in match
"period", # 1 = first half, 2 = second half
"timestamp", # Time in period (HH:MM:SS.mmm)
"minute", # Match minute
"second", # Second within minute
"type.name", # Event type (Pass, Shot, etc.)
"player.name", # Player performing action
"team.name", # Team in possession
"location", # [x, y] coordinates
"duration" # Event duration in seconds
)
# View sample events
events %>%
select(any_of(core_fields)) %>%
head(10) %>%
print()
# Count events by type
event_counts <- events %>%
count(type.name, sort = TRUE)
print(event_counts)Available columns:
['id', 'index', 'period', 'timestamp', 'minute', 'second',
'type', 'possession', 'possession_team', 'play_pattern',
'team', 'player', 'position', 'location', 'duration', ...]
Event counts by type:
Pass 847
Ball Receipt* 582
Carry 498
Pressure 189
Ball Recovery 87
Duel 76
Clearance 52
Shot 28
...Event Data Schema
Different data providers use different schemas. Understanding the structure helps you work across providers and build provider-agnostic pipelines.
| Event Type | Key Qualifiers | Typical Fields |
|---|---|---|
| Pass | Height, length, technique, body part, outcome | end_location, recipient, pass_type, cross, through_ball |
| Shot | Body part, technique, first_time, outcome | end_location, xG, freeze_frame, statsbomb_xg |
| Dribble | Outcome (complete/incomplete), overrun | end_location, nutmeg, no_touch |
| Tackle | Outcome (won/lost) | counterpress |
| Pressure | Duration, counterpress | duration |
| Carry | Under pressure | end_location, duration |
# Python: Exploring Event Qualifiers
import pandas as pd
# Extract pass-specific fields
passes = events[events["type"] == "Pass"].copy()
# Common pass columns
pass_columns = [
"id", "minute", "second", "player", "team",
"location", "pass_end_location",
"pass_length", "pass_angle", "pass_height",
"pass_body_part", "pass_type",
"pass_outcome", "pass_recipient",
"pass_cross", "pass_through_ball", "pass_switch"
]
# Filter to available columns
available_pass_cols = [c for c in pass_columns if c in passes.columns]
pass_data = passes[available_pass_cols]
# View pass qualifiers summary
pass_summary = {
"total_passes": len(passes),
"crosses": passes["pass_cross"].sum() if "pass_cross" in passes.columns else 0,
"through_balls": passes["pass_through_ball"].sum() if "pass_through_ball" in passes.columns else 0,
"successful": passes["pass_outcome"].isna().sum(),
"unsuccessful": passes["pass_outcome"].notna().sum()
}
print("Pass Summary:")
for k, v in pass_summary.items():
print(f" {k}: {v}")
# Extract shot-specific fields
shots = events[events["type"] == "Shot"].copy()
# Examine shot data
print("\nShot columns available:")
shot_cols = [c for c in shots.columns if "shot" in c.lower()]
print(shot_cols)
# View shot details
print("\nShot details:")
print(shots[["minute", "player", "shot_statsbomb_xg", "shot_outcome"]].head())# R: Exploring Event Qualifiers
library(tidyverse)
# Extract pass-specific fields
passes <- events %>%
filter(type.name == "Pass") %>%
select(
id, minute, second, player.name, team.name,
location, pass.end_location,
pass.length, pass.angle, pass.height.name,
pass.body_part.name, pass.type.name,
pass.outcome.name, pass.recipient.name,
pass.cross, pass.through_ball, pass.switch
)
# View pass qualifiers
pass_qualifiers <- passes %>%
summarize(
total_passes = n(),
crosses = sum(pass.cross == TRUE, na.rm = TRUE),
through_balls = sum(pass.through_ball == TRUE, na.rm = TRUE),
switches = sum(pass.switch == TRUE, na.rm = TRUE),
successful = sum(is.na(pass.outcome.name)),
unsuccessful = sum(!is.na(pass.outcome.name))
)
print(pass_qualifiers)
# Extract shot-specific fields
shots <- events %>%
filter(type.name == "Shot") %>%
select(
id, minute, second, player.name, team.name,
location, shot.end_location,
shot.statsbomb_xg, shot.outcome.name,
shot.body_part.name, shot.technique.name,
shot.type.name, shot.first_time,
shot.freeze_frame
)
# Examine freeze frame data (player positions at time of shot)
if ("shot.freeze_frame" %in% names(shots)) {
freeze_frame <- shots$shot.freeze_frame[[1]]
print("Freeze frame structure:")
print(str(freeze_frame))
}Pass Summary:
total_passes: 847
crosses: 23
through_balls: 8
successful: 712
unsuccessful: 135
Shot columns available:
['shot_statsbomb_xg', 'shot_end_location', 'shot_outcome',
'shot_body_part', 'shot_technique', 'shot_type', 'shot_first_time',
'shot_freeze_frame', 'shot_key_pass_id']
Shot details:
minute player shot_statsbomb_xg shot_outcome
0 12 Lionel Messi 0.082 Saved
1 27 Luis Suárez 0.156 Goal
2 34 Lionel Messi 0.043 Off TParsing Raw Event Data
Raw event data often comes in nested JSON format. Learning to parse and flatten this data is essential for analysis.
# Python: Parsing Nested JSON Event Data
import json
import pandas as pd
from typing import Dict, Any, List
def flatten_event(event: Dict[str, Any]) -> Dict[str, Any]:
"""Flatten a single nested event into a flat dictionary."""
# Base fields
flat = {
"id": event.get("id"),
"index": event.get("index"),
"period": event.get("period"),
"minute": event.get("minute"),
"second": event.get("second"),
"type": event.get("type", {}).get("name"),
"type_id": event.get("type", {}).get("id"),
"player": event.get("player", {}).get("name"),
"player_id": event.get("player", {}).get("id"),
"team": event.get("team", {}).get("name"),
"team_id": event.get("team", {}).get("id"),
}
# Location
location = event.get("location")
if location:
flat["location_x"] = location[0]
flat["location_y"] = location[1]
# Pass-specific fields
if "pass" in event:
pass_data = event["pass"]
flat["pass_length"] = pass_data.get("length")
flat["pass_angle"] = pass_data.get("angle")
end_loc = pass_data.get("end_location", [None, None])
flat["pass_end_x"] = end_loc[0] if end_loc else None
flat["pass_end_y"] = end_loc[1] if end_loc else None
flat["pass_outcome"] = pass_data.get("outcome", {}).get("name", "Complete")
flat["pass_recipient"] = pass_data.get("recipient", {}).get("name")
# Shot-specific fields
if "shot" in event:
shot_data = event["shot"]
flat["shot_xg"] = shot_data.get("statsbomb_xg")
flat["shot_outcome"] = shot_data.get("outcome", {}).get("name")
end_loc = shot_data.get("end_location", [None, None, None])
flat["shot_end_x"] = end_loc[0] if end_loc else None
flat["shot_end_y"] = end_loc[1] if end_loc else None
return flat
def parse_events_file(filepath: str) -> pd.DataFrame:
"""Parse a JSON events file into a flat DataFrame."""
with open(filepath, "r") as f:
raw_events = json.load(f)
flattened = [flatten_event(event) for event in raw_events]
return pd.DataFrame(flattened)
# Parse events
events_flat = parse_events_file("match_events.json")
print(events_flat.head())
print(f"Parsed {len(events_flat)} events")# R: Parsing Nested JSON Event Data
library(tidyverse)
library(jsonlite)
# Read raw JSON file
raw_json <- read_json("match_events.json", simplifyVector = FALSE)
# Function to flatten a single event
flatten_event <- function(event) {
# Base fields
base <- tibble(
id = event$id %||% NA,
index = event$index %||% NA,
period = event$period %||% NA,
minute = event$minute %||% NA,
second = event$second %||% NA,
type = event$type$name %||% NA,
type_id = event$type$id %||% NA,
player = event$player$name %||% NA,
player_id = event$player$id %||% NA,
team = event$team$name %||% NA,
team_id = event$team$id %||% NA
)
# Location
if (!is.null(event$location)) {
base$location_x <- event$location[[1]]
base$location_y <- event$location[[2]]
}
# Type-specific fields
if (!is.null(event$pass)) {
base$pass_length <- event$pass$length
base$pass_angle <- event$pass$angle
base$pass_end_x <- event$pass$end_location[[1]]
base$pass_end_y <- event$pass$end_location[[2]]
base$pass_outcome <- event$pass$outcome$name %||% "Complete"
base$pass_recipient <- event$pass$recipient$name
}
if (!is.null(event$shot)) {
base$shot_xg <- event$shot$statsbomb_xg
base$shot_outcome <- event$shot$outcome$name
base$shot_end_x <- event$shot$end_location[[1]]
base$shot_end_y <- event$shot$end_location[[2]]
}
return(base)
}
# Parse all events
events_flat <- map_dfr(raw_json, flatten_event)
# View result
print(head(events_flat))
print(sprintf("Parsed %d events", nrow(events_flat))) id index period minute second type location_x location_y
0 uuid-001 1 1 0 0 Pass 60.0 40.0
1 uuid-002 2 1 0 2 Ball Receipt 45.0 35.0
2 uuid-003 3 1 0 3 Carry 45.0 35.0
3 uuid-004 4 1 0 5 Pass 52.0 28.0
4 uuid-005 5 1 0 7 Ball Receipt 65.0 22.0
Parsed 1847 eventsHandling Multiple Providers
# Python: Provider-Agnostic Event Parser
import pandas as pd
from abc import ABC, abstractmethod
class EventParser(ABC):
"""Abstract base class for event data parsers."""
@abstractmethod
def parse(self, data) -> pd.DataFrame:
pass
@abstractmethod
def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
pass
class StatsBombParser(EventParser):
"""Parser for StatsBomb event data."""
def parse(self, data) -> pd.DataFrame:
return pd.json_normalize(data)
def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
standardized = pd.DataFrame({
"event_id": range(len(df)),
"event_type": df["type.name"] if "type.name" in df.columns else df["type"],
"player": df.get("player.name", df.get("player")),
"team": df.get("team.name", df.get("team")),
"minute": df["minute"],
"second": df["second"],
"location_x": df["location"].apply(lambda x: x[0] if isinstance(x, list) else None),
"location_y": df["location"].apply(lambda x: x[1] if isinstance(x, list) else None),
})
return standardized
class WyscoutParser(EventParser):
"""Parser for Wyscout event data."""
def parse(self, data) -> pd.DataFrame:
return pd.json_normalize(data)
def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
standardized = pd.DataFrame({
"event_id": range(len(df)),
"event_type": df["eventName"],
"player": df["playerId"],
"team": df["teamId"],
"minute": (df["eventSec"] // 60).astype(int),
"second": (df["eventSec"] % 60).astype(int),
"location_x": df["positions"].apply(
lambda x: x[0]["x"] if x else None
),
"location_y": df["positions"].apply(
lambda x: x[0]["y"] if x else None
),
})
return standardized
def get_parser(provider: str) -> EventParser:
"""Factory function to get appropriate parser."""
parsers = {
"statsbomb": StatsBombParser(),
"wyscout": WyscoutParser(),
}
return parsers.get(provider.lower())
# Usage
parser = get_parser("statsbomb")
standardized = parser.standardize(events)
print(standardized.head())# R: Provider-Agnostic Event Parser
library(tidyverse)
# Define provider schemas
provider_schemas <- list(
statsbomb = list(
event_type = "type.name",
location_x = "location[[1]]",
location_y = "location[[2]]",
player = "player.name",
team = "team.name",
timestamp = "timestamp"
),
opta = list(
event_type = "type_id",
location_x = "x",
location_y = "y",
player = "player_id",
team = "team_id",
timestamp = "time_stamp"
),
wyscout = list(
event_type = "eventName",
location_x = "positions[[1]]$x",
location_y = "positions[[1]]$y",
player = "playerId",
team = "teamId",
timestamp = "eventSec"
)
)
# Standardize events to common schema
standardize_events <- function(events, provider) {
schema <- provider_schemas[[provider]]
standardized <- events %>%
transmute(
event_id = row_number(),
event_type = !!sym(schema$event_type),
player = !!sym(schema$player),
team = !!sym(schema$team),
minute = minute,
second = second
)
# Handle location based on provider
if (provider == "statsbomb") {
standardized <- standardized %>%
mutate(
location_x = map_dbl(events$location, ~ .x[[1]] %||% NA),
location_y = map_dbl(events$location, ~ .x[[2]] %||% NA)
)
}
return(standardized)
}
# Example usage
standardized <- standardize_events(events, "statsbomb")
print(head(standardized))Creating Derived Events
Raw event data can be enriched by creating derived events—new event types calculated from existing data that capture higher-level concepts.
# Python: Creating Derived Events
import pandas as pd
import numpy as np
def create_progressive_passes(events: pd.DataFrame) -> pd.DataFrame:
"""Identify progressive passes (advance ball >10m toward goal)."""
passes = events[events["type"] == "Pass"].copy()
# Extract coordinates
passes["start_x"] = passes["location"].apply(lambda x: x[0] if x else None)
passes["start_y"] = passes["location"].apply(lambda x: x[1] if x else None)
passes["end_x"] = passes["pass_end_location"].apply(
lambda x: x[0] if isinstance(x, list) else None
)
passes["end_y"] = passes["pass_end_location"].apply(
lambda x: x[1] if isinstance(x, list) else None
)
# Calculate distances to goal (goal at x=120)
passes["start_dist_to_goal"] = np.sqrt(
(120 - passes["start_x"])**2 + (40 - passes["start_y"])**2
)
passes["end_dist_to_goal"] = np.sqrt(
(120 - passes["end_x"])**2 + (40 - passes["end_y"])**2
)
# Progressive if advances >10m toward goal
passes["is_progressive"] = (
passes["start_dist_to_goal"] - passes["end_dist_to_goal"]
) > 10
# Into final third
passes["into_final_third"] = (passes["start_x"] < 80) & (passes["end_x"] >= 80)
# Into penalty area
passes["into_box"] = (
(passes["end_x"] >= 102) &
(passes["end_y"] >= 18) &
(passes["end_y"] <= 62)
)
return passes
def create_ball_progressions(events: pd.DataFrame) -> pd.DataFrame:
"""Create combined ball progression events (passes + carries)."""
progressions = []
# Progressive passes
passes = events[events["type"] == "Pass"].copy()
passes["start_x"] = passes["location"].apply(lambda x: x[0] if x else 0)
passes["end_x"] = passes["pass_end_location"].apply(
lambda x: x[0] if isinstance(x, list) else 0
)
passes["progress_dist"] = passes["end_x"] - passes["start_x"]
prog_passes = passes[passes["progress_dist"] > 10].copy()
prog_passes["progression_type"] = "pass"
progressions.append(prog_passes)
# Progressive carries
if "Carry" in events["type"].values:
carries = events[events["type"] == "Carry"].copy()
carries["start_x"] = carries["location"].apply(lambda x: x[0] if x else 0)
carries["end_x"] = carries["carry_end_location"].apply(
lambda x: x[0] if isinstance(x, list) else 0
)
carries["progress_dist"] = carries["end_x"] - carries["start_x"]
prog_carries = carries[carries["progress_dist"] > 10].copy()
prog_carries["progression_type"] = "carry"
progressions.append(prog_carries)
return pd.concat(progressions).sort_values("index")
prog_passes = create_progressive_passes(events)
print(f"Progressive passes: {prog_passes['is_progressive'].sum()} "
f"({prog_passes['is_progressive'].mean()*100:.1f}%)")# R: Creating Derived Events
library(tidyverse)
# Create progressive passes (advance ball >10m toward goal)
create_progressive_passes <- function(events) {
passes <- events %>%
filter(type.name == "Pass") %>%
mutate(
# Extract coordinates
start_x = map_dbl(location, ~ .x[[1]]),
start_y = map_dbl(location, ~ .x[[2]]),
end_x = map_dbl(pass.end_location, ~ .x[[1]]),
end_y = map_dbl(pass.end_location, ~ .x[[2]]),
# Calculate distances to goal (goal at x=120)
start_dist_to_goal = sqrt((120 - start_x)^2 + (40 - start_y)^2),
end_dist_to_goal = sqrt((120 - end_x)^2 + (40 - end_y)^2),
# Progressive if advances >10m toward goal
is_progressive = (start_dist_to_goal - end_dist_to_goal) > 10,
# Into final third
into_final_third = start_x < 80 & end_x >= 80,
# Into penalty area
into_box = end_x >= 102 & end_y >= 18 & end_y <= 62
)
return(passes)
}
# Create ball progression events (carries + progressive passes)
create_ball_progressions <- function(events) {
# Progressive passes
prog_passes <- events %>%
filter(type.name == "Pass") %>%
mutate(
start_x = map_dbl(location, ~ .x[[1]]),
end_x = map_dbl(pass.end_location, ~ .x[[1]]),
progress_dist = end_x - start_x
) %>%
filter(progress_dist > 10) %>%
mutate(progression_type = "pass")
# Progressive carries
prog_carries <- events %>%
filter(type.name == "Carry") %>%
mutate(
start_x = map_dbl(location, ~ .x[[1]]),
end_x = map_dbl(carry.end_location, ~ .x[[1]]),
progress_dist = end_x - start_x
) %>%
filter(progress_dist > 10) %>%
mutate(progression_type = "carry")
# Combine
progressions <- bind_rows(prog_passes, prog_carries) %>%
arrange(index)
return(progressions)
}
prog_passes <- create_progressive_passes(events)
cat(sprintf("Progressive passes: %d (%.1f%%)\n",
sum(prog_passes$is_progressive),
mean(prog_passes$is_progressive) * 100))Progressive passes: 142 (16.8%)Creating Possession Sequences
# Python: Create Possession Sequences
import pandas as pd
import numpy as np
def create_possession_sequences(events: pd.DataFrame) -> pd.DataFrame:
"""Create possession sequence summaries."""
events = events.sort_values("index").copy()
# Identify possession changes
events["team_change"] = events["team"] != events["team"].shift(1)
events["new_possession"] = (
events["team_change"] |
events["type"].isin(["Starting XI", "Half Start"]) |
events["team"].shift(1).isna()
)
events["possession_id"] = events["new_possession"].cumsum()
# Helper to safely get location
def safe_location_x(loc):
if isinstance(loc, list) and len(loc) > 0:
return loc[0]
return np.nan
events["location_x"] = events["location"].apply(safe_location_x)
# Summarize each possession
possession_summary = events.groupby(["possession_id", "team"]).agg(
start_minute=("minute", "min"),
start_second=("second", "min"),
end_minute=("minute", "max"),
end_second=("second", "max"),
n_events=("index", "count"),
n_passes=("type", lambda x: (x == "Pass").sum()),
has_shot=("type", lambda x: (x == "Shot").any()),
has_goal=("shot_outcome", lambda x: (x == "Goal").any() if "Goal" in x.values else False),
start_x=("location_x", "first"),
end_x=("location_x", "last"),
).reset_index()
# Calculate duration
possession_summary["duration_seconds"] = (
(possession_summary["end_minute"] * 60 + possession_summary["end_second"]) -
(possession_summary["start_minute"] * 60 + possession_summary["start_second"])
)
# Classify sequences
possession_summary["territory_gained"] = (
possession_summary["end_x"] - possession_summary["start_x"]
)
def classify_sequence(row):
if row["has_goal"]:
return "Goal"
elif row["has_shot"]:
return "Shot"
elif row["end_x"] >= 102:
return "Penalty Area Entry"
elif row["end_x"] >= 80:
return "Final Third Entry"
else:
return "No Threat"
possession_summary["sequence_quality"] = possession_summary.apply(
classify_sequence, axis=1
)
return possession_summary
possessions = create_possession_sequences(events)
print(possessions["sequence_quality"].value_counts())# R: Create Possession Sequences
library(tidyverse)
create_possession_sequences <- function(events) {
# Identify possession changes
events <- events %>%
arrange(index) %>%
mutate(
# New possession on team change or after certain events
new_possession = team.name != lag(team.name) |
type.name %in% c("Starting XI", "Half Start") |
is.na(lag(team.name)),
possession_id = cumsum(new_possession)
)
# Summarize each possession
possession_summary <- events %>%
group_by(possession_id, team.name) %>%
summarize(
start_minute = min(minute),
start_second = min(second),
end_minute = max(minute),
end_second = max(second),
duration_seconds = (end_minute * 60 + end_second) -
(start_minute * 60 + start_second),
n_events = n(),
n_passes = sum(type.name == "Pass"),
n_successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
has_shot = any(type.name == "Shot"),
has_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),
start_x = first(na.omit(map_dbl(location, ~ .x[[1]]))),
end_x = last(na.omit(map_dbl(location, ~ .x[[1]]))),
.groups = "drop"
) %>%
mutate(
territory_gained = end_x - start_x,
is_attacking_sequence = end_x > 80,
sequence_quality = case_when(
has_goal ~ "Goal",
has_shot ~ "Shot",
end_x >= 102 ~ "Penalty Area Entry",
end_x >= 80 ~ "Final Third Entry",
TRUE ~ "No Threat"
)
)
return(possession_summary)
}
possessions <- create_possession_sequences(events)
print(table(possessions$sequence_quality))No Threat 156
Final Third Entry 42
Shot 28
Penalty Area Entry 18
Goal 3
Name: sequence_quality, dtype: int64Building Custom Metrics
With access to raw event data, you can create custom metrics tailored to specific questions or requirements.
# Python: Building Custom Metrics from Events
import pandas as pd
import numpy as np
def calculate_dangerous_passes_received(events: pd.DataFrame) -> pd.DataFrame:
"""Calculate dangerous passes received by player."""
passes = events[
(events["type"] == "Pass") &
(events["pass_outcome"].isna()) # Successful passes
].copy()
# Extract end locations
passes["end_x"] = passes["pass_end_location"].apply(
lambda x: x[0] if isinstance(x, list) else None
)
passes["end_y"] = passes["pass_end_location"].apply(
lambda x: x[1] if isinstance(x, list) else None
)
# Dangerous zone: final third, central areas
passes["is_dangerous"] = (
(passes["end_x"] >= 80) &
(passes["end_y"] >= 20) &
(passes["end_y"] <= 60) &
(passes["pass_recipient"].notna())
)
dangerous_received = passes[passes["is_dangerous"]].groupby(
"pass_recipient"
).size().reset_index(name="dangerous_passes_received")
return dangerous_received.sort_values(
"dangerous_passes_received", ascending=False
)
def calculate_pressure_regains(events: pd.DataFrame) -> pd.DataFrame:
"""Calculate pressure regain rate by player."""
events = events.sort_values("index").copy()
# Get pressures with next event info
pressures = events[events["type"] == "Pressure"].copy()
pressures["next_event"] = events["type"].shift(-1)
pressures["next_team"] = events["team"].shift(-1)
# Pressure success: same team regains ball
pressures["pressure_success"] = (
(pressures["next_team"] == pressures["team"]) &
(pressures["next_event"].isin(["Ball Recovery", "Interception"]))
)
player_pressure_stats = pressures.groupby("player").agg(
pressures=("index", "count"),
pressure_regains=("pressure_success", "sum")
).reset_index()
player_pressure_stats["regain_rate"] = (
player_pressure_stats["pressure_regains"] /
player_pressure_stats["pressures"]
)
return player_pressure_stats[
player_pressure_stats["pressures"] >= 5
].sort_values("regain_rate", ascending=False)
def calculate_box_entries(events: pd.DataFrame) -> pd.DataFrame:
"""Calculate box entries by player (passes + carries)."""
actions = events[events["type"].isin(["Pass", "Carry"])].copy()
def get_end_location(row):
if row["type"] == "Pass":
loc = row.get("pass_end_location")
else:
loc = row.get("carry_end_location")
return loc if isinstance(loc, list) else [None, None]
actions["start_x"] = actions["location"].apply(
lambda x: x[0] if isinstance(x, list) else None
)
actions["start_y"] = actions["location"].apply(
lambda x: x[1] if isinstance(x, list) else None
)
actions["end_loc"] = actions.apply(get_end_location, axis=1)
actions["end_x"] = actions["end_loc"].apply(lambda x: x[0])
actions["end_y"] = actions["end_loc"].apply(lambda x: x[1])
# Check if enters box
actions["enters_box"] = (
((actions["start_x"] < 102) | (actions["start_y"] < 18) | (actions["start_y"] > 62)) &
(actions["end_x"] >= 102) &
(actions["end_y"] >= 18) &
(actions["end_y"] <= 62)
)
box_entries = actions[actions["enters_box"]].groupby(
["player", "type"]
).size().unstack(fill_value=0).reset_index()
if "Pass" not in box_entries.columns:
box_entries["Pass"] = 0
if "Carry" not in box_entries.columns:
box_entries["Carry"] = 0
box_entries["total_box_entries"] = box_entries["Pass"] + box_entries["Carry"]
return box_entries.sort_values("total_box_entries", ascending=False)
# Calculate all metrics
dangerous_received = calculate_dangerous_passes_received(events)
pressure_regains = calculate_pressure_regains(events)
box_entries = calculate_box_entries(events)
print("Top players by dangerous passes received:")
print(dangerous_received.head())# R: Building Custom Metrics from Events
library(tidyverse)
# Metric 1: Dangerous Passes Received
calculate_dangerous_passes_received <- function(events) {
# Find passes into dangerous zones
passes <- events %>%
filter(type.name == "Pass", is.na(pass.outcome.name)) %>%
mutate(
end_x = map_dbl(pass.end_location, ~ .x[[1]]),
end_y = map_dbl(pass.end_location, ~ .x[[2]]),
# Dangerous zone: final third, central areas
is_dangerous = end_x >= 80 &
end_y >= 20 & end_y <= 60 &
!is.na(pass.recipient.name)
)
dangerous_received <- passes %>%
filter(is_dangerous) %>%
count(player = pass.recipient.name, name = "dangerous_passes_received") %>%
arrange(desc(dangerous_passes_received))
return(dangerous_received)
}
# Metric 2: Pressure Regains
calculate_pressure_regains <- function(events) {
events <- events %>% arrange(index)
# Find pressures followed by ball recovery or turnover
pressure_outcomes <- events %>%
filter(type.name == "Pressure") %>%
mutate(
next_event = lead(type.name),
next_team = lead(team.name),
pressure_success = next_team == team.name &
next_event %in% c("Ball Recovery", "Interception")
)
player_pressure_stats <- pressure_outcomes %>%
group_by(player.name) %>%
summarize(
pressures = n(),
pressure_regains = sum(pressure_success, na.rm = TRUE),
regain_rate = pressure_regains / pressures,
.groups = "drop"
) %>%
filter(pressures >= 5) %>%
arrange(desc(regain_rate))
return(player_pressure_stats)
}
# Metric 3: Box Entries Created
calculate_box_entries <- function(events) {
# Passes or carries that enter the penalty area
box_entries <- events %>%
filter(type.name %in% c("Pass", "Carry")) %>%
mutate(
start_x = map_dbl(location, ~ .x[[1]]),
start_y = map_dbl(location, ~ .x[[2]]),
end_x = case_when(
type.name == "Pass" ~ map_dbl(pass.end_location, ~ .x[[1]]),
type.name == "Carry" ~ map_dbl(carry.end_location, ~ .x[[1]]),
TRUE ~ NA_real_
),
end_y = case_when(
type.name == "Pass" ~ map_dbl(pass.end_location, ~ .x[[2]]),
type.name == "Carry" ~ map_dbl(carry.end_location, ~ .x[[2]]),
TRUE ~ NA_real_
),
# Check if it enters the box
enters_box = (start_x < 102 | start_y < 18 | start_y > 62) &
end_x >= 102 & end_y >= 18 & end_y <= 62
) %>%
filter(enters_box)
player_box_entries <- box_entries %>%
count(player.name, type.name, name = "box_entries") %>%
pivot_wider(names_from = type.name, values_from = box_entries, values_fill = 0) %>%
mutate(total_box_entries = Pass + Carry) %>%
arrange(desc(total_box_entries))
return(player_box_entries)
}
# Calculate all metrics
dangerous_received <- calculate_dangerous_passes_received(events)
pressure_regains <- calculate_pressure_regains(events)
box_entries <- calculate_box_entries(events)
print("Top players by dangerous passes received:")
print(head(dangerous_received))Top players by dangerous passes received:
pass_recipient dangerous_passes_received
0 Lionel Messi 12
1 Luis Suárez 9
2 Neymar Jr 8
3 Sergio Busquets 6
4 Andrés Iniesta 5Handling Data Quality Issues
Event data is collected by human taggers and automated systems, leading to quality variations. Understanding and handling these issues is crucial.
- Missing events: Off-ball actions often undercounted
- Location imprecision: Coordinates may be estimates
- Inconsistent tagging: Different taggers, different rules
- Temporal gaps: Missing timestamps or out of sequence
- Missing qualifiers: Body part, technique not always tagged
- Cross-provider differences: Same event, different classification
- Check event counts against expected ranges
- Validate location coordinates within pitch bounds
- Verify temporal sequence consistency
- Cross-reference with official match statistics
- Flag outliers for manual review
- Build automated quality metrics
# Python: Event Data Quality Checks
import pandas as pd
import numpy as np
from typing import Dict, Any
def validate_event_data(events: pd.DataFrame) -> Dict[str, Any]:
"""Comprehensive event data quality validation."""
quality_report = {}
# 1. Check for missing critical fields
quality_report["missing_fields"] = {
"missing_type": events["type"].isna().sum(),
"missing_player": events["player"].isna().sum(),
"missing_team": events["team"].isna().sum(),
"missing_location": events["location"].apply(
lambda x: x is None or (isinstance(x, list) and len(x) == 0)
).sum(),
"missing_timestamp": events["timestamp"].isna().sum()
}
# 2. Validate location coordinates
def extract_coords(loc):
if isinstance(loc, list) and len(loc) >= 2:
return loc[0], loc[1]
return None, None
events["loc_x"], events["loc_y"] = zip(*events["location"].apply(extract_coords))
valid_locs = events.dropna(subset=["loc_x", "loc_y"])
quality_report["location_issues"] = {
"x_out_of_bounds": ((valid_locs["loc_x"] < 0) | (valid_locs["loc_x"] > 120)).sum(),
"y_out_of_bounds": ((valid_locs["loc_y"] < 0) | (valid_locs["loc_y"] > 80)).sum(),
"suspicious_origin": ((valid_locs["loc_x"] == 0) & (valid_locs["loc_y"] == 0)).sum()
}
# 3. Check event sequence consistency
events_sorted = events.sort_values("index")
events_sorted["time_seconds"] = events_sorted["minute"] * 60 + events_sorted["second"]
events_sorted["time_diff"] = events_sorted["time_seconds"].diff()
quality_report["sequence_issues"] = {
"backwards_timestamps": (events_sorted["time_diff"] < -5).sum(),
"duplicate_indices": len(events) - events["index"].nunique()
}
# 4. Event count validation
event_counts = events["type"].value_counts()
expected_mins = {"Pass": 400, "Shot": 10, "Foul Committed": 15}
below_expected = []
for event_type, min_count in expected_mins.items():
actual = event_counts.get(event_type, 0)
if actual < min_count:
below_expected.append({
"type": event_type,
"actual": actual,
"expected_min": min_count
})
quality_report["below_expected_counts"] = below_expected
# 5. Generate overall quality score
total_events = len(events)
issues = (
sum(quality_report["missing_fields"].values()) +
sum(quality_report["location_issues"].values()) +
quality_report["sequence_issues"]["backwards_timestamps"]
)
quality_report["quality_score"] = 1 - (issues / total_events)
return quality_report
quality = validate_event_data(events)
print(f"Data Quality Score: {quality['quality_score']*100:.2f}%")
print("\nMissing fields:")
for field, count in quality["missing_fields"].items():
print(f" {field}: {count}")# R: Event Data Quality Checks
library(tidyverse)
validate_event_data <- function(events) {
quality_report <- list()
# 1. Check for missing critical fields
quality_report$missing_fields <- events %>%
summarize(
missing_type = sum(is.na(type.name)),
missing_player = sum(is.na(player.name)),
missing_team = sum(is.na(team.name)),
missing_location = sum(map_lgl(location, is.null)),
missing_timestamp = sum(is.na(timestamp))
)
# 2. Validate location coordinates
events_with_loc <- events %>%
filter(!map_lgl(location, is.null)) %>%
mutate(
x = map_dbl(location, ~ .x[[1]]),
y = map_dbl(location, ~ .x[[2]])
)
quality_report$location_issues <- events_with_loc %>%
summarize(
x_out_of_bounds = sum(x < 0 | x > 120),
y_out_of_bounds = sum(y < 0 | y > 80),
suspicious_origin = sum(x == 0 & y == 0)
)
# 3. Check event sequence consistency
events_ordered <- events %>% arrange(index)
quality_report$sequence_issues <- events_ordered %>%
mutate(
time_seconds = minute * 60 + second,
time_diff = time_seconds - lag(time_seconds),
time_backwards = time_diff < -5 # Allow small corrections
) %>%
summarize(
backwards_timestamps = sum(time_backwards, na.rm = TRUE),
duplicate_indices = n() - n_distinct(index)
)
# 4. Event count validation
quality_report$event_counts <- events %>%
count(type.name) %>%
mutate(
expected_min = case_when(
type.name == "Pass" ~ 400,
type.name == "Shot" ~ 10,
type.name == "Foul Committed" ~ 15,
TRUE ~ 0
),
below_expected = n < expected_min
)
# 5. Generate overall quality score
total_events <- nrow(events)
issues <- sum(quality_report$missing_fields) +
sum(quality_report$location_issues) +
quality_report$sequence_issues$backwards_timestamps
quality_report$quality_score <- 1 - (issues / total_events)
return(quality_report)
}
quality <- validate_event_data(events)
cat(sprintf("Data Quality Score: %.2f%%\n", quality$quality_score * 100))
print(quality$missing_fields)Data Quality Score: 97.84%
Missing fields:
missing_type: 0
missing_player: 12
missing_team: 0
missing_location: 23
missing_timestamp: 0Optimizing Event Data Processing
When working with multiple seasons of event data, processing efficiency becomes critical. Here are strategies for handling large datasets.
# Python: Optimizing Event Data Processing
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import pyarrow.parquet as pq
import pyarrow as pa
# Strategy 1: Use vectorized operations
def process_events_vectorized(events: pd.DataFrame) -> pd.DataFrame:
"""Use vectorized operations instead of apply."""
# Vectorized location extraction
locations = pd.DataFrame(
events["location"].tolist(),
columns=["x", "y"]
)
events = pd.concat([events, locations], axis=1)
# Vectorized calculations
events["in_final_third"] = events["x"] > 80
events["in_box"] = (events["x"] > 102) & (events["y"].between(18, 62))
return events
# Strategy 2: Process files in chunks
def process_season_chunked(file_paths: list, chunk_size: int = 10):
"""Process large number of files in memory-efficient chunks."""
results = []
for i in range(0, len(file_paths), chunk_size):
chunk_files = file_paths[i:i + chunk_size]
chunk_data = pd.concat([
pd.read_json(f) for f in chunk_files
])
# Process chunk
chunk_summary = summarize_events(chunk_data)
results.append(chunk_summary)
# Clear memory
del chunk_data
return pd.concat(results)
# Strategy 3: Use Parquet for efficient storage
def save_events_parquet(events: pd.DataFrame, filepath: str):
"""Save events to Parquet format for efficient storage."""
events.to_parquet(filepath, compression="snappy")
def load_events_parquet(filepath: str, columns: list = None):
"""Load events from Parquet, optionally selecting columns."""
return pd.read_parquet(filepath, columns=columns)
# Strategy 4: Parallel processing
def process_match(filepath: str) -> pd.DataFrame:
"""Process a single match file."""
events = pd.read_json(filepath)
return summarize_events(events)
def process_matches_parallel(file_paths: list, n_workers: int = 4):
"""Process multiple matches in parallel."""
with ProcessPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(process_match, file_paths))
return pd.concat(results)
# Benchmark example
import time
# Measure processing time
start = time.time()
processed = process_events_vectorized(events)
elapsed = time.time() - start
print(f"Vectorized processing: {elapsed:.3f} seconds")# R: Optimizing Event Data Processing
library(tidyverse)
library(data.table)
library(arrow)
# Strategy 1: Use data.table for large datasets
process_events_fast <- function(events_dt) {
# Convert to data.table
events_dt <- as.data.table(events_dt)
# Efficient aggregation with data.table
player_stats <- events_dt[,
.(
passes = sum(type.name == "Pass"),
shots = sum(type.name == "Shot"),
tackles = sum(type.name == "Tackle")
),
by = .(player.name, team.name)
]
return(player_stats)
}
# Strategy 2: Process files in chunks
process_season_chunked <- function(file_paths, chunk_size = 10) {
results <- list()
for (i in seq(1, length(file_paths), chunk_size)) {
chunk_files <- file_paths[i:min(i + chunk_size - 1, length(file_paths))]
chunk_data <- map_dfr(chunk_files, ~ {
events <- read_json(.x, simplifyVector = TRUE)
# Process each match
summarize_match(events)
})
results[[length(results) + 1]] <- chunk_data
# Clear memory
gc()
}
return(bind_rows(results))
}
# Strategy 3: Use Arrow/Parquet for storage
save_events_parquet <- function(events, filepath) {
write_parquet(events, filepath)
}
load_events_parquet <- function(filepath) {
# Read only needed columns
read_parquet(filepath,
col_select = c("id", "type.name", "player.name", "location", "minute")
)
}
# Strategy 4: Parallel processing
library(furrr)
plan(multisession, workers = 4)
process_matches_parallel <- function(match_files) {
future_map_dfr(match_files, function(file) {
events <- read_json(file, simplifyVector = TRUE)
summarize_match(events)
}, .progress = TRUE)
}Vectorized processing: 0.042 secondsPractice Exercises
Exercise 31.1: Shot Freeze Frame Analyzer
Task: Build a comprehensive shot analyzer that extracts shot data with freeze frame information to calculate advanced defensive pressure metrics.
Requirements:
- Parse nested JSON to extract all shot events with freeze frames
- Calculate distance to nearest defender and goalkeeper for each shot
- Count defenders in the shooting lane (cone between ball and goal)
- Create a pressure index combining all defensive factors
- Compare actual xG vs predicted xG with pressure adjustment
# Python: Shot Freeze Frame Analyzer
import pandas as pd
import numpy as np
from statsbombpy import sb
from typing import List, Dict, Optional
import math
def parse_freeze_frame(freeze_frame: List[Dict], shot_location: List[float]) -> Dict:
"""Parse freeze frame data to calculate defensive metrics."""
if not freeze_frame or not shot_location:
return {
"n_defenders_in_cone": None,
"dist_nearest_defender": None,
"dist_goalkeeper": None,
"defenders_in_box": None
}
shot_x, shot_y = shot_location[0], shot_location[1]
goal_x, goal_y = 120, 40
# Extract defender positions
defenders = []
for player in freeze_frame:
if not player.get("teammate", True):
loc = player.get("location", [])
if len(loc) >= 2:
defenders.append({
"x": loc[0],
"y": loc[1],
"is_goalkeeper": player.get("position", {}).get("name") == "Goalkeeper"
})
if not defenders:
return {
"n_defenders_in_cone": 0,
"dist_nearest_defender": None,
"dist_goalkeeper": None,
"defenders_in_box": 0
}
# Calculate distances
for d in defenders:
d["dist_from_shot"] = math.sqrt((d["x"] - shot_x)**2 + (d["y"] - shot_y)**2)
# Find goalkeeper
gk = [d for d in defenders if d.get("is_goalkeeper")]
dist_gk = gk[0]["dist_from_shot"] if gk else None
# Defenders in shooting cone (15 degree angle to goal)
cone_angle = 15 * math.pi / 180
shot_to_goal_angle = math.atan2(goal_y - shot_y, goal_x - shot_x)
in_cone = 0
for d in defenders:
angle_to_defender = math.atan2(d["y"] - shot_y, d["x"] - shot_x)
angle_diff = abs(angle_to_defender - shot_to_goal_angle)
if angle_diff < cone_angle and d["x"] > shot_x:
in_cone += 1
# Defenders in penalty box
in_box = sum(1 for d in defenders
if d["x"] >= 102 and 18 <= d["y"] <= 62)
return {
"n_defenders_in_cone": in_cone,
"dist_nearest_defender": min(d["dist_from_shot"] for d in defenders),
"dist_goalkeeper": dist_gk,
"defenders_in_box": in_box
}
def analyze_shots_with_pressure(match_ids: List[int]) -> pd.DataFrame:
"""Analyze all shots with defensive pressure metrics."""
all_shots = []
for match_id in match_ids:
try:
events = sb.events(match_id=match_id)
shots = events[events["type"] == "Shot"].copy()
for idx, shot in shots.iterrows():
freeze_frame = shot.get("shot_freeze_frame")
location = shot.get("location")
metrics = parse_freeze_frame(freeze_frame, location)
shot_data = {
"match_id": match_id,
"player": shot.get("player"),
"team": shot.get("team"),
"minute": shot.get("minute"),
"xg": shot.get("shot_statsbomb_xg"),
"outcome": shot.get("shot_outcome"),
"body_part": shot.get("shot_body_part"),
**metrics
}
all_shots.append(shot_data)
except Exception as e:
print(f"Error processing match {match_id}: {e}")
continue
df = pd.DataFrame(all_shots)
# Calculate pressure index
df["pressure_index"] = (
0.3 * (df["n_defenders_in_cone"] / 3).clip(0, 1) +
0.3 * (1 - df["dist_nearest_defender"] / 10).clip(0, 1) +
0.2 * (df["defenders_in_box"] / 6).clip(0, 1) +
0.2 * (1 - df["dist_goalkeeper"].fillna(15) / 15).clip(0, 1)
)
df["pressure_category"] = pd.cut(
df["pressure_index"],
bins=[0, 0.3, 0.6, 1.0],
labels=["Low Pressure", "Medium Pressure", "High Pressure"]
)
return df
# Load sample data
competitions = sb.competitions()
matches = sb.matches(competition_id=11, season_id=90)
sample_matches = matches["match_id"].head(10).tolist()
# Analyze shots
shots_df = analyze_shots_with_pressure(sample_matches)
# Summarize by pressure category
pressure_summary = shots_df.groupby("pressure_category").agg({
"xg": ["count", "mean"],
"outcome": lambda x: (x == "Goal").sum()
}).reset_index()
pressure_summary.columns = ["pressure_category", "n_shots", "avg_xg", "goals"]
pressure_summary["conversion_rate"] = pressure_summary["goals"] / pressure_summary["n_shots"]
print("Shot Analysis by Defensive Pressure:")
print(pressure_summary)# R: Shot Freeze Frame Analyzer
library(tidyverse)
library(StatsBombR)
library(jsonlite)
# Load StatsBomb data
Comp <- FreeCompetitions()
Matches <- FreeMatches(Comp)
events <- StatsBombFreeEvents(MatchesDF = Matches[1:10, ])
# Function to parse freeze frame data
parse_freeze_frame <- function(freeze_frame, shot_location) {
if (is.null(freeze_frame) || length(freeze_frame) == 0) {
return(list(
n_defenders_in_cone = NA,
dist_nearest_defender = NA,
dist_goalkeeper = NA,
defenders_in_box = NA
))
}
ff_df <- as.data.frame(freeze_frame)
shot_x <- shot_location[1]
shot_y <- shot_location[2]
goal_x <- 120
goal_y <- 40
# Extract positions
defenders <- ff_df %>%
filter(teammate == FALSE) %>%
mutate(
x = map_dbl(location, ~.x[1]),
y = map_dbl(location, ~.x[2])
)
if (nrow(defenders) == 0) {
return(list(
n_defenders_in_cone = 0,
dist_nearest_defender = NA,
dist_goalkeeper = NA,
defenders_in_box = 0
))
}
# Distance to each defender from shot location
defenders <- defenders %>%
mutate(
dist_from_shot = sqrt((x - shot_x)^2 + (y - shot_y)^2)
)
# Find goalkeeper (furthest back defender)
gk_row <- defenders %>% filter(x == max(x))
dist_gk <- if(nrow(gk_row) > 0) gk_row$dist_from_shot[1] else NA
# Defenders in shooting cone (within 15 degree angle to goal)
cone_angle <- 15 * pi / 180
shot_to_goal_angle <- atan2(goal_y - shot_y, goal_x - shot_x)
defenders <- defenders %>%
mutate(
angle_to_defender = atan2(y - shot_y, x - shot_x),
angle_diff = abs(angle_to_defender - shot_to_goal_angle),
in_cone = angle_diff < cone_angle & x > shot_x
)
# Defenders in penalty box
in_box <- sum(defenders$x >= 102 & defenders$y >= 18 & defenders$y <= 62)
list(
n_defenders_in_cone = sum(defenders$in_cone, na.rm = TRUE),
dist_nearest_defender = min(defenders$dist_from_shot, na.rm = TRUE),
dist_goalkeeper = dist_gk,
defenders_in_box = in_box
)
}
# Process all shots with freeze frames
shots_analyzed <- events %>%
filter(type.name == "Shot") %>%
mutate(
shot_x = map_dbl(location, ~.x[1]),
shot_y = map_dbl(location, ~.x[2]),
freeze_analysis = map2(shot.freeze_frame, location, parse_freeze_frame)
) %>%
unnest_wider(freeze_analysis)
# Calculate pressure index
shots_analyzed <- shots_analyzed %>%
mutate(
pressure_index = (
0.3 * pmin(n_defenders_in_cone / 3, 1) +
0.3 * pmax(0, 1 - dist_nearest_defender / 10) +
0.2 * pmin(defenders_in_box / 6, 1) +
0.2 * pmax(0, 1 - dist_goalkeeper / 15)
),
pressure_category = case_when(
pressure_index < 0.3 ~ "Low Pressure",
pressure_index < 0.6 ~ "Medium Pressure",
TRUE ~ "High Pressure"
)
)
# Analyze xG by pressure category
pressure_summary <- shots_analyzed %>%
group_by(pressure_category) %>%
summarize(
n_shots = n(),
avg_xg = mean(shot.statsbomb_xg, na.rm = TRUE),
goals = sum(shot.outcome.name == "Goal", na.rm = TRUE),
conversion_rate = goals / n_shots,
avg_defenders_in_cone = mean(n_defenders_in_cone, na.rm = TRUE),
.groups = "drop"
)
print("Shot Analysis by Defensive Pressure:")
print(pressure_summary)Exercise 31.2: Dangerous Turnover Tracker
Task: Build a comprehensive turnover analysis system that identifies and rates the danger of possession losses.
Requirements:
- Identify all turnover events (failed passes, dispossessions, miscontrols)
- Track subsequent opponent actions within 15 seconds
- Calculate a danger score based on zone, speed of counter, and outcome
- Create player-level turnover profiles with risk metrics
# Python: Dangerous Turnover Tracker
import pandas as pd
import numpy as np
from typing import Dict, List
from statsbombpy import sb
class TurnoverAnalyzer:
"""Analyze dangerous turnovers and their consequences."""
TURNOVER_TYPES = ["Miscontrol", "Dispossessed", "Error"]
AFTERMATH_WINDOW = 15 # seconds
def __init__(self, events: pd.DataFrame):
self.events = events.sort_values(["match_id", "index"]).copy()
self.events["game_seconds"] = self.events["minute"] * 60 + self.events["second"]
self._extract_locations()
def _extract_locations(self):
"""Extract x, y coordinates from location field."""
self.events["location_x"] = self.events["location"].apply(
lambda x: x[0] if isinstance(x, list) and len(x) >= 2 else None
)
self.events["location_y"] = self.events["location"].apply(
lambda x: x[1] if isinstance(x, list) and len(x) >= 2 else None
)
def identify_turnovers(self) -> pd.DataFrame:
"""Identify all turnover events."""
turnovers = self.events[
(self.events["type"].isin(self.TURNOVER_TYPES)) |
((self.events["type"] == "Pass") & (self.events["pass_outcome"].notna()))
].copy()
# Classify turnover zone
turnovers["turnover_zone"] = pd.cut(
turnovers["location_x"],
bins=[0, 40, 80, 120],
labels=["Defensive Third", "Middle Third", "Attacking Third"]
)
turnovers["zone_danger"] = turnovers["location_x"].apply(
lambda x: 3 if x < 40 else (2 if x < 80 else 1)
)
return turnovers
def analyze_aftermath(self, turnover: pd.Series) -> Dict:
"""Analyze what happens after a turnover."""
match_events = self.events[self.events["match_id"] == turnover["match_id"]]
turnover_time = turnover["game_seconds"]
turnover_team = turnover["team"]
# Get opponent events in aftermath window
aftermath = match_events[
(match_events["game_seconds"] > turnover_time) &
(match_events["game_seconds"] <= turnover_time + self.AFTERMATH_WINDOW) &
(match_events["team"] != turnover_team)
]
if len(aftermath) == 0:
return {
"shot_within_15s": False,
"goal_within_15s": False,
"xg_within_15s": 0.0,
"territory_gained": 0.0,
"counter_speed": 0.0
}
# Check for shots
shots = aftermath[aftermath["type"] == "Shot"]
# Calculate territory gained
start_x = aftermath.iloc[0]["location_x"] or 60
end_x = aftermath.iloc[-1]["location_x"] or 60
territory_gained = max(0, end_x - start_x)
# Counter speed
time_elapsed = aftermath.iloc[-1]["game_seconds"] - turnover_time
counter_speed = territory_gained / time_elapsed if time_elapsed > 0 else 0
return {
"shot_within_15s": len(shots) > 0,
"goal_within_15s": (shots["shot_outcome"] == "Goal").any() if len(shots) > 0 else False,
"xg_within_15s": shots["shot_statsbomb_xg"].sum() if len(shots) > 0 else 0.0,
"territory_gained": territory_gained,
"counter_speed": counter_speed
}
def analyze_all_turnovers(self) -> pd.DataFrame:
"""Analyze all turnovers with aftermath."""
turnovers = self.identify_turnovers()
aftermath_data = []
for idx, turnover in turnovers.iterrows():
aftermath = self.analyze_aftermath(turnover)
aftermath_data.append(aftermath)
aftermath_df = pd.DataFrame(aftermath_data)
result = pd.concat([turnovers.reset_index(drop=True), aftermath_df], axis=1)
# Calculate danger score
result["danger_score"] = (
result["zone_danger"] * 0.3 +
result["shot_within_15s"].astype(int) * 2 +
result["goal_within_15s"].astype(int) * 3 +
result["xg_within_15s"] * 2 +
np.minimum(result["territory_gained"] / 40, 1) * 1.5 +
np.minimum(result["counter_speed"] / 10, 1) * 1
)
result["danger_level"] = pd.cut(
result["danger_score"],
bins=[-np.inf, 1, 2, 4, np.inf],
labels=["Low", "Moderate", "Dangerous", "Critical"]
)
return result
def create_player_profiles(self, turnover_analysis: pd.DataFrame) -> pd.DataFrame:
"""Create player-level turnover profiles."""
profiles = turnover_analysis.groupby(["player", "team"]).agg({
"danger_score": ["count", "mean"],
"shot_within_15s": "sum",
"goal_within_15s": "sum",
"xg_within_15s": "sum",
"turnover_zone": lambda x: (x == "Defensive Third").mean() * 100
}).reset_index()
profiles.columns = [
"player", "team", "total_turnovers", "avg_danger_score",
"shots_conceded", "goals_conceded", "xg_conceded", "pct_in_own_third"
]
profiles["dangerous_turnovers"] = turnover_analysis.groupby("player").apply(
lambda x: (x["danger_level"].isin(["Critical", "Dangerous"])).sum()
).values
profiles["danger_rate"] = profiles["dangerous_turnovers"] / profiles["total_turnovers"] * 100
return profiles.sort_values("avg_danger_score", ascending=False)
# Example usage
events = sb.events(match_id=3788741)
analyzer = TurnoverAnalyzer(events)
turnover_results = analyzer.analyze_all_turnovers()
player_profiles = analyzer.create_player_profiles(turnover_results)
print("Turnover Analysis Summary:")
print(turnover_results["danger_level"].value_counts())
print("\nTop 10 Players by Turnover Danger:")
print(player_profiles.head(10))# R: Dangerous Turnover Tracker
library(tidyverse)
# Define turnover event types
turnover_types <- c(
"Miscontrol",
"Dispossessed",
"Error",
"Pass" # with unsuccessful outcome
)
analyze_turnovers <- function(events) {
# Sort events by time
events <- events %>%
arrange(match_id, index) %>%
mutate(
game_seconds = minute * 60 + second,
location_x = map_dbl(location, ~ if(!is.null(.x)) .x[1] else NA),
location_y = map_dbl(location, ~ if(!is.null(.x)) .x[2] else NA)
)
# Identify turnovers
turnovers <- events %>%
filter(
type.name %in% c("Miscontrol", "Dispossessed", "Error") |
(type.name == "Pass" & !is.na(pass.outcome.name))
) %>%
mutate(
turnover_zone = case_when(
location_x < 40 ~ "Defensive Third",
location_x < 80 ~ "Middle Third",
TRUE ~ "Attacking Third"
),
zone_danger = case_when(
location_x < 40 ~ 3, # Own third - most dangerous
location_x < 80 ~ 2,
TRUE ~ 1 # Opponent third - least dangerous
)
)
# For each turnover, analyze subsequent events
analyze_aftermath <- function(turnover_row, all_events) {
match_events <- all_events %>%
filter(match_id == turnover_row$match_id)
turnover_time <- turnover_row$game_seconds
turnover_team <- turnover_row$team.name
# Get events in next 15 seconds by other team
aftermath <- match_events %>%
filter(
game_seconds > turnover_time,
game_seconds <= turnover_time + 15,
team.name != turnover_team
)
if (nrow(aftermath) == 0) {
return(tibble(
shot_within_15s = FALSE,
goal_within_15s = FALSE,
xg_within_15s = 0,
territory_gained = 0,
counter_speed = NA
))
}
# Check for shots
shots <- aftermath %>% filter(type.name == "Shot")
# Calculate territory gained
first_touch <- aftermath %>% slice(1)
last_action <- aftermath %>% slice(n())
start_x <- first_touch$location_x
end_x <- last_action$location_x
territory_gained <- if(!is.na(end_x) && !is.na(start_x)) end_x - start_x else 0
# Counter speed (meters per second)
time_elapsed <- last_action$game_seconds - turnover_time
counter_speed <- if(time_elapsed > 0) territory_gained / time_elapsed else 0
tibble(
shot_within_15s = nrow(shots) > 0,
goal_within_15s = any(shots$shot.outcome.name == "Goal", na.rm = TRUE),
xg_within_15s = sum(shots$shot.statsbomb_xg, na.rm = TRUE),
territory_gained = max(0, territory_gained),
counter_speed = counter_speed
)
}
# Apply aftermath analysis to each turnover
turnover_analysis <- turnovers %>%
mutate(
aftermath = map(row_number(), ~ analyze_aftermath(turnovers[.x, ], events))
) %>%
unnest(aftermath)
# Calculate danger score
turnover_analysis <- turnover_analysis %>%
mutate(
danger_score = (
zone_danger * 0.3 +
shot_within_15s * 2 +
goal_within_15s * 3 +
xg_within_15s * 2 +
pmin(territory_gained / 40, 1) * 1.5 +
pmin(counter_speed / 10, 1) * 1
),
danger_level = case_when(
danger_score >= 4 ~ "Critical",
danger_score >= 2 ~ "Dangerous",
danger_score >= 1 ~ "Moderate",
TRUE ~ "Low"
)
)
return(turnover_analysis)
}
# Player-level turnover profiles
create_player_turnover_profile <- function(turnover_analysis) {
turnover_analysis %>%
group_by(player.name, team.name) %>%
summarize(
total_turnovers = n(),
dangerous_turnovers = sum(danger_level %in% c("Critical", "Dangerous")),
shots_conceded = sum(shot_within_15s),
goals_conceded = sum(goal_within_15s),
xg_conceded = sum(xg_within_15s),
avg_danger_score = mean(danger_score),
pct_in_own_third = mean(turnover_zone == "Defensive Third") * 100,
.groups = "drop"
) %>%
mutate(
danger_rate = dangerous_turnovers / total_turnovers * 100
) %>%
arrange(desc(avg_danger_score))
}
# Run analysis
turnover_results <- analyze_turnovers(events)
player_profiles <- create_player_turnover_profile(turnover_results)
print("Top 10 Players by Turnover Danger:")
print(head(player_profiles, 10))Exercise 31.3: Multi-Provider Event Data Normalizer
Task: Build a provider-agnostic event data processing system that normalizes data from multiple sources into a unified schema.
Requirements:
- Define a common event schema supporting multiple providers
- Implement parsers for StatsBomb and Wyscout formats
- Handle coordinate system differences (normalize to 105x68m pitch)
- Create data quality reports comparing provider coverage
# Python: Multi-Provider Event Data Normalizer
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
import pandas as pd
import numpy as np
@dataclass
class CommonEventSchema:
"""Standard schema for normalized events."""
REQUIRED_FIELDS = [
"event_id", "match_id", "period", "minute", "second",
"event_type", "player_id", "player_name", "team_id", "team_name",
"location_x", "location_y"
]
OPTIONAL_FIELDS = [
"end_x", "end_y", "outcome", "body_part", "technique",
"recipient_id", "recipient_name", "xg", "duration"
]
PITCH_LENGTH = 105
PITCH_WIDTH = 68
class EventParser(ABC):
"""Base class for event data parsers."""
def __init__(self, provider: str, pitch_length: float, pitch_width: float):
self.provider = provider
self.pitch_length = pitch_length
self.pitch_width = pitch_width
@abstractmethod
def parse(self, data: Any) -> pd.DataFrame:
"""Parse raw data into normalized format."""
pass
def normalize_x(self, x: float) -> float:
"""Normalize x coordinate to standard pitch."""
return x * CommonEventSchema.PITCH_LENGTH / self.pitch_length
def normalize_y(self, y: float) -> float:
"""Normalize y coordinate to standard pitch."""
return y * CommonEventSchema.PITCH_WIDTH / self.pitch_width
def validate(self, events: pd.DataFrame) -> Dict:
"""Validate parsed events against schema."""
missing = [f for f in CommonEventSchema.REQUIRED_FIELDS
if f not in events.columns]
complete_rows = events[CommonEventSchema.REQUIRED_FIELDS].dropna()
return {
"valid": len(missing) == 0,
"missing_fields": missing,
"n_events": len(events),
"n_complete": len(complete_rows),
"completeness_rate": len(complete_rows) / len(events) if len(events) > 0 else 0
}
class StatsBombParser(EventParser):
"""Parser for StatsBomb event data."""
def __init__(self):
super().__init__("statsbomb", pitch_length=120, pitch_width=80)
def parse(self, data: pd.DataFrame) -> pd.DataFrame:
events = pd.DataFrame({
"event_id": data["id"],
"match_id": data.get("match_id"),
"period": data["period"],
"minute": data["minute"],
"second": data["second"],
"event_type": data["type"],
"player_id": data.get("player_id"),
"player_name": data.get("player"),
"team_id": data.get("team_id"),
"team_name": data.get("team"),
})
# Extract and normalize locations
events["location_x"] = data["location"].apply(
lambda x: self.normalize_x(x[0]) if isinstance(x, list) else None
)
events["location_y"] = data["location"].apply(
lambda x: self.normalize_y(x[1]) if isinstance(x, list) else None
)
return events
class WyscoutParser(EventParser):
"""Parser for Wyscout event data."""
EVENT_TYPE_MAP = {
"Pass": "Pass",
"Shot": "Shot",
"Duel": "Duel",
"Foul": "Foul",
"Free kick": "Free Kick",
"Offside": "Offside",
"Others on the ball": "Other"
}
def __init__(self):
super().__init__("wyscout", pitch_length=100, pitch_width=100)
def parse(self, data: pd.DataFrame) -> pd.DataFrame:
events = pd.DataFrame({
"event_id": data["id"],
"match_id": data["matchId"],
"period": data["matchPeriod"],
"minute": data["eventSec"] // 60,
"second": data["eventSec"] % 60,
"event_type": data["eventName"].map(self.EVENT_TYPE_MAP).fillna(data["eventName"]),
"player_id": data["playerId"],
"player_name": data.get("playerName"),
"team_id": data["teamId"],
"team_name": data.get("teamName"),
})
# Extract positions from nested structure
def get_position(positions, idx, coord):
try:
return positions[0].get(coord) if positions else None
except:
return None
events["location_x"] = data["positions"].apply(
lambda x: self.normalize_x(get_position(x, 0, "x")) if get_position(x, 0, "x") else None
)
events["location_y"] = data["positions"].apply(
lambda x: self.normalize_y(get_position(x, 0, "y")) if get_position(x, 0, "y") else None
)
return events
class EventNormalizer:
"""Main class for normalizing events from multiple providers."""
PARSERS = {
"statsbomb": StatsBombParser,
"wyscout": WyscoutParser
}
@classmethod
def get_parser(cls, provider: str) -> EventParser:
"""Factory method to get appropriate parser."""
if provider.lower() not in cls.PARSERS:
raise ValueError(f"Unknown provider: {provider}")
return cls.PARSERS[provider.lower()]()
@classmethod
def normalize(cls, data: Any, provider: str) -> pd.DataFrame:
"""Normalize data from specified provider."""
parser = cls.get_parser(provider)
normalized = parser.parse(data)
validation = parser.validate(normalized)
print(f"Normalized {validation['n_events']} events from {provider}")
print(f"Completeness: {validation['completeness_rate']:.1%}")
return normalized
@staticmethod
def compare_providers(datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Compare coverage across providers."""
comparisons = []
for provider, events in datasets.items():
comparisons.append({
"provider": provider,
"total_events": len(events),
"unique_types": events["event_type"].nunique(),
"with_location": events["location_x"].notna().sum(),
"passes": (events["event_type"] == "Pass").sum(),
"shots": (events["event_type"] == "Shot").sum(),
"avg_per_match": len(events) / events["match_id"].nunique()
})
return pd.DataFrame(comparisons)
# Example usage
from statsbombpy import sb
# Load and normalize StatsBomb data
sb_events = sb.events(match_id=3788741)
normalizer = EventNormalizer()
normalized_sb = normalizer.normalize(sb_events, "statsbomb")
print("\nNormalized Schema Sample:")
print(normalized_sb.head())# R: Multi-Provider Event Data Normalizer
library(tidyverse)
library(jsonlite)
library(R6)
# Define common event schema
CommonEventSchema <- list(
required_fields = c(
"event_id", "match_id", "period", "minute", "second",
"event_type", "player_id", "player_name", "team_id", "team_name",
"location_x", "location_y"
),
optional_fields = c(
"end_x", "end_y", "outcome", "body_part", "technique",
"recipient_id", "recipient_name", "xg", "duration"
),
pitch_dims = list(length = 105, width = 68)
)
# Base parser class
EventParser <- R6Class("EventParser",
public = list(
provider = NULL,
pitch_dims = NULL,
initialize = function(provider) {
self$provider <- provider
},
parse = function(data) {
stop("Subclass must implement parse()")
},
normalize_coordinates = function(x, y, source_dims) {
list(
x = x * CommonEventSchema$pitch_dims$length / source_dims$length,
y = y * CommonEventSchema$pitch_dims$width / source_dims$width
)
},
validate = function(events) {
missing_cols <- setdiff(
CommonEventSchema$required_fields,
names(events)
)
list(
valid = length(missing_cols) == 0,
missing = missing_cols,
n_events = nrow(events),
n_complete = sum(complete.cases(events[, CommonEventSchema$required_fields]))
)
}
)
)
# StatsBomb parser
StatsBombParser <- R6Class("StatsBombParser",
inherit = EventParser,
public = list(
initialize = function() {
super$initialize("statsbomb")
self$pitch_dims <- list(length = 120, width = 80)
},
parse = function(data) {
events <- data %>%
transmute(
event_id = id,
match_id = match_id,
period = period,
minute = minute,
second = second,
event_type = type.name,
player_id = player.id,
player_name = player.name,
team_id = team.id,
team_name = team.name,
raw_x = map_dbl(location, ~ if(!is.null(.x)) .x[1] else NA),
raw_y = map_dbl(location, ~ if(!is.null(.x)) .x[2] else NA)
)
# Normalize coordinates
events <- events %>%
mutate(
location_x = raw_x * 105 / 120,
location_y = raw_y * 68 / 80
) %>%
select(-raw_x, -raw_y)
events
}
)
)
# Wyscout parser
WyscoutParser <- R6Class("WyscoutParser",
inherit = EventParser,
public = list(
initialize = function() {
super$initialize("wyscout")
self$pitch_dims <- list(length = 100, width = 100) # Percentage
},
parse = function(data) {
events <- data %>%
transmute(
event_id = id,
match_id = matchId,
period = matchPeriod,
minute = floor(eventSec / 60),
second = eventSec %% 60,
event_type = self$map_event_type(eventName),
player_id = playerId,
player_name = playerName,
team_id = teamId,
team_name = teamName,
raw_x = map_dbl(positions, ~ if(length(.x) > 0) .x[[1]]$x else NA),
raw_y = map_dbl(positions, ~ if(length(.x) > 0) .x[[1]]$y else NA)
)
# Normalize coordinates (Wyscout uses 0-100 percentage)
events <- events %>%
mutate(
location_x = raw_x * 105 / 100,
location_y = raw_y * 68 / 100
) %>%
select(-raw_x, -raw_y)
events
},
map_event_type = function(wyscout_type) {
type_mapping <- c(
"Pass" = "Pass",
"Shot" = "Shot",
"Duel" = "Duel",
"Foul" = "Foul",
"Free kick" = "Free Kick",
"Offside" = "Offside",
"Others on the ball" = "Other"
)
ifelse(wyscout_type %in% names(type_mapping),
type_mapping[wyscout_type],
wyscout_type)
}
)
)
# Factory function
create_parser <- function(provider) {
switch(tolower(provider),
"statsbomb" = StatsBombParser$new(),
"wyscout" = WyscoutParser$new(),
stop(paste("Unknown provider:", provider))
)
}
# Data quality comparison
compare_provider_coverage <- function(sb_events, ws_events) {
comparison <- tibble(
metric = c(
"Total Events",
"Unique Event Types",
"Events with Location",
"Pass Events",
"Shot Events",
"Avg Events per Match"
),
statsbomb = c(
nrow(sb_events),
n_distinct(sb_events$event_type),
sum(!is.na(sb_events$location_x)),
sum(sb_events$event_type == "Pass"),
sum(sb_events$event_type == "Shot"),
nrow(sb_events) / n_distinct(sb_events$match_id)
),
wyscout = c(
nrow(ws_events),
n_distinct(ws_events$event_type),
sum(!is.na(ws_events$location_x)),
sum(ws_events$event_type == "Pass"),
sum(ws_events$event_type == "Shot"),
nrow(ws_events) / n_distinct(ws_events$match_id)
)
)
comparison
}
# Example usage
sb_parser <- create_parser("statsbomb")
# sb_normalized <- sb_parser$parse(statsbomb_data)
# validation <- sb_parser$validate(sb_normalized)
print("Multi-provider normalizer ready")Chapter Summary
Key Takeaways
- Event data structure: Events have core fields (type, player, team, location, timestamp) plus type-specific qualifiers
- Provider differences: Different providers use different schemas; build provider-agnostic parsers when possible
- Derived events: Create higher-level events (progressive passes, possession sequences) from raw data
- Custom metrics: Raw event data enables building metrics tailored to specific questions
- Data quality: Always validate event data; human tagging introduces errors
- Optimization: Use vectorized operations, Parquet storage, and parallel processing for large datasets
Event Data Processing Pipeline
- Load and parse raw JSON/XML data
- Validate data quality and flag issues
- Standardize to common schema
- Create derived events and sequences
- Calculate custom metrics
- Store in efficient format (Parquet) for future use