Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)
Learning Objectives
  • Understand the structure and schema of football event data
  • Parse and clean raw event data from different providers
  • Work with nested JSON structures and qualifiers
  • Create derived events and custom event types
  • Build custom metrics from raw event sequences
  • Handle event data quality issues and missing data
  • Optimize event data processing for large datasets
  • Compare event data across different providers

Understanding Event Data

Event data forms the backbone of modern football analytics. Every pass, shot, tackle, and dribble is recorded with precise coordinates and timestamps. Understanding how to work with raw event data unlocks the ability to create custom metrics and analyses that go beyond pre-packaged statistics.

What is Event Data?

Event data captures discrete on-ball actions during a match. Each event has a type (pass, shot, tackle), location (x,y coordinates), timestamp, player/team information, and qualifiers that provide additional context (pass height, shot body part, etc.).

Location Data

X,Y coordinates for start/end positions of each action

Temporal Data

Timestamps, match periods, and event sequences

Qualifiers

Additional context: body part, technique, outcome

event_structure.py
# Python: Understanding Event Data Structure
import pandas as pd
import json
from statsbombpy import sb

# Load StatsBomb event data
events = sb.events(match_id=3788741)

# Examine the structure
print(events.info())
print(events.head())

# Core fields in event data
core_fields = [
    "id",           # Unique event identifier
    "index",        # Sequential order in match
    "period",       # 1 = first half, 2 = second half
    "timestamp",    # Time in period (HH:MM:SS.mmm)
    "minute",       # Match minute
    "second",       # Second within minute
    "type",         # Event type (Pass, Shot, etc.)
    "player",       # Player performing action
    "team",         # Team in possession
    "location",     # [x, y] coordinates
    "duration"      # Event duration in seconds
]

# View available columns
print("Available columns:")
print(events.columns.tolist())

# Count events by type
event_counts = events["type"].value_counts()
print("\nEvent counts by type:")
print(event_counts)
# R: Understanding Event Data Structure
library(tidyverse)
library(jsonlite)

# Load StatsBomb event data
events <- fromJSON("statsbomb_events.json", flatten = TRUE)

# Examine the structure
str(events[1:3, ])

# Core fields in event data
core_fields <- c(
    "id",           # Unique event identifier
    "index",        # Sequential order in match
    "period",       # 1 = first half, 2 = second half
    "timestamp",    # Time in period (HH:MM:SS.mmm)
    "minute",       # Match minute
    "second",       # Second within minute
    "type.name",    # Event type (Pass, Shot, etc.)
    "player.name",  # Player performing action
    "team.name",    # Team in possession
    "location",     # [x, y] coordinates
    "duration"      # Event duration in seconds
)

# View sample events
events %>%
    select(any_of(core_fields)) %>%
    head(10) %>%
    print()

# Count events by type
event_counts <- events %>%
    count(type.name, sort = TRUE)

print(event_counts)
Output
Available columns:
['id', 'index', 'period', 'timestamp', 'minute', 'second',
'type', 'possession', 'possession_team', 'play_pattern',
'team', 'player', 'position', 'location', 'duration', ...]

Event counts by type:
Pass                847
Ball Receipt*       582
Carry               498
Pressure            189
Ball Recovery       87
Duel                76
Clearance           52
Shot                28
...

Event Data Schema

Different data providers use different schemas. Understanding the structure helps you work across providers and build provider-agnostic pipelines.

Event Type Key Qualifiers Typical Fields
Pass Height, length, technique, body part, outcome end_location, recipient, pass_type, cross, through_ball
Shot Body part, technique, first_time, outcome end_location, xG, freeze_frame, statsbomb_xg
Dribble Outcome (complete/incomplete), overrun end_location, nutmeg, no_touch
Tackle Outcome (won/lost) counterpress
Pressure Duration, counterpress duration
Carry Under pressure end_location, duration
event_qualifiers.py
# Python: Exploring Event Qualifiers
import pandas as pd

# Extract pass-specific fields
passes = events[events["type"] == "Pass"].copy()

# Common pass columns
pass_columns = [
    "id", "minute", "second", "player", "team",
    "location", "pass_end_location",
    "pass_length", "pass_angle", "pass_height",
    "pass_body_part", "pass_type",
    "pass_outcome", "pass_recipient",
    "pass_cross", "pass_through_ball", "pass_switch"
]

# Filter to available columns
available_pass_cols = [c for c in pass_columns if c in passes.columns]
pass_data = passes[available_pass_cols]

# View pass qualifiers summary
pass_summary = {
    "total_passes": len(passes),
    "crosses": passes["pass_cross"].sum() if "pass_cross" in passes.columns else 0,
    "through_balls": passes["pass_through_ball"].sum() if "pass_through_ball" in passes.columns else 0,
    "successful": passes["pass_outcome"].isna().sum(),
    "unsuccessful": passes["pass_outcome"].notna().sum()
}

print("Pass Summary:")
for k, v in pass_summary.items():
    print(f"  {k}: {v}")

# Extract shot-specific fields
shots = events[events["type"] == "Shot"].copy()

# Examine shot data
print("\nShot columns available:")
shot_cols = [c for c in shots.columns if "shot" in c.lower()]
print(shot_cols)

# View shot details
print("\nShot details:")
print(shots[["minute", "player", "shot_statsbomb_xg", "shot_outcome"]].head())
# R: Exploring Event Qualifiers
library(tidyverse)

# Extract pass-specific fields
passes <- events %>%
    filter(type.name == "Pass") %>%
    select(
        id, minute, second, player.name, team.name,
        location, pass.end_location,
        pass.length, pass.angle, pass.height.name,
        pass.body_part.name, pass.type.name,
        pass.outcome.name, pass.recipient.name,
        pass.cross, pass.through_ball, pass.switch
    )

# View pass qualifiers
pass_qualifiers <- passes %>%
    summarize(
        total_passes = n(),
        crosses = sum(pass.cross == TRUE, na.rm = TRUE),
        through_balls = sum(pass.through_ball == TRUE, na.rm = TRUE),
        switches = sum(pass.switch == TRUE, na.rm = TRUE),
        successful = sum(is.na(pass.outcome.name)),
        unsuccessful = sum(!is.na(pass.outcome.name))
    )

print(pass_qualifiers)

# Extract shot-specific fields
shots <- events %>%
    filter(type.name == "Shot") %>%
    select(
        id, minute, second, player.name, team.name,
        location, shot.end_location,
        shot.statsbomb_xg, shot.outcome.name,
        shot.body_part.name, shot.technique.name,
        shot.type.name, shot.first_time,
        shot.freeze_frame
    )

# Examine freeze frame data (player positions at time of shot)
if ("shot.freeze_frame" %in% names(shots)) {
    freeze_frame <- shots$shot.freeze_frame[[1]]
    print("Freeze frame structure:")
    print(str(freeze_frame))
}
Output
Pass Summary:
  total_passes: 847
  crosses: 23
  through_balls: 8
  successful: 712
  unsuccessful: 135

Shot columns available:
['shot_statsbomb_xg', 'shot_end_location', 'shot_outcome',
'shot_body_part', 'shot_technique', 'shot_type', 'shot_first_time',
'shot_freeze_frame', 'shot_key_pass_id']

Shot details:
   minute        player  shot_statsbomb_xg    shot_outcome
0      12   Lionel Messi              0.082          Saved
1      27  Luis Suárez               0.156           Goal
2      34   Lionel Messi              0.043  Off T

Parsing Raw Event Data

Raw event data often comes in nested JSON format. Learning to parse and flatten this data is essential for analysis.

parsing_events.py
# Python: Parsing Nested JSON Event Data
import json
import pandas as pd
from typing import Dict, Any, List

def flatten_event(event: Dict[str, Any]) -> Dict[str, Any]:
    """Flatten a single nested event into a flat dictionary."""
    # Base fields
    flat = {
        "id": event.get("id"),
        "index": event.get("index"),
        "period": event.get("period"),
        "minute": event.get("minute"),
        "second": event.get("second"),
        "type": event.get("type", {}).get("name"),
        "type_id": event.get("type", {}).get("id"),
        "player": event.get("player", {}).get("name"),
        "player_id": event.get("player", {}).get("id"),
        "team": event.get("team", {}).get("name"),
        "team_id": event.get("team", {}).get("id"),
    }

    # Location
    location = event.get("location")
    if location:
        flat["location_x"] = location[0]
        flat["location_y"] = location[1]

    # Pass-specific fields
    if "pass" in event:
        pass_data = event["pass"]
        flat["pass_length"] = pass_data.get("length")
        flat["pass_angle"] = pass_data.get("angle")
        end_loc = pass_data.get("end_location", [None, None])
        flat["pass_end_x"] = end_loc[0] if end_loc else None
        flat["pass_end_y"] = end_loc[1] if end_loc else None
        flat["pass_outcome"] = pass_data.get("outcome", {}).get("name", "Complete")
        flat["pass_recipient"] = pass_data.get("recipient", {}).get("name")

    # Shot-specific fields
    if "shot" in event:
        shot_data = event["shot"]
        flat["shot_xg"] = shot_data.get("statsbomb_xg")
        flat["shot_outcome"] = shot_data.get("outcome", {}).get("name")
        end_loc = shot_data.get("end_location", [None, None, None])
        flat["shot_end_x"] = end_loc[0] if end_loc else None
        flat["shot_end_y"] = end_loc[1] if end_loc else None

    return flat

def parse_events_file(filepath: str) -> pd.DataFrame:
    """Parse a JSON events file into a flat DataFrame."""
    with open(filepath, "r") as f:
        raw_events = json.load(f)

    flattened = [flatten_event(event) for event in raw_events]
    return pd.DataFrame(flattened)

# Parse events
events_flat = parse_events_file("match_events.json")

print(events_flat.head())
print(f"Parsed {len(events_flat)} events")
# R: Parsing Nested JSON Event Data
library(tidyverse)
library(jsonlite)

# Read raw JSON file
raw_json <- read_json("match_events.json", simplifyVector = FALSE)

# Function to flatten a single event
flatten_event <- function(event) {
    # Base fields
    base <- tibble(
        id = event$id %||% NA,
        index = event$index %||% NA,
        period = event$period %||% NA,
        minute = event$minute %||% NA,
        second = event$second %||% NA,
        type = event$type$name %||% NA,
        type_id = event$type$id %||% NA,
        player = event$player$name %||% NA,
        player_id = event$player$id %||% NA,
        team = event$team$name %||% NA,
        team_id = event$team$id %||% NA
    )

    # Location
    if (!is.null(event$location)) {
        base$location_x <- event$location[[1]]
        base$location_y <- event$location[[2]]
    }

    # Type-specific fields
    if (!is.null(event$pass)) {
        base$pass_length <- event$pass$length
        base$pass_angle <- event$pass$angle
        base$pass_end_x <- event$pass$end_location[[1]]
        base$pass_end_y <- event$pass$end_location[[2]]
        base$pass_outcome <- event$pass$outcome$name %||% "Complete"
        base$pass_recipient <- event$pass$recipient$name
    }

    if (!is.null(event$shot)) {
        base$shot_xg <- event$shot$statsbomb_xg
        base$shot_outcome <- event$shot$outcome$name
        base$shot_end_x <- event$shot$end_location[[1]]
        base$shot_end_y <- event$shot$end_location[[2]]
    }

    return(base)
}

# Parse all events
events_flat <- map_dfr(raw_json, flatten_event)

# View result
print(head(events_flat))
print(sprintf("Parsed %d events", nrow(events_flat)))
Output
        id  index  period  minute  second     type  location_x  location_y
0  uuid-001      1       1       0       0  Pass        60.0        40.0
1  uuid-002      2       1       0       2  Ball Receipt  45.0   35.0
2  uuid-003      3       1       0       3  Carry        45.0        35.0
3  uuid-004      4       1       0       5  Pass        52.0        28.0
4  uuid-005      5       1       0       7  Ball Receipt  65.0   22.0

Parsed 1847 events

Handling Multiple Providers

provider_agnostic.py
# Python: Provider-Agnostic Event Parser
import pandas as pd
from abc import ABC, abstractmethod

class EventParser(ABC):
    """Abstract base class for event data parsers."""

    @abstractmethod
    def parse(self, data) -> pd.DataFrame:
        pass

    @abstractmethod
    def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
        pass

class StatsBombParser(EventParser):
    """Parser for StatsBomb event data."""

    def parse(self, data) -> pd.DataFrame:
        return pd.json_normalize(data)

    def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
        standardized = pd.DataFrame({
            "event_id": range(len(df)),
            "event_type": df["type.name"] if "type.name" in df.columns else df["type"],
            "player": df.get("player.name", df.get("player")),
            "team": df.get("team.name", df.get("team")),
            "minute": df["minute"],
            "second": df["second"],
            "location_x": df["location"].apply(lambda x: x[0] if isinstance(x, list) else None),
            "location_y": df["location"].apply(lambda x: x[1] if isinstance(x, list) else None),
        })
        return standardized

class WyscoutParser(EventParser):
    """Parser for Wyscout event data."""

    def parse(self, data) -> pd.DataFrame:
        return pd.json_normalize(data)

    def standardize(self, df: pd.DataFrame) -> pd.DataFrame:
        standardized = pd.DataFrame({
            "event_id": range(len(df)),
            "event_type": df["eventName"],
            "player": df["playerId"],
            "team": df["teamId"],
            "minute": (df["eventSec"] // 60).astype(int),
            "second": (df["eventSec"] % 60).astype(int),
            "location_x": df["positions"].apply(
                lambda x: x[0]["x"] if x else None
            ),
            "location_y": df["positions"].apply(
                lambda x: x[0]["y"] if x else None
            ),
        })
        return standardized

def get_parser(provider: str) -> EventParser:
    """Factory function to get appropriate parser."""
    parsers = {
        "statsbomb": StatsBombParser(),
        "wyscout": WyscoutParser(),
    }
    return parsers.get(provider.lower())

# Usage
parser = get_parser("statsbomb")
standardized = parser.standardize(events)
print(standardized.head())
# R: Provider-Agnostic Event Parser
library(tidyverse)

# Define provider schemas
provider_schemas <- list(
    statsbomb = list(
        event_type = "type.name",
        location_x = "location[[1]]",
        location_y = "location[[2]]",
        player = "player.name",
        team = "team.name",
        timestamp = "timestamp"
    ),
    opta = list(
        event_type = "type_id",
        location_x = "x",
        location_y = "y",
        player = "player_id",
        team = "team_id",
        timestamp = "time_stamp"
    ),
    wyscout = list(
        event_type = "eventName",
        location_x = "positions[[1]]$x",
        location_y = "positions[[1]]$y",
        player = "playerId",
        team = "teamId",
        timestamp = "eventSec"
    )
)

# Standardize events to common schema
standardize_events <- function(events, provider) {
    schema <- provider_schemas[[provider]]

    standardized <- events %>%
        transmute(
            event_id = row_number(),
            event_type = !!sym(schema$event_type),
            player = !!sym(schema$player),
            team = !!sym(schema$team),
            minute = minute,
            second = second
        )

    # Handle location based on provider
    if (provider == "statsbomb") {
        standardized <- standardized %>%
            mutate(
                location_x = map_dbl(events$location, ~ .x[[1]] %||% NA),
                location_y = map_dbl(events$location, ~ .x[[2]] %||% NA)
            )
    }

    return(standardized)
}

# Example usage
standardized <- standardize_events(events, "statsbomb")
print(head(standardized))

Creating Derived Events

Raw event data can be enriched by creating derived events—new event types calculated from existing data that capture higher-level concepts.

derived_events.py
# Python: Creating Derived Events
import pandas as pd
import numpy as np

def create_progressive_passes(events: pd.DataFrame) -> pd.DataFrame:
    """Identify progressive passes (advance ball >10m toward goal)."""
    passes = events[events["type"] == "Pass"].copy()

    # Extract coordinates
    passes["start_x"] = passes["location"].apply(lambda x: x[0] if x else None)
    passes["start_y"] = passes["location"].apply(lambda x: x[1] if x else None)
    passes["end_x"] = passes["pass_end_location"].apply(
        lambda x: x[0] if isinstance(x, list) else None
    )
    passes["end_y"] = passes["pass_end_location"].apply(
        lambda x: x[1] if isinstance(x, list) else None
    )

    # Calculate distances to goal (goal at x=120)
    passes["start_dist_to_goal"] = np.sqrt(
        (120 - passes["start_x"])**2 + (40 - passes["start_y"])**2
    )
    passes["end_dist_to_goal"] = np.sqrt(
        (120 - passes["end_x"])**2 + (40 - passes["end_y"])**2
    )

    # Progressive if advances >10m toward goal
    passes["is_progressive"] = (
        passes["start_dist_to_goal"] - passes["end_dist_to_goal"]
    ) > 10

    # Into final third
    passes["into_final_third"] = (passes["start_x"] < 80) & (passes["end_x"] >= 80)

    # Into penalty area
    passes["into_box"] = (
        (passes["end_x"] >= 102) &
        (passes["end_y"] >= 18) &
        (passes["end_y"] <= 62)
    )

    return passes

def create_ball_progressions(events: pd.DataFrame) -> pd.DataFrame:
    """Create combined ball progression events (passes + carries)."""
    progressions = []

    # Progressive passes
    passes = events[events["type"] == "Pass"].copy()
    passes["start_x"] = passes["location"].apply(lambda x: x[0] if x else 0)
    passes["end_x"] = passes["pass_end_location"].apply(
        lambda x: x[0] if isinstance(x, list) else 0
    )
    passes["progress_dist"] = passes["end_x"] - passes["start_x"]
    prog_passes = passes[passes["progress_dist"] > 10].copy()
    prog_passes["progression_type"] = "pass"
    progressions.append(prog_passes)

    # Progressive carries
    if "Carry" in events["type"].values:
        carries = events[events["type"] == "Carry"].copy()
        carries["start_x"] = carries["location"].apply(lambda x: x[0] if x else 0)
        carries["end_x"] = carries["carry_end_location"].apply(
            lambda x: x[0] if isinstance(x, list) else 0
        )
        carries["progress_dist"] = carries["end_x"] - carries["start_x"]
        prog_carries = carries[carries["progress_dist"] > 10].copy()
        prog_carries["progression_type"] = "carry"
        progressions.append(prog_carries)

    return pd.concat(progressions).sort_values("index")

prog_passes = create_progressive_passes(events)
print(f"Progressive passes: {prog_passes['is_progressive'].sum()} "
      f"({prog_passes['is_progressive'].mean()*100:.1f}%)")
# R: Creating Derived Events
library(tidyverse)

# Create progressive passes (advance ball >10m toward goal)
create_progressive_passes <- function(events) {
    passes <- events %>%
        filter(type.name == "Pass") %>%
        mutate(
            # Extract coordinates
            start_x = map_dbl(location, ~ .x[[1]]),
            start_y = map_dbl(location, ~ .x[[2]]),
            end_x = map_dbl(pass.end_location, ~ .x[[1]]),
            end_y = map_dbl(pass.end_location, ~ .x[[2]]),

            # Calculate distances to goal (goal at x=120)
            start_dist_to_goal = sqrt((120 - start_x)^2 + (40 - start_y)^2),
            end_dist_to_goal = sqrt((120 - end_x)^2 + (40 - end_y)^2),

            # Progressive if advances >10m toward goal
            is_progressive = (start_dist_to_goal - end_dist_to_goal) > 10,

            # Into final third
            into_final_third = start_x < 80 & end_x >= 80,

            # Into penalty area
            into_box = end_x >= 102 & end_y >= 18 & end_y <= 62
        )

    return(passes)
}

# Create ball progression events (carries + progressive passes)
create_ball_progressions <- function(events) {
    # Progressive passes
    prog_passes <- events %>%
        filter(type.name == "Pass") %>%
        mutate(
            start_x = map_dbl(location, ~ .x[[1]]),
            end_x = map_dbl(pass.end_location, ~ .x[[1]]),
            progress_dist = end_x - start_x
        ) %>%
        filter(progress_dist > 10) %>%
        mutate(progression_type = "pass")

    # Progressive carries
    prog_carries <- events %>%
        filter(type.name == "Carry") %>%
        mutate(
            start_x = map_dbl(location, ~ .x[[1]]),
            end_x = map_dbl(carry.end_location, ~ .x[[1]]),
            progress_dist = end_x - start_x
        ) %>%
        filter(progress_dist > 10) %>%
        mutate(progression_type = "carry")

    # Combine
    progressions <- bind_rows(prog_passes, prog_carries) %>%
        arrange(index)

    return(progressions)
}

prog_passes <- create_progressive_passes(events)
cat(sprintf("Progressive passes: %d (%.1f%%)\n",
    sum(prog_passes$is_progressive),
    mean(prog_passes$is_progressive) * 100))
Output
Progressive passes: 142 (16.8%)

Creating Possession Sequences

possession_sequences.py
# Python: Create Possession Sequences
import pandas as pd
import numpy as np

def create_possession_sequences(events: pd.DataFrame) -> pd.DataFrame:
    """Create possession sequence summaries."""
    events = events.sort_values("index").copy()

    # Identify possession changes
    events["team_change"] = events["team"] != events["team"].shift(1)
    events["new_possession"] = (
        events["team_change"] |
        events["type"].isin(["Starting XI", "Half Start"]) |
        events["team"].shift(1).isna()
    )
    events["possession_id"] = events["new_possession"].cumsum()

    # Helper to safely get location
    def safe_location_x(loc):
        if isinstance(loc, list) and len(loc) > 0:
            return loc[0]
        return np.nan

    events["location_x"] = events["location"].apply(safe_location_x)

    # Summarize each possession
    possession_summary = events.groupby(["possession_id", "team"]).agg(
        start_minute=("minute", "min"),
        start_second=("second", "min"),
        end_minute=("minute", "max"),
        end_second=("second", "max"),
        n_events=("index", "count"),
        n_passes=("type", lambda x: (x == "Pass").sum()),
        has_shot=("type", lambda x: (x == "Shot").any()),
        has_goal=("shot_outcome", lambda x: (x == "Goal").any() if "Goal" in x.values else False),
        start_x=("location_x", "first"),
        end_x=("location_x", "last"),
    ).reset_index()

    # Calculate duration
    possession_summary["duration_seconds"] = (
        (possession_summary["end_minute"] * 60 + possession_summary["end_second"]) -
        (possession_summary["start_minute"] * 60 + possession_summary["start_second"])
    )

    # Classify sequences
    possession_summary["territory_gained"] = (
        possession_summary["end_x"] - possession_summary["start_x"]
    )

    def classify_sequence(row):
        if row["has_goal"]:
            return "Goal"
        elif row["has_shot"]:
            return "Shot"
        elif row["end_x"] >= 102:
            return "Penalty Area Entry"
        elif row["end_x"] >= 80:
            return "Final Third Entry"
        else:
            return "No Threat"

    possession_summary["sequence_quality"] = possession_summary.apply(
        classify_sequence, axis=1
    )

    return possession_summary

possessions = create_possession_sequences(events)
print(possessions["sequence_quality"].value_counts())
# R: Create Possession Sequences
library(tidyverse)

create_possession_sequences <- function(events) {
    # Identify possession changes
    events <- events %>%
        arrange(index) %>%
        mutate(
            # New possession on team change or after certain events
            new_possession = team.name != lag(team.name) |
                             type.name %in% c("Starting XI", "Half Start") |
                             is.na(lag(team.name)),
            possession_id = cumsum(new_possession)
        )

    # Summarize each possession
    possession_summary <- events %>%
        group_by(possession_id, team.name) %>%
        summarize(
            start_minute = min(minute),
            start_second = min(second),
            end_minute = max(minute),
            end_second = max(second),
            duration_seconds = (end_minute * 60 + end_second) -
                              (start_minute * 60 + start_second),
            n_events = n(),
            n_passes = sum(type.name == "Pass"),
            n_successful_passes = sum(type.name == "Pass" & is.na(pass.outcome.name)),
            has_shot = any(type.name == "Shot"),
            has_goal = any(type.name == "Shot" & shot.outcome.name == "Goal"),
            start_x = first(na.omit(map_dbl(location, ~ .x[[1]]))),
            end_x = last(na.omit(map_dbl(location, ~ .x[[1]]))),
            .groups = "drop"
        ) %>%
        mutate(
            territory_gained = end_x - start_x,
            is_attacking_sequence = end_x > 80,
            sequence_quality = case_when(
                has_goal ~ "Goal",
                has_shot ~ "Shot",
                end_x >= 102 ~ "Penalty Area Entry",
                end_x >= 80 ~ "Final Third Entry",
                TRUE ~ "No Threat"
            )
        )

    return(possession_summary)
}

possessions <- create_possession_sequences(events)
print(table(possessions$sequence_quality))
Output
No Threat            156
Final Third Entry     42
Shot                  28
Penalty Area Entry    18
Goal                   3
Name: sequence_quality, dtype: int64

Building Custom Metrics

With access to raw event data, you can create custom metrics tailored to specific questions or requirements.

custom_metrics.py
# Python: Building Custom Metrics from Events
import pandas as pd
import numpy as np

def calculate_dangerous_passes_received(events: pd.DataFrame) -> pd.DataFrame:
    """Calculate dangerous passes received by player."""
    passes = events[
        (events["type"] == "Pass") &
        (events["pass_outcome"].isna())  # Successful passes
    ].copy()

    # Extract end locations
    passes["end_x"] = passes["pass_end_location"].apply(
        lambda x: x[0] if isinstance(x, list) else None
    )
    passes["end_y"] = passes["pass_end_location"].apply(
        lambda x: x[1] if isinstance(x, list) else None
    )

    # Dangerous zone: final third, central areas
    passes["is_dangerous"] = (
        (passes["end_x"] >= 80) &
        (passes["end_y"] >= 20) &
        (passes["end_y"] <= 60) &
        (passes["pass_recipient"].notna())
    )

    dangerous_received = passes[passes["is_dangerous"]].groupby(
        "pass_recipient"
    ).size().reset_index(name="dangerous_passes_received")

    return dangerous_received.sort_values(
        "dangerous_passes_received", ascending=False
    )

def calculate_pressure_regains(events: pd.DataFrame) -> pd.DataFrame:
    """Calculate pressure regain rate by player."""
    events = events.sort_values("index").copy()

    # Get pressures with next event info
    pressures = events[events["type"] == "Pressure"].copy()
    pressures["next_event"] = events["type"].shift(-1)
    pressures["next_team"] = events["team"].shift(-1)

    # Pressure success: same team regains ball
    pressures["pressure_success"] = (
        (pressures["next_team"] == pressures["team"]) &
        (pressures["next_event"].isin(["Ball Recovery", "Interception"]))
    )

    player_pressure_stats = pressures.groupby("player").agg(
        pressures=("index", "count"),
        pressure_regains=("pressure_success", "sum")
    ).reset_index()

    player_pressure_stats["regain_rate"] = (
        player_pressure_stats["pressure_regains"] /
        player_pressure_stats["pressures"]
    )

    return player_pressure_stats[
        player_pressure_stats["pressures"] >= 5
    ].sort_values("regain_rate", ascending=False)

def calculate_box_entries(events: pd.DataFrame) -> pd.DataFrame:
    """Calculate box entries by player (passes + carries)."""
    actions = events[events["type"].isin(["Pass", "Carry"])].copy()

    def get_end_location(row):
        if row["type"] == "Pass":
            loc = row.get("pass_end_location")
        else:
            loc = row.get("carry_end_location")
        return loc if isinstance(loc, list) else [None, None]

    actions["start_x"] = actions["location"].apply(
        lambda x: x[0] if isinstance(x, list) else None
    )
    actions["start_y"] = actions["location"].apply(
        lambda x: x[1] if isinstance(x, list) else None
    )
    actions["end_loc"] = actions.apply(get_end_location, axis=1)
    actions["end_x"] = actions["end_loc"].apply(lambda x: x[0])
    actions["end_y"] = actions["end_loc"].apply(lambda x: x[1])

    # Check if enters box
    actions["enters_box"] = (
        ((actions["start_x"] < 102) | (actions["start_y"] < 18) | (actions["start_y"] > 62)) &
        (actions["end_x"] >= 102) &
        (actions["end_y"] >= 18) &
        (actions["end_y"] <= 62)
    )

    box_entries = actions[actions["enters_box"]].groupby(
        ["player", "type"]
    ).size().unstack(fill_value=0).reset_index()

    if "Pass" not in box_entries.columns:
        box_entries["Pass"] = 0
    if "Carry" not in box_entries.columns:
        box_entries["Carry"] = 0

    box_entries["total_box_entries"] = box_entries["Pass"] + box_entries["Carry"]

    return box_entries.sort_values("total_box_entries", ascending=False)

# Calculate all metrics
dangerous_received = calculate_dangerous_passes_received(events)
pressure_regains = calculate_pressure_regains(events)
box_entries = calculate_box_entries(events)

print("Top players by dangerous passes received:")
print(dangerous_received.head())
# R: Building Custom Metrics from Events
library(tidyverse)

# Metric 1: Dangerous Passes Received
calculate_dangerous_passes_received <- function(events) {
    # Find passes into dangerous zones
    passes <- events %>%
        filter(type.name == "Pass", is.na(pass.outcome.name)) %>%
        mutate(
            end_x = map_dbl(pass.end_location, ~ .x[[1]]),
            end_y = map_dbl(pass.end_location, ~ .x[[2]]),
            # Dangerous zone: final third, central areas
            is_dangerous = end_x >= 80 &
                          end_y >= 20 & end_y <= 60 &
                          !is.na(pass.recipient.name)
        )

    dangerous_received <- passes %>%
        filter(is_dangerous) %>%
        count(player = pass.recipient.name, name = "dangerous_passes_received") %>%
        arrange(desc(dangerous_passes_received))

    return(dangerous_received)
}

# Metric 2: Pressure Regains
calculate_pressure_regains <- function(events) {
    events <- events %>% arrange(index)

    # Find pressures followed by ball recovery or turnover
    pressure_outcomes <- events %>%
        filter(type.name == "Pressure") %>%
        mutate(
            next_event = lead(type.name),
            next_team = lead(team.name),
            pressure_success = next_team == team.name &
                              next_event %in% c("Ball Recovery", "Interception")
        )

    player_pressure_stats <- pressure_outcomes %>%
        group_by(player.name) %>%
        summarize(
            pressures = n(),
            pressure_regains = sum(pressure_success, na.rm = TRUE),
            regain_rate = pressure_regains / pressures,
            .groups = "drop"
        ) %>%
        filter(pressures >= 5) %>%
        arrange(desc(regain_rate))

    return(player_pressure_stats)
}

# Metric 3: Box Entries Created
calculate_box_entries <- function(events) {
    # Passes or carries that enter the penalty area
    box_entries <- events %>%
        filter(type.name %in% c("Pass", "Carry")) %>%
        mutate(
            start_x = map_dbl(location, ~ .x[[1]]),
            start_y = map_dbl(location, ~ .x[[2]]),
            end_x = case_when(
                type.name == "Pass" ~ map_dbl(pass.end_location, ~ .x[[1]]),
                type.name == "Carry" ~ map_dbl(carry.end_location, ~ .x[[1]]),
                TRUE ~ NA_real_
            ),
            end_y = case_when(
                type.name == "Pass" ~ map_dbl(pass.end_location, ~ .x[[2]]),
                type.name == "Carry" ~ map_dbl(carry.end_location, ~ .x[[2]]),
                TRUE ~ NA_real_
            ),
            # Check if it enters the box
            enters_box = (start_x < 102 | start_y < 18 | start_y > 62) &
                        end_x >= 102 & end_y >= 18 & end_y <= 62
        ) %>%
        filter(enters_box)

    player_box_entries <- box_entries %>%
        count(player.name, type.name, name = "box_entries") %>%
        pivot_wider(names_from = type.name, values_from = box_entries, values_fill = 0) %>%
        mutate(total_box_entries = Pass + Carry) %>%
        arrange(desc(total_box_entries))

    return(player_box_entries)
}

# Calculate all metrics
dangerous_received <- calculate_dangerous_passes_received(events)
pressure_regains <- calculate_pressure_regains(events)
box_entries <- calculate_box_entries(events)

print("Top players by dangerous passes received:")
print(head(dangerous_received))
Output
Top players by dangerous passes received:
       pass_recipient  dangerous_passes_received
0      Lionel Messi                          12
1       Luis Suárez                           9
2       Neymar Jr                             8
3  Sergio Busquets                            6
4     Andrés Iniesta                          5

Handling Data Quality Issues

Event data is collected by human taggers and automated systems, leading to quality variations. Understanding and handling these issues is crucial.

Common Data Quality Issues
  • Missing events: Off-ball actions often undercounted
  • Location imprecision: Coordinates may be estimates
  • Inconsistent tagging: Different taggers, different rules
  • Temporal gaps: Missing timestamps or out of sequence
  • Missing qualifiers: Body part, technique not always tagged
  • Cross-provider differences: Same event, different classification
Validation Strategies
  • Check event counts against expected ranges
  • Validate location coordinates within pitch bounds
  • Verify temporal sequence consistency
  • Cross-reference with official match statistics
  • Flag outliers for manual review
  • Build automated quality metrics
data_quality.py
# Python: Event Data Quality Checks
import pandas as pd
import numpy as np
from typing import Dict, Any

def validate_event_data(events: pd.DataFrame) -> Dict[str, Any]:
    """Comprehensive event data quality validation."""
    quality_report = {}

    # 1. Check for missing critical fields
    quality_report["missing_fields"] = {
        "missing_type": events["type"].isna().sum(),
        "missing_player": events["player"].isna().sum(),
        "missing_team": events["team"].isna().sum(),
        "missing_location": events["location"].apply(
            lambda x: x is None or (isinstance(x, list) and len(x) == 0)
        ).sum(),
        "missing_timestamp": events["timestamp"].isna().sum()
    }

    # 2. Validate location coordinates
    def extract_coords(loc):
        if isinstance(loc, list) and len(loc) >= 2:
            return loc[0], loc[1]
        return None, None

    events["loc_x"], events["loc_y"] = zip(*events["location"].apply(extract_coords))

    valid_locs = events.dropna(subset=["loc_x", "loc_y"])
    quality_report["location_issues"] = {
        "x_out_of_bounds": ((valid_locs["loc_x"] < 0) | (valid_locs["loc_x"] > 120)).sum(),
        "y_out_of_bounds": ((valid_locs["loc_y"] < 0) | (valid_locs["loc_y"] > 80)).sum(),
        "suspicious_origin": ((valid_locs["loc_x"] == 0) & (valid_locs["loc_y"] == 0)).sum()
    }

    # 3. Check event sequence consistency
    events_sorted = events.sort_values("index")
    events_sorted["time_seconds"] = events_sorted["minute"] * 60 + events_sorted["second"]
    events_sorted["time_diff"] = events_sorted["time_seconds"].diff()

    quality_report["sequence_issues"] = {
        "backwards_timestamps": (events_sorted["time_diff"] < -5).sum(),
        "duplicate_indices": len(events) - events["index"].nunique()
    }

    # 4. Event count validation
    event_counts = events["type"].value_counts()
    expected_mins = {"Pass": 400, "Shot": 10, "Foul Committed": 15}

    below_expected = []
    for event_type, min_count in expected_mins.items():
        actual = event_counts.get(event_type, 0)
        if actual < min_count:
            below_expected.append({
                "type": event_type,
                "actual": actual,
                "expected_min": min_count
            })

    quality_report["below_expected_counts"] = below_expected

    # 5. Generate overall quality score
    total_events = len(events)
    issues = (
        sum(quality_report["missing_fields"].values()) +
        sum(quality_report["location_issues"].values()) +
        quality_report["sequence_issues"]["backwards_timestamps"]
    )

    quality_report["quality_score"] = 1 - (issues / total_events)

    return quality_report

quality = validate_event_data(events)
print(f"Data Quality Score: {quality['quality_score']*100:.2f}%")
print("\nMissing fields:")
for field, count in quality["missing_fields"].items():
    print(f"  {field}: {count}")
# R: Event Data Quality Checks
library(tidyverse)

validate_event_data <- function(events) {
    quality_report <- list()

    # 1. Check for missing critical fields
    quality_report$missing_fields <- events %>%
        summarize(
            missing_type = sum(is.na(type.name)),
            missing_player = sum(is.na(player.name)),
            missing_team = sum(is.na(team.name)),
            missing_location = sum(map_lgl(location, is.null)),
            missing_timestamp = sum(is.na(timestamp))
        )

    # 2. Validate location coordinates
    events_with_loc <- events %>%
        filter(!map_lgl(location, is.null)) %>%
        mutate(
            x = map_dbl(location, ~ .x[[1]]),
            y = map_dbl(location, ~ .x[[2]])
        )

    quality_report$location_issues <- events_with_loc %>%
        summarize(
            x_out_of_bounds = sum(x < 0 | x > 120),
            y_out_of_bounds = sum(y < 0 | y > 80),
            suspicious_origin = sum(x == 0 & y == 0)
        )

    # 3. Check event sequence consistency
    events_ordered <- events %>% arrange(index)
    quality_report$sequence_issues <- events_ordered %>%
        mutate(
            time_seconds = minute * 60 + second,
            time_diff = time_seconds - lag(time_seconds),
            time_backwards = time_diff < -5  # Allow small corrections
        ) %>%
        summarize(
            backwards_timestamps = sum(time_backwards, na.rm = TRUE),
            duplicate_indices = n() - n_distinct(index)
        )

    # 4. Event count validation
    quality_report$event_counts <- events %>%
        count(type.name) %>%
        mutate(
            expected_min = case_when(
                type.name == "Pass" ~ 400,
                type.name == "Shot" ~ 10,
                type.name == "Foul Committed" ~ 15,
                TRUE ~ 0
            ),
            below_expected = n < expected_min
        )

    # 5. Generate overall quality score
    total_events <- nrow(events)
    issues <- sum(quality_report$missing_fields) +
              sum(quality_report$location_issues) +
              quality_report$sequence_issues$backwards_timestamps

    quality_report$quality_score <- 1 - (issues / total_events)

    return(quality_report)
}

quality <- validate_event_data(events)
cat(sprintf("Data Quality Score: %.2f%%\n", quality$quality_score * 100))
print(quality$missing_fields)
Output
Data Quality Score: 97.84%

Missing fields:
  missing_type: 0
  missing_player: 12
  missing_team: 0
  missing_location: 23
  missing_timestamp: 0

Optimizing Event Data Processing

When working with multiple seasons of event data, processing efficiency becomes critical. Here are strategies for handling large datasets.

optimization.py
# Python: Optimizing Event Data Processing
import pandas as pd
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import pyarrow.parquet as pq
import pyarrow as pa

# Strategy 1: Use vectorized operations
def process_events_vectorized(events: pd.DataFrame) -> pd.DataFrame:
    """Use vectorized operations instead of apply."""
    # Vectorized location extraction
    locations = pd.DataFrame(
        events["location"].tolist(),
        columns=["x", "y"]
    )
    events = pd.concat([events, locations], axis=1)

    # Vectorized calculations
    events["in_final_third"] = events["x"] > 80
    events["in_box"] = (events["x"] > 102) & (events["y"].between(18, 62))

    return events

# Strategy 2: Process files in chunks
def process_season_chunked(file_paths: list, chunk_size: int = 10):
    """Process large number of files in memory-efficient chunks."""
    results = []

    for i in range(0, len(file_paths), chunk_size):
        chunk_files = file_paths[i:i + chunk_size]

        chunk_data = pd.concat([
            pd.read_json(f) for f in chunk_files
        ])

        # Process chunk
        chunk_summary = summarize_events(chunk_data)
        results.append(chunk_summary)

        # Clear memory
        del chunk_data

    return pd.concat(results)

# Strategy 3: Use Parquet for efficient storage
def save_events_parquet(events: pd.DataFrame, filepath: str):
    """Save events to Parquet format for efficient storage."""
    events.to_parquet(filepath, compression="snappy")

def load_events_parquet(filepath: str, columns: list = None):
    """Load events from Parquet, optionally selecting columns."""
    return pd.read_parquet(filepath, columns=columns)

# Strategy 4: Parallel processing
def process_match(filepath: str) -> pd.DataFrame:
    """Process a single match file."""
    events = pd.read_json(filepath)
    return summarize_events(events)

def process_matches_parallel(file_paths: list, n_workers: int = 4):
    """Process multiple matches in parallel."""
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(process_match, file_paths))
    return pd.concat(results)

# Benchmark example
import time

# Measure processing time
start = time.time()
processed = process_events_vectorized(events)
elapsed = time.time() - start
print(f"Vectorized processing: {elapsed:.3f} seconds")
# R: Optimizing Event Data Processing
library(tidyverse)
library(data.table)
library(arrow)

# Strategy 1: Use data.table for large datasets
process_events_fast <- function(events_dt) {
    # Convert to data.table
    events_dt <- as.data.table(events_dt)

    # Efficient aggregation with data.table
    player_stats <- events_dt[,
        .(
            passes = sum(type.name == "Pass"),
            shots = sum(type.name == "Shot"),
            tackles = sum(type.name == "Tackle")
        ),
        by = .(player.name, team.name)
    ]

    return(player_stats)
}

# Strategy 2: Process files in chunks
process_season_chunked <- function(file_paths, chunk_size = 10) {
    results <- list()

    for (i in seq(1, length(file_paths), chunk_size)) {
        chunk_files <- file_paths[i:min(i + chunk_size - 1, length(file_paths))]

        chunk_data <- map_dfr(chunk_files, ~ {
            events <- read_json(.x, simplifyVector = TRUE)
            # Process each match
            summarize_match(events)
        })

        results[[length(results) + 1]] <- chunk_data

        # Clear memory
        gc()
    }

    return(bind_rows(results))
}

# Strategy 3: Use Arrow/Parquet for storage
save_events_parquet <- function(events, filepath) {
    write_parquet(events, filepath)
}

load_events_parquet <- function(filepath) {
    # Read only needed columns
    read_parquet(filepath,
        col_select = c("id", "type.name", "player.name", "location", "minute")
    )
}

# Strategy 4: Parallel processing
library(furrr)
plan(multisession, workers = 4)

process_matches_parallel <- function(match_files) {
    future_map_dfr(match_files, function(file) {
        events <- read_json(file, simplifyVector = TRUE)
        summarize_match(events)
    }, .progress = TRUE)
}
Output
Vectorized processing: 0.042 seconds

Practice Exercises

Exercise 31.1: Shot Freeze Frame Analyzer

Task: Build a comprehensive shot analyzer that extracts shot data with freeze frame information to calculate advanced defensive pressure metrics.

Requirements:

  • Parse nested JSON to extract all shot events with freeze frames
  • Calculate distance to nearest defender and goalkeeper for each shot
  • Count defenders in the shooting lane (cone between ball and goal)
  • Create a pressure index combining all defensive factors
  • Compare actual xG vs predicted xG with pressure adjustment

freeze_frame_analysis.py
# Python: Shot Freeze Frame Analyzer
import pandas as pd
import numpy as np
from statsbombpy import sb
from typing import List, Dict, Optional
import math

def parse_freeze_frame(freeze_frame: List[Dict], shot_location: List[float]) -> Dict:
    """Parse freeze frame data to calculate defensive metrics."""
    if not freeze_frame or not shot_location:
        return {
            "n_defenders_in_cone": None,
            "dist_nearest_defender": None,
            "dist_goalkeeper": None,
            "defenders_in_box": None
        }

    shot_x, shot_y = shot_location[0], shot_location[1]
    goal_x, goal_y = 120, 40

    # Extract defender positions
    defenders = []
    for player in freeze_frame:
        if not player.get("teammate", True):
            loc = player.get("location", [])
            if len(loc) >= 2:
                defenders.append({
                    "x": loc[0],
                    "y": loc[1],
                    "is_goalkeeper": player.get("position", {}).get("name") == "Goalkeeper"
                })

    if not defenders:
        return {
            "n_defenders_in_cone": 0,
            "dist_nearest_defender": None,
            "dist_goalkeeper": None,
            "defenders_in_box": 0
        }

    # Calculate distances
    for d in defenders:
        d["dist_from_shot"] = math.sqrt((d["x"] - shot_x)**2 + (d["y"] - shot_y)**2)

    # Find goalkeeper
    gk = [d for d in defenders if d.get("is_goalkeeper")]
    dist_gk = gk[0]["dist_from_shot"] if gk else None

    # Defenders in shooting cone (15 degree angle to goal)
    cone_angle = 15 * math.pi / 180
    shot_to_goal_angle = math.atan2(goal_y - shot_y, goal_x - shot_x)

    in_cone = 0
    for d in defenders:
        angle_to_defender = math.atan2(d["y"] - shot_y, d["x"] - shot_x)
        angle_diff = abs(angle_to_defender - shot_to_goal_angle)
        if angle_diff < cone_angle and d["x"] > shot_x:
            in_cone += 1

    # Defenders in penalty box
    in_box = sum(1 for d in defenders
                 if d["x"] >= 102 and 18 <= d["y"] <= 62)

    return {
        "n_defenders_in_cone": in_cone,
        "dist_nearest_defender": min(d["dist_from_shot"] for d in defenders),
        "dist_goalkeeper": dist_gk,
        "defenders_in_box": in_box
    }

def analyze_shots_with_pressure(match_ids: List[int]) -> pd.DataFrame:
    """Analyze all shots with defensive pressure metrics."""
    all_shots = []

    for match_id in match_ids:
        try:
            events = sb.events(match_id=match_id)
            shots = events[events["type"] == "Shot"].copy()

            for idx, shot in shots.iterrows():
                freeze_frame = shot.get("shot_freeze_frame")
                location = shot.get("location")

                metrics = parse_freeze_frame(freeze_frame, location)

                shot_data = {
                    "match_id": match_id,
                    "player": shot.get("player"),
                    "team": shot.get("team"),
                    "minute": shot.get("minute"),
                    "xg": shot.get("shot_statsbomb_xg"),
                    "outcome": shot.get("shot_outcome"),
                    "body_part": shot.get("shot_body_part"),
                    **metrics
                }
                all_shots.append(shot_data)
        except Exception as e:
            print(f"Error processing match {match_id}: {e}")
            continue

    df = pd.DataFrame(all_shots)

    # Calculate pressure index
    df["pressure_index"] = (
        0.3 * (df["n_defenders_in_cone"] / 3).clip(0, 1) +
        0.3 * (1 - df["dist_nearest_defender"] / 10).clip(0, 1) +
        0.2 * (df["defenders_in_box"] / 6).clip(0, 1) +
        0.2 * (1 - df["dist_goalkeeper"].fillna(15) / 15).clip(0, 1)
    )

    df["pressure_category"] = pd.cut(
        df["pressure_index"],
        bins=[0, 0.3, 0.6, 1.0],
        labels=["Low Pressure", "Medium Pressure", "High Pressure"]
    )

    return df

# Load sample data
competitions = sb.competitions()
matches = sb.matches(competition_id=11, season_id=90)
sample_matches = matches["match_id"].head(10).tolist()

# Analyze shots
shots_df = analyze_shots_with_pressure(sample_matches)

# Summarize by pressure category
pressure_summary = shots_df.groupby("pressure_category").agg({
    "xg": ["count", "mean"],
    "outcome": lambda x: (x == "Goal").sum()
}).reset_index()

pressure_summary.columns = ["pressure_category", "n_shots", "avg_xg", "goals"]
pressure_summary["conversion_rate"] = pressure_summary["goals"] / pressure_summary["n_shots"]

print("Shot Analysis by Defensive Pressure:")
print(pressure_summary)
# R: Shot Freeze Frame Analyzer
library(tidyverse)
library(StatsBombR)
library(jsonlite)

# Load StatsBomb data
Comp <- FreeCompetitions()
Matches <- FreeMatches(Comp)
events <- StatsBombFreeEvents(MatchesDF = Matches[1:10, ])

# Function to parse freeze frame data
parse_freeze_frame <- function(freeze_frame, shot_location) {
    if (is.null(freeze_frame) || length(freeze_frame) == 0) {
        return(list(
            n_defenders_in_cone = NA,
            dist_nearest_defender = NA,
            dist_goalkeeper = NA,
            defenders_in_box = NA
        ))
    }

    ff_df <- as.data.frame(freeze_frame)
    shot_x <- shot_location[1]
    shot_y <- shot_location[2]
    goal_x <- 120
    goal_y <- 40

    # Extract positions
    defenders <- ff_df %>%
        filter(teammate == FALSE) %>%
        mutate(
            x = map_dbl(location, ~.x[1]),
            y = map_dbl(location, ~.x[2])
        )

    if (nrow(defenders) == 0) {
        return(list(
            n_defenders_in_cone = 0,
            dist_nearest_defender = NA,
            dist_goalkeeper = NA,
            defenders_in_box = 0
        ))
    }

    # Distance to each defender from shot location
    defenders <- defenders %>%
        mutate(
            dist_from_shot = sqrt((x - shot_x)^2 + (y - shot_y)^2)
        )

    # Find goalkeeper (furthest back defender)
    gk_row <- defenders %>% filter(x == max(x))
    dist_gk <- if(nrow(gk_row) > 0) gk_row$dist_from_shot[1] else NA

    # Defenders in shooting cone (within 15 degree angle to goal)
    cone_angle <- 15 * pi / 180
    shot_to_goal_angle <- atan2(goal_y - shot_y, goal_x - shot_x)

    defenders <- defenders %>%
        mutate(
            angle_to_defender = atan2(y - shot_y, x - shot_x),
            angle_diff = abs(angle_to_defender - shot_to_goal_angle),
            in_cone = angle_diff < cone_angle & x > shot_x
        )

    # Defenders in penalty box
    in_box <- sum(defenders$x >= 102 & defenders$y >= 18 & defenders$y <= 62)

    list(
        n_defenders_in_cone = sum(defenders$in_cone, na.rm = TRUE),
        dist_nearest_defender = min(defenders$dist_from_shot, na.rm = TRUE),
        dist_goalkeeper = dist_gk,
        defenders_in_box = in_box
    )
}

# Process all shots with freeze frames
shots_analyzed <- events %>%
    filter(type.name == "Shot") %>%
    mutate(
        shot_x = map_dbl(location, ~.x[1]),
        shot_y = map_dbl(location, ~.x[2]),
        freeze_analysis = map2(shot.freeze_frame, location, parse_freeze_frame)
    ) %>%
    unnest_wider(freeze_analysis)

# Calculate pressure index
shots_analyzed <- shots_analyzed %>%
    mutate(
        pressure_index = (
            0.3 * pmin(n_defenders_in_cone / 3, 1) +
            0.3 * pmax(0, 1 - dist_nearest_defender / 10) +
            0.2 * pmin(defenders_in_box / 6, 1) +
            0.2 * pmax(0, 1 - dist_goalkeeper / 15)
        ),
        pressure_category = case_when(
            pressure_index < 0.3 ~ "Low Pressure",
            pressure_index < 0.6 ~ "Medium Pressure",
            TRUE ~ "High Pressure"
        )
    )

# Analyze xG by pressure category
pressure_summary <- shots_analyzed %>%
    group_by(pressure_category) %>%
    summarize(
        n_shots = n(),
        avg_xg = mean(shot.statsbomb_xg, na.rm = TRUE),
        goals = sum(shot.outcome.name == "Goal", na.rm = TRUE),
        conversion_rate = goals / n_shots,
        avg_defenders_in_cone = mean(n_defenders_in_cone, na.rm = TRUE),
        .groups = "drop"
    )

print("Shot Analysis by Defensive Pressure:")
print(pressure_summary)
Exercise 31.2: Dangerous Turnover Tracker

Task: Build a comprehensive turnover analysis system that identifies and rates the danger of possession losses.

Requirements:

  • Identify all turnover events (failed passes, dispossessions, miscontrols)
  • Track subsequent opponent actions within 15 seconds
  • Calculate a danger score based on zone, speed of counter, and outcome
  • Create player-level turnover profiles with risk metrics

turnover_tracker.py
# Python: Dangerous Turnover Tracker
import pandas as pd
import numpy as np
from typing import Dict, List
from statsbombpy import sb

class TurnoverAnalyzer:
    """Analyze dangerous turnovers and their consequences."""

    TURNOVER_TYPES = ["Miscontrol", "Dispossessed", "Error"]
    AFTERMATH_WINDOW = 15  # seconds

    def __init__(self, events: pd.DataFrame):
        self.events = events.sort_values(["match_id", "index"]).copy()
        self.events["game_seconds"] = self.events["minute"] * 60 + self.events["second"]
        self._extract_locations()

    def _extract_locations(self):
        """Extract x, y coordinates from location field."""
        self.events["location_x"] = self.events["location"].apply(
            lambda x: x[0] if isinstance(x, list) and len(x) >= 2 else None
        )
        self.events["location_y"] = self.events["location"].apply(
            lambda x: x[1] if isinstance(x, list) and len(x) >= 2 else None
        )

    def identify_turnovers(self) -> pd.DataFrame:
        """Identify all turnover events."""
        turnovers = self.events[
            (self.events["type"].isin(self.TURNOVER_TYPES)) |
            ((self.events["type"] == "Pass") & (self.events["pass_outcome"].notna()))
        ].copy()

        # Classify turnover zone
        turnovers["turnover_zone"] = pd.cut(
            turnovers["location_x"],
            bins=[0, 40, 80, 120],
            labels=["Defensive Third", "Middle Third", "Attacking Third"]
        )

        turnovers["zone_danger"] = turnovers["location_x"].apply(
            lambda x: 3 if x < 40 else (2 if x < 80 else 1)
        )

        return turnovers

    def analyze_aftermath(self, turnover: pd.Series) -> Dict:
        """Analyze what happens after a turnover."""
        match_events = self.events[self.events["match_id"] == turnover["match_id"]]

        turnover_time = turnover["game_seconds"]
        turnover_team = turnover["team"]

        # Get opponent events in aftermath window
        aftermath = match_events[
            (match_events["game_seconds"] > turnover_time) &
            (match_events["game_seconds"] <= turnover_time + self.AFTERMATH_WINDOW) &
            (match_events["team"] != turnover_team)
        ]

        if len(aftermath) == 0:
            return {
                "shot_within_15s": False,
                "goal_within_15s": False,
                "xg_within_15s": 0.0,
                "territory_gained": 0.0,
                "counter_speed": 0.0
            }

        # Check for shots
        shots = aftermath[aftermath["type"] == "Shot"]

        # Calculate territory gained
        start_x = aftermath.iloc[0]["location_x"] or 60
        end_x = aftermath.iloc[-1]["location_x"] or 60
        territory_gained = max(0, end_x - start_x)

        # Counter speed
        time_elapsed = aftermath.iloc[-1]["game_seconds"] - turnover_time
        counter_speed = territory_gained / time_elapsed if time_elapsed > 0 else 0

        return {
            "shot_within_15s": len(shots) > 0,
            "goal_within_15s": (shots["shot_outcome"] == "Goal").any() if len(shots) > 0 else False,
            "xg_within_15s": shots["shot_statsbomb_xg"].sum() if len(shots) > 0 else 0.0,
            "territory_gained": territory_gained,
            "counter_speed": counter_speed
        }

    def analyze_all_turnovers(self) -> pd.DataFrame:
        """Analyze all turnovers with aftermath."""
        turnovers = self.identify_turnovers()

        aftermath_data = []
        for idx, turnover in turnovers.iterrows():
            aftermath = self.analyze_aftermath(turnover)
            aftermath_data.append(aftermath)

        aftermath_df = pd.DataFrame(aftermath_data)
        result = pd.concat([turnovers.reset_index(drop=True), aftermath_df], axis=1)

        # Calculate danger score
        result["danger_score"] = (
            result["zone_danger"] * 0.3 +
            result["shot_within_15s"].astype(int) * 2 +
            result["goal_within_15s"].astype(int) * 3 +
            result["xg_within_15s"] * 2 +
            np.minimum(result["territory_gained"] / 40, 1) * 1.5 +
            np.minimum(result["counter_speed"] / 10, 1) * 1
        )

        result["danger_level"] = pd.cut(
            result["danger_score"],
            bins=[-np.inf, 1, 2, 4, np.inf],
            labels=["Low", "Moderate", "Dangerous", "Critical"]
        )

        return result

    def create_player_profiles(self, turnover_analysis: pd.DataFrame) -> pd.DataFrame:
        """Create player-level turnover profiles."""
        profiles = turnover_analysis.groupby(["player", "team"]).agg({
            "danger_score": ["count", "mean"],
            "shot_within_15s": "sum",
            "goal_within_15s": "sum",
            "xg_within_15s": "sum",
            "turnover_zone": lambda x: (x == "Defensive Third").mean() * 100
        }).reset_index()

        profiles.columns = [
            "player", "team", "total_turnovers", "avg_danger_score",
            "shots_conceded", "goals_conceded", "xg_conceded", "pct_in_own_third"
        ]

        profiles["dangerous_turnovers"] = turnover_analysis.groupby("player").apply(
            lambda x: (x["danger_level"].isin(["Critical", "Dangerous"])).sum()
        ).values

        profiles["danger_rate"] = profiles["dangerous_turnovers"] / profiles["total_turnovers"] * 100

        return profiles.sort_values("avg_danger_score", ascending=False)

# Example usage
events = sb.events(match_id=3788741)
analyzer = TurnoverAnalyzer(events)

turnover_results = analyzer.analyze_all_turnovers()
player_profiles = analyzer.create_player_profiles(turnover_results)

print("Turnover Analysis Summary:")
print(turnover_results["danger_level"].value_counts())
print("\nTop 10 Players by Turnover Danger:")
print(player_profiles.head(10))
# R: Dangerous Turnover Tracker
library(tidyverse)

# Define turnover event types
turnover_types <- c(
    "Miscontrol",
    "Dispossessed",
    "Error",
    "Pass"  # with unsuccessful outcome
)

analyze_turnovers <- function(events) {
    # Sort events by time
    events <- events %>%
        arrange(match_id, index) %>%
        mutate(
            game_seconds = minute * 60 + second,
            location_x = map_dbl(location, ~ if(!is.null(.x)) .x[1] else NA),
            location_y = map_dbl(location, ~ if(!is.null(.x)) .x[2] else NA)
        )

    # Identify turnovers
    turnovers <- events %>%
        filter(
            type.name %in% c("Miscontrol", "Dispossessed", "Error") |
            (type.name == "Pass" & !is.na(pass.outcome.name))
        ) %>%
        mutate(
            turnover_zone = case_when(
                location_x < 40 ~ "Defensive Third",
                location_x < 80 ~ "Middle Third",
                TRUE ~ "Attacking Third"
            ),
            zone_danger = case_when(
                location_x < 40 ~ 3,  # Own third - most dangerous
                location_x < 80 ~ 2,
                TRUE ~ 1  # Opponent third - least dangerous
            )
        )

    # For each turnover, analyze subsequent events
    analyze_aftermath <- function(turnover_row, all_events) {
        match_events <- all_events %>%
            filter(match_id == turnover_row$match_id)

        turnover_time <- turnover_row$game_seconds
        turnover_team <- turnover_row$team.name

        # Get events in next 15 seconds by other team
        aftermath <- match_events %>%
            filter(
                game_seconds > turnover_time,
                game_seconds <= turnover_time + 15,
                team.name != turnover_team
            )

        if (nrow(aftermath) == 0) {
            return(tibble(
                shot_within_15s = FALSE,
                goal_within_15s = FALSE,
                xg_within_15s = 0,
                territory_gained = 0,
                counter_speed = NA
            ))
        }

        # Check for shots
        shots <- aftermath %>% filter(type.name == "Shot")

        # Calculate territory gained
        first_touch <- aftermath %>% slice(1)
        last_action <- aftermath %>% slice(n())

        start_x <- first_touch$location_x
        end_x <- last_action$location_x
        territory_gained <- if(!is.na(end_x) && !is.na(start_x)) end_x - start_x else 0

        # Counter speed (meters per second)
        time_elapsed <- last_action$game_seconds - turnover_time
        counter_speed <- if(time_elapsed > 0) territory_gained / time_elapsed else 0

        tibble(
            shot_within_15s = nrow(shots) > 0,
            goal_within_15s = any(shots$shot.outcome.name == "Goal", na.rm = TRUE),
            xg_within_15s = sum(shots$shot.statsbomb_xg, na.rm = TRUE),
            territory_gained = max(0, territory_gained),
            counter_speed = counter_speed
        )
    }

    # Apply aftermath analysis to each turnover
    turnover_analysis <- turnovers %>%
        mutate(
            aftermath = map(row_number(), ~ analyze_aftermath(turnovers[.x, ], events))
        ) %>%
        unnest(aftermath)

    # Calculate danger score
    turnover_analysis <- turnover_analysis %>%
        mutate(
            danger_score = (
                zone_danger * 0.3 +
                shot_within_15s * 2 +
                goal_within_15s * 3 +
                xg_within_15s * 2 +
                pmin(territory_gained / 40, 1) * 1.5 +
                pmin(counter_speed / 10, 1) * 1
            ),
            danger_level = case_when(
                danger_score >= 4 ~ "Critical",
                danger_score >= 2 ~ "Dangerous",
                danger_score >= 1 ~ "Moderate",
                TRUE ~ "Low"
            )
        )

    return(turnover_analysis)
}

# Player-level turnover profiles
create_player_turnover_profile <- function(turnover_analysis) {
    turnover_analysis %>%
        group_by(player.name, team.name) %>%
        summarize(
            total_turnovers = n(),
            dangerous_turnovers = sum(danger_level %in% c("Critical", "Dangerous")),
            shots_conceded = sum(shot_within_15s),
            goals_conceded = sum(goal_within_15s),
            xg_conceded = sum(xg_within_15s),
            avg_danger_score = mean(danger_score),
            pct_in_own_third = mean(turnover_zone == "Defensive Third") * 100,
            .groups = "drop"
        ) %>%
        mutate(
            danger_rate = dangerous_turnovers / total_turnovers * 100
        ) %>%
        arrange(desc(avg_danger_score))
}

# Run analysis
turnover_results <- analyze_turnovers(events)
player_profiles <- create_player_turnover_profile(turnover_results)

print("Top 10 Players by Turnover Danger:")
print(head(player_profiles, 10))
Exercise 31.3: Multi-Provider Event Data Normalizer

Task: Build a provider-agnostic event data processing system that normalizes data from multiple sources into a unified schema.

Requirements:

  • Define a common event schema supporting multiple providers
  • Implement parsers for StatsBomb and Wyscout formats
  • Handle coordinate system differences (normalize to 105x68m pitch)
  • Create data quality reports comparing provider coverage

multi_provider_normalizer.py
# Python: Multi-Provider Event Data Normalizer
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
import pandas as pd
import numpy as np

@dataclass
class CommonEventSchema:
    """Standard schema for normalized events."""
    REQUIRED_FIELDS = [
        "event_id", "match_id", "period", "minute", "second",
        "event_type", "player_id", "player_name", "team_id", "team_name",
        "location_x", "location_y"
    ]
    OPTIONAL_FIELDS = [
        "end_x", "end_y", "outcome", "body_part", "technique",
        "recipient_id", "recipient_name", "xg", "duration"
    ]
    PITCH_LENGTH = 105
    PITCH_WIDTH = 68

class EventParser(ABC):
    """Base class for event data parsers."""

    def __init__(self, provider: str, pitch_length: float, pitch_width: float):
        self.provider = provider
        self.pitch_length = pitch_length
        self.pitch_width = pitch_width

    @abstractmethod
    def parse(self, data: Any) -> pd.DataFrame:
        """Parse raw data into normalized format."""
        pass

    def normalize_x(self, x: float) -> float:
        """Normalize x coordinate to standard pitch."""
        return x * CommonEventSchema.PITCH_LENGTH / self.pitch_length

    def normalize_y(self, y: float) -> float:
        """Normalize y coordinate to standard pitch."""
        return y * CommonEventSchema.PITCH_WIDTH / self.pitch_width

    def validate(self, events: pd.DataFrame) -> Dict:
        """Validate parsed events against schema."""
        missing = [f for f in CommonEventSchema.REQUIRED_FIELDS
                   if f not in events.columns]

        complete_rows = events[CommonEventSchema.REQUIRED_FIELDS].dropna()

        return {
            "valid": len(missing) == 0,
            "missing_fields": missing,
            "n_events": len(events),
            "n_complete": len(complete_rows),
            "completeness_rate": len(complete_rows) / len(events) if len(events) > 0 else 0
        }

class StatsBombParser(EventParser):
    """Parser for StatsBomb event data."""

    def __init__(self):
        super().__init__("statsbomb", pitch_length=120, pitch_width=80)

    def parse(self, data: pd.DataFrame) -> pd.DataFrame:
        events = pd.DataFrame({
            "event_id": data["id"],
            "match_id": data.get("match_id"),
            "period": data["period"],
            "minute": data["minute"],
            "second": data["second"],
            "event_type": data["type"],
            "player_id": data.get("player_id"),
            "player_name": data.get("player"),
            "team_id": data.get("team_id"),
            "team_name": data.get("team"),
        })

        # Extract and normalize locations
        events["location_x"] = data["location"].apply(
            lambda x: self.normalize_x(x[0]) if isinstance(x, list) else None
        )
        events["location_y"] = data["location"].apply(
            lambda x: self.normalize_y(x[1]) if isinstance(x, list) else None
        )

        return events

class WyscoutParser(EventParser):
    """Parser for Wyscout event data."""

    EVENT_TYPE_MAP = {
        "Pass": "Pass",
        "Shot": "Shot",
        "Duel": "Duel",
        "Foul": "Foul",
        "Free kick": "Free Kick",
        "Offside": "Offside",
        "Others on the ball": "Other"
    }

    def __init__(self):
        super().__init__("wyscout", pitch_length=100, pitch_width=100)

    def parse(self, data: pd.DataFrame) -> pd.DataFrame:
        events = pd.DataFrame({
            "event_id": data["id"],
            "match_id": data["matchId"],
            "period": data["matchPeriod"],
            "minute": data["eventSec"] // 60,
            "second": data["eventSec"] % 60,
            "event_type": data["eventName"].map(self.EVENT_TYPE_MAP).fillna(data["eventName"]),
            "player_id": data["playerId"],
            "player_name": data.get("playerName"),
            "team_id": data["teamId"],
            "team_name": data.get("teamName"),
        })

        # Extract positions from nested structure
        def get_position(positions, idx, coord):
            try:
                return positions[0].get(coord) if positions else None
            except:
                return None

        events["location_x"] = data["positions"].apply(
            lambda x: self.normalize_x(get_position(x, 0, "x")) if get_position(x, 0, "x") else None
        )
        events["location_y"] = data["positions"].apply(
            lambda x: self.normalize_y(get_position(x, 0, "y")) if get_position(x, 0, "y") else None
        )

        return events

class EventNormalizer:
    """Main class for normalizing events from multiple providers."""

    PARSERS = {
        "statsbomb": StatsBombParser,
        "wyscout": WyscoutParser
    }

    @classmethod
    def get_parser(cls, provider: str) -> EventParser:
        """Factory method to get appropriate parser."""
        if provider.lower() not in cls.PARSERS:
            raise ValueError(f"Unknown provider: {provider}")
        return cls.PARSERS[provider.lower()]()

    @classmethod
    def normalize(cls, data: Any, provider: str) -> pd.DataFrame:
        """Normalize data from specified provider."""
        parser = cls.get_parser(provider)
        normalized = parser.parse(data)
        validation = parser.validate(normalized)

        print(f"Normalized {validation['n_events']} events from {provider}")
        print(f"Completeness: {validation['completeness_rate']:.1%}")

        return normalized

    @staticmethod
    def compare_providers(datasets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
        """Compare coverage across providers."""
        comparisons = []

        for provider, events in datasets.items():
            comparisons.append({
                "provider": provider,
                "total_events": len(events),
                "unique_types": events["event_type"].nunique(),
                "with_location": events["location_x"].notna().sum(),
                "passes": (events["event_type"] == "Pass").sum(),
                "shots": (events["event_type"] == "Shot").sum(),
                "avg_per_match": len(events) / events["match_id"].nunique()
            })

        return pd.DataFrame(comparisons)

# Example usage
from statsbombpy import sb

# Load and normalize StatsBomb data
sb_events = sb.events(match_id=3788741)
normalizer = EventNormalizer()
normalized_sb = normalizer.normalize(sb_events, "statsbomb")

print("\nNormalized Schema Sample:")
print(normalized_sb.head())
# R: Multi-Provider Event Data Normalizer
library(tidyverse)
library(jsonlite)
library(R6)

# Define common event schema
CommonEventSchema <- list(
    required_fields = c(
        "event_id", "match_id", "period", "minute", "second",
        "event_type", "player_id", "player_name", "team_id", "team_name",
        "location_x", "location_y"
    ),
    optional_fields = c(
        "end_x", "end_y", "outcome", "body_part", "technique",
        "recipient_id", "recipient_name", "xg", "duration"
    ),
    pitch_dims = list(length = 105, width = 68)
)

# Base parser class
EventParser <- R6Class("EventParser",
    public = list(
        provider = NULL,
        pitch_dims = NULL,

        initialize = function(provider) {
            self$provider <- provider
        },

        parse = function(data) {
            stop("Subclass must implement parse()")
        },

        normalize_coordinates = function(x, y, source_dims) {
            list(
                x = x * CommonEventSchema$pitch_dims$length / source_dims$length,
                y = y * CommonEventSchema$pitch_dims$width / source_dims$width
            )
        },

        validate = function(events) {
            missing_cols <- setdiff(
                CommonEventSchema$required_fields,
                names(events)
            )

            list(
                valid = length(missing_cols) == 0,
                missing = missing_cols,
                n_events = nrow(events),
                n_complete = sum(complete.cases(events[, CommonEventSchema$required_fields]))
            )
        }
    )
)

# StatsBomb parser
StatsBombParser <- R6Class("StatsBombParser",
    inherit = EventParser,
    public = list(
        initialize = function() {
            super$initialize("statsbomb")
            self$pitch_dims <- list(length = 120, width = 80)
        },

        parse = function(data) {
            events <- data %>%
                transmute(
                    event_id = id,
                    match_id = match_id,
                    period = period,
                    minute = minute,
                    second = second,
                    event_type = type.name,
                    player_id = player.id,
                    player_name = player.name,
                    team_id = team.id,
                    team_name = team.name,
                    raw_x = map_dbl(location, ~ if(!is.null(.x)) .x[1] else NA),
                    raw_y = map_dbl(location, ~ if(!is.null(.x)) .x[2] else NA)
                )

            # Normalize coordinates
            events <- events %>%
                mutate(
                    location_x = raw_x * 105 / 120,
                    location_y = raw_y * 68 / 80
                ) %>%
                select(-raw_x, -raw_y)

            events
        }
    )
)

# Wyscout parser
WyscoutParser <- R6Class("WyscoutParser",
    inherit = EventParser,
    public = list(
        initialize = function() {
            super$initialize("wyscout")
            self$pitch_dims <- list(length = 100, width = 100)  # Percentage
        },

        parse = function(data) {
            events <- data %>%
                transmute(
                    event_id = id,
                    match_id = matchId,
                    period = matchPeriod,
                    minute = floor(eventSec / 60),
                    second = eventSec %% 60,
                    event_type = self$map_event_type(eventName),
                    player_id = playerId,
                    player_name = playerName,
                    team_id = teamId,
                    team_name = teamName,
                    raw_x = map_dbl(positions, ~ if(length(.x) > 0) .x[[1]]$x else NA),
                    raw_y = map_dbl(positions, ~ if(length(.x) > 0) .x[[1]]$y else NA)
                )

            # Normalize coordinates (Wyscout uses 0-100 percentage)
            events <- events %>%
                mutate(
                    location_x = raw_x * 105 / 100,
                    location_y = raw_y * 68 / 100
                ) %>%
                select(-raw_x, -raw_y)

            events
        },

        map_event_type = function(wyscout_type) {
            type_mapping <- c(
                "Pass" = "Pass",
                "Shot" = "Shot",
                "Duel" = "Duel",
                "Foul" = "Foul",
                "Free kick" = "Free Kick",
                "Offside" = "Offside",
                "Others on the ball" = "Other"
            )
            ifelse(wyscout_type %in% names(type_mapping),
                   type_mapping[wyscout_type],
                   wyscout_type)
        }
    )
)

# Factory function
create_parser <- function(provider) {
    switch(tolower(provider),
        "statsbomb" = StatsBombParser$new(),
        "wyscout" = WyscoutParser$new(),
        stop(paste("Unknown provider:", provider))
    )
}

# Data quality comparison
compare_provider_coverage <- function(sb_events, ws_events) {
    comparison <- tibble(
        metric = c(
            "Total Events",
            "Unique Event Types",
            "Events with Location",
            "Pass Events",
            "Shot Events",
            "Avg Events per Match"
        ),
        statsbomb = c(
            nrow(sb_events),
            n_distinct(sb_events$event_type),
            sum(!is.na(sb_events$location_x)),
            sum(sb_events$event_type == "Pass"),
            sum(sb_events$event_type == "Shot"),
            nrow(sb_events) / n_distinct(sb_events$match_id)
        ),
        wyscout = c(
            nrow(ws_events),
            n_distinct(ws_events$event_type),
            sum(!is.na(ws_events$location_x)),
            sum(ws_events$event_type == "Pass"),
            sum(ws_events$event_type == "Shot"),
            nrow(ws_events) / n_distinct(ws_events$match_id)
        )
    )

    comparison
}

# Example usage
sb_parser <- create_parser("statsbomb")
# sb_normalized <- sb_parser$parse(statsbomb_data)
# validation <- sb_parser$validate(sb_normalized)
print("Multi-provider normalizer ready")

Chapter Summary

Key Takeaways
  • Event data structure: Events have core fields (type, player, team, location, timestamp) plus type-specific qualifiers
  • Provider differences: Different providers use different schemas; build provider-agnostic parsers when possible
  • Derived events: Create higher-level events (progressive passes, possession sequences) from raw data
  • Custom metrics: Raw event data enables building metrics tailored to specific questions
  • Data quality: Always validate event data; human tagging introduces errors
  • Optimization: Use vectorized operations, Parquet storage, and parallel processing for large datasets
Event Data Processing Pipeline
  1. Load and parse raw JSON/XML data
  2. Validate data quality and flag issues
  3. Standardize to common schema
  4. Create derived events and sequences
  5. Calculate custom metrics
  6. Store in efficient format (Parquet) for future use