Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)
Learning Objectives
  • Understand the requirements and challenges of real-time analytics
  • Design streaming data pipelines for live match data
  • Calculate rolling metrics and live xG during matches
  • Build real-time win probability models
  • Create live dashboards for in-game decision support
  • Implement momentum detection and game state analysis
  • Generate automated alerts and tactical triggers
  • Handle data latency and update strategies

The Challenge of Real-Time Analytics

Real-time analytics transforms how teams operate during matches. Instead of post-match analysis, coaches receive live insights that can influence in-game decisions—substitutions, tactical adjustments, and set piece strategies. This chapter covers the technical and analytical challenges of building real-time systems.

Real-Time Constraints

Real-time analytics operates under strict constraints: sub-second latency requirements, incomplete data, and the need for robust handling of missing or delayed events. Systems must be designed for reliability under pressure.

Low Latency
Updates in <1 second
Rolling Windows
Last 5-10 minute trends
Smart Alerts
Trigger-based notifications
Reliability
Robust error handling
realtime_framework.py
# Python: Real-Time Analytics Framework
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import pandas as pd
from datetime import datetime

@dataclass
class LiveMatchState:
    """Manages live match state and real-time analytics."""
    match_id: str
    home_team: str
    away_team: str
    events: List[Dict] = field(default_factory=list)
    current_minute: int = 0
    score: Dict[str, int] = field(default_factory=lambda: {"home": 0, "away": 0})
    xg: Dict[str, float] = field(default_factory=lambda: {"home": 0.0, "away": 0.0})
    metrics: Dict = field(default_factory=dict)
    alerts: List[str] = field(default_factory=list)

    def add_event(self, event: Dict) -> List[str]:
        """Add new event and update all metrics."""
        self.events.append(event)
        self.current_minute = max(event.get("minute", 0), self.current_minute)

        # Update metrics
        self._update_metrics(event)

        # Check for alerts
        new_alerts = self._check_alerts(event)
        self.alerts.extend(new_alerts)

        return new_alerts

    def _update_metrics(self, event: Dict):
        """Update score and xG based on new event."""
        # Update score
        if event.get("type") == "Shot" and event.get("outcome") == "Goal":
            team_key = "home" if event["team"] == self.home_team else "away"
            self.score[team_key] += 1

        # Update xG
        if event.get("type") == "Shot":
            xg_value = event.get("xg", 0)
            team_key = "home" if event["team"] == self.home_team else "away"
            self.xg[team_key] += xg_value

        # Calculate rolling metrics
        self._calculate_rolling_metrics()

    def _calculate_rolling_metrics(self, window_minutes: int = 10):
        """Calculate rolling window metrics."""
        cutoff = self.current_minute - window_minutes
        recent = [e for e in self.events if e.get("minute", 0) >= cutoff]

        if not recent:
            return

        home_events = [e for e in recent if e.get("team") == self.home_team]
        away_events = [e for e in recent if e.get("team") == self.away_team]

        self.metrics["rolling"] = {
            "home_possession": len(home_events) / len(recent) * 100 if recent else 50,
            "home_shots_10min": sum(1 for e in home_events if e.get("type") == "Shot"),
            "away_shots_10min": sum(1 for e in away_events if e.get("type") == "Shot"),
            "home_pressure": sum(1 for e in home_events if e.get("type") == "Pressure"),
            "away_pressure": sum(1 for e in away_events if e.get("type") == "Pressure"),
        }

    def _check_alerts(self, event: Dict) -> List[str]:
        """Check for tactical alerts based on new event."""
        alerts = []

        # High xG shot alert
        if event.get("type") == "Shot" and event.get("xg", 0) > 0.3:
            alerts.append(f"HIGH xG CHANCE ({event['xg']:.2f}) - {event.get('player')}")

        # Momentum shift alert
        rolling = self.metrics.get("rolling", {})
        if rolling.get("away_shots_10min", 0) >= 4 and rolling.get("home_shots_10min", 0) <= 1:
            alerts.append("MOMENTUM SHIFT: Opponent dominating last 10 min")

        return alerts

    def get_state(self) -> Dict:
        """Get current match state summary."""
        return {
            "minute": self.current_minute,
            "score": self.score,
            "xg": self.xg,
            "metrics": self.metrics,
            "n_events": len(self.events),
            "recent_alerts": self.alerts[-5:] if self.alerts else []
        }

# Usage
match_state = LiveMatchState(
    match_id="match_001",
    home_team="Barcelona",
    away_team="Real Madrid"
)
print(match_state.get_state())
# R: Real-Time Analytics Framework
library(R6)
library(tidyverse)

# Define a Live Match State Manager
LiveMatchState <- R6Class("LiveMatchState",
    public = list(
        match_id = NULL,
        events = NULL,
        current_minute = 0,
        home_team = NULL,
        away_team = NULL,
        score = list(home = 0, away = 0),
        xg = list(home = 0, away = 0),
        metrics = list(),

        initialize = function(match_id, home_team, away_team) {
            self$match_id <- match_id
            self$home_team <- home_team
            self$away_team <- away_team
            self$events <- tibble()
            self$metrics <- list()
        },

        add_event = function(event) {
            # Validate and add event
            self$events <- bind_rows(self$events, event)
            self$current_minute <- max(event$minute, self$current_minute)

            # Update metrics
            self$update_metrics(event)

            # Check for alerts
            self$check_alerts(event)
        },

        update_metrics = function(event) {
            # Update score
            if (event$type == "Shot" && event$outcome == "Goal") {
                if (event$team == self$home_team) {
                    self$score$home <- self$score$home + 1
                } else {
                    self$score$away <- self$score$away + 1
                }
            }

            # Update xG
            if (event$type == "Shot") {
                if (event$team == self$home_team) {
                    self$xg$home <- self$xg$home + event$xg
                } else {
                    self$xg$away <- self$xg$away + event$xg
                }
            }

            # Calculate rolling metrics
            self$calculate_rolling_metrics()
        },

        calculate_rolling_metrics = function(window_minutes = 10) {
            recent <- self$events %>%
                filter(minute >= self$current_minute - window_minutes)

            self$metrics$rolling <- list(
                home_possession = mean(recent$team == self$home_team, na.rm = TRUE) * 100,
                home_shots_10min = sum(recent$type == "Shot" & recent$team == self$home_team),
                away_shots_10min = sum(recent$type == "Shot" & recent$team == self$away_team),
                home_pressure = sum(recent$type == "Pressure" & recent$team == self$home_team),
                away_pressure = sum(recent$type == "Pressure" & recent$team == self$away_team)
            )
        },

        check_alerts = function(event) {
            # Example alerts
            alerts <- list()

            # High xG shot alert
            if (event$type == "Shot" && event$xg > 0.3) {
                alerts <- c(alerts, sprintf("HIGH xG CHANCE (%.2f) - %s",
                                           event$xg, event$player))
            }

            # Momentum shift alert
            if (!is.null(self$metrics$rolling)) {
                if (self$metrics$rolling$away_shots_10min >= 4 &&
                    self$metrics$rolling$home_shots_10min <= 1) {
                    alerts <- c(alerts, "MOMENTUM SHIFT: Opponent dominating last 10 min")
                }
            }

            if (length(alerts) > 0) {
                cat(paste(alerts, collapse = "\n"), "\n")
            }
        },

        get_state = function() {
            list(
                minute = self$current_minute,
                score = self$score,
                xg = self$xg,
                metrics = self$metrics,
                n_events = nrow(self$events)
            )
        }
    )
)

# Usage
match_state <- LiveMatchState$new("match_001", "Barcelona", "Real Madrid")
print(match_state$get_state())
Output
{'minute': 0, 'score': {'home': 0, 'away': 0},
'xg': {'home': 0.0, 'away': 0.0}, 'metrics': {},
'n_events': 0, 'recent_alerts': []}

Streaming Data Pipelines

Real-time analytics requires robust data pipelines that can handle continuous streams of events. We'll cover both polling and push-based approaches.

streaming_pipeline.py
# Python: Simulated Streaming Data Handler
import asyncio
from typing import Callable, Optional, Generator
import pandas as pd

class EventStream:
    """Simulated event stream for live match data."""

    def __init__(self, events_df: pd.DataFrame, speed_factor: float = 1.0):
        self.events = events_df.sort_values(["minute", "second"]).reset_index(drop=True)
        self.events["game_time"] = self.events["minute"] * 60 + self.events["second"]
        self.current_index = 0
        self.speed_factor = speed_factor

    def get_next_event(self) -> Optional[dict]:
        """Get the next event in the stream."""
        if self.current_index >= len(self.events):
            return None

        event = self.events.iloc[self.current_index].to_dict()
        self.current_index += 1
        return event

    def has_more(self) -> bool:
        """Check if there are more events."""
        return self.current_index < len(self.events)

    def __iter__(self) -> Generator:
        """Iterate through events."""
        while self.has_more():
            yield self.get_next_event()

async def process_live_match(match_state: LiveMatchState,
                              event_stream: EventStream,
                              callback: Optional[Callable] = None):
    """Process live match events asynchronously."""
    for event in event_stream:
        if event:
            alerts = match_state.add_event(event)

            if callback:
                await callback(match_state.get_state(), alerts)

            # Simulate real-time delay
            await asyncio.sleep(0.1 / event_stream.speed_factor)

    return match_state

# Example callback
async def live_update_callback(state: dict, alerts: list):
    """Print live updates."""
    print(f"\rMinute {state['minute']} | "
          f"Score: {state['score']['home']}-{state['score']['away']} | "
          f"xG: {state['xg']['home']:.2f}-{state['xg']['away']:.2f}",
          end="")

    for alert in alerts:
        print(f"\n*** ALERT: {alert} ***")

# Run the live match processor
# asyncio.run(process_live_match(match_state, event_stream, live_update_callback))
# R: Simulated Streaming Data Handler
library(tidyverse)
library(later)

# Simulated event stream (in production, this would be a websocket or API)
create_event_stream <- function(events_df, speed_factor = 1) {
    events_queue <- events_df %>%
        arrange(minute, second) %>%
        mutate(
            game_time = minute * 60 + second,
            event_index = row_number()
        )

    list(
        events = events_queue,
        current_index = 1,
        speed_factor = speed_factor,

        get_next_event = function(self) {
            if (self$current_index > nrow(self$events)) {
                return(NULL)  # Match ended
            }

            event <- self$events[self$current_index, ]
            self$current_index <- self$current_index + 1
            return(event)
        },

        has_more = function(self) {
            self$current_index <= nrow(self$events)
        }
    )
}

# Process streaming events
process_live_match <- function(match_state, event_stream, callback = NULL) {
    while (event_stream$has_more(event_stream)) {
        event <- event_stream$get_next_event(event_stream)

        if (!is.null(event)) {
            match_state$add_event(event)

            # Call callback with updated state
            if (!is.null(callback)) {
                callback(match_state$get_state())
            }

            # Simulate real-time delay (for demo)
            Sys.sleep(0.1 / event_stream$speed_factor)
        }
    }

    return(match_state)
}

# Example callback for live updates
live_update_callback <- function(state) {
    cat(sprintf(
        "\rMinute %d | Score: %d-%d | xG: %.2f-%.2f",
        state$minute,
        state$score$home, state$score$away,
        state$xg$home, state$xg$away
    ))
}

Live Win Probability

Win probability models estimate the likelihood of each outcome based on current game state. These update continuously as the match progresses.

win_probability.py
# Python: Live Win Probability Model
import numpy as np
from scipy import stats
from typing import Dict

def calculate_win_probability(home_goals: int, away_goals: int,
                               home_xg: float, away_xg: float,
                               minute: int, n_sims: int = 10000) -> Dict[str, float]:
    """Calculate win probability using Poisson simulation."""
    # Remaining time factor
    remaining_pct = (90 - minute) / 90

    # Calculate xG rate and regress to mean
    safe_minute = max(minute, 1)
    xg_rate_home = home_xg / safe_minute * 90
    xg_rate_away = away_xg / safe_minute * 90

    # Regress to league average
    league_avg_xg = 1.3
    regression_factor = 0.3

    projected_home_xg = xg_rate_home * (1 - regression_factor) + league_avg_xg * regression_factor
    projected_away_xg = xg_rate_away * (1 - regression_factor) + league_avg_xg * regression_factor

    # Expected goals remaining
    remaining_home_xg = projected_home_xg * remaining_pct
    remaining_away_xg = projected_away_xg * remaining_pct

    # Simulate outcomes
    np.random.seed(42)  # For reproducibility in examples
    home_remaining = np.random.poisson(remaining_home_xg, n_sims)
    away_remaining = np.random.poisson(remaining_away_xg, n_sims)

    home_final = home_goals + home_remaining
    away_final = away_goals + away_remaining

    # Calculate probabilities
    home_win_prob = (home_final > away_final).mean() * 100
    draw_prob = (home_final == away_final).mean() * 100
    away_win_prob = (home_final < away_final).mean() * 100

    return {
        "home_win": round(home_win_prob, 1),
        "draw": round(draw_prob, 1),
        "away_win": round(away_win_prob, 1)
    }

def track_win_probability(match_state: LiveMatchState) -> Dict[str, float]:
    """Get current win probability from match state."""
    state = match_state.get_state()

    return calculate_win_probability(
        home_goals=state["score"]["home"],
        away_goals=state["score"]["away"],
        home_xg=state["xg"]["home"],
        away_xg=state["xg"]["away"],
        minute=state["minute"]
    )

# Example
probs = calculate_win_probability(
    home_goals=1, away_goals=0,
    home_xg=1.2, away_xg=0.8,
    minute=60
)

print(f"Win Probability at minute 60 (1-0):")
print(f"Home Win: {probs['home_win']}%")
print(f"Draw: {probs['draw']}%")
print(f"Away Win: {probs['away_win']}%")
# R: Live Win Probability Model
library(tidyverse)

calculate_win_probability <- function(home_goals, away_goals, home_xg, away_xg,
                                       minute, is_home = TRUE) {
    # Remaining time factor
    remaining_pct <- (90 - minute) / 90

    # Expected goals remaining (based on current xG rate, regression to mean)
    xg_rate_home <- home_xg / max(minute, 1) * 90
    xg_rate_away <- away_xg / max(minute, 1) * 90

    # Regress to league average rates
    league_avg_xg <- 1.3  # Average xG per team per game
    regression_factor <- 0.3  # How much to regress

    projected_home_xg <- xg_rate_home * (1 - regression_factor) + league_avg_xg * regression_factor
    projected_away_xg <- xg_rate_away * (1 - regression_factor) + league_avg_xg * regression_factor

    # Expected goals remaining
    remaining_home_xg <- projected_home_xg * remaining_pct
    remaining_away_xg <- projected_away_xg * remaining_pct

    # Simulate outcomes using Poisson
    n_sims <- 10000
    home_final <- home_goals + rpois(n_sims, remaining_home_xg)
    away_final <- away_goals + rpois(n_sims, remaining_away_xg)

    # Calculate probabilities
    home_win_prob <- mean(home_final > away_final)
    draw_prob <- mean(home_final == away_final)
    away_win_prob <- mean(home_final < away_final)

    return(list(
        home_win = round(home_win_prob * 100, 1),
        draw = round(draw_prob * 100, 1),
        away_win = round(away_win_prob * 100, 1)
    ))
}

# Calculate live win probability throughout match
track_win_probability <- function(match_state) {
    state <- match_state$get_state()

    probs <- calculate_win_probability(
        home_goals = state$score$home,
        away_goals = state$score$away,
        home_xg = state$xg$home,
        away_xg = state$xg$away,
        minute = state$minute
    )

    return(probs)
}

# Example: Track probability over a match
probs <- calculate_win_probability(
    home_goals = 1, away_goals = 0,
    home_xg = 1.2, away_xg = 0.8,
    minute = 60
)

cat(sprintf("Win Probability at minute 60 (1-0):\n"))
cat(sprintf("Home Win: %.1f%%\n", probs$home_win))
cat(sprintf("Draw: %.1f%%\n", probs$draw))
cat(sprintf("Away Win: %.1f%%\n", probs$away_win))
Output
Win Probability at minute 60 (1-0):
Home Win: 71.2%
Draw: 19.8%
Away Win: 9.0%

Momentum Detection

Momentum captures which team is currently dominating. Detecting momentum shifts early allows proactive tactical adjustments.

momentum_detection.py
# Python: Momentum Detection System
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np

@dataclass
class MomentumTracker:
    """Tracks and analyzes match momentum."""
    events: List[Dict] = field(default_factory=list)
    momentum_history: List[Dict] = field(default_factory=list)
    current_momentum: float = 0.0  # -1 to 1 scale

    # Event weights for momentum calculation
    EVENT_WEIGHTS = {
        "Shot": 3.0,
        "Key Pass": 2.0,
        "Progressive Pass": 1.0,
        "Pressure": 0.5,
        "Ball Recovery": 1.0,
        "Dribble": 1.0,
    }

    def add_events(self, new_events: List[Dict], home_team: str) -> float:
        """Add events and recalculate momentum."""
        self.events.extend(new_events)
        return self.calculate_momentum(home_team)

    def calculate_momentum(self, home_team: str, window_minutes: int = 5) -> float:
        """Calculate current momentum based on recent events."""
        if not self.events:
            return 0.0

        current_minute = max(e.get("minute", 0) for e in self.events)
        cutoff = current_minute - window_minutes
        recent = [e for e in self.events if e.get("minute", 0) >= cutoff]

        if not recent:
            return 0.0

        home_score = 0.0
        away_score = 0.0

        for event in recent:
            weight = self.EVENT_WEIGHTS.get(event.get("type"), 0.5)

            # Bonus for final third actions
            if event.get("location_x", 0) > 80:
                weight *= 1.5

            if event.get("team") == home_team:
                home_score += weight
            else:
                away_score += weight

        # Normalize to -1 to 1
        total = home_score + away_score
        if total > 0:
            self.current_momentum = (home_score - away_score) / total
        else:
            self.current_momentum = 0.0

        # Track history
        self.momentum_history.append({
            "minute": current_minute,
            "momentum": self.current_momentum
        })

        return self.current_momentum

    def detect_momentum_shift(self, threshold: float = 0.3) -> Optional[str]:
        """Detect significant momentum shifts."""
        if len(self.momentum_history) < 2:
            return None

        recent = self.momentum_history[-2:]
        change = recent[1]["momentum"] - recent[0]["momentum"]

        if abs(change) > threshold:
            direction = "HOME" if change > 0 else "AWAY"
            return f"MOMENTUM SHIFT: {direction} gaining control ({change:+.2f})"

        return None

    def get_momentum_label(self) -> str:
        """Get human-readable momentum label."""
        m = self.current_momentum
        if m > 0.3:
            return "HOME DOMINANT"
        elif m < -0.3:
            return "AWAY DOMINANT"
        elif m > 0.1:
            return "HOME SLIGHT EDGE"
        elif m < -0.1:
            return "AWAY SLIGHT EDGE"
        else:
            return "BALANCED"

# Usage
momentum_tracker = MomentumTracker()
print("Momentum tracking initialized")
# R: Momentum Detection System
library(tidyverse)

MomentumTracker <- R6Class("MomentumTracker",
    public = list(
        events = NULL,
        momentum_history = NULL,
        current_momentum = 0,  # -1 to 1 scale

        initialize = function() {
            self$events <- tibble()
            self$momentum_history <- tibble()
        },

        add_events = function(new_events, home_team) {
            self$events <- bind_rows(self$events, new_events)
            self$calculate_momentum(home_team)
        },

        calculate_momentum = function(home_team, window = 5) {
            if (nrow(self$events) == 0) return(0)

            current_minute <- max(self$events$minute)
            recent <- self$events %>%
                filter(minute >= current_minute - window)

            if (nrow(recent) == 0) return(0)

            # Weight different event types
            event_weights <- list(
                Shot = 3,
                `Key Pass` = 2,
                `Progressive Pass` = 1,
                Pressure = 0.5,
                `Ball Recovery` = 1,
                Dribble = 1
            )

            # Calculate momentum score
            home_score <- 0
            away_score <- 0

            for (i in 1:nrow(recent)) {
                event <- recent[i, ]
                weight <- event_weights[[event$type]] %||% 0.5

                # Bonus for final third actions
                if (!is.na(event$location_x) && event$location_x > 80) {
                    weight <- weight * 1.5
                }

                if (event$team == home_team) {
                    home_score <- home_score + weight
                } else {
                    away_score <- away_score + weight
                }
            }

            # Normalize to -1 to 1 scale
            total <- home_score + away_score
            if (total > 0) {
                self$current_momentum <- (home_score - away_score) / total
            } else {
                self$current_momentum <- 0
            }

            # Track history
            self$momentum_history <- bind_rows(
                self$momentum_history,
                tibble(
                    minute = current_minute,
                    momentum = self$current_momentum
                )
            )

            return(self$current_momentum)
        },

        detect_momentum_shift = function(threshold = 0.3) {
            if (nrow(self$momentum_history) < 2) return(NULL)

            recent <- tail(self$momentum_history, 2)
            change <- recent$momentum[2] - recent$momentum[1]

            if (abs(change) > threshold) {
                direction <- ifelse(change > 0, "HOME", "AWAY")
                return(sprintf("MOMENTUM SHIFT: %s gaining control (%.2f change)",
                              direction, change))
            }

            return(NULL)
        },

        get_momentum_label = function() {
            m <- self$current_momentum
            if (m > 0.3) return("HOME DOMINANT")
            if (m < -0.3) return("AWAY DOMINANT")
            if (m > 0.1) return("HOME SLIGHT EDGE")
            if (m < -0.1) return("AWAY SLIGHT EDGE")
            return("BALANCED")
        }
    )
)

# Usage
momentum <- MomentumTracker$new()
cat("Momentum tracking initialized\n")

Building Live Dashboards

Live dashboards synthesize real-time metrics into actionable displays for coaching staff and analysts.

live_dashboard.py
# Python: Live Match Dashboard with Streamlit
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import time

def create_live_dashboard():
    """Create a Streamlit live match dashboard."""
    st.set_page_config(page_title="Live Match Analytics", layout="wide")
    st.title("Live Match Analytics Dashboard")

    # Initialize session state
    if "match_state" not in st.session_state:
        st.session_state.match_state = None

    # Create placeholder for auto-refresh
    placeholder = st.empty()

    # Simulate live updates (in production, fetch from API)
    while True:
        with placeholder.container():
            state = st.session_state.match_state

            # Row 1: Key Metrics
            col1, col2, col3, col4 = st.columns(4)

            with col1:
                if state:
                    st.metric(
                        "Score",
                        f"{state['score']['home']} - {state['score']['away']}",
                        f"Minute {state['minute']}"
                    )
                else:
                    st.metric("Score", "--", "Waiting...")

            with col2:
                if state:
                    st.metric(
                        "xG",
                        f"{state['xg']['home']:.2f} - {state['xg']['away']:.2f}"
                    )
                else:
                    st.metric("xG", "--")

            with col3:
                if state:
                    probs = calculate_win_probability(
                        state["score"]["home"], state["score"]["away"],
                        state["xg"]["home"], state["xg"]["away"],
                        state["minute"]
                    )
                    st.metric("Home Win Prob", f"{probs['home_win']}%")
                else:
                    st.metric("Home Win Prob", "--")

            with col4:
                momentum_label = state.get("momentum_label", "BALANCED") if state else "--"
                st.metric("Momentum", momentum_label)

            # Row 2: Charts
            col_left, col_right = st.columns(2)

            with col_left:
                st.subheader("xG Flow")
                if state and "xg_timeline" in state:
                    fig = create_xg_flow_chart(state["xg_timeline"])
                    st.plotly_chart(fig, use_container_width=True)
                else:
                    st.info("Waiting for match data...")

            with col_right:
                st.subheader("Win Probability")
                if state and "win_prob_history" in state:
                    fig = create_win_prob_chart(state["win_prob_history"])
                    st.plotly_chart(fig, use_container_width=True)
                else:
                    st.info("Waiting for match data...")

            # Row 3: Alerts and Stats
            col_a, col_b, col_c = st.columns(3)

            with col_a:
                st.subheader("Live Alerts")
                if state and state.get("recent_alerts"):
                    for alert in state["recent_alerts"]:
                        st.warning(alert)
                else:
                    st.info("No recent alerts")

            with col_b:
                st.subheader("Rolling Stats (10 min)")
                if state and state.get("metrics", {}).get("rolling"):
                    rolling = state["metrics"]["rolling"]
                    st.write(f"Home Possession: {rolling['home_possession']:.1f}%")
                    st.write(f"Home Shots: {rolling['home_shots_10min']}")
                    st.write(f"Away Shots: {rolling['away_shots_10min']}")

            with col_c:
                st.subheader("Recent Events")
                if state and "recent_events" in state:
                    for event in state["recent_events"][-5:]:
                        st.text(f"{event['minute']}' - {event['type']} ({event['team']})")

        # Refresh interval
        time.sleep(1)

def create_xg_flow_chart(timeline_data):
    """Create xG flow chart."""
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=timeline_data["minute"],
        y=timeline_data["home_xg"],
        name="Home xG",
        line=dict(color="#2E7D32", width=2)
    ))
    fig.add_trace(go.Scatter(
        x=timeline_data["minute"],
        y=timeline_data["away_xg"],
        name="Away xG",
        line=dict(color="#D32F2F", width=2)
    ))
    fig.update_layout(
        xaxis_title="Minute",
        yaxis_title="Cumulative xG",
        height=300
    )
    return fig

# Run: streamlit run live_dashboard.py
# R: Live Match Dashboard with Shiny
library(shiny)
library(shinydashboard)
library(plotly)

# Dashboard UI
ui <- dashboardPage(
    dashboardHeader(title = "Live Match Analytics"),

    dashboardSidebar(disable = TRUE),

    dashboardBody(
        # Row 1: Key Metrics
        fluidRow(
            valueBoxOutput("score_box", width = 3),
            valueBoxOutput("xg_box", width = 3),
            valueBoxOutput("momentum_box", width = 3),
            valueBoxOutput("win_prob_box", width = 3)
        ),

        # Row 2: Charts
        fluidRow(
            box(title = "xG Flow", status = "primary", width = 6,
                plotlyOutput("xg_flow_chart")),
            box(title = "Win Probability", status = "success", width = 6,
                plotlyOutput("win_prob_chart"))
        ),

        # Row 3: Alerts and Details
        fluidRow(
            box(title = "Live Alerts", status = "warning", width = 4,
                uiOutput("alerts_list")),
            box(title = "Rolling Stats (10 min)", status = "info", width = 4,
                tableOutput("rolling_stats")),
            box(title = "Recent Events", status = "primary", width = 4,
                tableOutput("recent_events"))
        )
    )
)

# Dashboard Server
server <- function(input, output, session) {
    # Reactive values to store match state
    match_state <- reactiveVal(NULL)

    # Auto-refresh every second
    autoInvalidate <- reactiveTimer(1000)

    # Update match state
    observe({
        autoInvalidate()
        # In production: fetch latest state from API
        # state <- fetch_match_state(match_id)
        # match_state(state)
    })

    # Score box
    output$score_box <- renderValueBox({
        state <- match_state()
        if (is.null(state)) {
            valueBox("--", "Score", icon = icon("futbol"))
        } else {
            valueBox(
                sprintf("%d - %d", state$score$home, state$score$away),
                sprintf("Minute %d", state$minute),
                icon = icon("futbol"),
                color = "green"
            )
        }
    })

    # xG box
    output$xg_box <- renderValueBox({
        state <- match_state()
        if (is.null(state)) {
            valueBox("--", "xG", icon = icon("chart-line"))
        } else {
            valueBox(
                sprintf("%.2f - %.2f", state$xg$home, state$xg$away),
                "Expected Goals",
                icon = icon("chart-line"),
                color = "blue"
            )
        }
    })

    # Momentum box
    output$momentum_box <- renderValueBox({
        state <- match_state()
        if (is.null(state)) {
            valueBox("--", "Momentum")
        } else {
            momentum_label <- state$momentum_label %||% "BALANCED"
            color <- if (grepl("HOME", momentum_label)) "green" else
                     if (grepl("AWAY", momentum_label)) "red" else "yellow"
            valueBox(momentum_label, "Current Momentum", color = color)
        }
    })
}

Live xG Calculation

Calculating xG in real-time requires efficient model inference with minimal latency. Pre-computed models and optimized feature extraction are essential.

live_xg_calculator.py
# Python: Optimized Live xG Calculator
import numpy as np
import pickle
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class ShotEvent:
    """Represents a shot event for xG calculation."""
    location_x: float
    location_y: float
    body_part: str = "foot"
    play_pattern: str = "open play"
    under_pressure: bool = False
    first_touch: bool = False

class LiveXGCalculator:
    """Optimized xG calculator for live match use."""

    def __init__(self, model_path: Optional[str] = None):
        self.model = None
        self.feature_means = None
        self.feature_sds = None

        if model_path:
            self._load_model(model_path)

    def _load_model(self, model_path: str):
        """Load pre-trained model and normalization params."""
        import xgboost as xgb
        self.model = xgb.Booster()
        self.model.load_model(model_path)

        with open(f"{model_path}_params.pkl", "rb") as f:
            params = pickle.load(f)
            self.feature_means = params["means"]
            self.feature_sds = params["sds"]

    def extract_features(self, shot: ShotEvent, match_context: Dict) -> np.ndarray:
        """Extract features from shot event."""
        x, y = shot.location_x, shot.location_y

        # Distance and angle to goal
        goal_x, goal_y = 120, 40
        distance = np.sqrt((goal_x - x)**2 + (goal_y - y)**2)
        angle = np.degrees(np.arctan2(abs(y - goal_y), goal_x - x))

        features = np.array([
            distance,
            angle,
            float(shot.body_part == "foot"),
            float(shot.body_part == "head"),
            float(shot.play_pattern == "open play"),
            float(shot.play_pattern == "counter"),
            float(shot.play_pattern in ["corner", "free kick"]),
            float(shot.under_pressure),
            float(shot.first_touch),
            float(match_context.get("home_trailing", False)),
            match_context.get("minute", 45),
            match_context.get("minute", 45) ** 2
        ])

        return features

    def normalize_features(self, features: np.ndarray) -> np.ndarray:
        """Normalize features using stored parameters."""
        if self.feature_means is not None:
            return (features - self.feature_means) / self.feature_sds
        return features

    def predict_xg(self, shot: ShotEvent, match_context: Dict) -> float:
        """Predict xG for a single shot."""
        if self.model is None:
            return self._simple_xg(shot)

        features = self.extract_features(shot, match_context)
        features_norm = self.normalize_features(features)

        import xgboost as xgb
        dmatrix = xgb.DMatrix(features_norm.reshape(1, -1))
        xg = self.model.predict(dmatrix)[0]

        return np.clip(xg, 0.01, 0.99)

    def _simple_xg(self, shot: ShotEvent) -> float:
        """Simple fallback xG calculation."""
        x, y = shot.location_x, shot.location_y
        goal_x, goal_y = 120, 40

        distance = np.sqrt((goal_x - x)**2 + (goal_y - y)**2)
        base_xg = np.exp(-0.1 * distance)

        if shot.body_part == "head":
            base_xg *= 0.7

        return round(float(np.clip(base_xg, 0.01, 0.99)), 3)

    def batch_predict(self, shots: List[ShotEvent], match_context: Dict) -> List[float]:
        """Efficiently predict xG for multiple shots."""
        if self.model is None:
            return [self._simple_xg(s) for s in shots]

        features = np.array([
            self.extract_features(s, match_context) for s in shots
        ])
        features_norm = np.apply_along_axis(self.normalize_features, 1, features)

        import xgboost as xgb
        dmatrix = xgb.DMatrix(features_norm)
        xg_values = self.model.predict(dmatrix)

        return [float(np.clip(xg, 0.01, 0.99)) for xg in xg_values]

# Example usage
calculator = LiveXGCalculator()
shot = ShotEvent(location_x=105, location_y=42, body_part="foot")
xg = calculator._simple_xg(shot)
print(f"Simple xG for shot at (105, 42): {xg:.3f}")
# R: Optimized Live xG Calculator
library(R6)
library(xgboost)

LiveXGCalculator <- R6Class("LiveXGCalculator",
    public = list(
        model = NULL,
        feature_means = NULL,
        feature_sds = NULL,

        initialize = function(model_path) {
            # Load pre-trained model
            self$model <- xgb.load(model_path)

            # Load feature normalization parameters
            self$feature_means <- readRDS(paste0(model_path, "_means.rds"))
            self$feature_sds <- readRDS(paste0(model_path, "_sds.rds"))
        },

        extract_features = function(shot_event, match_context) {
            # Core shot features
            x <- shot_event$location_x
            y <- shot_event$location_y

            # Distance and angle to goal
            goal_x <- 120
            goal_y <- 40
            distance <- sqrt((goal_x - x)^2 + (goal_y - y)^2)
            angle <- atan2(abs(y - goal_y), goal_x - x) * 180 / pi

            # Contextual features
            features <- c(
                distance = distance,
                angle = angle,
                body_part_foot = as.numeric(shot_event$body_part == "foot"),
                body_part_head = as.numeric(shot_event$body_part == "head"),
                shot_type_open = as.numeric(shot_event$play_pattern == "open play"),
                shot_type_counter = as.numeric(shot_event$play_pattern == "counter"),
                shot_type_set_piece = as.numeric(shot_event$play_pattern %in%
                                                 c("corner", "free kick")),
                under_pressure = as.numeric(shot_event$under_pressure),
                first_touch = as.numeric(shot_event$first_touch),
                # Context from match state
                home_trailing = as.numeric(match_context$home_trailing),
                minute = match_context$minute,
                minute_squared = match_context$minute^2
            )

            return(features)
        },

        normalize_features = function(features) {
            (features - self$feature_means) / self$feature_sds
        },

        predict_xg = function(shot_event, match_context) {
            # Extract and normalize features
            features <- self$extract_features(shot_event, match_context)
            features_norm <- self$normalize_features(features)

            # Create xgb matrix
            dmatrix <- xgb.DMatrix(matrix(features_norm, nrow = 1))

            # Predict
            xg <- predict(self$model, dmatrix)

            # Clip to valid range
            xg <- max(0.01, min(0.99, xg))

            return(xg)
        },

        batch_predict = function(shots, match_context) {
            # Efficient batch prediction for multiple shots
            features_list <- lapply(shots, function(s) {
                self$extract_features(s, match_context)
            })

            features_matrix <- do.call(rbind, features_list)
            features_norm <- t(apply(features_matrix, 1, self$normalize_features))

            dmatrix <- xgb.DMatrix(features_norm)
            xg_values <- predict(self$model, dmatrix)

            return(pmax(0.01, pmin(0.99, xg_values)))
        }
    )
)

# Simple fallback for when model not available
calculate_simple_xg <- function(x, y, body_part = "foot") {
    # Distance-based approximation
    goal_x <- 120
    goal_center_y <- 40

    distance <- sqrt((goal_x - x)^2 + (goal_center_y - y)^2)

    # Base xG from distance
    base_xg <- exp(-0.1 * distance)

    # Adjust for body part
    if (body_part == "head") {
        base_xg <- base_xg * 0.7
    }

    return(round(base_xg, 3))
}

# Example usage
xg <- calculate_simple_xg(x = 105, y = 42, body_part = "foot")
cat(sprintf("Simple xG for shot at (105, 42): %.3f\n", xg))
Output
Simple xG for shot at (105, 42): 0.223

Reliability and Error Handling

Real-time systems must handle errors gracefully. Missing data, network issues, and out-of-order events are common challenges.

Common Issues
  • Missing or delayed events
  • Events arriving out of order
  • Network disconnections
  • Invalid or corrupted data
  • Model inference failures
Mitigation Strategies
  • Event buffering and reordering
  • Fallback calculations
  • Heartbeat monitoring
  • Data validation pipelines
  • Graceful degradation
robust_processing.py
# Python: Robust Event Processing
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

@dataclass
class RobustEventProcessor:
    """Handles event validation, buffering, and error recovery."""
    buffer_window: float = 5.0  # seconds
    heartbeat_timeout: float = 30.0  # seconds
    event_buffer: List[Dict] = field(default_factory=list)
    last_heartbeat: datetime = field(default_factory=datetime.now)

    VALID_EVENT_TYPES = {
        "Shot", "Pass", "Pressure", "Foul", "Dribble",
        "Ball Recovery", "Clearance", "Goal Keeper"
    }

    REQUIRED_FIELDS = {"type", "minute", "team"}

    def process_event(self, event: Dict) -> List[Dict]:
        """Process incoming event with validation and buffering."""
        # Validate
        valid, reason = self.validate_event(event)
        if not valid:
            logger.warning(f"Invalid event: {reason}")
            return []

        # Buffer for reordering
        self.add_to_buffer(event)

        # Return ready events
        return self.get_ready_events()

    def validate_event(self, event: Dict) -> Tuple[bool, Optional[str]]:
        """Validate event structure and values."""
        # Required fields
        missing = self.REQUIRED_FIELDS - set(event.keys())
        if missing:
            return False, f"Missing fields: {missing}"

        # Type validation
        if event.get("type") not in self.VALID_EVENT_TYPES:
            return False, f"Invalid event type: {event.get('type')}"

        # Range validation
        minute = event.get("minute", -1)
        if minute < 0 or minute > 130:
            return False, f"Invalid minute: {minute}"

        return True, None

    def add_to_buffer(self, event: Dict):
        """Add event to buffer and sort by game time."""
        event["received_at"] = datetime.now()
        self.event_buffer.append(event)

        # Sort by game time
        self.event_buffer.sort(
            key=lambda e: e.get("minute", 0) * 60 + e.get("second", 0)
        )

    def get_ready_events(self) -> List[Dict]:
        """Return events that have waited long enough in buffer."""
        if not self.event_buffer:
            return []

        current_time = datetime.now()
        ready = []
        remaining = []

        for event in self.event_buffer:
            elapsed = (current_time - event["received_at"]).total_seconds()
            if elapsed >= self.buffer_window:
                ready.append(event)
            else:
                remaining.append(event)

        self.event_buffer = remaining
        return ready

    def check_heartbeat(self) -> bool:
        """Check if data feed is still active."""
        elapsed = (datetime.now() - self.last_heartbeat).total_seconds()

        if elapsed > self.heartbeat_timeout:
            logger.warning("Data feed heartbeat timeout - connection may be lost")
            return False

        return True

    def receive_heartbeat(self):
        """Update last heartbeat time."""
        self.last_heartbeat = datetime.now()

processor = RobustEventProcessor()
print("Robust event processor initialized")
# R: Robust Event Processing
library(R6)

RobustEventProcessor <- R6Class("RobustEventProcessor",
    public = list(
        event_buffer = NULL,
        buffer_window = 5,  # seconds
        last_heartbeat = NULL,
        heartbeat_timeout = 30,  # seconds

        initialize = function() {
            self$event_buffer <- list()
            self$last_heartbeat <- Sys.time()
        },

        process_event = function(event) {
            # Validate event
            validation <- self$validate_event(event)
            if (!validation$valid) {
                warning(sprintf("Invalid event: %s", validation$reason))
                return(NULL)
            }

            # Buffer for reordering
            self$add_to_buffer(event)

            # Process ready events
            ready_events <- self$get_ready_events()
            return(ready_events)
        },

        validate_event = function(event) {
            # Required fields
            required <- c("type", "minute", "team")
            missing <- setdiff(required, names(event))

            if (length(missing) > 0) {
                return(list(
                    valid = FALSE,
                    reason = paste("Missing fields:", paste(missing, collapse = ", "))
                ))
            }

            # Type validation
            valid_types <- c("Shot", "Pass", "Pressure", "Foul", "Dribble",
                           "Ball Recovery", "Clearance", "Goal Keeper")
            if (!event$type %in% valid_types) {
                return(list(
                    valid = FALSE,
                    reason = sprintf("Invalid event type: %s", event$type)
                ))
            }

            # Range validation
            if (event$minute < 0 || event$minute > 130) {
                return(list(
                    valid = FALSE,
                    reason = sprintf("Invalid minute: %d", event$minute)
                ))
            }

            return(list(valid = TRUE, reason = NULL))
        },

        add_to_buffer = function(event) {
            event$received_at <- as.numeric(Sys.time())
            self$event_buffer <- c(self$event_buffer, list(event))

            # Sort by game time
            game_times <- sapply(self$event_buffer, function(e) {
                e$minute * 60 + (e$second %||% 0)
            })
            self$event_buffer <- self$event_buffer[order(game_times)]
        },

        get_ready_events = function() {
            if (length(self$event_buffer) == 0) {
                return(list())
            }

            current_time <- as.numeric(Sys.time())
            ready <- list()
            remaining <- list()

            for (event in self$event_buffer) {
                # Event is ready if its been in buffer long enough
                if (current_time - event$received_at >= self$buffer_window) {
                    ready <- c(ready, list(event))
                } else {
                    remaining <- c(remaining, list(event))
                }
            }

            self$event_buffer <- remaining
            return(ready)
        },

        check_heartbeat = function() {
            elapsed <- as.numeric(Sys.time() - self$last_heartbeat)

            if (elapsed > self$heartbeat_timeout) {
                warning("Data feed heartbeat timeout - connection may be lost")
                return(FALSE)
            }

            return(TRUE)
        },

        receive_heartbeat = function() {
            self$last_heartbeat <- Sys.time()
        }
    )
)

processor <- RobustEventProcessor$new()
cat("Robust event processor initialized\n")
Output
Robust event processor initialized

Live Substitution Analysis

Real-time analytics can inform substitution decisions by tracking player fatigue, performance degradation, and tactical needs.

substitution_analyzer.py
# Python: Substitution Recommendation System
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import pandas as pd

@dataclass
class SubstitutionAnalyzer:
    """Analyzes player fatigue and recommends substitutions."""
    fatigue_thresholds: Dict[str, float] = field(default_factory=lambda: {
        "high_intensity_runs": 0.7,
        "sprint_count": 0.6,
        "pass_completion_drop": 10
    })
    player_metrics: pd.DataFrame = field(default_factory=pd.DataFrame)

    def update_player_metrics(self, player_tracking: pd.DataFrame, minute: int) -> pd.DataFrame:
        """Update fatigue metrics for all players."""
        df = player_tracking.copy()
        df["current_minute"] = minute

        # Calculate current rates (per 90)
        df["hir_rate"] = df["high_intensity_runs"] / max(minute, 1) * 90
        df["sprint_rate"] = df["sprints"] / max(minute, 1) * 90
        df["pass_completion_current"] = (
            df["passes_completed"] / df["passes_attempted"].clip(lower=1) * 100
        )

        # For simplicity, use first values as baseline
        # In production, would track baseline from first 30 minutes
        df["baseline_hir"] = df["hir_rate"].mean()
        df["baseline_sprints"] = df["sprint_rate"].mean()
        df["baseline_pass_pct"] = df["pass_completion_current"].mean()

        # Calculate retention
        df["hir_retention"] = df["hir_rate"] / df["baseline_hir"].clip(lower=0.1)
        df["sprint_retention"] = df["sprint_rate"] / df["baseline_sprints"].clip(lower=0.1)
        df["pass_pct_change"] = df["pass_completion_current"] - df["baseline_pass_pct"]

        self.player_metrics = df
        return df

    def get_fatigue_alerts(self) -> pd.DataFrame:
        """Identify players showing fatigue."""
        if self.player_metrics.empty:
            return pd.DataFrame()

        df = self.player_metrics.copy()

        df["hir_fatigued"] = df["hir_retention"] < self.fatigue_thresholds["high_intensity_runs"]
        df["sprint_fatigued"] = df["sprint_retention"] < self.fatigue_thresholds["sprint_count"]
        df["passing_degraded"] = df["pass_pct_change"] < -self.fatigue_thresholds["pass_completion_drop"]

        df["fatigue_score"] = (
            df["hir_fatigued"].astype(int) +
            df["sprint_fatigued"].astype(int) +
            df["passing_degraded"].astype(int)
        )

        fatigued = df[df["fatigue_score"] >= 2].sort_values(
            "fatigue_score", ascending=False
        )

        return fatigued[["player_id", "player_name", "position", "fatigue_score",
                        "hir_retention", "sprint_retention", "pass_pct_change"]]

    def recommend_substitution(self, game_state: Dict, bench_players: pd.DataFrame) -> Dict:
        """Generate substitution recommendation."""
        fatigued = self.get_fatigue_alerts()

        if fatigued.empty:
            return {
                "recommendation": "No urgent substitutions needed",
                "priority": "low"
            }

        # Get most fatigued player
        target = fatigued.iloc[0]

        # Find replacement
        suitable = bench_players[bench_players["position"] == target["position"]]
        if not suitable.empty:
            replacement = suitable.sort_values("minutes_rest", ascending=False).iloc[0]
            replacement_name = replacement["player_name"]
        else:
            replacement_name = "available substitute"

        # Determine urgency
        score_diff = game_state.get("score_diff", 0)
        minute = game_state.get("minute", 0)

        if score_diff < 0 and minute > 70:
            urgency = "high"
        elif target["fatigue_score"] >= 3:
            urgency = "high"
        elif minute > 75:
            urgency = "medium"
        else:
            urgency = "low"

        return {
            "recommendation": f"Replace {target['player_name']} with {replacement_name} (fatigue score: {target['fatigue_score']})",
            "target_player": target["player_name"],
            "replacement": replacement_name,
            "priority": urgency,
            "reasoning": {
                "hir_retention": target["hir_retention"],
                "sprint_retention": target["sprint_retention"],
                "pass_degradation": target["pass_pct_change"]
            }
        }

analyzer = SubstitutionAnalyzer()
print("Substitution analyzer initialized")
# R: Substitution Recommendation System
library(R6)
library(tidyverse)

SubstitutionAnalyzer <- R6Class("SubstitutionAnalyzer",
    public = list(
        player_metrics = NULL,
        fatigue_thresholds = list(
            high_intensity_runs = 0.7,  # % of average
            sprint_count = 0.6,
            pass_completion_drop = 10  # percentage points
        ),

        initialize = function() {
            self$player_metrics <- tibble()
        },

        update_player_metrics = function(player_tracking, minute) {
            # Calculate fatigue indicators for each player
            metrics <- player_tracking %>%
                mutate(
                    current_minute = minute,
                    # Compare current rate to first 30 min baseline
                    hir_rate = high_intensity_runs / max(minute, 1) * 90,
                    sprint_rate = sprints / max(minute, 1) * 90,
                    pass_completion_current = passes_completed / pmax(passes_attempted, 1) * 100
                ) %>%
                group_by(player_id) %>%
                mutate(
                    # Compare to baseline (first 30 min)
                    baseline_hir = first(hir_rate),
                    baseline_sprints = first(sprint_rate),
                    baseline_pass_pct = first(pass_completion_current),

                    hir_retention = hir_rate / baseline_hir,
                    sprint_retention = sprint_rate / baseline_sprints,
                    pass_pct_change = pass_completion_current - baseline_pass_pct
                ) %>%
                ungroup()

            self$player_metrics <- metrics
            return(metrics)
        },

        get_fatigue_alerts = function() {
            if (nrow(self$player_metrics) == 0) return(tibble())

            fatigue_alerts <- self$player_metrics %>%
                mutate(
                    hir_fatigued = hir_retention < self$fatigue_thresholds$high_intensity_runs,
                    sprint_fatigued = sprint_retention < self$fatigue_thresholds$sprint_count,
                    passing_degraded = pass_pct_change < -self$fatigue_thresholds$pass_completion_drop,
                    fatigue_score = as.numeric(hir_fatigued) +
                                   as.numeric(sprint_fatigued) +
                                   as.numeric(passing_degraded)
                ) %>%
                filter(fatigue_score >= 2) %>%
                arrange(desc(fatigue_score)) %>%
                select(player_id, player_name, position, fatigue_score,
                       hir_retention, sprint_retention, pass_pct_change)

            return(fatigue_alerts)
        },

        recommend_substitution = function(game_state, bench_players) {
            fatigued <- self$get_fatigue_alerts()

            if (nrow(fatigued) == 0) {
                return(list(
                    recommendation = "No urgent substitutions needed",
                    priority = "low"
                ))
            }

            # Get most fatigued player
            target <- fatigued[1, ]

            # Find suitable replacement from bench
            replacement <- bench_players %>%
                filter(position == target$position) %>%
                arrange(desc(minutes_rest)) %>%
                slice(1)

            # Consider game state
            urgency <- case_when(
                game_state$score_diff < 0 && game_state$minute > 70 ~ "high",
                target$fatigue_score >= 3 ~ "high",
                game_state$minute > 75 ~ "medium",
                TRUE ~ "low"
            )

            return(list(
                recommendation = sprintf(
                    "Replace %s with %s (fatigue score: %d)",
                    target$player_name,
                    replacement$player_name[1] %||% "available substitute",
                    target$fatigue_score
                ),
                target_player = target$player_name,
                replacement = replacement$player_name[1],
                priority = urgency,
                reasoning = list(
                    hir_retention = target$hir_retention,
                    sprint_retention = target$sprint_retention,
                    pass_degradation = target$pass_pct_change
                )
            ))
        }
    )
)

sub_analyzer <- SubstitutionAnalyzer$new()
cat("Substitution analyzer initialized\n")
Output
Substitution analyzer initialized

Case Study: Full Match Real-Time Analysis

Let's walk through a complete real-time analysis of a simulated match, demonstrating all the components working together.

complete_analysis.py
# Python: Complete Real-Time Match Analysis
import pandas as pd
import numpy as np
from typing import Dict, List

async def run_complete_match_analysis(events_df: pd.DataFrame,
                                       home_team: str,
                                       away_team: str) -> Dict:
    """Run complete real-time analysis on a match."""

    # Initialize components
    match_state = LiveMatchState(
        match_id="match_001",
        home_team=home_team,
        away_team=away_team
    )
    momentum_tracker = MomentumTracker()
    alert_system = TacticalAlertSystem()
    event_processor = RobustEventProcessor()

    # Results tracking
    timeline = []
    all_alerts = []

    # Process events chronologically
    events_sorted = events_df.sort_values(["minute", "second"])

    for _, event in events_sorted.iterrows():
        event_dict = event.to_dict()

        # Process through robust handler
        validated = event_processor.process_event(event_dict)

        if validated:
            for valid_event in validated:
                # Update match state
                match_state.add_event(valid_event)

                # Update momentum
                momentum_tracker.add_events([valid_event], home_team)

                # Get current state
                state = match_state.get_state()
                state["momentum_label"] = momentum_tracker.get_momentum_label()

                # Check for momentum shift
                shift = momentum_tracker.detect_momentum_shift()
                if shift:
                    state["momentum_change"] = 0.5

                # Check alerts
                alerts = alert_system.check_alerts(valid_event, state)
                all_alerts.extend(alerts)

                # Calculate win probability
                win_prob = calculate_win_probability(
                    state["score"]["home"], state["score"]["away"],
                    state["xg"]["home"], state["xg"]["away"],
                    state["minute"]
                )

                # Record timeline
                timeline.append({
                    "minute": state["minute"],
                    "home_score": state["score"]["home"],
                    "away_score": state["score"]["away"],
                    "home_xg": state["xg"]["home"],
                    "away_xg": state["xg"]["away"],
                    "momentum": momentum_tracker.current_momentum,
                    "home_win_prob": win_prob["home_win"],
                    "n_alerts": len(alerts)
                })

    # Summary
    final_state = match_state.get_state()

    print("\n=== MATCH SUMMARY ===")
    print(f"Final Score: {final_state['score']['home']} - {final_state['score']['away']}")
    print(f"Final xG: {final_state['xg']['home']:.2f} - {final_state['xg']['away']:.2f}")
    print(f"Total Alerts Generated: {len(all_alerts)}")

    if all_alerts:
        print("\nKey Alerts:")
        high_priority = [a for a in all_alerts if a["priority"] == "high"]
        for alert in high_priority[:5]:
            print(f"  [{alert['minute']}'] {alert['message']}")

    return {
        "timeline": pd.DataFrame(timeline),
        "alerts": all_alerts,
        "final_state": final_state
    }

print("Complete match analysis system ready")
# R: Complete Real-Time Match Analysis
library(tidyverse)

run_complete_match_analysis <- function(events_df, home_team, away_team) {
    # Initialize all components
    match_state <- LiveMatchState$new("match_001", home_team, away_team)
    momentum_tracker <- MomentumTracker$new()
    alert_system <- TacticalAlertSystem$new()
    event_processor <- RobustEventProcessor$new()

    # Results tracking
    results <- tibble()
    all_alerts <- list()

    # Process events chronologically
    events_sorted <- events_df %>%
        arrange(minute, second)

    for (i in 1:nrow(events_sorted)) {
        event <- events_sorted[i, ]

        # Process through robust handler
        validated <- event_processor$process_event(as.list(event))

        if (length(validated) > 0) {
            # Update match state
            match_state$add_event(event)

            # Update momentum
            momentum_tracker$add_events(event, home_team)

            # Get current state
            state <- match_state$get_state()
            state$momentum_label <- momentum_tracker$get_momentum_label()

            # Check for momentum shift
            shift <- momentum_tracker$detect_momentum_shift()
            if (!is.null(shift)) {
                state$momentum_change <- 0.5  # Simplified
            }

            # Check alerts
            alerts <- alert_system$check_alerts(event, state)
            if (length(alerts) > 0) {
                all_alerts <- c(all_alerts, alerts)
            }

            # Calculate win probability
            win_prob <- calculate_win_probability(
                state$score$home, state$score$away,
                state$xg$home, state$xg$away,
                state$minute
            )

            # Record state
            results <- bind_rows(results, tibble(
                minute = state$minute,
                home_score = state$score$home,
                away_score = state$score$away,
                home_xg = state$xg$home,
                away_xg = state$xg$away,
                momentum = momentum_tracker$current_momentum,
                home_win_prob = win_prob$home_win,
                n_alerts = length(alerts)
            ))
        }
    }

    # Summary
    final_state <- match_state$get_state()

    cat(sprintf("\n=== MATCH SUMMARY ===\n"))
    cat(sprintf("Final Score: %d - %d\n", final_state$score$home, final_state$score$away))
    cat(sprintf("Final xG: %.2f - %.2f\n", final_state$xg$home, final_state$xg$away))
    cat(sprintf("Total Alerts Generated: %d\n", length(all_alerts)))

    if (length(all_alerts) > 0) {
        cat("\nKey Alerts:\n")
        high_priority <- Filter(function(a) a$priority == "high", all_alerts)
        for (alert in head(high_priority, 5)) {
            cat(sprintf("  [%d'] %s\n", alert$minute, alert$message))
        }
    }

    return(list(
        timeline = results,
        alerts = all_alerts,
        final_state = final_state
    ))
}

# Example usage (would use real events_df)
cat("Complete match analysis system ready\n")
Output
Complete match analysis system ready

Automated Tactical Alerts

Smart alert systems filter the noise and highlight only the most important tactical insights during a match.

tactical_alerts.py
# Python: Tactical Alert System
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Any
import pandas as pd

@dataclass
class AlertRule:
    """Defines a tactical alert rule."""
    id: str
    condition: Callable[[Dict, Dict], bool]
    message: Callable[[Dict, Dict], str]
    priority: str = "medium"

class TacticalAlertSystem:
    """System for generating tactical alerts during live matches."""

    def __init__(self, cooldown_minutes: int = 5):
        self.alert_rules: Dict[str, AlertRule] = {}
        self.alert_history: List[Dict] = []
        self.cooldown_minutes = cooldown_minutes
        self._define_default_rules()

    def _define_default_rules(self):
        """Define default tactical alert rules."""

        # High xG chance
        self.add_rule(AlertRule(
            id="high_xg_chance",
            condition=lambda e, s: e.get("type") == "Shot" and e.get("xg", 0) > 0.25,
            message=lambda e, s: f"BIG CHANCE: {e.get('xg', 0)*100:.0f}% xG shot by {e.get('player')}",
            priority="high"
        ))

        # Momentum swing
        self.add_rule(AlertRule(
            id="momentum_swing",
            condition=lambda e, s: abs(s.get("momentum_change", 0)) > 0.4,
            message=lambda e, s: f"MOMENTUM SWING: {'HOME' if s.get('momentum_change', 0) > 0 else 'AWAY'} taking control",
            priority="high"
        ))

        # Pressing intensity drop
        self.add_rule(AlertRule(
            id="pressing_drop",
            condition=lambda e, s: (
                s.get("metrics", {}).get("rolling", {}).get("home_pressure", 10) < 5 and
                s.get("minute", 0) > 60
            ),
            message=lambda e, s: "WARNING: Pressing intensity dropped - consider energy management",
            priority="medium"
        ))

        # xG underperformance
        self.add_rule(AlertRule(
            id="xg_underperform",
            condition=lambda e, s: (
                s.get("minute", 0) >= 60 and
                (s.get("xg", {}).get("home", 0) - s.get("score", {}).get("home", 0)) > 1.0
            ),
            message=lambda e, s: f"xG GAP: Creating chances ({s.get('xg', {}).get('home', 0):.1f} xG) but only {s.get('score', {}).get('home', 0)} goals",
            priority="medium"
        ))

        # Dangerous free kick
        self.add_rule(AlertRule(
            id="set_piece_zone",
            condition=lambda e, s: (
                e.get("type") == "Foul" and
                e.get("location_x", 0) > 85 and
                20 < e.get("location_y", 0) < 60
            ),
            message=lambda e, s: "DANGEROUS FREE KICK: Good scoring position",
            priority="high"
        ))

    def add_rule(self, rule: AlertRule):
        """Add a new alert rule."""
        self.alert_rules[rule.id] = rule

    def check_alerts(self, event: Dict, state: Dict) -> List[Dict]:
        """Check all rules and return triggered alerts."""
        alerts = []
        current_minute = state.get("minute", 0)

        for rule_id, rule in self.alert_rules.items():
            # Check cooldown
            recent_same = [
                a for a in self.alert_history
                if a["rule_id"] == rule_id and
                a["minute"] >= current_minute - self.cooldown_minutes
            ]
            if recent_same:
                continue

            # Check condition
            try:
                if rule.condition(event, state):
                    alert = {
                        "rule_id": rule_id,
                        "message": rule.message(event, state),
                        "priority": rule.priority,
                        "minute": current_minute
                    }
                    alerts.append(alert)
                    self.alert_history.append({
                        "rule_id": rule_id,
                        "minute": current_minute
                    })
            except Exception:
                pass  # Skip failed rules

        return alerts

alert_system = TacticalAlertSystem()
print(f"Tactical alert system initialized with {len(alert_system.alert_rules)} rules")
# R: Tactical Alert System
library(R6)

TacticalAlertSystem <- R6Class("TacticalAlertSystem",
    public = list(
        alert_rules = NULL,
        alert_history = NULL,
        cooldown_minutes = 5,  # Prevent duplicate alerts

        initialize = function() {
            self$alert_rules <- list()
            self$alert_history <- tibble()
            self$define_default_rules()
        },

        define_default_rules = function() {
            # High xG chance
            self$add_rule(
                id = "high_xg_chance",
                condition = function(event, state) {
                    event$type == "Shot" && event$xg > 0.25
                },
                message = function(event, state) {
                    sprintf("BIG CHANCE: %.0f%% xG shot by %s", event$xg * 100, event$player)
                },
                priority = "high"
            )

            # Momentum swing
            self$add_rule(
                id = "momentum_swing",
                condition = function(event, state) {
                    !is.null(state$momentum_change) && abs(state$momentum_change) > 0.4
                },
                message = function(event, state) {
                    direction <- ifelse(state$momentum_change > 0, "HOME", "AWAY")
                    sprintf("MOMENTUM SWING: %s taking control", direction)
                },
                priority = "high"
            )

            # Pressing intensity drop
            self$add_rule(
                id = "pressing_drop",
                condition = function(event, state) {
                    !is.null(state$metrics$rolling) &&
                    state$metrics$rolling$home_pressure < 5 &&
                    state$minute > 60
                },
                message = function(event, state) {
                    "WARNING: Pressing intensity dropped - consider energy management"
                },
                priority = "medium"
            )

            # xG underperformance
            self$add_rule(
                id = "xg_underperform",
                condition = function(event, state) {
                    state$minute >= 60 &&
                    (state$xg$home - state$score$home) > 1.0
                },
                message = function(event, state) {
                    sprintf("xG GAP: Creating chances (%.1f xG) but only %d goals",
                           state$xg$home, state$score$home)
                },
                priority = "medium"
            )

            # Set piece opportunity
            self$add_rule(
                id = "set_piece_zone",
                condition = function(event, state) {
                    event$type == "Foul" &&
                    event$location_x > 85 &&
                    event$location_y > 20 && event$location_y < 60
                },
                message = function(event, state) {
                    "DANGEROUS FREE KICK: Good scoring position"
                },
                priority = "high"
            )
        },

        add_rule = function(id, condition, message, priority = "medium") {
            self$alert_rules[[id]] <- list(
                condition = condition,
                message = message,
                priority = priority
            )
        },

        check_alerts = function(event, state) {
            alerts <- list()

            for (rule_id in names(self$alert_rules)) {
                rule <- self$alert_rules[[rule_id]]

                # Check cooldown
                recent_same <- self$alert_history %>%
                    filter(rule_id == !!rule_id,
                           minute >= state$minute - self$cooldown_minutes)

                if (nrow(recent_same) > 0) next

                # Check condition
                if (rule$condition(event, state)) {
                    alert <- list(
                        rule_id = rule_id,
                        message = rule$message(event, state),
                        priority = rule$priority,
                        minute = state$minute
                    )

                    alerts <- c(alerts, list(alert))

                    # Log to history
                    self$alert_history <- bind_rows(
                        self$alert_history,
                        tibble(rule_id = rule_id, minute = state$minute)
                    )
                }
            }

            return(alerts)
        }
    )
)

alert_system <- TacticalAlertSystem$new()
cat("Tactical alert system initialized with", length(alert_system$alert_rules), "rules\n")
Output
Tactical alert system initialized with 5 rules

Practice Exercises

Task: Create a custom alert rule that triggers when a team has 3+ shots without scoring while trailing.

Requirements:

  • Track shots for each team in a rolling window
  • Check if trailing and generating chances
  • Suggest tactical adjustment in the alert message

Task: Create an animated xG timeline that updates in real-time during a match.

Requirements:

  • Show cumulative xG for both teams over time
  • Mark goal events with icons
  • Add shaded regions for momentum periods
  • Update smoothly as new events arrive

Task: Build a system that suggests optimal substitution timing and candidates.

Requirements:

  • Track player fatigue metrics (distance, sprints)
  • Monitor performance degradation over time
  • Consider game state (winning/losing/drawing)
  • Suggest substitution with reasoning

Chapter Summary

Key Takeaways
  • Real-time requires different architecture: Streaming pipelines, state management, and low-latency updates
  • Win probability: Combine current score, xG, and remaining time to estimate match outcomes
  • Momentum detection: Weight recent events to identify which team is dominating
  • Live dashboards: Synthesize metrics into actionable displays for coaching staff
  • Smart alerts: Filter noise and surface only the most important tactical insights
Real-Time System Components
  1. Data Ingestion: Receive and validate streaming events
  2. State Management: Maintain current match state
  3. Metric Calculation: Rolling windows and live aggregations
  4. Alert Engine: Rule-based trigger system with cooldowns
  5. Visualization: Real-time dashboards and charts
  6. Delivery: Push updates to coaching staff