Capstone - Complete Analytics System
Building Analytics Pipelines
Moving from ad-hoc analysis to production-ready analytics requires robust data pipelines. This chapter covers the infrastructure needed to collect, process, store, and serve football analytics at scale - turning one-off scripts into reliable, automated systems.
Learning Objectives
- Design ETL pipelines for football data
- Build data warehouses optimized for analytics queries
- Implement automated data collection and processing
- Deploy models and dashboards to production
- Monitor and maintain analytics systems
- Apply best practices for reproducible analytics
Pipeline Architecture
A football analytics pipeline typically consists of several stages: data collection, transformation, storage, analysis, and serving. Each stage must be reliable, scalable, and maintainable.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Sources │───▶│ Extract │───▶│ Transform │───▶│ Load │───▶│ Serve │
│ │ │ │ │ │ │ │ │ │
│ - APIs │ │ - Scraping │ │ - Clean │ │ - Database │ │ - Dashboard │
│ - Feeds │ │ - API calls │ │ - Enrich │ │ - Data Lake │ │ - API │
│ - Files │ │ - Webhooks │ │ - Aggregate │ │ - Warehouse │ │ - Reports │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
# Python: Pipeline architecture with Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
# Cache expensive operations
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def extract_matches(season: int) -> pd.DataFrame:
"""Extract match data from API."""
from statsbombpy import sb
competitions = sb.competitions()
matches = sb.matches(competition_id=11, season_id=season)
return matches
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def extract_events(match_id: int) -> pd.DataFrame:
"""Extract event data for a match."""
from statsbombpy import sb
return sb.events(match_id=match_id)
@task
def transform_events(events: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform event data."""
# Remove incomplete records
events = events.dropna(subset=["player", "team"])
# Standardize coordinates
events["x"] = events["location"].apply(lambda x: x[0] if x else None)
events["y"] = events["location"].apply(lambda x: x[1] if x else None)
# Add derived features
events["is_successful"] = events["outcome"].apply(
lambda x: x.get("name") == "Complete" if x else None
)
return events
@task
def calculate_player_metrics(events: pd.DataFrame) -> pd.DataFrame:
"""Calculate player-level metrics."""
player_stats = events.groupby(["player", "team"]).agg(
passes=("type", lambda x: (x == "Pass").sum()),
successful_passes=("is_successful", lambda x: x.sum()),
shots=("type", lambda x: (x == "Shot").sum()),
touches=("type", "count"),
minutes_played=("minute", lambda x: x.max() - x.min())
).reset_index()
player_stats["pass_completion"] = (
player_stats["successful_passes"] / player_stats["passes"]
)
return player_stats
@task
def load_to_database(df: pd.DataFrame, table_name: str):
"""Load data to PostgreSQL database."""
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/football")
df.to_sql(table_name, engine, if_exists="replace", index=False)
return f"Loaded {len(df)} rows to {table_name}"
@flow(name="Football Analytics Pipeline")
def analytics_pipeline(season: int = 2023):
"""Main analytics pipeline flow."""
# Extract
matches = extract_matches(season)
print(f"Extracted {len(matches)} matches")
# Process each match
all_events = []
for match_id in matches["match_id"][:10]: # Limit for demo
events = extract_events(match_id)
all_events.append(events)
combined_events = pd.concat(all_events, ignore_index=True)
# Transform
clean_events = transform_events(combined_events)
player_metrics = calculate_player_metrics(clean_events)
# Load
load_to_database(player_metrics, "player_metrics")
return player_metrics
# Run pipeline
if __name__ == "__main__":
result = analytics_pipeline(season=2023)
print(f"Pipeline complete: {len(result)} player records")# R: Pipeline architecture with targets
library(targets)
library(tarchetypes)
# Define pipeline in _targets.R
# tar_option_set(packages = c("tidyverse", "StatsBombR"))
# Pipeline targets
list(
# Extract: Get raw data
tar_target(
raw_matches,
get_matches_from_api(season = 2023)
),
tar_target(
raw_events,
get_events_from_api(raw_matches$match_id),
pattern = map(raw_matches)
),
# Transform: Clean and enrich
tar_target(
clean_events,
clean_event_data(raw_events)
),
tar_target(
player_stats,
calculate_player_stats(clean_events)
),
tar_target(
team_stats,
calculate_team_stats(clean_events)
),
# Load: Save to database
tar_target(
db_upload,
upload_to_database(player_stats, team_stats),
cue = tar_cue(mode = "always")
),
# Serve: Generate outputs
tar_target(
weekly_report,
generate_report(player_stats, team_stats),
format = "file"
)
)
# Run pipeline
# tar_make()
# Visualize dependencies
# tar_visnetwork()ETL Best Practices
Extract, Transform, Load (ETL) is the backbone of any analytics system. Proper ETL design ensures data quality, enables incremental updates, and handles failures gracefully.
# Python: Robust ETL patterns
import pandas as pd
import logging
from datetime import datetime
from typing import Optional, Dict, List
import json
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FootballETL:
"""
Robust ETL pipeline for football data.
"""
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def get_checkpoint(self, source_name: str) -> Optional[Dict]:
"""Load checkpoint for incremental extraction."""
checkpoint_file = self.checkpoint_dir / f"{source_name}.json"
if checkpoint_file.exists():
with open(checkpoint_file) as f:
return json.load(f)
return None
def save_checkpoint(self, source_name: str, checkpoint_data: Dict):
"""Save checkpoint after successful extraction."""
checkpoint_file = self.checkpoint_dir / f"{source_name}.json"
checkpoint_data["saved_at"] = datetime.now().isoformat()
with open(checkpoint_file, "w") as f:
json.dump(checkpoint_data, f)
def extract_with_retry(self, extract_fn, max_retries: int = 3,
**kwargs) -> pd.DataFrame:
"""Extract data with retry logic."""
import time
for attempt in range(max_retries):
try:
data = extract_fn(**kwargs)
logger.info(f"Extraction successful: {len(data)} records")
return data
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
def validate(self, df: pd.DataFrame, schema: Dict) -> Dict:
"""Validate data against schema."""
results = {}
# Check required columns
results["required_columns"] = all(
col in df.columns for col in schema.get("required", [])
)
# Check data types
type_checks = []
for col, expected_type in schema.get("types", {}).items():
if col in df.columns:
type_checks.append(df[col].dtype == expected_type)
results["correct_types"] = all(type_checks)
# Check value ranges
for col, (min_val, max_val) in schema.get("ranges", {}).items():
if col in df.columns:
results[f"{col}_in_range"] = (
df[col].between(min_val, max_val).all()
)
# Check for nulls in non-nullable columns
for col in schema.get("non_nullable", []):
if col in df.columns:
results[f"{col}_no_nulls"] = not df[col].isna().any()
results["all_valid"] = all(results.values())
return results
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply standard transformations."""
# Make a copy
df = df.copy()
# Standardize column names
df.columns = df.columns.str.lower().str.replace(" ", "_")
# Handle missing values
numeric_cols = df.select_dtypes(include=["number"]).columns
df[numeric_cols] = df[numeric_cols].fillna(0)
# Add metadata
df["_processed_at"] = datetime.now()
df["_source"] = "statsbomb"
return df
def load_incremental(self, df: pd.DataFrame, table_name: str,
key_columns: List[str], engine):
"""Load data with upsert logic."""
from sqlalchemy.dialects.postgresql import insert
# Convert to records
records = df.to_dict(orient="records")
# Build upsert statement
table = engine.execute(
f"SELECT * FROM {table_name} LIMIT 0"
).keys()
stmt = insert(table).values(records)
# On conflict, update
update_dict = {c: stmt.excluded[c] for c in df.columns
if c not in key_columns}
upsert_stmt = stmt.on_conflict_do_update(
index_elements=key_columns,
set_=update_dict
)
engine.execute(upsert_stmt)
logger.info(f"Upserted {len(records)} records to {table_name}")
# Example usage
etl = FootballETL()
# Define schema for validation
events_schema = {
"required": ["match_id", "type", "minute", "player"],
"types": {"minute": "int64", "match_id": "int64"},
"ranges": {"minute": (0, 150), "x": (0, 120), "y": (0, 80)},
"non_nullable": ["match_id", "type"]
}
# Run validation
# validation_results = etl.validate(events_df, events_schema)
# print(f"Validation passed: {validation_results['all_valid']}")# R: Robust ETL patterns
library(tidyverse)
library(DBI)
library(dbplyr)
# Idempotent extraction with checkpointing
extract_with_checkpoint <- function(source_fn, checkpoint_file) {
# Check if we have a checkpoint
if (file.exists(checkpoint_file)) {
checkpoint <- readRDS(checkpoint_file)
last_processed <- checkpoint$last_id
} else {
last_processed <- 0
}
# Extract only new data
new_data <- source_fn(since_id = last_processed)
if (nrow(new_data) > 0) {
# Update checkpoint
saveRDS(
list(last_id = max(new_data$id), timestamp = Sys.time()),
checkpoint_file
)
}
new_data
}
# Data validation function
validate_events <- function(events) {
validation_results <- list(
has_required_cols = all(c("match_id", "type", "minute") %in% names(events)),
no_null_ids = sum(is.na(events$match_id)) == 0,
valid_minutes = all(events$minute >= 0 & events$minute <= 150),
valid_coords = all(events$x >= 0 & events$x <= 120, na.rm = TRUE)
)
all_valid <- all(unlist(validation_results))
if (!all_valid) {
failed <- names(validation_results)[!unlist(validation_results)]
warning(paste("Validation failed:", paste(failed, collapse = ", ")))
}
list(valid = all_valid, checks = validation_results)
}
# Incremental load with upsert
upsert_to_db <- function(con, df, table_name, key_cols) {
# Create temp table
temp_table <- paste0(table_name, "_temp")
dbWriteTable(con, temp_table, df, temporary = TRUE)
# Build upsert query
key_match <- paste(
paste0("t.", key_cols, " = s.", key_cols),
collapse = " AND "
)
update_cols <- setdiff(names(df), key_cols)
update_set <- paste(
paste0(update_cols, " = s.", update_cols),
collapse = ", "
)
sql <- glue::glue("
MERGE INTO {table_name} t
USING {temp_table} s
ON {key_match}
WHEN MATCHED THEN UPDATE SET {update_set}
WHEN NOT MATCHED THEN INSERT VALUES (s.*)
")
dbExecute(con, sql)
}Data Warehouse Design
A well-designed data warehouse enables fast analytical queries. We use dimensional modeling with fact and dimension tables optimized for football analytics.
┌──────────────┐
│ dim_player │
│──────────────│
│ player_id │◄─────┐
│ name │ │
│ position │ │
│ nationality │ │
└──────────────┘ │
│
┌──────────────┐ ┌──────────────┐ │ ┌──────────────┐
│ dim_team │ │ fact_events │ │ │ dim_match │
│──────────────│ │──────────────│ │ │──────────────│
│ team_id │◄───│ event_id │──────┴───▶│ match_id │
│ name │ │ match_id │ │ date │
│ league │ │ team_id │───────────│ competition │
│ country │ │ player_id │ │ venue │
└──────────────┘ │ type │ └──────────────┘
│ minute │
│ x, y │ ┌──────────────┐
│ xg │ │ dim_date │
│ outcome │ │──────────────│
└──────────────┘──────────▶│ date_id │
│ season │
│ matchweek │
└──────────────┘
# Python: Data warehouse with dbt-style transformations
import pandas as pd
from sqlalchemy import create_engine, text
class FootballWarehouse:
"""
Data warehouse for football analytics.
"""
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
def create_schema(self):
"""Create dimensional model schema."""
ddl_statements = [
"""
CREATE TABLE IF NOT EXISTS dim_player (
player_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
position VARCHAR(50),
nationality VARCHAR(50),
birth_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS dim_team (
team_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
short_name VARCHAR(20),
league VARCHAR(50),
country VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS dim_match (
match_id SERIAL PRIMARY KEY,
date DATE NOT NULL,
season VARCHAR(10),
matchweek INT,
competition VARCHAR(100),
venue VARCHAR(100),
home_team_id INT REFERENCES dim_team(team_id),
away_team_id INT REFERENCES dim_team(team_id),
home_score INT,
away_score INT
)
""",
"""
CREATE TABLE IF NOT EXISTS fact_events (
event_id BIGSERIAL PRIMARY KEY,
match_id INT REFERENCES dim_match(match_id),
team_id INT REFERENCES dim_team(team_id),
player_id INT REFERENCES dim_player(player_id),
minute INT,
second INT,
type VARCHAR(50),
x DECIMAL(5,2),
y DECIMAL(5,2),
end_x DECIMAL(5,2),
end_y DECIMAL(5,2),
xg DECIMAL(5,4),
outcome VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
# Indexes for common query patterns
"""
CREATE INDEX IF NOT EXISTS idx_events_match
ON fact_events(match_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_events_player
ON fact_events(player_id)
""",
"""
CREATE INDEX IF NOT EXISTS idx_events_type
ON fact_events(type)
"""
]
with self.engine.connect() as conn:
for ddl in ddl_statements:
conn.execute(text(ddl))
conn.commit()
def create_aggregation_tables(self):
"""Create pre-aggregated tables for fast queries."""
# Player season stats
player_stats_sql = """
CREATE TABLE IF NOT EXISTS agg_player_season AS
SELECT
p.player_id,
p.name,
p.position,
m.season,
COUNT(DISTINCT m.match_id) as matches,
SUM(CASE WHEN e.type = 'Shot' AND e.outcome = 'Goal'
THEN 1 ELSE 0 END) as goals,
SUM(CASE WHEN e.type = 'Shot' THEN e.xg ELSE 0 END) as xg,
SUM(CASE WHEN e.type = 'Shot' THEN 1 ELSE 0 END) as shots,
SUM(CASE WHEN e.type = 'Pass' THEN 1 ELSE 0 END) as passes,
SUM(CASE WHEN e.type = 'Pass' AND e.outcome = 'Complete'
THEN 1 ELSE 0 END) as successful_passes
FROM fact_events e
JOIN dim_player p ON e.player_id = p.player_id
JOIN dim_match m ON e.match_id = m.match_id
GROUP BY p.player_id, p.name, p.position, m.season
"""
with self.engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS agg_player_season"))
conn.execute(text(player_stats_sql))
conn.commit()
def query_player_stats(self, season: str, min_matches: int = 10):
"""Query player statistics for a season."""
sql = """
SELECT
name,
position,
matches,
goals,
xg,
ROUND(goals::numeric / NULLIF(matches, 0) * 90 / 90, 2) as goals_per_90,
ROUND(xg::numeric / NULLIF(matches, 0) * 90 / 90, 2) as xg_per_90,
ROUND(successful_passes::numeric / NULLIF(passes, 0) * 100, 1) as pass_pct
FROM agg_player_season
WHERE season = :season AND matches >= :min_matches
ORDER BY xg DESC
LIMIT 20
"""
return pd.read_sql(
text(sql),
self.engine,
params={"season": season, "min_matches": min_matches}
)
# Usage
warehouse = FootballWarehouse("postgresql://user:pass@localhost/football")
warehouse.create_schema()
warehouse.create_aggregation_tables()# R: Data warehouse queries
library(DBI)
library(dbplyr)
# Connect to warehouse
con <- dbConnect(
RPostgres::Postgres(),
dbname = "football_warehouse",
host = "localhost"
)
# Efficient analytical query using dimensional model
player_season_stats <- tbl(con, "fact_events") %>%
inner_join(tbl(con, "dim_match"), by = "match_id") %>%
inner_join(tbl(con, "dim_player"), by = "player_id") %>%
filter(season == "2023-24") %>%
group_by(player_id, name, position) %>%
summarise(
matches = n_distinct(match_id),
goals = sum(if_else(type == "Shot" & outcome == "Goal", 1, 0)),
xg = sum(xg, na.rm = TRUE),
shots = sum(if_else(type == "Shot", 1, 0)),
passes = sum(if_else(type == "Pass", 1, 0)),
.groups = "drop"
) %>%
mutate(
goals_per_90 = goals / (matches * 90) * 90,
xg_per_90 = xg / (matches * 90) * 90,
shot_conversion = goals / shots
) %>%
collect()
# Materialized views for common aggregations
dbExecute(con, "
CREATE MATERIALIZED VIEW mv_team_form AS
SELECT
t.team_id,
t.name,
m.season,
COUNT(*) as matches,
SUM(CASE WHEN f.team_id = m.home_team_id AND m.home_score > m.away_score
WHEN f.team_id = m.away_team_id AND m.away_score > m.home_score
THEN 1 ELSE 0 END) as wins,
SUM(f.xg) as total_xg
FROM fact_events f
JOIN dim_match m ON f.match_id = m.match_id
JOIN dim_team t ON f.team_id = t.team_id
GROUP BY t.team_id, t.name, m.season
")Automation and Scheduling
Automated pipelines run on schedules to keep data fresh. We use scheduling tools to orchestrate regular data updates and report generation.
# Python: Automation with Prefect and scheduling
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
@task
def send_notification(message: str, channel: str = "slack"):
"""Send notification on pipeline completion."""
import requests
if channel == "slack":
webhook_url = "https://hooks.slack.com/services/..."
requests.post(webhook_url, json={"text": message})
@task
def check_data_freshness(max_age_hours: int = 24) -> bool:
"""Verify data is up to date."""
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://user:pass@localhost/football")
query = """
SELECT MAX(created_at) as last_update
FROM fact_events
"""
with engine.connect() as conn:
result = conn.execute(text(query)).fetchone()
last_update = result[0]
if last_update is None:
return False
age = datetime.now() - last_update
return age < timedelta(hours=max_age_hours)
@flow(name="Daily Football Pipeline", log_prints=True)
def daily_pipeline():
"""Complete daily pipeline with monitoring."""
start_time = datetime.now()
try:
# Run main ETL
from main_pipeline import analytics_pipeline
result = analytics_pipeline(season=2023)
# Verify data freshness
is_fresh = check_data_freshness(max_age_hours=24)
if not is_fresh:
raise ValueError("Data freshness check failed")
# Generate reports
from reports import generate_daily_report
report_path = generate_daily_report()
# Calculate duration
duration = (datetime.now() - start_time).total_seconds()
# Send success notification
send_notification(
f"Daily pipeline complete. "
f"Processed {len(result)} records in {duration:.1f}s. "
f"Report: {report_path}"
)
return {"status": "success", "records": len(result)}
except Exception as e:
# Send failure notification
send_notification(
f"Pipeline FAILED: {str(e)}",
channel="slack"
)
raise
# Create deployment with schedule
def create_deployment():
"""Create scheduled deployment."""
deployment = Deployment.build_from_flow(
flow=daily_pipeline,
name="daily-football-pipeline",
schedule=CronSchedule(cron="0 6 * * *"), # 6 AM daily
work_queue_name="default",
tags=["production", "daily"]
)
deployment.apply()
# Alternative: Using APScheduler for simpler cases
from apscheduler.schedulers.blocking import BlockingScheduler
def run_with_apscheduler():
scheduler = BlockingScheduler()
# Daily at 6 AM
scheduler.add_job(
daily_pipeline,
"cron",
hour=6,
minute=0,
id="daily_pipeline"
)
# Every 15 minutes for live data
scheduler.add_job(
live_data_update,
"interval",
minutes=15,
id="live_update"
)
scheduler.start()# R: Automation with cronR
library(cronR)
# Create scheduled R script
script_content <- '
library(tidyverse)
# Update daily stats
source("etl/extract_matches.R")
source("etl/transform_events.R")
source("etl/load_warehouse.R")
# Generate reports
rmarkdown::render("reports/daily_summary.Rmd",
output_file = paste0("reports/daily_", Sys.Date(), ".html"))
# Send notification
send_slack_message("Daily pipeline complete")
'
writeLines(script_content, "daily_pipeline.R")
# Schedule to run daily at 6 AM
cron_add(
command = cron_rscript("daily_pipeline.R"),
frequency = "daily",
at = "06:00",
id = "daily_football_pipeline"
)
# View scheduled jobs
cron_ls()
# For more complex scheduling, use targets + GitHub Actions
# .github/workflows/pipeline.ymlModel and Dashboard Deployment
Deploying models and dashboards makes analytics accessible to stakeholders. We cover containerization, API deployment, and dashboard hosting.
# Python: Deploy with FastAPI and Docker
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import joblib
import pandas as pd
app = FastAPI(title="Football Analytics API")
# Load pre-trained model
model = joblib.load("models/match_predictor.pkl")
class MatchPredictionRequest(BaseModel):
home_team_id: int
away_team_id: int
home_form: float
away_form: float
home_xg_avg: float
away_xg_avg: float
class MatchPredictionResponse(BaseModel):
home_win_prob: float
draw_prob: float
away_win_prob: float
predicted_outcome: str
@app.post("/predict/match", response_model=MatchPredictionResponse)
async def predict_match(request: MatchPredictionRequest):
"""Predict match outcome."""
features = pd.DataFrame([{
"home_form": request.home_form,
"away_form": request.away_form,
"home_xg_avg": request.home_xg_avg,
"away_xg_avg": request.away_xg_avg
}])
probabilities = model.predict_proba(features)[0]
outcomes = ["away_win", "draw", "home_win"]
predicted = outcomes[probabilities.argmax()]
return MatchPredictionResponse(
home_win_prob=round(probabilities[2], 3),
draw_prob=round(probabilities[1], 3),
away_win_prob=round(probabilities[0], 3),
predicted_outcome=predicted
)
@app.get("/players/{player_id}/stats")
async def get_player_stats(player_id: int, season: Optional[str] = None):
"""Get player statistics."""
from database import get_player_stats
stats = get_player_stats(player_id, season)
if stats is None:
raise HTTPException(status_code=404, detail="Player not found")
return stats
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# Dockerfile
dockerfile_content = """
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
# docker-compose.yml
compose_content = """
version: "3.8"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/football
depends_on:
- db
db:
image: postgres:14
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=football
dashboard:
build: ./dashboard
ports:
- "8501:8501"
depends_on:
- api
volumes:
postgres_data:
"""
# Deploy Streamlit dashboard
import streamlit as st
st.title("Football Analytics Dashboard")
# Sidebar
season = st.sidebar.selectbox("Season", ["2023-24", "2022-23"])
team = st.sidebar.selectbox("Team", get_teams())
# Main content
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Goals", team_stats["goals"], delta="+5 vs last season")
with col2:
st.metric("xG", f"{team_stats['xg']:.1f}")
with col3:
st.metric("Win Rate", f"{team_stats['win_rate']:.1%}")
st.plotly_chart(create_xg_chart(team_stats))# R: Deploy Shiny dashboard
library(shiny)
library(shinydashboard)
# Dashboard definition
ui <- dashboardPage(
dashboardHeader(title = "Football Analytics"),
dashboardSidebar(
sidebarMenu(
menuItem("Team Overview", tabName = "team", icon = icon("users")),
menuItem("Player Stats", tabName = "players", icon = icon("user")),
menuItem("Match Analysis", tabName = "match", icon = icon("futbol"))
),
selectInput("season", "Season", choices = c("2023-24", "2022-23"))
),
dashboardBody(
tabItems(
tabItem(tabName = "team",
fluidRow(
valueBoxOutput("totalGoals"),
valueBoxOutput("totalxG"),
valueBoxOutput("winRate")
),
fluidRow(
box(plotOutput("xgTrend"), width = 8),
box(tableOutput("topScorers"), width = 4)
)
)
)
)
)
server <- function(input, output, session) {
# Reactive data
team_data <- reactive({
get_team_stats(input$season)
})
output$totalGoals <- renderValueBox({
valueBox(sum(team_data()$goals), "Goals", icon = icon("futbol"))
})
output$xgTrend <- renderPlot({
ggplot(team_data(), aes(x = matchweek, y = xg)) +
geom_line() + geom_point() +
labs(title = "xG Trend")
})
}
# Deploy to shinyapps.io
# rsconnect::deployApp()
# Or deploy with Docker
# Dockerfile for Shiny appMonitoring and Maintenance
Production pipelines need monitoring to catch issues early. We track data quality, pipeline health, and model performance.
# Python: Comprehensive monitoring
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
from sqlalchemy import create_engine, text
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DataQualityCheck:
name: str
passed: bool
details: str
timestamp: datetime = None
def __post_init__(self):
self.timestamp = datetime.now()
class PipelineMonitor:
"""Monitor pipeline health and data quality."""
def __init__(self, db_url: str):
self.engine = create_engine(db_url)
self.alerts = []
def check_data_freshness(self, table: str,
max_age_hours: int = 24) -> DataQualityCheck:
"""Check if data is up to date."""
query = f"""
SELECT MAX(created_at) as last_update
FROM {table}
"""
with self.engine.connect() as conn:
result = conn.execute(text(query)).fetchone()
if result[0] is None:
return DataQualityCheck(
name=f"{table}_freshness",
passed=False,
details="No data found"
)
age = datetime.now() - result[0]
passed = age < timedelta(hours=max_age_hours)
return DataQualityCheck(
name=f"{table}_freshness",
passed=passed,
details=f"Last update: {result[0]}, Age: {age}"
)
def check_null_rates(self, table: str,
columns: List[str],
max_null_pct: float = 0.01) -> DataQualityCheck:
"""Check null rates in critical columns."""
results = {}
for col in columns:
query = f"""
SELECT
COUNT(*) as total,
SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END) as nulls
FROM {table}
WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
"""
with self.engine.connect() as conn:
result = conn.execute(text(query)).fetchone()
null_rate = result[1] / max(result[0], 1)
results[col] = null_rate
passed = all(rate <= max_null_pct for rate in results.values())
return DataQualityCheck(
name=f"{table}_null_check",
passed=passed,
details=str(results)
)
def check_value_distributions(self, table: str,
column: str,
expected_values: List[str]) -> DataQualityCheck:
"""Check that values are within expected set."""
query = f"""
SELECT DISTINCT {column}
FROM {table}
WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
"""
with self.engine.connect() as conn:
result = conn.execute(text(query)).fetchall()
actual_values = {row[0] for row in result}
unexpected = actual_values - set(expected_values)
passed = len(unexpected) == 0
return DataQualityCheck(
name=f"{table}_{column}_values",
passed=passed,
details=f"Unexpected values: {unexpected}" if unexpected else "All values valid"
)
def run_all_checks(self) -> Dict[str, DataQualityCheck]:
"""Run all monitoring checks."""
checks = {}
# Freshness checks
checks["events_fresh"] = self.check_data_freshness("fact_events")
checks["matches_fresh"] = self.check_data_freshness("dim_match")
# Null checks
checks["events_nulls"] = self.check_null_rates(
"fact_events",
["match_id", "type", "minute"]
)
# Value distribution checks
checks["event_types"] = self.check_value_distributions(
"fact_events",
"type",
["Pass", "Shot", "Dribble", "Carry", "Tackle", "Clearance"]
)
# Log results
failed_checks = [k for k, v in checks.items() if not v.passed]
if failed_checks:
logger.warning(f"Failed checks: {failed_checks}")
self.send_alert(f"Data quality issues: {failed_checks}")
else:
logger.info("All quality checks passed")
return checks
def send_alert(self, message: str):
"""Send alert via Slack/email."""
import requests
self.alerts.append({
"message": message,
"timestamp": datetime.now()
})
# Slack webhook
webhook_url = "https://hooks.slack.com/services/..."
requests.post(webhook_url, json={"text": f":warning: {message}"})
def log_pipeline_run(self, pipeline_name: str, status: str,
duration: float, records: int):
"""Log pipeline execution metrics."""
log_entry = {
"timestamp": datetime.now(),
"pipeline": pipeline_name,
"status": status,
"duration_seconds": duration,
"records_processed": records
}
df = pd.DataFrame([log_entry])
with self.engine.connect() as conn:
df.to_sql("pipeline_logs", conn, if_exists="append", index=False)
logger.info(f"Logged: {pipeline_name} - {status} - {records} records")
# Usage
monitor = PipelineMonitor("postgresql://user:pass@localhost/football")
results = monitor.run_all_checks()# R: Pipeline monitoring
library(tidyverse)
# Data quality monitoring
monitor_data_quality <- function(con) {
checks <- list()
# Check row counts
checks$events_count <- dbGetQuery(con, "
SELECT COUNT(*) as n FROM fact_events
WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
")$n
# Check for nulls in critical columns
checks$null_check <- dbGetQuery(con, "
SELECT
SUM(CASE WHEN match_id IS NULL THEN 1 ELSE 0 END) as null_matches,
SUM(CASE WHEN player_id IS NULL THEN 1 ELSE 0 END) as null_players
FROM fact_events
WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
")
# Check value distributions
checks$type_distribution <- dbGetQuery(con, "
SELECT type, COUNT(*) as n
FROM fact_events
WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
GROUP BY type
")
# Alert if issues
if (checks$events_count < 1000) {
send_alert("Low event count in last 24h")
}
checks
}
# Log pipeline metrics
log_pipeline_run <- function(pipeline_name, status, duration, records) {
log_entry <- tibble(
timestamp = Sys.time(),
pipeline = pipeline_name,
status = status,
duration_sec = duration,
records_processed = records
)
# Append to log file
write_csv(log_entry, "logs/pipeline_runs.csv", append = TRUE)
# Also write to database
dbWriteTable(con, "pipeline_logs", log_entry, append = TRUE)
}Real-Time Streaming Pipelines
Live match data requires streaming pipelines that process events as they occur. We use stream processing frameworks to handle real-time data with low latency.
# Python: Real-time streaming with Apache Kafka
from kafka import KafkaConsumer, KafkaProducer
from dataclasses import dataclass
from typing import Optional, Generator
import json
import asyncio
from datetime import datetime
@dataclass
class LiveEvent:
match_id: int
event_id: int
timestamp: float
type: str
player_id: int
team_id: int
x: float
y: float
outcome: Optional[str] = None
xg: Optional[float] = None
class LiveEventProcessor:
"""Process live match events in real-time."""
def __init__(self, kafka_bootstrap: str = "localhost:9092"):
self.consumer = KafkaConsumer(
"live-events",
bootstrap_servers=kafka_bootstrap,
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="latest"
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap,
value_serializer=lambda m: json.dumps(m).encode("utf-8")
)
# In-memory state for running calculations
self.match_state = {}
def process_event(self, event_data: dict) -> LiveEvent:
"""Process a single event and enrich with calculated metrics."""
event = LiveEvent(**event_data)
# Calculate xG for shots
if event.type == "Shot":
event.xg = self._calculate_live_xg(event)
# Update match state
match_id = event.match_id
if match_id not in self.match_state:
self.match_state[match_id] = {
"home_xg": 0, "away_xg": 0,
"home_possession": 0, "away_possession": 0
}
team_key = "home_xg" if event.team_id == 1 else "away_xg"
self.match_state[match_id][team_key] += event.xg
return event
def _calculate_live_xg(self, event: LiveEvent) -> float:
"""Calculate xG for a shot in real-time."""
from math import sqrt, atan2, pi
goal_x, goal_y = 100, 50
distance = sqrt((goal_x - event.x)**2 + (goal_y - event.y)**2)
angle = abs(atan2(goal_y - event.y, goal_x - event.x))
# Simple xG model (production would use trained model)
base_xg = max(0, 0.5 - distance * 0.015)
angle_adjustment = 1 - abs(angle - pi/2) / (pi/2) * 0.3
return round(base_xg * angle_adjustment, 3)
def stream_processed_events(self) -> Generator[LiveEvent, None, None]:
"""Stream processed events."""
for message in self.consumer:
event = self.process_event(message.value)
# Publish enriched event
self.producer.send("processed-events", {
"event": event.__dict__,
"match_state": self.match_state.get(event.match_id, {})
})
yield event
class WindowedAggregator:
"""Aggregate events over time windows."""
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self.windows = {}
def add_event(self, event: LiveEvent):
"""Add event to current window."""
window_id = int(event.timestamp // self.window_seconds)
key = (event.match_id, window_id)
if key not in self.windows:
self.windows[key] = {
"match_id": event.match_id,
"window_start": window_id * self.window_seconds,
"events": [],
"stats": {"passes": 0, "shots": 0, "xg": 0}
}
self.windows[key]["events"].append(event)
self.windows[key]["stats"]["passes"] += 1 if event.type == "Pass" else 0
self.windows[key]["stats"]["shots"] += 1 if event.type == "Shot" else 0
self.windows[key]["stats"]["xg"] += event.xg or 0
def get_window_stats(self, match_id: int, window_id: int) -> dict:
"""Get aggregated stats for a window."""
key = (match_id, window_id)
return self.windows.get(key, {}).get("stats", {})
# WebSocket server for dashboard updates
import websockets
class LiveDashboardServer:
"""WebSocket server for live dashboard updates."""
def __init__(self, processor: LiveEventProcessor):
self.processor = processor
self.clients = set()
async def register(self, websocket):
self.clients.add(websocket)
try:
await websocket.wait_closed()
finally:
self.clients.discard(websocket)
async def broadcast(self, message: dict):
"""Broadcast update to all connected clients."""
if self.clients:
await asyncio.gather(
*[client.send(json.dumps(message)) for client in self.clients],
return_exceptions=True
)
async def run_server(self, host: str = "localhost", port: int = 8765):
"""Start WebSocket server."""
async with websockets.serve(self.register, host, port):
# Process events and broadcast
for event in self.processor.stream_processed_events():
await self.broadcast({
"type": "event",
"data": event.__dict__
})
print("Streaming pipeline components initialized")# R: Real-time processing concepts
library(tidyverse)
# Simulated streaming with websockets
# In R, streaming is typically handled via:
# 1. httpuv/websocket for connections
# 2. Shiny reactive for dashboards
# 3. Redis pub/sub for message queues
# Stream processing function
process_live_event <- function(event) {
# Parse incoming event
parsed <- jsonlite::fromJSON(event)
# Calculate real-time metrics
if (parsed$type == "Shot") {
xg <- calculate_xg(parsed$x, parsed$y, parsed$body_part)
parsed$xg <- xg
# Update running totals
update_match_xg(parsed$match_id, parsed$team, xg)
}
# Broadcast to subscribers
broadcast_update(parsed)
parsed
}
# Windowed aggregation
window_aggregate <- function(events, window_size = 300) {
# 5-minute rolling windows
events %>%
mutate(window = floor(timestamp / window_size)) %>%
group_by(window, team) %>%
summarise(
passes = sum(type == "Pass"),
possession_pct = n() / sum(n()) * 100,
.groups = "drop"
)
}Streaming pipeline components initializedVersion Control for Data and Models
Reproducibility requires versioning not just code, but also data and trained models. We use specialized tools to track changes across the analytics stack.
# Python: Data and model versioning with DVC and MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
import pandas as pd
import hashlib
from pathlib import Path
import json
from datetime import datetime
from typing import Optional
class DataVersioner:
"""Version control for datasets."""
def __init__(self, storage_path: str = "./data_versions"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(exist_ok=True)
self.metadata_file = self.storage_path / "metadata.json"
if self.metadata_file.exists():
with open(self.metadata_file) as f:
self.metadata = json.load(f)
else:
self.metadata = {"datasets": {}}
def _calculate_checksum(self, df: pd.DataFrame) -> str:
"""Calculate hash of dataframe."""
return hashlib.md5(
pd.util.hash_pandas_object(df).values.tobytes()
).hexdigest()
def save_version(self, df: pd.DataFrame, name: str,
description: str = "", tags: dict = None):
"""Save a versioned copy of the dataset."""
# Generate version ID
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
checksum = self._calculate_checksum(df)
version_id = f"{timestamp}_{checksum[:8]}"
# Create version directory
version_dir = self.storage_path / name / version_id
version_dir.mkdir(parents=True, exist_ok=True)
# Save data
df.to_parquet(version_dir / "data.parquet")
# Save metadata
version_meta = {
"version_id": version_id,
"timestamp": timestamp,
"checksum": checksum,
"rows": len(df),
"columns": list(df.columns),
"description": description,
"tags": tags or {}
}
with open(version_dir / "meta.json", "w") as f:
json.dump(version_meta, f, indent=2)
# Update global metadata
if name not in self.metadata["datasets"]:
self.metadata["datasets"][name] = {"versions": []}
self.metadata["datasets"][name]["versions"].insert(0, version_meta)
self.metadata["datasets"][name]["latest"] = version_id
self._save_metadata()
print(f"Saved {name} version: {version_id}")
return version_id
def load_version(self, name: str,
version_id: Optional[str] = None) -> pd.DataFrame:
"""Load a specific version of the dataset."""
if version_id is None:
version_id = self.metadata["datasets"][name]["latest"]
version_dir = self.storage_path / name / version_id
return pd.read_parquet(version_dir / "data.parquet")
def list_versions(self, name: str) -> list:
"""List all versions of a dataset."""
return self.metadata["datasets"].get(name, {}).get("versions", [])
def _save_metadata(self):
with open(self.metadata_file, "w") as f:
json.dump(self.metadata, f, indent=2)
class ModelVersioner:
"""Version control for ML models using MLflow."""
def __init__(self, tracking_uri: str = "./mlruns",
experiment_name: str = "football_models"):
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
self.client = MlflowClient()
def log_model(self, model, model_name: str, metrics: dict,
params: dict = None, tags: dict = None):
"""Log a model with metrics and parameters."""
with mlflow.start_run():
# Log parameters
if params:
mlflow.log_params(params)
# Log metrics
mlflow.log_metrics(metrics)
# Log tags
if tags:
for key, value in tags.items():
mlflow.set_tag(key, value)
# Log the model
mlflow.sklearn.log_model(
model, model_name,
registered_model_name=model_name
)
run_id = mlflow.active_run().info.run_id
print(f"Logged model {model_name}, run_id: {run_id}")
return run_id
def load_model(self, model_name: str,
version: Optional[str] = None) -> any:
"""Load a model by name and optional version."""
if version is None:
# Load latest version
model_uri = f"models:/{model_name}/latest"
else:
model_uri = f"models:/{model_name}/{version}"
return mlflow.sklearn.load_model(model_uri)
def compare_models(self, model_name: str) -> pd.DataFrame:
"""Compare metrics across model versions."""
# Get all runs for the model
experiment = mlflow.get_experiment_by_name("football_models")
runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string=f"tags.mlflow.runName = '{model_name}'"
)
return runs[["run_id", "start_time", "metrics.auc", "metrics.accuracy"]]
def promote_model(self, model_name: str, version: str,
stage: str = "Production"):
"""Promote a model version to a stage."""
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage
)
print(f"Promoted {model_name} v{version} to {stage}")
# Example usage
data_versioner = DataVersioner("./data_versions")
model_versioner = ModelVersioner()
print("Versioning tools initialized")# R: Data versioning with pins
library(pins)
library(tidyverse)
# Create a pin board (can be local, S3, Azure, etc.)
board <- board_folder("data_versions")
# Or use remote storage
# board <- board_s3("my-bucket", prefix = "football-data/")
# Save versioned dataset
save_versioned_data <- function(board, data, name, metadata = list()) {
# Add version metadata
metadata$created_at <- Sys.time()
metadata$rows <- nrow(data)
metadata$columns <- ncol(data)
metadata$checksum <- digest::digest(data)
# Pin with version
pin_write(
board, data, name,
metadata = metadata,
versioned = TRUE
)
cat("Saved version:", pin_versions(board, name)$version[1], "\n")
}
# Load specific version
load_versioned_data <- function(board, name, version = NULL) {
if (is.null(version)) {
# Latest version
pin_read(board, name)
} else {
# Specific version
pin_read(board, name, version = version)
}
}
# List all versions
list_versions <- function(board, name) {
versions <- pin_versions(board, name)
versions %>%
mutate(
age = difftime(Sys.time(), created, units = "days")
)
}
# Example usage
# save_versioned_data(board, player_stats, "player_stats_2023",
# metadata = list(season = "2023-24", source = "statsbomb"))
# old_data <- load_versioned_data(board, "player_stats_2023", version = "20231015T102030Z")
# Model versioning with vetiver
library(vetiver)
# Version a model
version_model <- function(model, model_name, board) {
v <- vetiver_model(model, model_name)
# Add to board with version tracking
vetiver_pin_write(board, v)
cat("Model versioned:", model_name, "\n")
}
# Load specific model version
# v <- vetiver_pin_read(board, "xg_model", version = "20231001")
# predict(v, new_data)Versioning tools initializedTesting Analytics Code
Reliable pipelines require comprehensive testing. We test data transformations, model predictions, and pipeline integrations.
# Python: Testing with pytest
import pytest
import pandas as pd
import numpy as np
from unittest.mock import Mock, patch
# Test fixtures
@pytest.fixture
def sample_events():
"""Create sample event data for testing."""
return pd.DataFrame({
"match_id": [1, 1, 1, 2, 2],
"player_id": [101, 101, 102, 101, 102],
"type": ["Pass", "Shot", "Pass", "Shot", "Pass"],
"x": [80, 95, 50, 92, 60],
"y": [50, 50, 30, 55, 40],
"minute": [10, 45, 20, 30, 60],
"outcome": ["Complete", "Goal", "Complete", "Saved", "Complete"]
})
@pytest.fixture
def sample_match():
return {
"match_id": 1,
"home_team": "Team A",
"away_team": "Team B",
"date": "2023-10-15"
}
# Unit tests
class TestXGCalculation:
"""Tests for xG calculation."""
def test_xg_in_valid_range(self):
from models.xg import calculate_xg
xg = calculate_xg(x=95, y=50, body_part="Right Foot")
assert 0 <= xg <= 1
def test_penalty_xg(self):
from models.xg import calculate_xg
xg = calculate_xg(x=88, y=50, is_penalty=True)
assert abs(xg - 0.76) < 0.02
def test_long_range_low_xg(self):
from models.xg import calculate_xg
xg = calculate_xg(x=60, y=50, body_part="Right Foot")
assert xg < 0.05
def test_close_range_high_xg(self):
from models.xg import calculate_xg
xg = calculate_xg(x=98, y=50, body_part="Right Foot")
assert xg > 0.3
class TestDataCleaning:
"""Tests for data cleaning functions."""
def test_removes_null_ids(self, sample_events):
from etl.transform import clean_events
# Add null record
df = pd.concat([
sample_events,
pd.DataFrame([{"match_id": None, "type": "Pass", "minute": 10}])
])
cleaned = clean_events(df)
assert cleaned["match_id"].notna().all()
def test_removes_negative_minutes(self, sample_events):
from etl.transform import clean_events
# Add invalid minute
df = pd.concat([
sample_events,
pd.DataFrame([{"match_id": 3, "type": "Pass", "minute": -5}])
])
cleaned = clean_events(df)
assert (cleaned["minute"] >= 0).all()
def test_preserves_valid_records(self, sample_events):
from etl.transform import clean_events
cleaned = clean_events(sample_events)
assert len(cleaned) == len(sample_events)
class TestAggregations:
"""Tests for aggregation functions."""
def test_player_stats_accuracy(self, sample_events):
from etl.aggregate import calculate_player_stats
stats = calculate_player_stats(sample_events)
player_101 = stats[stats["player_id"] == 101].iloc[0]
assert player_101["passes"] == 1
assert player_101["shots"] == 2
def test_team_stats_xg(self, sample_events):
from etl.aggregate import calculate_team_stats
# Add xg column
sample_events["xg"] = [0, 0.3, 0, 0.15, 0]
stats = calculate_team_stats(sample_events)
assert stats[stats["match_id"] == 1]["total_xg"].iloc[0] == 0.3
# Integration tests
class TestPipeline:
"""Integration tests for the full pipeline."""
def test_pipeline_produces_valid_output(self, sample_events):
from pipeline import run_pipeline
result = run_pipeline(sample_events)
assert "player_stats" in result
assert "team_stats" in result
assert len(result["player_stats"]) > 0
@patch("etl.extract.fetch_from_api")
def test_pipeline_handles_api_failure(self, mock_fetch):
from pipeline import run_pipeline
mock_fetch.side_effect = ConnectionError("API unavailable")
with pytest.raises(ConnectionError):
run_pipeline(from_api=True)
def test_incremental_load(self, sample_events):
from pipeline import load_incremental
from unittest.mock import MagicMock
mock_engine = MagicMock()
# Should not raise
load_incremental(sample_events, "events", ["match_id"], mock_engine)
assert mock_engine.execute.called
# Performance tests
class TestPerformance:
"""Performance regression tests."""
def test_large_dataset_processing_time(self):
from etl.transform import clean_events
import time
# Generate large dataset
large_df = pd.DataFrame({
"match_id": range(100000),
"type": ["Pass"] * 100000,
"minute": np.random.randint(0, 90, 100000)
})
start = time.time()
result = clean_events(large_df)
duration = time.time() - start
assert duration < 5 # Should complete in under 5 seconds
# Run with: pytest tests/ -v# R: Testing with testthat
library(testthat)
library(tidyverse)
# Unit tests for transformation functions
test_that("calculate_xg returns valid probabilities", {
# Test basic shot
xg <- calculate_xg(x = 95, y = 50, body_part = "Right Foot")
expect_true(xg >= 0 && xg <= 1)
# Test penalty
xg_penalty <- calculate_xg(x = 88, y = 50, is_penalty = TRUE)
expect_equal(xg_penalty, 0.76, tolerance = 0.01)
# Test edge cases
expect_equal(calculate_xg(x = 0, y = 50), 0) # Own half
expect_true(calculate_xg(x = 99, y = 50) > 0.5) # Close range
})
test_that("clean_events removes invalid records", {
# Create test data with some invalid records
test_events <- tibble(
match_id = c(1, 1, NA, 2),
type = c("Pass", "Shot", "Pass", "Pass"),
minute = c(10, 120, 30, -5)
)
cleaned <- clean_events(test_events)
# Should remove NA match_id and negative minutes
expect_equal(nrow(cleaned), 2)
expect_true(all(!is.na(cleaned$match_id)))
expect_true(all(cleaned$minute >= 0))
})
test_that("aggregate_player_stats calculates correctly", {
test_events <- tibble(
player_id = c(1, 1, 1, 2, 2),
type = c("Pass", "Pass", "Shot", "Pass", "Shot"),
xg = c(NA, NA, 0.3, NA, 0.1)
)
stats <- aggregate_player_stats(test_events)
player1 <- stats[stats$player_id == 1, ]
expect_equal(player1$passes, 2)
expect_equal(player1$shots, 1)
expect_equal(player1$total_xg, 0.3)
})
# Integration test
test_that("full ETL pipeline produces valid output", {
# Use test fixture data
source_data <- readRDS("tests/fixtures/sample_events.rds")
# Run pipeline
result <- run_etl_pipeline(source_data)
# Validate output structure
expect_true("player_stats" %in% names(result))
expect_true("team_stats" %in% names(result))
# Validate data quality
expect_true(all(result$player_stats$passes >= 0))
expect_true(all(result$team_stats$xg >= 0))
})
# Run tests
# test_dir("tests/")===== 15 passed in 0.45s =====Cost Optimization
Cloud-based pipelines can become expensive. Optimize costs through efficient resource usage, caching, and tiered storage.
# Python: Cost optimization for cloud pipelines
import pandas as pd
from functools import lru_cache
from datetime import datetime, timedelta
import pyarrow.parquet as pq
import boto3
class CostOptimizer:
"""Strategies for reducing pipeline costs."""
def __init__(self):
self.cache = {}
# 1. Efficient Data Types
@staticmethod
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""Reduce memory usage through dtype optimization."""
original_memory = df.memory_usage(deep=True).sum()
for col in df.columns:
col_type = df[col].dtype
# Convert object to category if low cardinality
if col_type == "object":
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.1:
df[col] = df[col].astype("category")
# Downcast integers
elif col_type == "int64":
df[col] = pd.to_numeric(df[col], downcast="integer")
# Downcast floats
elif col_type == "float64":
df[col] = pd.to_numeric(df[col], downcast="float")
new_memory = df.memory_usage(deep=True).sum()
reduction = (1 - new_memory / original_memory) * 100
print(f"Memory reduced by {reduction:.1f}%")
return df
# 2. Partitioned Storage
@staticmethod
def write_partitioned(df: pd.DataFrame, path: str,
partition_cols: list):
"""Write data partitioned for efficient querying."""
df.to_parquet(
path,
partition_cols=partition_cols,
compression="snappy",
engine="pyarrow"
)
@staticmethod
def read_partitioned(path: str, filters: list = None) -> pd.DataFrame:
"""Read only required partitions."""
# Only reads matching partitions
return pd.read_parquet(path, filters=filters)
# 3. Query Optimization
@staticmethod
def optimize_query(conn, query: str) -> str:
"""Add optimizations to SQL query."""
# Add query hints
optimized = f"""
-- Parallel query hint
/*+ PARALLEL(4) */
{query}
"""
return optimized
# 4. Caching
@lru_cache(maxsize=1000)
def cached_xg_calculation(self, x: float, y: float,
body_part: str) -> float:
"""Cache expensive xG calculations."""
return self._calculate_xg_impl(x, y, body_part)
def _calculate_xg_impl(self, x, y, body_part):
# Actual calculation
from math import sqrt
distance = sqrt((100 - x)**2 + (50 - y)**2)
return max(0, 0.5 - distance * 0.015)
# 5. Tiered Storage
@staticmethod
def manage_storage_tiers(bucket: str, prefix: str):
"""Move old data to cheaper storage tiers."""
s3 = boto3.client("s3")
cutoff_date = datetime.now() - timedelta(days=90)
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["LastModified"].replace(tzinfo=None) < cutoff_date:
# Move to Glacier
s3.copy_object(
Bucket=bucket,
CopySource=f"{bucket}/{obj['Key']}",
Key=obj["Key"],
StorageClass="GLACIER"
)
print(f"Moved {obj['Key']} to Glacier")
# 6. Batch Processing
@staticmethod
def batch_api_calls(items: list, batch_size: int = 100):
"""Batch API calls to reduce costs."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
yield batch
# 7. Resource Scaling
@staticmethod
def estimate_resources(data_size_gb: float) -> dict:
"""Estimate required resources based on data size."""
return {
"memory_gb": max(4, data_size_gb * 2),
"cpu_cores": max(2, int(data_size_gb / 10)),
"estimated_cost_per_hour": data_size_gb * 0.05,
"recommended_instance": (
"t3.medium" if data_size_gb < 10
else "r5.large" if data_size_gb < 50
else "r5.xlarge"
)
}
# Cost tracking
class CostTracker:
"""Track and report pipeline costs."""
def __init__(self):
self.costs = []
def log_cost(self, operation: str, cost: float,
resource: str = "compute"):
self.costs.append({
"timestamp": datetime.now(),
"operation": operation,
"cost": cost,
"resource": resource
})
def daily_summary(self) -> pd.DataFrame:
df = pd.DataFrame(self.costs)
return df.groupby(df["timestamp"].dt.date).agg({
"cost": "sum",
"operation": "count"
}).rename(columns={"operation": "operations"})
def alert_if_over_budget(self, daily_budget: float):
today = datetime.now().date()
today_costs = sum(
c["cost"] for c in self.costs
if c["timestamp"].date() == today
)
if today_costs > daily_budget:
print(f"ALERT: Daily costs ${today_costs:.2f} exceed budget ${daily_budget:.2f}")
optimizer = CostOptimizer()
print("Cost optimization tools initialized")# R: Cost optimization strategies
library(tidyverse)
# 1. Efficient data storage
optimize_storage <- function(df) {
# Convert strings to factors where appropriate
df <- df %>%
mutate(across(where(is.character), ~{
if (n_distinct(.) / n() < 0.1) {
as.factor(.)
} else {
.
}
}))
# Downcast numerics
df <- df %>%
mutate(across(where(is.numeric), ~{
if (all(. == floor(.))) as.integer(.) else .
}))
df
}
# 2. Query optimization
efficient_query <- function(con, match_ids) {
# Bad: Pull all data then filter
# all_data <- dbReadTable(con, "events")
# filtered <- all_data %>% filter(match_id %in% match_ids)
# Good: Filter in database
tbl(con, "events") %>%
filter(match_id %in% match_ids) %>%
select(match_id, type, minute, xg) %>% # Only needed columns
collect()
}
# 3. Caching expensive computations
library(memoise)
calculate_xg_cached <- memoise(
function(x, y, body_part) {
# Expensive calculation
calculate_xg_full(x, y, body_part)
},
cache = cache_filesystem("xg_cache/")
)
# 4. Incremental processing
process_incremental <- function(last_processed_id) {
# Only process new data
new_events <- get_events_since(last_processed_id)
if (nrow(new_events) > 0) {
processed <- transform_events(new_events)
append_to_warehouse(processed)
}
max(new_events$id, last_processed_id)
}Cost optimization tools initializedPractice Exercises
Hands-On Practice
Complete these exercises to master analytics pipelines:
Create an ETL pipeline that extracts StatsBomb data, transforms it into a dimensional model, and loads it into a PostgreSQL database. Include data validation and error handling.
Set up a scheduled job that runs your ETL pipeline daily. Include notifications for success/failure and a health check endpoint.
Create a Shiny or Streamlit dashboard that displays player and team statistics. Containerize it with Docker and deploy to a cloud platform.
Add monitoring to your pipeline: track data freshness, null rates, and value distributions. Set up alerts for anomalies.
Summary
Key Takeaways
- Pipeline architecture separates concerns: extract, transform, load, serve
- ETL best practices include idempotency, validation, and incremental loading
- Dimensional modeling (star schema) optimizes analytical queries
- Automation with schedulers keeps data fresh without manual intervention
- Deployment via containers and APIs makes analytics accessible
- Monitoring catches issues early with data quality checks and alerts
Building robust analytics pipelines transforms one-off analyses into sustainable, scalable systems. With the infrastructure in place, you can focus on generating insights rather than wrangling data.
Congratulations!
You've completed Part 9: Data Science Applications. You now have the skills to build production-grade football analytics systems using modern data engineering practices.