Capstone - Complete Analytics System
- Understand the requirements and challenges of real-time analytics
- Design streaming data pipelines for live match data
- Calculate rolling metrics and live xG during matches
- Build real-time win probability models
- Create live dashboards for in-game decision support
- Implement momentum detection and game state analysis
- Generate automated alerts and tactical triggers
- Handle data latency and update strategies
The Challenge of Real-Time Analytics
Real-time analytics transforms how teams operate during matches. Instead of post-match analysis, coaches receive live insights that can influence in-game decisions—substitutions, tactical adjustments, and set piece strategies. This chapter covers the technical and analytical challenges of building real-time systems.
Real-Time Constraints
Real-time analytics operates under strict constraints: sub-second latency requirements, incomplete data, and the need for robust handling of missing or delayed events. Systems must be designed for reliability under pressure.
Low Latency
Updates in <1 secondRolling Windows
Last 5-10 minute trendsSmart Alerts
Trigger-based notificationsReliability
Robust error handling# Python: Real-Time Analytics Framework
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import pandas as pd
from datetime import datetime
@dataclass
class LiveMatchState:
"""Manages live match state and real-time analytics."""
match_id: str
home_team: str
away_team: str
events: List[Dict] = field(default_factory=list)
current_minute: int = 0
score: Dict[str, int] = field(default_factory=lambda: {"home": 0, "away": 0})
xg: Dict[str, float] = field(default_factory=lambda: {"home": 0.0, "away": 0.0})
metrics: Dict = field(default_factory=dict)
alerts: List[str] = field(default_factory=list)
def add_event(self, event: Dict) -> List[str]:
"""Add new event and update all metrics."""
self.events.append(event)
self.current_minute = max(event.get("minute", 0), self.current_minute)
# Update metrics
self._update_metrics(event)
# Check for alerts
new_alerts = self._check_alerts(event)
self.alerts.extend(new_alerts)
return new_alerts
def _update_metrics(self, event: Dict):
"""Update score and xG based on new event."""
# Update score
if event.get("type") == "Shot" and event.get("outcome") == "Goal":
team_key = "home" if event["team"] == self.home_team else "away"
self.score[team_key] += 1
# Update xG
if event.get("type") == "Shot":
xg_value = event.get("xg", 0)
team_key = "home" if event["team"] == self.home_team else "away"
self.xg[team_key] += xg_value
# Calculate rolling metrics
self._calculate_rolling_metrics()
def _calculate_rolling_metrics(self, window_minutes: int = 10):
"""Calculate rolling window metrics."""
cutoff = self.current_minute - window_minutes
recent = [e for e in self.events if e.get("minute", 0) >= cutoff]
if not recent:
return
home_events = [e for e in recent if e.get("team") == self.home_team]
away_events = [e for e in recent if e.get("team") == self.away_team]
self.metrics["rolling"] = {
"home_possession": len(home_events) / len(recent) * 100 if recent else 50,
"home_shots_10min": sum(1 for e in home_events if e.get("type") == "Shot"),
"away_shots_10min": sum(1 for e in away_events if e.get("type") == "Shot"),
"home_pressure": sum(1 for e in home_events if e.get("type") == "Pressure"),
"away_pressure": sum(1 for e in away_events if e.get("type") == "Pressure"),
}
def _check_alerts(self, event: Dict) -> List[str]:
"""Check for tactical alerts based on new event."""
alerts = []
# High xG shot alert
if event.get("type") == "Shot" and event.get("xg", 0) > 0.3:
alerts.append(f"HIGH xG CHANCE ({event['xg']:.2f}) - {event.get('player')}")
# Momentum shift alert
rolling = self.metrics.get("rolling", {})
if rolling.get("away_shots_10min", 0) >= 4 and rolling.get("home_shots_10min", 0) <= 1:
alerts.append("MOMENTUM SHIFT: Opponent dominating last 10 min")
return alerts
def get_state(self) -> Dict:
"""Get current match state summary."""
return {
"minute": self.current_minute,
"score": self.score,
"xg": self.xg,
"metrics": self.metrics,
"n_events": len(self.events),
"recent_alerts": self.alerts[-5:] if self.alerts else []
}
# Usage
match_state = LiveMatchState(
match_id="match_001",
home_team="Barcelona",
away_team="Real Madrid"
)
print(match_state.get_state())# R: Real-Time Analytics Framework
library(R6)
library(tidyverse)
# Define a Live Match State Manager
LiveMatchState <- R6Class("LiveMatchState",
public = list(
match_id = NULL,
events = NULL,
current_minute = 0,
home_team = NULL,
away_team = NULL,
score = list(home = 0, away = 0),
xg = list(home = 0, away = 0),
metrics = list(),
initialize = function(match_id, home_team, away_team) {
self$match_id <- match_id
self$home_team <- home_team
self$away_team <- away_team
self$events <- tibble()
self$metrics <- list()
},
add_event = function(event) {
# Validate and add event
self$events <- bind_rows(self$events, event)
self$current_minute <- max(event$minute, self$current_minute)
# Update metrics
self$update_metrics(event)
# Check for alerts
self$check_alerts(event)
},
update_metrics = function(event) {
# Update score
if (event$type == "Shot" && event$outcome == "Goal") {
if (event$team == self$home_team) {
self$score$home <- self$score$home + 1
} else {
self$score$away <- self$score$away + 1
}
}
# Update xG
if (event$type == "Shot") {
if (event$team == self$home_team) {
self$xg$home <- self$xg$home + event$xg
} else {
self$xg$away <- self$xg$away + event$xg
}
}
# Calculate rolling metrics
self$calculate_rolling_metrics()
},
calculate_rolling_metrics = function(window_minutes = 10) {
recent <- self$events %>%
filter(minute >= self$current_minute - window_minutes)
self$metrics$rolling <- list(
home_possession = mean(recent$team == self$home_team, na.rm = TRUE) * 100,
home_shots_10min = sum(recent$type == "Shot" & recent$team == self$home_team),
away_shots_10min = sum(recent$type == "Shot" & recent$team == self$away_team),
home_pressure = sum(recent$type == "Pressure" & recent$team == self$home_team),
away_pressure = sum(recent$type == "Pressure" & recent$team == self$away_team)
)
},
check_alerts = function(event) {
# Example alerts
alerts <- list()
# High xG shot alert
if (event$type == "Shot" && event$xg > 0.3) {
alerts <- c(alerts, sprintf("HIGH xG CHANCE (%.2f) - %s",
event$xg, event$player))
}
# Momentum shift alert
if (!is.null(self$metrics$rolling)) {
if (self$metrics$rolling$away_shots_10min >= 4 &&
self$metrics$rolling$home_shots_10min <= 1) {
alerts <- c(alerts, "MOMENTUM SHIFT: Opponent dominating last 10 min")
}
}
if (length(alerts) > 0) {
cat(paste(alerts, collapse = "\n"), "\n")
}
},
get_state = function() {
list(
minute = self$current_minute,
score = self$score,
xg = self$xg,
metrics = self$metrics,
n_events = nrow(self$events)
)
}
)
)
# Usage
match_state <- LiveMatchState$new("match_001", "Barcelona", "Real Madrid")
print(match_state$get_state()){'minute': 0, 'score': {'home': 0, 'away': 0},
'xg': {'home': 0.0, 'away': 0.0}, 'metrics': {},
'n_events': 0, 'recent_alerts': []}Streaming Data Pipelines
Real-time analytics requires robust data pipelines that can handle continuous streams of events. We'll cover both polling and push-based approaches.
# Python: Simulated Streaming Data Handler
import asyncio
from typing import Callable, Optional, Generator
import pandas as pd
class EventStream:
"""Simulated event stream for live match data."""
def __init__(self, events_df: pd.DataFrame, speed_factor: float = 1.0):
self.events = events_df.sort_values(["minute", "second"]).reset_index(drop=True)
self.events["game_time"] = self.events["minute"] * 60 + self.events["second"]
self.current_index = 0
self.speed_factor = speed_factor
def get_next_event(self) -> Optional[dict]:
"""Get the next event in the stream."""
if self.current_index >= len(self.events):
return None
event = self.events.iloc[self.current_index].to_dict()
self.current_index += 1
return event
def has_more(self) -> bool:
"""Check if there are more events."""
return self.current_index < len(self.events)
def __iter__(self) -> Generator:
"""Iterate through events."""
while self.has_more():
yield self.get_next_event()
async def process_live_match(match_state: LiveMatchState,
event_stream: EventStream,
callback: Optional[Callable] = None):
"""Process live match events asynchronously."""
for event in event_stream:
if event:
alerts = match_state.add_event(event)
if callback:
await callback(match_state.get_state(), alerts)
# Simulate real-time delay
await asyncio.sleep(0.1 / event_stream.speed_factor)
return match_state
# Example callback
async def live_update_callback(state: dict, alerts: list):
"""Print live updates."""
print(f"\rMinute {state['minute']} | "
f"Score: {state['score']['home']}-{state['score']['away']} | "
f"xG: {state['xg']['home']:.2f}-{state['xg']['away']:.2f}",
end="")
for alert in alerts:
print(f"\n*** ALERT: {alert} ***")
# Run the live match processor
# asyncio.run(process_live_match(match_state, event_stream, live_update_callback))# R: Simulated Streaming Data Handler
library(tidyverse)
library(later)
# Simulated event stream (in production, this would be a websocket or API)
create_event_stream <- function(events_df, speed_factor = 1) {
events_queue <- events_df %>%
arrange(minute, second) %>%
mutate(
game_time = minute * 60 + second,
event_index = row_number()
)
list(
events = events_queue,
current_index = 1,
speed_factor = speed_factor,
get_next_event = function(self) {
if (self$current_index > nrow(self$events)) {
return(NULL) # Match ended
}
event <- self$events[self$current_index, ]
self$current_index <- self$current_index + 1
return(event)
},
has_more = function(self) {
self$current_index <= nrow(self$events)
}
)
}
# Process streaming events
process_live_match <- function(match_state, event_stream, callback = NULL) {
while (event_stream$has_more(event_stream)) {
event <- event_stream$get_next_event(event_stream)
if (!is.null(event)) {
match_state$add_event(event)
# Call callback with updated state
if (!is.null(callback)) {
callback(match_state$get_state())
}
# Simulate real-time delay (for demo)
Sys.sleep(0.1 / event_stream$speed_factor)
}
}
return(match_state)
}
# Example callback for live updates
live_update_callback <- function(state) {
cat(sprintf(
"\rMinute %d | Score: %d-%d | xG: %.2f-%.2f",
state$minute,
state$score$home, state$score$away,
state$xg$home, state$xg$away
))
}Live Win Probability
Win probability models estimate the likelihood of each outcome based on current game state. These update continuously as the match progresses.
# Python: Live Win Probability Model
import numpy as np
from scipy import stats
from typing import Dict
def calculate_win_probability(home_goals: int, away_goals: int,
home_xg: float, away_xg: float,
minute: int, n_sims: int = 10000) -> Dict[str, float]:
"""Calculate win probability using Poisson simulation."""
# Remaining time factor
remaining_pct = (90 - minute) / 90
# Calculate xG rate and regress to mean
safe_minute = max(minute, 1)
xg_rate_home = home_xg / safe_minute * 90
xg_rate_away = away_xg / safe_minute * 90
# Regress to league average
league_avg_xg = 1.3
regression_factor = 0.3
projected_home_xg = xg_rate_home * (1 - regression_factor) + league_avg_xg * regression_factor
projected_away_xg = xg_rate_away * (1 - regression_factor) + league_avg_xg * regression_factor
# Expected goals remaining
remaining_home_xg = projected_home_xg * remaining_pct
remaining_away_xg = projected_away_xg * remaining_pct
# Simulate outcomes
np.random.seed(42) # For reproducibility in examples
home_remaining = np.random.poisson(remaining_home_xg, n_sims)
away_remaining = np.random.poisson(remaining_away_xg, n_sims)
home_final = home_goals + home_remaining
away_final = away_goals + away_remaining
# Calculate probabilities
home_win_prob = (home_final > away_final).mean() * 100
draw_prob = (home_final == away_final).mean() * 100
away_win_prob = (home_final < away_final).mean() * 100
return {
"home_win": round(home_win_prob, 1),
"draw": round(draw_prob, 1),
"away_win": round(away_win_prob, 1)
}
def track_win_probability(match_state: LiveMatchState) -> Dict[str, float]:
"""Get current win probability from match state."""
state = match_state.get_state()
return calculate_win_probability(
home_goals=state["score"]["home"],
away_goals=state["score"]["away"],
home_xg=state["xg"]["home"],
away_xg=state["xg"]["away"],
minute=state["minute"]
)
# Example
probs = calculate_win_probability(
home_goals=1, away_goals=0,
home_xg=1.2, away_xg=0.8,
minute=60
)
print(f"Win Probability at minute 60 (1-0):")
print(f"Home Win: {probs['home_win']}%")
print(f"Draw: {probs['draw']}%")
print(f"Away Win: {probs['away_win']}%")# R: Live Win Probability Model
library(tidyverse)
calculate_win_probability <- function(home_goals, away_goals, home_xg, away_xg,
minute, is_home = TRUE) {
# Remaining time factor
remaining_pct <- (90 - minute) / 90
# Expected goals remaining (based on current xG rate, regression to mean)
xg_rate_home <- home_xg / max(minute, 1) * 90
xg_rate_away <- away_xg / max(minute, 1) * 90
# Regress to league average rates
league_avg_xg <- 1.3 # Average xG per team per game
regression_factor <- 0.3 # How much to regress
projected_home_xg <- xg_rate_home * (1 - regression_factor) + league_avg_xg * regression_factor
projected_away_xg <- xg_rate_away * (1 - regression_factor) + league_avg_xg * regression_factor
# Expected goals remaining
remaining_home_xg <- projected_home_xg * remaining_pct
remaining_away_xg <- projected_away_xg * remaining_pct
# Simulate outcomes using Poisson
n_sims <- 10000
home_final <- home_goals + rpois(n_sims, remaining_home_xg)
away_final <- away_goals + rpois(n_sims, remaining_away_xg)
# Calculate probabilities
home_win_prob <- mean(home_final > away_final)
draw_prob <- mean(home_final == away_final)
away_win_prob <- mean(home_final < away_final)
return(list(
home_win = round(home_win_prob * 100, 1),
draw = round(draw_prob * 100, 1),
away_win = round(away_win_prob * 100, 1)
))
}
# Calculate live win probability throughout match
track_win_probability <- function(match_state) {
state <- match_state$get_state()
probs <- calculate_win_probability(
home_goals = state$score$home,
away_goals = state$score$away,
home_xg = state$xg$home,
away_xg = state$xg$away,
minute = state$minute
)
return(probs)
}
# Example: Track probability over a match
probs <- calculate_win_probability(
home_goals = 1, away_goals = 0,
home_xg = 1.2, away_xg = 0.8,
minute = 60
)
cat(sprintf("Win Probability at minute 60 (1-0):\n"))
cat(sprintf("Home Win: %.1f%%\n", probs$home_win))
cat(sprintf("Draw: %.1f%%\n", probs$draw))
cat(sprintf("Away Win: %.1f%%\n", probs$away_win))Win Probability at minute 60 (1-0):
Home Win: 71.2%
Draw: 19.8%
Away Win: 9.0%Momentum Detection
Momentum captures which team is currently dominating. Detecting momentum shifts early allows proactive tactical adjustments.
# Python: Momentum Detection System
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np
@dataclass
class MomentumTracker:
"""Tracks and analyzes match momentum."""
events: List[Dict] = field(default_factory=list)
momentum_history: List[Dict] = field(default_factory=list)
current_momentum: float = 0.0 # -1 to 1 scale
# Event weights for momentum calculation
EVENT_WEIGHTS = {
"Shot": 3.0,
"Key Pass": 2.0,
"Progressive Pass": 1.0,
"Pressure": 0.5,
"Ball Recovery": 1.0,
"Dribble": 1.0,
}
def add_events(self, new_events: List[Dict], home_team: str) -> float:
"""Add events and recalculate momentum."""
self.events.extend(new_events)
return self.calculate_momentum(home_team)
def calculate_momentum(self, home_team: str, window_minutes: int = 5) -> float:
"""Calculate current momentum based on recent events."""
if not self.events:
return 0.0
current_minute = max(e.get("minute", 0) for e in self.events)
cutoff = current_minute - window_minutes
recent = [e for e in self.events if e.get("minute", 0) >= cutoff]
if not recent:
return 0.0
home_score = 0.0
away_score = 0.0
for event in recent:
weight = self.EVENT_WEIGHTS.get(event.get("type"), 0.5)
# Bonus for final third actions
if event.get("location_x", 0) > 80:
weight *= 1.5
if event.get("team") == home_team:
home_score += weight
else:
away_score += weight
# Normalize to -1 to 1
total = home_score + away_score
if total > 0:
self.current_momentum = (home_score - away_score) / total
else:
self.current_momentum = 0.0
# Track history
self.momentum_history.append({
"minute": current_minute,
"momentum": self.current_momentum
})
return self.current_momentum
def detect_momentum_shift(self, threshold: float = 0.3) -> Optional[str]:
"""Detect significant momentum shifts."""
if len(self.momentum_history) < 2:
return None
recent = self.momentum_history[-2:]
change = recent[1]["momentum"] - recent[0]["momentum"]
if abs(change) > threshold:
direction = "HOME" if change > 0 else "AWAY"
return f"MOMENTUM SHIFT: {direction} gaining control ({change:+.2f})"
return None
def get_momentum_label(self) -> str:
"""Get human-readable momentum label."""
m = self.current_momentum
if m > 0.3:
return "HOME DOMINANT"
elif m < -0.3:
return "AWAY DOMINANT"
elif m > 0.1:
return "HOME SLIGHT EDGE"
elif m < -0.1:
return "AWAY SLIGHT EDGE"
else:
return "BALANCED"
# Usage
momentum_tracker = MomentumTracker()
print("Momentum tracking initialized")# R: Momentum Detection System
library(tidyverse)
MomentumTracker <- R6Class("MomentumTracker",
public = list(
events = NULL,
momentum_history = NULL,
current_momentum = 0, # -1 to 1 scale
initialize = function() {
self$events <- tibble()
self$momentum_history <- tibble()
},
add_events = function(new_events, home_team) {
self$events <- bind_rows(self$events, new_events)
self$calculate_momentum(home_team)
},
calculate_momentum = function(home_team, window = 5) {
if (nrow(self$events) == 0) return(0)
current_minute <- max(self$events$minute)
recent <- self$events %>%
filter(minute >= current_minute - window)
if (nrow(recent) == 0) return(0)
# Weight different event types
event_weights <- list(
Shot = 3,
`Key Pass` = 2,
`Progressive Pass` = 1,
Pressure = 0.5,
`Ball Recovery` = 1,
Dribble = 1
)
# Calculate momentum score
home_score <- 0
away_score <- 0
for (i in 1:nrow(recent)) {
event <- recent[i, ]
weight <- event_weights[[event$type]] %||% 0.5
# Bonus for final third actions
if (!is.na(event$location_x) && event$location_x > 80) {
weight <- weight * 1.5
}
if (event$team == home_team) {
home_score <- home_score + weight
} else {
away_score <- away_score + weight
}
}
# Normalize to -1 to 1 scale
total <- home_score + away_score
if (total > 0) {
self$current_momentum <- (home_score - away_score) / total
} else {
self$current_momentum <- 0
}
# Track history
self$momentum_history <- bind_rows(
self$momentum_history,
tibble(
minute = current_minute,
momentum = self$current_momentum
)
)
return(self$current_momentum)
},
detect_momentum_shift = function(threshold = 0.3) {
if (nrow(self$momentum_history) < 2) return(NULL)
recent <- tail(self$momentum_history, 2)
change <- recent$momentum[2] - recent$momentum[1]
if (abs(change) > threshold) {
direction <- ifelse(change > 0, "HOME", "AWAY")
return(sprintf("MOMENTUM SHIFT: %s gaining control (%.2f change)",
direction, change))
}
return(NULL)
},
get_momentum_label = function() {
m <- self$current_momentum
if (m > 0.3) return("HOME DOMINANT")
if (m < -0.3) return("AWAY DOMINANT")
if (m > 0.1) return("HOME SLIGHT EDGE")
if (m < -0.1) return("AWAY SLIGHT EDGE")
return("BALANCED")
}
)
)
# Usage
momentum <- MomentumTracker$new()
cat("Momentum tracking initialized\n")Building Live Dashboards
Live dashboards synthesize real-time metrics into actionable displays for coaching staff and analysts.
# Python: Live Match Dashboard with Streamlit
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import time
def create_live_dashboard():
"""Create a Streamlit live match dashboard."""
st.set_page_config(page_title="Live Match Analytics", layout="wide")
st.title("Live Match Analytics Dashboard")
# Initialize session state
if "match_state" not in st.session_state:
st.session_state.match_state = None
# Create placeholder for auto-refresh
placeholder = st.empty()
# Simulate live updates (in production, fetch from API)
while True:
with placeholder.container():
state = st.session_state.match_state
# Row 1: Key Metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
if state:
st.metric(
"Score",
f"{state['score']['home']} - {state['score']['away']}",
f"Minute {state['minute']}"
)
else:
st.metric("Score", "--", "Waiting...")
with col2:
if state:
st.metric(
"xG",
f"{state['xg']['home']:.2f} - {state['xg']['away']:.2f}"
)
else:
st.metric("xG", "--")
with col3:
if state:
probs = calculate_win_probability(
state["score"]["home"], state["score"]["away"],
state["xg"]["home"], state["xg"]["away"],
state["minute"]
)
st.metric("Home Win Prob", f"{probs['home_win']}%")
else:
st.metric("Home Win Prob", "--")
with col4:
momentum_label = state.get("momentum_label", "BALANCED") if state else "--"
st.metric("Momentum", momentum_label)
# Row 2: Charts
col_left, col_right = st.columns(2)
with col_left:
st.subheader("xG Flow")
if state and "xg_timeline" in state:
fig = create_xg_flow_chart(state["xg_timeline"])
st.plotly_chart(fig, use_container_width=True)
else:
st.info("Waiting for match data...")
with col_right:
st.subheader("Win Probability")
if state and "win_prob_history" in state:
fig = create_win_prob_chart(state["win_prob_history"])
st.plotly_chart(fig, use_container_width=True)
else:
st.info("Waiting for match data...")
# Row 3: Alerts and Stats
col_a, col_b, col_c = st.columns(3)
with col_a:
st.subheader("Live Alerts")
if state and state.get("recent_alerts"):
for alert in state["recent_alerts"]:
st.warning(alert)
else:
st.info("No recent alerts")
with col_b:
st.subheader("Rolling Stats (10 min)")
if state and state.get("metrics", {}).get("rolling"):
rolling = state["metrics"]["rolling"]
st.write(f"Home Possession: {rolling['home_possession']:.1f}%")
st.write(f"Home Shots: {rolling['home_shots_10min']}")
st.write(f"Away Shots: {rolling['away_shots_10min']}")
with col_c:
st.subheader("Recent Events")
if state and "recent_events" in state:
for event in state["recent_events"][-5:]:
st.text(f"{event['minute']}' - {event['type']} ({event['team']})")
# Refresh interval
time.sleep(1)
def create_xg_flow_chart(timeline_data):
"""Create xG flow chart."""
fig = go.Figure()
fig.add_trace(go.Scatter(
x=timeline_data["minute"],
y=timeline_data["home_xg"],
name="Home xG",
line=dict(color="#2E7D32", width=2)
))
fig.add_trace(go.Scatter(
x=timeline_data["minute"],
y=timeline_data["away_xg"],
name="Away xG",
line=dict(color="#D32F2F", width=2)
))
fig.update_layout(
xaxis_title="Minute",
yaxis_title="Cumulative xG",
height=300
)
return fig
# Run: streamlit run live_dashboard.py# R: Live Match Dashboard with Shiny
library(shiny)
library(shinydashboard)
library(plotly)
# Dashboard UI
ui <- dashboardPage(
dashboardHeader(title = "Live Match Analytics"),
dashboardSidebar(disable = TRUE),
dashboardBody(
# Row 1: Key Metrics
fluidRow(
valueBoxOutput("score_box", width = 3),
valueBoxOutput("xg_box", width = 3),
valueBoxOutput("momentum_box", width = 3),
valueBoxOutput("win_prob_box", width = 3)
),
# Row 2: Charts
fluidRow(
box(title = "xG Flow", status = "primary", width = 6,
plotlyOutput("xg_flow_chart")),
box(title = "Win Probability", status = "success", width = 6,
plotlyOutput("win_prob_chart"))
),
# Row 3: Alerts and Details
fluidRow(
box(title = "Live Alerts", status = "warning", width = 4,
uiOutput("alerts_list")),
box(title = "Rolling Stats (10 min)", status = "info", width = 4,
tableOutput("rolling_stats")),
box(title = "Recent Events", status = "primary", width = 4,
tableOutput("recent_events"))
)
)
)
# Dashboard Server
server <- function(input, output, session) {
# Reactive values to store match state
match_state <- reactiveVal(NULL)
# Auto-refresh every second
autoInvalidate <- reactiveTimer(1000)
# Update match state
observe({
autoInvalidate()
# In production: fetch latest state from API
# state <- fetch_match_state(match_id)
# match_state(state)
})
# Score box
output$score_box <- renderValueBox({
state <- match_state()
if (is.null(state)) {
valueBox("--", "Score", icon = icon("futbol"))
} else {
valueBox(
sprintf("%d - %d", state$score$home, state$score$away),
sprintf("Minute %d", state$minute),
icon = icon("futbol"),
color = "green"
)
}
})
# xG box
output$xg_box <- renderValueBox({
state <- match_state()
if (is.null(state)) {
valueBox("--", "xG", icon = icon("chart-line"))
} else {
valueBox(
sprintf("%.2f - %.2f", state$xg$home, state$xg$away),
"Expected Goals",
icon = icon("chart-line"),
color = "blue"
)
}
})
# Momentum box
output$momentum_box <- renderValueBox({
state <- match_state()
if (is.null(state)) {
valueBox("--", "Momentum")
} else {
momentum_label <- state$momentum_label %||% "BALANCED"
color <- if (grepl("HOME", momentum_label)) "green" else
if (grepl("AWAY", momentum_label)) "red" else "yellow"
valueBox(momentum_label, "Current Momentum", color = color)
}
})
}Live xG Calculation
Calculating xG in real-time requires efficient model inference with minimal latency. Pre-computed models and optimized feature extraction are essential.
# Python: Optimized Live xG Calculator
import numpy as np
import pickle
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class ShotEvent:
"""Represents a shot event for xG calculation."""
location_x: float
location_y: float
body_part: str = "foot"
play_pattern: str = "open play"
under_pressure: bool = False
first_touch: bool = False
class LiveXGCalculator:
"""Optimized xG calculator for live match use."""
def __init__(self, model_path: Optional[str] = None):
self.model = None
self.feature_means = None
self.feature_sds = None
if model_path:
self._load_model(model_path)
def _load_model(self, model_path: str):
"""Load pre-trained model and normalization params."""
import xgboost as xgb
self.model = xgb.Booster()
self.model.load_model(model_path)
with open(f"{model_path}_params.pkl", "rb") as f:
params = pickle.load(f)
self.feature_means = params["means"]
self.feature_sds = params["sds"]
def extract_features(self, shot: ShotEvent, match_context: Dict) -> np.ndarray:
"""Extract features from shot event."""
x, y = shot.location_x, shot.location_y
# Distance and angle to goal
goal_x, goal_y = 120, 40
distance = np.sqrt((goal_x - x)**2 + (goal_y - y)**2)
angle = np.degrees(np.arctan2(abs(y - goal_y), goal_x - x))
features = np.array([
distance,
angle,
float(shot.body_part == "foot"),
float(shot.body_part == "head"),
float(shot.play_pattern == "open play"),
float(shot.play_pattern == "counter"),
float(shot.play_pattern in ["corner", "free kick"]),
float(shot.under_pressure),
float(shot.first_touch),
float(match_context.get("home_trailing", False)),
match_context.get("minute", 45),
match_context.get("minute", 45) ** 2
])
return features
def normalize_features(self, features: np.ndarray) -> np.ndarray:
"""Normalize features using stored parameters."""
if self.feature_means is not None:
return (features - self.feature_means) / self.feature_sds
return features
def predict_xg(self, shot: ShotEvent, match_context: Dict) -> float:
"""Predict xG for a single shot."""
if self.model is None:
return self._simple_xg(shot)
features = self.extract_features(shot, match_context)
features_norm = self.normalize_features(features)
import xgboost as xgb
dmatrix = xgb.DMatrix(features_norm.reshape(1, -1))
xg = self.model.predict(dmatrix)[0]
return np.clip(xg, 0.01, 0.99)
def _simple_xg(self, shot: ShotEvent) -> float:
"""Simple fallback xG calculation."""
x, y = shot.location_x, shot.location_y
goal_x, goal_y = 120, 40
distance = np.sqrt((goal_x - x)**2 + (goal_y - y)**2)
base_xg = np.exp(-0.1 * distance)
if shot.body_part == "head":
base_xg *= 0.7
return round(float(np.clip(base_xg, 0.01, 0.99)), 3)
def batch_predict(self, shots: List[ShotEvent], match_context: Dict) -> List[float]:
"""Efficiently predict xG for multiple shots."""
if self.model is None:
return [self._simple_xg(s) for s in shots]
features = np.array([
self.extract_features(s, match_context) for s in shots
])
features_norm = np.apply_along_axis(self.normalize_features, 1, features)
import xgboost as xgb
dmatrix = xgb.DMatrix(features_norm)
xg_values = self.model.predict(dmatrix)
return [float(np.clip(xg, 0.01, 0.99)) for xg in xg_values]
# Example usage
calculator = LiveXGCalculator()
shot = ShotEvent(location_x=105, location_y=42, body_part="foot")
xg = calculator._simple_xg(shot)
print(f"Simple xG for shot at (105, 42): {xg:.3f}")# R: Optimized Live xG Calculator
library(R6)
library(xgboost)
LiveXGCalculator <- R6Class("LiveXGCalculator",
public = list(
model = NULL,
feature_means = NULL,
feature_sds = NULL,
initialize = function(model_path) {
# Load pre-trained model
self$model <- xgb.load(model_path)
# Load feature normalization parameters
self$feature_means <- readRDS(paste0(model_path, "_means.rds"))
self$feature_sds <- readRDS(paste0(model_path, "_sds.rds"))
},
extract_features = function(shot_event, match_context) {
# Core shot features
x <- shot_event$location_x
y <- shot_event$location_y
# Distance and angle to goal
goal_x <- 120
goal_y <- 40
distance <- sqrt((goal_x - x)^2 + (goal_y - y)^2)
angle <- atan2(abs(y - goal_y), goal_x - x) * 180 / pi
# Contextual features
features <- c(
distance = distance,
angle = angle,
body_part_foot = as.numeric(shot_event$body_part == "foot"),
body_part_head = as.numeric(shot_event$body_part == "head"),
shot_type_open = as.numeric(shot_event$play_pattern == "open play"),
shot_type_counter = as.numeric(shot_event$play_pattern == "counter"),
shot_type_set_piece = as.numeric(shot_event$play_pattern %in%
c("corner", "free kick")),
under_pressure = as.numeric(shot_event$under_pressure),
first_touch = as.numeric(shot_event$first_touch),
# Context from match state
home_trailing = as.numeric(match_context$home_trailing),
minute = match_context$minute,
minute_squared = match_context$minute^2
)
return(features)
},
normalize_features = function(features) {
(features - self$feature_means) / self$feature_sds
},
predict_xg = function(shot_event, match_context) {
# Extract and normalize features
features <- self$extract_features(shot_event, match_context)
features_norm <- self$normalize_features(features)
# Create xgb matrix
dmatrix <- xgb.DMatrix(matrix(features_norm, nrow = 1))
# Predict
xg <- predict(self$model, dmatrix)
# Clip to valid range
xg <- max(0.01, min(0.99, xg))
return(xg)
},
batch_predict = function(shots, match_context) {
# Efficient batch prediction for multiple shots
features_list <- lapply(shots, function(s) {
self$extract_features(s, match_context)
})
features_matrix <- do.call(rbind, features_list)
features_norm <- t(apply(features_matrix, 1, self$normalize_features))
dmatrix <- xgb.DMatrix(features_norm)
xg_values <- predict(self$model, dmatrix)
return(pmax(0.01, pmin(0.99, xg_values)))
}
)
)
# Simple fallback for when model not available
calculate_simple_xg <- function(x, y, body_part = "foot") {
# Distance-based approximation
goal_x <- 120
goal_center_y <- 40
distance <- sqrt((goal_x - x)^2 + (goal_center_y - y)^2)
# Base xG from distance
base_xg <- exp(-0.1 * distance)
# Adjust for body part
if (body_part == "head") {
base_xg <- base_xg * 0.7
}
return(round(base_xg, 3))
}
# Example usage
xg <- calculate_simple_xg(x = 105, y = 42, body_part = "foot")
cat(sprintf("Simple xG for shot at (105, 42): %.3f\n", xg))Simple xG for shot at (105, 42): 0.223Reliability and Error Handling
Real-time systems must handle errors gracefully. Missing data, network issues, and out-of-order events are common challenges.
- Missing or delayed events
- Events arriving out of order
- Network disconnections
- Invalid or corrupted data
- Model inference failures
- Event buffering and reordering
- Fallback calculations
- Heartbeat monitoring
- Data validation pipelines
- Graceful degradation
# Python: Robust Event Processing
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
@dataclass
class RobustEventProcessor:
"""Handles event validation, buffering, and error recovery."""
buffer_window: float = 5.0 # seconds
heartbeat_timeout: float = 30.0 # seconds
event_buffer: List[Dict] = field(default_factory=list)
last_heartbeat: datetime = field(default_factory=datetime.now)
VALID_EVENT_TYPES = {
"Shot", "Pass", "Pressure", "Foul", "Dribble",
"Ball Recovery", "Clearance", "Goal Keeper"
}
REQUIRED_FIELDS = {"type", "minute", "team"}
def process_event(self, event: Dict) -> List[Dict]:
"""Process incoming event with validation and buffering."""
# Validate
valid, reason = self.validate_event(event)
if not valid:
logger.warning(f"Invalid event: {reason}")
return []
# Buffer for reordering
self.add_to_buffer(event)
# Return ready events
return self.get_ready_events()
def validate_event(self, event: Dict) -> Tuple[bool, Optional[str]]:
"""Validate event structure and values."""
# Required fields
missing = self.REQUIRED_FIELDS - set(event.keys())
if missing:
return False, f"Missing fields: {missing}"
# Type validation
if event.get("type") not in self.VALID_EVENT_TYPES:
return False, f"Invalid event type: {event.get('type')}"
# Range validation
minute = event.get("minute", -1)
if minute < 0 or minute > 130:
return False, f"Invalid minute: {minute}"
return True, None
def add_to_buffer(self, event: Dict):
"""Add event to buffer and sort by game time."""
event["received_at"] = datetime.now()
self.event_buffer.append(event)
# Sort by game time
self.event_buffer.sort(
key=lambda e: e.get("minute", 0) * 60 + e.get("second", 0)
)
def get_ready_events(self) -> List[Dict]:
"""Return events that have waited long enough in buffer."""
if not self.event_buffer:
return []
current_time = datetime.now()
ready = []
remaining = []
for event in self.event_buffer:
elapsed = (current_time - event["received_at"]).total_seconds()
if elapsed >= self.buffer_window:
ready.append(event)
else:
remaining.append(event)
self.event_buffer = remaining
return ready
def check_heartbeat(self) -> bool:
"""Check if data feed is still active."""
elapsed = (datetime.now() - self.last_heartbeat).total_seconds()
if elapsed > self.heartbeat_timeout:
logger.warning("Data feed heartbeat timeout - connection may be lost")
return False
return True
def receive_heartbeat(self):
"""Update last heartbeat time."""
self.last_heartbeat = datetime.now()
processor = RobustEventProcessor()
print("Robust event processor initialized")# R: Robust Event Processing
library(R6)
RobustEventProcessor <- R6Class("RobustEventProcessor",
public = list(
event_buffer = NULL,
buffer_window = 5, # seconds
last_heartbeat = NULL,
heartbeat_timeout = 30, # seconds
initialize = function() {
self$event_buffer <- list()
self$last_heartbeat <- Sys.time()
},
process_event = function(event) {
# Validate event
validation <- self$validate_event(event)
if (!validation$valid) {
warning(sprintf("Invalid event: %s", validation$reason))
return(NULL)
}
# Buffer for reordering
self$add_to_buffer(event)
# Process ready events
ready_events <- self$get_ready_events()
return(ready_events)
},
validate_event = function(event) {
# Required fields
required <- c("type", "minute", "team")
missing <- setdiff(required, names(event))
if (length(missing) > 0) {
return(list(
valid = FALSE,
reason = paste("Missing fields:", paste(missing, collapse = ", "))
))
}
# Type validation
valid_types <- c("Shot", "Pass", "Pressure", "Foul", "Dribble",
"Ball Recovery", "Clearance", "Goal Keeper")
if (!event$type %in% valid_types) {
return(list(
valid = FALSE,
reason = sprintf("Invalid event type: %s", event$type)
))
}
# Range validation
if (event$minute < 0 || event$minute > 130) {
return(list(
valid = FALSE,
reason = sprintf("Invalid minute: %d", event$minute)
))
}
return(list(valid = TRUE, reason = NULL))
},
add_to_buffer = function(event) {
event$received_at <- as.numeric(Sys.time())
self$event_buffer <- c(self$event_buffer, list(event))
# Sort by game time
game_times <- sapply(self$event_buffer, function(e) {
e$minute * 60 + (e$second %||% 0)
})
self$event_buffer <- self$event_buffer[order(game_times)]
},
get_ready_events = function() {
if (length(self$event_buffer) == 0) {
return(list())
}
current_time <- as.numeric(Sys.time())
ready <- list()
remaining <- list()
for (event in self$event_buffer) {
# Event is ready if its been in buffer long enough
if (current_time - event$received_at >= self$buffer_window) {
ready <- c(ready, list(event))
} else {
remaining <- c(remaining, list(event))
}
}
self$event_buffer <- remaining
return(ready)
},
check_heartbeat = function() {
elapsed <- as.numeric(Sys.time() - self$last_heartbeat)
if (elapsed > self$heartbeat_timeout) {
warning("Data feed heartbeat timeout - connection may be lost")
return(FALSE)
}
return(TRUE)
},
receive_heartbeat = function() {
self$last_heartbeat <- Sys.time()
}
)
)
processor <- RobustEventProcessor$new()
cat("Robust event processor initialized\n")Robust event processor initializedLive Substitution Analysis
Real-time analytics can inform substitution decisions by tracking player fatigue, performance degradation, and tactical needs.
# Python: Substitution Recommendation System
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import pandas as pd
@dataclass
class SubstitutionAnalyzer:
"""Analyzes player fatigue and recommends substitutions."""
fatigue_thresholds: Dict[str, float] = field(default_factory=lambda: {
"high_intensity_runs": 0.7,
"sprint_count": 0.6,
"pass_completion_drop": 10
})
player_metrics: pd.DataFrame = field(default_factory=pd.DataFrame)
def update_player_metrics(self, player_tracking: pd.DataFrame, minute: int) -> pd.DataFrame:
"""Update fatigue metrics for all players."""
df = player_tracking.copy()
df["current_minute"] = minute
# Calculate current rates (per 90)
df["hir_rate"] = df["high_intensity_runs"] / max(minute, 1) * 90
df["sprint_rate"] = df["sprints"] / max(minute, 1) * 90
df["pass_completion_current"] = (
df["passes_completed"] / df["passes_attempted"].clip(lower=1) * 100
)
# For simplicity, use first values as baseline
# In production, would track baseline from first 30 minutes
df["baseline_hir"] = df["hir_rate"].mean()
df["baseline_sprints"] = df["sprint_rate"].mean()
df["baseline_pass_pct"] = df["pass_completion_current"].mean()
# Calculate retention
df["hir_retention"] = df["hir_rate"] / df["baseline_hir"].clip(lower=0.1)
df["sprint_retention"] = df["sprint_rate"] / df["baseline_sprints"].clip(lower=0.1)
df["pass_pct_change"] = df["pass_completion_current"] - df["baseline_pass_pct"]
self.player_metrics = df
return df
def get_fatigue_alerts(self) -> pd.DataFrame:
"""Identify players showing fatigue."""
if self.player_metrics.empty:
return pd.DataFrame()
df = self.player_metrics.copy()
df["hir_fatigued"] = df["hir_retention"] < self.fatigue_thresholds["high_intensity_runs"]
df["sprint_fatigued"] = df["sprint_retention"] < self.fatigue_thresholds["sprint_count"]
df["passing_degraded"] = df["pass_pct_change"] < -self.fatigue_thresholds["pass_completion_drop"]
df["fatigue_score"] = (
df["hir_fatigued"].astype(int) +
df["sprint_fatigued"].astype(int) +
df["passing_degraded"].astype(int)
)
fatigued = df[df["fatigue_score"] >= 2].sort_values(
"fatigue_score", ascending=False
)
return fatigued[["player_id", "player_name", "position", "fatigue_score",
"hir_retention", "sprint_retention", "pass_pct_change"]]
def recommend_substitution(self, game_state: Dict, bench_players: pd.DataFrame) -> Dict:
"""Generate substitution recommendation."""
fatigued = self.get_fatigue_alerts()
if fatigued.empty:
return {
"recommendation": "No urgent substitutions needed",
"priority": "low"
}
# Get most fatigued player
target = fatigued.iloc[0]
# Find replacement
suitable = bench_players[bench_players["position"] == target["position"]]
if not suitable.empty:
replacement = suitable.sort_values("minutes_rest", ascending=False).iloc[0]
replacement_name = replacement["player_name"]
else:
replacement_name = "available substitute"
# Determine urgency
score_diff = game_state.get("score_diff", 0)
minute = game_state.get("minute", 0)
if score_diff < 0 and minute > 70:
urgency = "high"
elif target["fatigue_score"] >= 3:
urgency = "high"
elif minute > 75:
urgency = "medium"
else:
urgency = "low"
return {
"recommendation": f"Replace {target['player_name']} with {replacement_name} (fatigue score: {target['fatigue_score']})",
"target_player": target["player_name"],
"replacement": replacement_name,
"priority": urgency,
"reasoning": {
"hir_retention": target["hir_retention"],
"sprint_retention": target["sprint_retention"],
"pass_degradation": target["pass_pct_change"]
}
}
analyzer = SubstitutionAnalyzer()
print("Substitution analyzer initialized")# R: Substitution Recommendation System
library(R6)
library(tidyverse)
SubstitutionAnalyzer <- R6Class("SubstitutionAnalyzer",
public = list(
player_metrics = NULL,
fatigue_thresholds = list(
high_intensity_runs = 0.7, # % of average
sprint_count = 0.6,
pass_completion_drop = 10 # percentage points
),
initialize = function() {
self$player_metrics <- tibble()
},
update_player_metrics = function(player_tracking, minute) {
# Calculate fatigue indicators for each player
metrics <- player_tracking %>%
mutate(
current_minute = minute,
# Compare current rate to first 30 min baseline
hir_rate = high_intensity_runs / max(minute, 1) * 90,
sprint_rate = sprints / max(minute, 1) * 90,
pass_completion_current = passes_completed / pmax(passes_attempted, 1) * 100
) %>%
group_by(player_id) %>%
mutate(
# Compare to baseline (first 30 min)
baseline_hir = first(hir_rate),
baseline_sprints = first(sprint_rate),
baseline_pass_pct = first(pass_completion_current),
hir_retention = hir_rate / baseline_hir,
sprint_retention = sprint_rate / baseline_sprints,
pass_pct_change = pass_completion_current - baseline_pass_pct
) %>%
ungroup()
self$player_metrics <- metrics
return(metrics)
},
get_fatigue_alerts = function() {
if (nrow(self$player_metrics) == 0) return(tibble())
fatigue_alerts <- self$player_metrics %>%
mutate(
hir_fatigued = hir_retention < self$fatigue_thresholds$high_intensity_runs,
sprint_fatigued = sprint_retention < self$fatigue_thresholds$sprint_count,
passing_degraded = pass_pct_change < -self$fatigue_thresholds$pass_completion_drop,
fatigue_score = as.numeric(hir_fatigued) +
as.numeric(sprint_fatigued) +
as.numeric(passing_degraded)
) %>%
filter(fatigue_score >= 2) %>%
arrange(desc(fatigue_score)) %>%
select(player_id, player_name, position, fatigue_score,
hir_retention, sprint_retention, pass_pct_change)
return(fatigue_alerts)
},
recommend_substitution = function(game_state, bench_players) {
fatigued <- self$get_fatigue_alerts()
if (nrow(fatigued) == 0) {
return(list(
recommendation = "No urgent substitutions needed",
priority = "low"
))
}
# Get most fatigued player
target <- fatigued[1, ]
# Find suitable replacement from bench
replacement <- bench_players %>%
filter(position == target$position) %>%
arrange(desc(minutes_rest)) %>%
slice(1)
# Consider game state
urgency <- case_when(
game_state$score_diff < 0 && game_state$minute > 70 ~ "high",
target$fatigue_score >= 3 ~ "high",
game_state$minute > 75 ~ "medium",
TRUE ~ "low"
)
return(list(
recommendation = sprintf(
"Replace %s with %s (fatigue score: %d)",
target$player_name,
replacement$player_name[1] %||% "available substitute",
target$fatigue_score
),
target_player = target$player_name,
replacement = replacement$player_name[1],
priority = urgency,
reasoning = list(
hir_retention = target$hir_retention,
sprint_retention = target$sprint_retention,
pass_degradation = target$pass_pct_change
)
))
}
)
)
sub_analyzer <- SubstitutionAnalyzer$new()
cat("Substitution analyzer initialized\n")Substitution analyzer initializedCase Study: Full Match Real-Time Analysis
Let's walk through a complete real-time analysis of a simulated match, demonstrating all the components working together.
# Python: Complete Real-Time Match Analysis
import pandas as pd
import numpy as np
from typing import Dict, List
async def run_complete_match_analysis(events_df: pd.DataFrame,
home_team: str,
away_team: str) -> Dict:
"""Run complete real-time analysis on a match."""
# Initialize components
match_state = LiveMatchState(
match_id="match_001",
home_team=home_team,
away_team=away_team
)
momentum_tracker = MomentumTracker()
alert_system = TacticalAlertSystem()
event_processor = RobustEventProcessor()
# Results tracking
timeline = []
all_alerts = []
# Process events chronologically
events_sorted = events_df.sort_values(["minute", "second"])
for _, event in events_sorted.iterrows():
event_dict = event.to_dict()
# Process through robust handler
validated = event_processor.process_event(event_dict)
if validated:
for valid_event in validated:
# Update match state
match_state.add_event(valid_event)
# Update momentum
momentum_tracker.add_events([valid_event], home_team)
# Get current state
state = match_state.get_state()
state["momentum_label"] = momentum_tracker.get_momentum_label()
# Check for momentum shift
shift = momentum_tracker.detect_momentum_shift()
if shift:
state["momentum_change"] = 0.5
# Check alerts
alerts = alert_system.check_alerts(valid_event, state)
all_alerts.extend(alerts)
# Calculate win probability
win_prob = calculate_win_probability(
state["score"]["home"], state["score"]["away"],
state["xg"]["home"], state["xg"]["away"],
state["minute"]
)
# Record timeline
timeline.append({
"minute": state["minute"],
"home_score": state["score"]["home"],
"away_score": state["score"]["away"],
"home_xg": state["xg"]["home"],
"away_xg": state["xg"]["away"],
"momentum": momentum_tracker.current_momentum,
"home_win_prob": win_prob["home_win"],
"n_alerts": len(alerts)
})
# Summary
final_state = match_state.get_state()
print("\n=== MATCH SUMMARY ===")
print(f"Final Score: {final_state['score']['home']} - {final_state['score']['away']}")
print(f"Final xG: {final_state['xg']['home']:.2f} - {final_state['xg']['away']:.2f}")
print(f"Total Alerts Generated: {len(all_alerts)}")
if all_alerts:
print("\nKey Alerts:")
high_priority = [a for a in all_alerts if a["priority"] == "high"]
for alert in high_priority[:5]:
print(f" [{alert['minute']}'] {alert['message']}")
return {
"timeline": pd.DataFrame(timeline),
"alerts": all_alerts,
"final_state": final_state
}
print("Complete match analysis system ready")# R: Complete Real-Time Match Analysis
library(tidyverse)
run_complete_match_analysis <- function(events_df, home_team, away_team) {
# Initialize all components
match_state <- LiveMatchState$new("match_001", home_team, away_team)
momentum_tracker <- MomentumTracker$new()
alert_system <- TacticalAlertSystem$new()
event_processor <- RobustEventProcessor$new()
# Results tracking
results <- tibble()
all_alerts <- list()
# Process events chronologically
events_sorted <- events_df %>%
arrange(minute, second)
for (i in 1:nrow(events_sorted)) {
event <- events_sorted[i, ]
# Process through robust handler
validated <- event_processor$process_event(as.list(event))
if (length(validated) > 0) {
# Update match state
match_state$add_event(event)
# Update momentum
momentum_tracker$add_events(event, home_team)
# Get current state
state <- match_state$get_state()
state$momentum_label <- momentum_tracker$get_momentum_label()
# Check for momentum shift
shift <- momentum_tracker$detect_momentum_shift()
if (!is.null(shift)) {
state$momentum_change <- 0.5 # Simplified
}
# Check alerts
alerts <- alert_system$check_alerts(event, state)
if (length(alerts) > 0) {
all_alerts <- c(all_alerts, alerts)
}
# Calculate win probability
win_prob <- calculate_win_probability(
state$score$home, state$score$away,
state$xg$home, state$xg$away,
state$minute
)
# Record state
results <- bind_rows(results, tibble(
minute = state$minute,
home_score = state$score$home,
away_score = state$score$away,
home_xg = state$xg$home,
away_xg = state$xg$away,
momentum = momentum_tracker$current_momentum,
home_win_prob = win_prob$home_win,
n_alerts = length(alerts)
))
}
}
# Summary
final_state <- match_state$get_state()
cat(sprintf("\n=== MATCH SUMMARY ===\n"))
cat(sprintf("Final Score: %d - %d\n", final_state$score$home, final_state$score$away))
cat(sprintf("Final xG: %.2f - %.2f\n", final_state$xg$home, final_state$xg$away))
cat(sprintf("Total Alerts Generated: %d\n", length(all_alerts)))
if (length(all_alerts) > 0) {
cat("\nKey Alerts:\n")
high_priority <- Filter(function(a) a$priority == "high", all_alerts)
for (alert in head(high_priority, 5)) {
cat(sprintf(" [%d'] %s\n", alert$minute, alert$message))
}
}
return(list(
timeline = results,
alerts = all_alerts,
final_state = final_state
))
}
# Example usage (would use real events_df)
cat("Complete match analysis system ready\n")Complete match analysis system readyAutomated Tactical Alerts
Smart alert systems filter the noise and highlight only the most important tactical insights during a match.
# Python: Tactical Alert System
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Any
import pandas as pd
@dataclass
class AlertRule:
"""Defines a tactical alert rule."""
id: str
condition: Callable[[Dict, Dict], bool]
message: Callable[[Dict, Dict], str]
priority: str = "medium"
class TacticalAlertSystem:
"""System for generating tactical alerts during live matches."""
def __init__(self, cooldown_minutes: int = 5):
self.alert_rules: Dict[str, AlertRule] = {}
self.alert_history: List[Dict] = []
self.cooldown_minutes = cooldown_minutes
self._define_default_rules()
def _define_default_rules(self):
"""Define default tactical alert rules."""
# High xG chance
self.add_rule(AlertRule(
id="high_xg_chance",
condition=lambda e, s: e.get("type") == "Shot" and e.get("xg", 0) > 0.25,
message=lambda e, s: f"BIG CHANCE: {e.get('xg', 0)*100:.0f}% xG shot by {e.get('player')}",
priority="high"
))
# Momentum swing
self.add_rule(AlertRule(
id="momentum_swing",
condition=lambda e, s: abs(s.get("momentum_change", 0)) > 0.4,
message=lambda e, s: f"MOMENTUM SWING: {'HOME' if s.get('momentum_change', 0) > 0 else 'AWAY'} taking control",
priority="high"
))
# Pressing intensity drop
self.add_rule(AlertRule(
id="pressing_drop",
condition=lambda e, s: (
s.get("metrics", {}).get("rolling", {}).get("home_pressure", 10) < 5 and
s.get("minute", 0) > 60
),
message=lambda e, s: "WARNING: Pressing intensity dropped - consider energy management",
priority="medium"
))
# xG underperformance
self.add_rule(AlertRule(
id="xg_underperform",
condition=lambda e, s: (
s.get("minute", 0) >= 60 and
(s.get("xg", {}).get("home", 0) - s.get("score", {}).get("home", 0)) > 1.0
),
message=lambda e, s: f"xG GAP: Creating chances ({s.get('xg', {}).get('home', 0):.1f} xG) but only {s.get('score', {}).get('home', 0)} goals",
priority="medium"
))
# Dangerous free kick
self.add_rule(AlertRule(
id="set_piece_zone",
condition=lambda e, s: (
e.get("type") == "Foul" and
e.get("location_x", 0) > 85 and
20 < e.get("location_y", 0) < 60
),
message=lambda e, s: "DANGEROUS FREE KICK: Good scoring position",
priority="high"
))
def add_rule(self, rule: AlertRule):
"""Add a new alert rule."""
self.alert_rules[rule.id] = rule
def check_alerts(self, event: Dict, state: Dict) -> List[Dict]:
"""Check all rules and return triggered alerts."""
alerts = []
current_minute = state.get("minute", 0)
for rule_id, rule in self.alert_rules.items():
# Check cooldown
recent_same = [
a for a in self.alert_history
if a["rule_id"] == rule_id and
a["minute"] >= current_minute - self.cooldown_minutes
]
if recent_same:
continue
# Check condition
try:
if rule.condition(event, state):
alert = {
"rule_id": rule_id,
"message": rule.message(event, state),
"priority": rule.priority,
"minute": current_minute
}
alerts.append(alert)
self.alert_history.append({
"rule_id": rule_id,
"minute": current_minute
})
except Exception:
pass # Skip failed rules
return alerts
alert_system = TacticalAlertSystem()
print(f"Tactical alert system initialized with {len(alert_system.alert_rules)} rules")# R: Tactical Alert System
library(R6)
TacticalAlertSystem <- R6Class("TacticalAlertSystem",
public = list(
alert_rules = NULL,
alert_history = NULL,
cooldown_minutes = 5, # Prevent duplicate alerts
initialize = function() {
self$alert_rules <- list()
self$alert_history <- tibble()
self$define_default_rules()
},
define_default_rules = function() {
# High xG chance
self$add_rule(
id = "high_xg_chance",
condition = function(event, state) {
event$type == "Shot" && event$xg > 0.25
},
message = function(event, state) {
sprintf("BIG CHANCE: %.0f%% xG shot by %s", event$xg * 100, event$player)
},
priority = "high"
)
# Momentum swing
self$add_rule(
id = "momentum_swing",
condition = function(event, state) {
!is.null(state$momentum_change) && abs(state$momentum_change) > 0.4
},
message = function(event, state) {
direction <- ifelse(state$momentum_change > 0, "HOME", "AWAY")
sprintf("MOMENTUM SWING: %s taking control", direction)
},
priority = "high"
)
# Pressing intensity drop
self$add_rule(
id = "pressing_drop",
condition = function(event, state) {
!is.null(state$metrics$rolling) &&
state$metrics$rolling$home_pressure < 5 &&
state$minute > 60
},
message = function(event, state) {
"WARNING: Pressing intensity dropped - consider energy management"
},
priority = "medium"
)
# xG underperformance
self$add_rule(
id = "xg_underperform",
condition = function(event, state) {
state$minute >= 60 &&
(state$xg$home - state$score$home) > 1.0
},
message = function(event, state) {
sprintf("xG GAP: Creating chances (%.1f xG) but only %d goals",
state$xg$home, state$score$home)
},
priority = "medium"
)
# Set piece opportunity
self$add_rule(
id = "set_piece_zone",
condition = function(event, state) {
event$type == "Foul" &&
event$location_x > 85 &&
event$location_y > 20 && event$location_y < 60
},
message = function(event, state) {
"DANGEROUS FREE KICK: Good scoring position"
},
priority = "high"
)
},
add_rule = function(id, condition, message, priority = "medium") {
self$alert_rules[[id]] <- list(
condition = condition,
message = message,
priority = priority
)
},
check_alerts = function(event, state) {
alerts <- list()
for (rule_id in names(self$alert_rules)) {
rule <- self$alert_rules[[rule_id]]
# Check cooldown
recent_same <- self$alert_history %>%
filter(rule_id == !!rule_id,
minute >= state$minute - self$cooldown_minutes)
if (nrow(recent_same) > 0) next
# Check condition
if (rule$condition(event, state)) {
alert <- list(
rule_id = rule_id,
message = rule$message(event, state),
priority = rule$priority,
minute = state$minute
)
alerts <- c(alerts, list(alert))
# Log to history
self$alert_history <- bind_rows(
self$alert_history,
tibble(rule_id = rule_id, minute = state$minute)
)
}
}
return(alerts)
}
)
)
alert_system <- TacticalAlertSystem$new()
cat("Tactical alert system initialized with", length(alert_system$alert_rules), "rules\n")Tactical alert system initialized with 5 rulesPractice Exercises
Task: Create a custom alert rule that triggers when a team has 3+ shots without scoring while trailing.
Requirements:
- Track shots for each team in a rolling window
- Check if trailing and generating chances
- Suggest tactical adjustment in the alert message
Task: Create an animated xG timeline that updates in real-time during a match.
Requirements:
- Show cumulative xG for both teams over time
- Mark goal events with icons
- Add shaded regions for momentum periods
- Update smoothly as new events arrive
Task: Build a system that suggests optimal substitution timing and candidates.
Requirements:
- Track player fatigue metrics (distance, sprints)
- Monitor performance degradation over time
- Consider game state (winning/losing/drawing)
- Suggest substitution with reasoning
Chapter Summary
Key Takeaways
- Real-time requires different architecture: Streaming pipelines, state management, and low-latency updates
- Win probability: Combine current score, xG, and remaining time to estimate match outcomes
- Momentum detection: Weight recent events to identify which team is dominating
- Live dashboards: Synthesize metrics into actionable displays for coaching staff
- Smart alerts: Filter noise and surface only the most important tactical insights
Real-Time System Components
- Data Ingestion: Receive and validate streaming events
- State Management: Maintain current match state
- Metric Calculation: Rolling windows and live aggregations
- Alert Engine: Rule-based trigger system with cooldowns
- Visualization: Real-time dashboards and charts
- Delivery: Push updates to coaching staff