Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)

Building Analytics Pipelines

Moving from ad-hoc analysis to production-ready analytics requires robust data pipelines. This chapter covers the infrastructure needed to collect, process, store, and serve football analytics at scale - turning one-off scripts into reliable, automated systems.

Pipeline Architecture

A football analytics pipeline typically consists of several stages: data collection, transformation, storage, analysis, and serving. Each stage must be reliable, scalable, and maintainable.

Typical Pipeline Architecture
┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Sources   │───▶│   Extract   │───▶│  Transform  │───▶│    Load     │───▶│    Serve    │
│             │    │             │    │             │    │             │    │             │
│ - APIs      │    │ - Scraping  │    │ - Clean     │    │ - Database  │    │ - Dashboard │
│ - Feeds     │    │ - API calls │    │ - Enrich    │    │ - Data Lake │    │ - API       │
│ - Files     │    │ - Webhooks  │    │ - Aggregate │    │ - Warehouse │    │ - Reports   │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
            
pipeline_architecture
# Python: Pipeline architecture with Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd

# Cache expensive operations
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def extract_matches(season: int) -> pd.DataFrame:
    """Extract match data from API."""
    from statsbombpy import sb

    competitions = sb.competitions()
    matches = sb.matches(competition_id=11, season_id=season)
    return matches

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def extract_events(match_id: int) -> pd.DataFrame:
    """Extract event data for a match."""
    from statsbombpy import sb
    return sb.events(match_id=match_id)

@task
def transform_events(events: pd.DataFrame) -> pd.DataFrame:
    """Clean and transform event data."""

    # Remove incomplete records
    events = events.dropna(subset=["player", "team"])

    # Standardize coordinates
    events["x"] = events["location"].apply(lambda x: x[0] if x else None)
    events["y"] = events["location"].apply(lambda x: x[1] if x else None)

    # Add derived features
    events["is_successful"] = events["outcome"].apply(
        lambda x: x.get("name") == "Complete" if x else None
    )

    return events

@task
def calculate_player_metrics(events: pd.DataFrame) -> pd.DataFrame:
    """Calculate player-level metrics."""

    player_stats = events.groupby(["player", "team"]).agg(
        passes=("type", lambda x: (x == "Pass").sum()),
        successful_passes=("is_successful", lambda x: x.sum()),
        shots=("type", lambda x: (x == "Shot").sum()),
        touches=("type", "count"),
        minutes_played=("minute", lambda x: x.max() - x.min())
    ).reset_index()

    player_stats["pass_completion"] = (
        player_stats["successful_passes"] / player_stats["passes"]
    )

    return player_stats

@task
def load_to_database(df: pd.DataFrame, table_name: str):
    """Load data to PostgreSQL database."""
    from sqlalchemy import create_engine

    engine = create_engine("postgresql://user:pass@localhost/football")
    df.to_sql(table_name, engine, if_exists="replace", index=False)

    return f"Loaded {len(df)} rows to {table_name}"

@flow(name="Football Analytics Pipeline")
def analytics_pipeline(season: int = 2023):
    """Main analytics pipeline flow."""

    # Extract
    matches = extract_matches(season)
    print(f"Extracted {len(matches)} matches")

    # Process each match
    all_events = []
    for match_id in matches["match_id"][:10]:  # Limit for demo
        events = extract_events(match_id)
        all_events.append(events)

    combined_events = pd.concat(all_events, ignore_index=True)

    # Transform
    clean_events = transform_events(combined_events)
    player_metrics = calculate_player_metrics(clean_events)

    # Load
    load_to_database(player_metrics, "player_metrics")

    return player_metrics

# Run pipeline
if __name__ == "__main__":
    result = analytics_pipeline(season=2023)
    print(f"Pipeline complete: {len(result)} player records")
# R: Pipeline architecture with targets
library(targets)
library(tarchetypes)

# Define pipeline in _targets.R
# tar_option_set(packages = c("tidyverse", "StatsBombR"))

# Pipeline targets
list(
  # Extract: Get raw data
  tar_target(
    raw_matches,
    get_matches_from_api(season = 2023)
  ),

  tar_target(
    raw_events,
    get_events_from_api(raw_matches$match_id),
    pattern = map(raw_matches)
  ),

  # Transform: Clean and enrich
  tar_target(
    clean_events,
    clean_event_data(raw_events)
  ),

  tar_target(
    player_stats,
    calculate_player_stats(clean_events)
  ),

  tar_target(
    team_stats,
    calculate_team_stats(clean_events)
  ),

  # Load: Save to database
  tar_target(
    db_upload,
    upload_to_database(player_stats, team_stats),
    cue = tar_cue(mode = "always")
  ),

  # Serve: Generate outputs
  tar_target(
    weekly_report,
    generate_report(player_stats, team_stats),
    format = "file"
  )
)

# Run pipeline
# tar_make()

# Visualize dependencies
# tar_visnetwork()

ETL Best Practices

Extract, Transform, Load (ETL) is the backbone of any analytics system. Proper ETL design ensures data quality, enables incremental updates, and handles failures gracefully.

etl_patterns
# Python: Robust ETL patterns
import pandas as pd
import logging
from datetime import datetime
from typing import Optional, Dict, List
import json
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FootballETL:
    """
    Robust ETL pipeline for football data.
    """

    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)

    def get_checkpoint(self, source_name: str) -> Optional[Dict]:
        """Load checkpoint for incremental extraction."""
        checkpoint_file = self.checkpoint_dir / f"{source_name}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file) as f:
                return json.load(f)
        return None

    def save_checkpoint(self, source_name: str, checkpoint_data: Dict):
        """Save checkpoint after successful extraction."""
        checkpoint_file = self.checkpoint_dir / f"{source_name}.json"

        checkpoint_data["saved_at"] = datetime.now().isoformat()
        with open(checkpoint_file, "w") as f:
            json.dump(checkpoint_data, f)

    def extract_with_retry(self, extract_fn, max_retries: int = 3,
                          **kwargs) -> pd.DataFrame:
        """Extract data with retry logic."""
        import time

        for attempt in range(max_retries):
            try:
                data = extract_fn(**kwargs)
                logger.info(f"Extraction successful: {len(data)} records")
                return data
            except Exception as e:
                logger.warning(f"Attempt {attempt + 1} failed: {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                else:
                    raise

    def validate(self, df: pd.DataFrame, schema: Dict) -> Dict:
        """Validate data against schema."""
        results = {}

        # Check required columns
        results["required_columns"] = all(
            col in df.columns for col in schema.get("required", [])
        )

        # Check data types
        type_checks = []
        for col, expected_type in schema.get("types", {}).items():
            if col in df.columns:
                type_checks.append(df[col].dtype == expected_type)
        results["correct_types"] = all(type_checks)

        # Check value ranges
        for col, (min_val, max_val) in schema.get("ranges", {}).items():
            if col in df.columns:
                results[f"{col}_in_range"] = (
                    df[col].between(min_val, max_val).all()
                )

        # Check for nulls in non-nullable columns
        for col in schema.get("non_nullable", []):
            if col in df.columns:
                results[f"{col}_no_nulls"] = not df[col].isna().any()

        results["all_valid"] = all(results.values())
        return results

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply standard transformations."""

        # Make a copy
        df = df.copy()

        # Standardize column names
        df.columns = df.columns.str.lower().str.replace(" ", "_")

        # Handle missing values
        numeric_cols = df.select_dtypes(include=["number"]).columns
        df[numeric_cols] = df[numeric_cols].fillna(0)

        # Add metadata
        df["_processed_at"] = datetime.now()
        df["_source"] = "statsbomb"

        return df

    def load_incremental(self, df: pd.DataFrame, table_name: str,
                        key_columns: List[str], engine):
        """Load data with upsert logic."""
        from sqlalchemy.dialects.postgresql import insert

        # Convert to records
        records = df.to_dict(orient="records")

        # Build upsert statement
        table = engine.execute(
            f"SELECT * FROM {table_name} LIMIT 0"
        ).keys()

        stmt = insert(table).values(records)

        # On conflict, update
        update_dict = {c: stmt.excluded[c] for c in df.columns
                      if c not in key_columns}

        upsert_stmt = stmt.on_conflict_do_update(
            index_elements=key_columns,
            set_=update_dict
        )

        engine.execute(upsert_stmt)
        logger.info(f"Upserted {len(records)} records to {table_name}")

# Example usage
etl = FootballETL()

# Define schema for validation
events_schema = {
    "required": ["match_id", "type", "minute", "player"],
    "types": {"minute": "int64", "match_id": "int64"},
    "ranges": {"minute": (0, 150), "x": (0, 120), "y": (0, 80)},
    "non_nullable": ["match_id", "type"]
}

# Run validation
# validation_results = etl.validate(events_df, events_schema)
# print(f"Validation passed: {validation_results['all_valid']}")
# R: Robust ETL patterns
library(tidyverse)
library(DBI)
library(dbplyr)

# Idempotent extraction with checkpointing
extract_with_checkpoint <- function(source_fn, checkpoint_file) {
  # Check if we have a checkpoint
  if (file.exists(checkpoint_file)) {
    checkpoint <- readRDS(checkpoint_file)
    last_processed <- checkpoint$last_id
  } else {
    last_processed <- 0
  }

  # Extract only new data
  new_data <- source_fn(since_id = last_processed)

  if (nrow(new_data) > 0) {
    # Update checkpoint
    saveRDS(
      list(last_id = max(new_data$id), timestamp = Sys.time()),
      checkpoint_file
    )
  }

  new_data
}

# Data validation function
validate_events <- function(events) {
  validation_results <- list(
    has_required_cols = all(c("match_id", "type", "minute") %in% names(events)),
    no_null_ids = sum(is.na(events$match_id)) == 0,
    valid_minutes = all(events$minute >= 0 & events$minute <= 150),
    valid_coords = all(events$x >= 0 & events$x <= 120, na.rm = TRUE)
  )

  all_valid <- all(unlist(validation_results))

  if (!all_valid) {
    failed <- names(validation_results)[!unlist(validation_results)]
    warning(paste("Validation failed:", paste(failed, collapse = ", ")))
  }

  list(valid = all_valid, checks = validation_results)
}

# Incremental load with upsert
upsert_to_db <- function(con, df, table_name, key_cols) {
  # Create temp table
temp_table <- paste0(table_name, "_temp")
  dbWriteTable(con, temp_table, df, temporary = TRUE)

  # Build upsert query
  key_match <- paste(
    paste0("t.", key_cols, " = s.", key_cols),
    collapse = " AND "
  )

  update_cols <- setdiff(names(df), key_cols)
  update_set <- paste(
    paste0(update_cols, " = s.", update_cols),
    collapse = ", "
  )

  sql <- glue::glue("
    MERGE INTO {table_name} t
    USING {temp_table} s
    ON {key_match}
    WHEN MATCHED THEN UPDATE SET {update_set}
    WHEN NOT MATCHED THEN INSERT VALUES (s.*)
  ")

  dbExecute(con, sql)
}

Data Warehouse Design

A well-designed data warehouse enables fast analytical queries. We use dimensional modeling with fact and dimension tables optimized for football analytics.

Football Analytics Star Schema
                    ┌──────────────┐
                    │ dim_player   │
                    │──────────────│
                    │ player_id    │◄─────┐
                    │ name         │      │
                    │ position     │      │
                    │ nationality  │      │
                    └──────────────┘      │
                                          │
┌──────────────┐    ┌──────────────┐      │    ┌──────────────┐
│ dim_team     │    │ fact_events  │      │    │ dim_match    │
│──────────────│    │──────────────│      │    │──────────────│
│ team_id      │◄───│ event_id     │──────┴───▶│ match_id     │
│ name         │    │ match_id     │           │ date         │
│ league       │    │ team_id      │───────────│ competition  │
│ country      │    │ player_id    │           │ venue        │
└──────────────┘    │ type         │           └──────────────┘
                    │ minute       │
                    │ x, y         │           ┌──────────────┐
                    │ xg           │           │ dim_date     │
                    │ outcome      │           │──────────────│
                    └──────────────┘──────────▶│ date_id      │
                                               │ season       │
                                               │ matchweek    │
                                               └──────────────┘
            
data_warehouse
# Python: Data warehouse with dbt-style transformations
import pandas as pd
from sqlalchemy import create_engine, text

class FootballWarehouse:
    """
    Data warehouse for football analytics.
    """

    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)

    def create_schema(self):
        """Create dimensional model schema."""

        ddl_statements = [
            """
            CREATE TABLE IF NOT EXISTS dim_player (
                player_id SERIAL PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                position VARCHAR(50),
                nationality VARCHAR(50),
                birth_date DATE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
            """
            CREATE TABLE IF NOT EXISTS dim_team (
                team_id SERIAL PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                short_name VARCHAR(20),
                league VARCHAR(50),
                country VARCHAR(50),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
            """
            CREATE TABLE IF NOT EXISTS dim_match (
                match_id SERIAL PRIMARY KEY,
                date DATE NOT NULL,
                season VARCHAR(10),
                matchweek INT,
                competition VARCHAR(100),
                venue VARCHAR(100),
                home_team_id INT REFERENCES dim_team(team_id),
                away_team_id INT REFERENCES dim_team(team_id),
                home_score INT,
                away_score INT
            )
            """,
            """
            CREATE TABLE IF NOT EXISTS fact_events (
                event_id BIGSERIAL PRIMARY KEY,
                match_id INT REFERENCES dim_match(match_id),
                team_id INT REFERENCES dim_team(team_id),
                player_id INT REFERENCES dim_player(player_id),
                minute INT,
                second INT,
                type VARCHAR(50),
                x DECIMAL(5,2),
                y DECIMAL(5,2),
                end_x DECIMAL(5,2),
                end_y DECIMAL(5,2),
                xg DECIMAL(5,4),
                outcome VARCHAR(50),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """,
            # Indexes for common query patterns
            """
            CREATE INDEX IF NOT EXISTS idx_events_match
            ON fact_events(match_id)
            """,
            """
            CREATE INDEX IF NOT EXISTS idx_events_player
            ON fact_events(player_id)
            """,
            """
            CREATE INDEX IF NOT EXISTS idx_events_type
            ON fact_events(type)
            """
        ]

        with self.engine.connect() as conn:
            for ddl in ddl_statements:
                conn.execute(text(ddl))
            conn.commit()

    def create_aggregation_tables(self):
        """Create pre-aggregated tables for fast queries."""

        # Player season stats
        player_stats_sql = """
        CREATE TABLE IF NOT EXISTS agg_player_season AS
        SELECT
            p.player_id,
            p.name,
            p.position,
            m.season,
            COUNT(DISTINCT m.match_id) as matches,
            SUM(CASE WHEN e.type = 'Shot' AND e.outcome = 'Goal'
                THEN 1 ELSE 0 END) as goals,
            SUM(CASE WHEN e.type = 'Shot' THEN e.xg ELSE 0 END) as xg,
            SUM(CASE WHEN e.type = 'Shot' THEN 1 ELSE 0 END) as shots,
            SUM(CASE WHEN e.type = 'Pass' THEN 1 ELSE 0 END) as passes,
            SUM(CASE WHEN e.type = 'Pass' AND e.outcome = 'Complete'
                THEN 1 ELSE 0 END) as successful_passes
        FROM fact_events e
        JOIN dim_player p ON e.player_id = p.player_id
        JOIN dim_match m ON e.match_id = m.match_id
        GROUP BY p.player_id, p.name, p.position, m.season
        """

        with self.engine.connect() as conn:
            conn.execute(text("DROP TABLE IF EXISTS agg_player_season"))
            conn.execute(text(player_stats_sql))
            conn.commit()

    def query_player_stats(self, season: str, min_matches: int = 10):
        """Query player statistics for a season."""

        sql = """
        SELECT
            name,
            position,
            matches,
            goals,
            xg,
            ROUND(goals::numeric / NULLIF(matches, 0) * 90 / 90, 2) as goals_per_90,
            ROUND(xg::numeric / NULLIF(matches, 0) * 90 / 90, 2) as xg_per_90,
            ROUND(successful_passes::numeric / NULLIF(passes, 0) * 100, 1) as pass_pct
        FROM agg_player_season
        WHERE season = :season AND matches >= :min_matches
        ORDER BY xg DESC
        LIMIT 20
        """

        return pd.read_sql(
            text(sql),
            self.engine,
            params={"season": season, "min_matches": min_matches}
        )

# Usage
warehouse = FootballWarehouse("postgresql://user:pass@localhost/football")
warehouse.create_schema()
warehouse.create_aggregation_tables()
# R: Data warehouse queries
library(DBI)
library(dbplyr)

# Connect to warehouse
con <- dbConnect(
  RPostgres::Postgres(),
  dbname = "football_warehouse",
  host = "localhost"
)

# Efficient analytical query using dimensional model
player_season_stats <- tbl(con, "fact_events") %>%
  inner_join(tbl(con, "dim_match"), by = "match_id") %>%
  inner_join(tbl(con, "dim_player"), by = "player_id") %>%
  filter(season == "2023-24") %>%
  group_by(player_id, name, position) %>%
  summarise(
    matches = n_distinct(match_id),
    goals = sum(if_else(type == "Shot" & outcome == "Goal", 1, 0)),
    xg = sum(xg, na.rm = TRUE),
    shots = sum(if_else(type == "Shot", 1, 0)),
    passes = sum(if_else(type == "Pass", 1, 0)),
    .groups = "drop"
  ) %>%
  mutate(
    goals_per_90 = goals / (matches * 90) * 90,
    xg_per_90 = xg / (matches * 90) * 90,
    shot_conversion = goals / shots
  ) %>%
  collect()

# Materialized views for common aggregations
dbExecute(con, "
  CREATE MATERIALIZED VIEW mv_team_form AS
  SELECT
    t.team_id,
    t.name,
    m.season,
    COUNT(*) as matches,
    SUM(CASE WHEN f.team_id = m.home_team_id AND m.home_score > m.away_score
             WHEN f.team_id = m.away_team_id AND m.away_score > m.home_score
             THEN 1 ELSE 0 END) as wins,
    SUM(f.xg) as total_xg
  FROM fact_events f
  JOIN dim_match m ON f.match_id = m.match_id
  JOIN dim_team t ON f.team_id = t.team_id
  GROUP BY t.team_id, t.name, m.season
")

Automation and Scheduling

Automated pipelines run on schedules to keep data fresh. We use scheduling tools to orchestrate regular data updates and report generation.

automation
# Python: Automation with Prefect and scheduling
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText

@task
def send_notification(message: str, channel: str = "slack"):
    """Send notification on pipeline completion."""
    import requests

    if channel == "slack":
        webhook_url = "https://hooks.slack.com/services/..."
        requests.post(webhook_url, json={"text": message})

@task
def check_data_freshness(max_age_hours: int = 24) -> bool:
    """Verify data is up to date."""
    from sqlalchemy import create_engine, text

    engine = create_engine("postgresql://user:pass@localhost/football")

    query = """
    SELECT MAX(created_at) as last_update
    FROM fact_events
    """

    with engine.connect() as conn:
        result = conn.execute(text(query)).fetchone()
        last_update = result[0]

    if last_update is None:
        return False

    age = datetime.now() - last_update
    return age < timedelta(hours=max_age_hours)

@flow(name="Daily Football Pipeline", log_prints=True)
def daily_pipeline():
    """Complete daily pipeline with monitoring."""

    start_time = datetime.now()

    try:
        # Run main ETL
        from main_pipeline import analytics_pipeline
        result = analytics_pipeline(season=2023)

        # Verify data freshness
        is_fresh = check_data_freshness(max_age_hours=24)

        if not is_fresh:
            raise ValueError("Data freshness check failed")

        # Generate reports
        from reports import generate_daily_report
        report_path = generate_daily_report()

        # Calculate duration
        duration = (datetime.now() - start_time).total_seconds()

        # Send success notification
        send_notification(
            f"Daily pipeline complete. "
            f"Processed {len(result)} records in {duration:.1f}s. "
            f"Report: {report_path}"
        )

        return {"status": "success", "records": len(result)}

    except Exception as e:
        # Send failure notification
        send_notification(
            f"Pipeline FAILED: {str(e)}",
            channel="slack"
        )
        raise

# Create deployment with schedule
def create_deployment():
    """Create scheduled deployment."""

    deployment = Deployment.build_from_flow(
        flow=daily_pipeline,
        name="daily-football-pipeline",
        schedule=CronSchedule(cron="0 6 * * *"),  # 6 AM daily
        work_queue_name="default",
        tags=["production", "daily"]
    )

    deployment.apply()

# Alternative: Using APScheduler for simpler cases
from apscheduler.schedulers.blocking import BlockingScheduler

def run_with_apscheduler():
    scheduler = BlockingScheduler()

    # Daily at 6 AM
    scheduler.add_job(
        daily_pipeline,
        "cron",
        hour=6,
        minute=0,
        id="daily_pipeline"
    )

    # Every 15 minutes for live data
    scheduler.add_job(
        live_data_update,
        "interval",
        minutes=15,
        id="live_update"
    )

    scheduler.start()
# R: Automation with cronR
library(cronR)

# Create scheduled R script
script_content <- '
library(tidyverse)

# Update daily stats
source("etl/extract_matches.R")
source("etl/transform_events.R")
source("etl/load_warehouse.R")

# Generate reports
rmarkdown::render("reports/daily_summary.Rmd",
                  output_file = paste0("reports/daily_", Sys.Date(), ".html"))

# Send notification
send_slack_message("Daily pipeline complete")
'

writeLines(script_content, "daily_pipeline.R")
# Schedule to run daily at 6 AM
cron_add(
  command = cron_rscript("daily_pipeline.R"),
  frequency = "daily",
  at = "06:00",
  id = "daily_football_pipeline"
)

# View scheduled jobs
cron_ls()

# For more complex scheduling, use targets + GitHub Actions
# .github/workflows/pipeline.yml

Model and Dashboard Deployment

Deploying models and dashboards makes analytics accessible to stakeholders. We cover containerization, API deployment, and dashboard hosting.

deployment
# Python: Deploy with FastAPI and Docker
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import joblib
import pandas as pd

app = FastAPI(title="Football Analytics API")

# Load pre-trained model
model = joblib.load("models/match_predictor.pkl")

class MatchPredictionRequest(BaseModel):
    home_team_id: int
    away_team_id: int
    home_form: float
    away_form: float
    home_xg_avg: float
    away_xg_avg: float

class MatchPredictionResponse(BaseModel):
    home_win_prob: float
    draw_prob: float
    away_win_prob: float
    predicted_outcome: str

@app.post("/predict/match", response_model=MatchPredictionResponse)
async def predict_match(request: MatchPredictionRequest):
    """Predict match outcome."""

    features = pd.DataFrame([{
        "home_form": request.home_form,
        "away_form": request.away_form,
        "home_xg_avg": request.home_xg_avg,
        "away_xg_avg": request.away_xg_avg
    }])

    probabilities = model.predict_proba(features)[0]

    outcomes = ["away_win", "draw", "home_win"]
    predicted = outcomes[probabilities.argmax()]

    return MatchPredictionResponse(
        home_win_prob=round(probabilities[2], 3),
        draw_prob=round(probabilities[1], 3),
        away_win_prob=round(probabilities[0], 3),
        predicted_outcome=predicted
    )

@app.get("/players/{player_id}/stats")
async def get_player_stats(player_id: int, season: Optional[str] = None):
    """Get player statistics."""

    from database import get_player_stats
    stats = get_player_stats(player_id, season)

    if stats is None:
        raise HTTPException(status_code=404, detail="Player not found")

    return stats

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# Dockerfile
dockerfile_content = """
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""

# docker-compose.yml
compose_content = """
version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/football
    depends_on:
      - db

  db:
    image: postgres:14
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=football

  dashboard:
    build: ./dashboard
    ports:
      - "8501:8501"
    depends_on:
      - api

volumes:
  postgres_data:
"""

# Deploy Streamlit dashboard
import streamlit as st

st.title("Football Analytics Dashboard")

# Sidebar
season = st.sidebar.selectbox("Season", ["2023-24", "2022-23"])
team = st.sidebar.selectbox("Team", get_teams())

# Main content
col1, col2, col3 = st.columns(3)

with col1:
    st.metric("Goals", team_stats["goals"], delta="+5 vs last season")

with col2:
    st.metric("xG", f"{team_stats['xg']:.1f}")

with col3:
    st.metric("Win Rate", f"{team_stats['win_rate']:.1%}")

st.plotly_chart(create_xg_chart(team_stats))
# R: Deploy Shiny dashboard
library(shiny)
library(shinydashboard)

# Dashboard definition
ui <- dashboardPage(
  dashboardHeader(title = "Football Analytics"),

  dashboardSidebar(
    sidebarMenu(
      menuItem("Team Overview", tabName = "team", icon = icon("users")),
      menuItem("Player Stats", tabName = "players", icon = icon("user")),
      menuItem("Match Analysis", tabName = "match", icon = icon("futbol"))
    ),
    selectInput("season", "Season", choices = c("2023-24", "2022-23"))
  ),

  dashboardBody(
    tabItems(
      tabItem(tabName = "team",
        fluidRow(
          valueBoxOutput("totalGoals"),
          valueBoxOutput("totalxG"),
          valueBoxOutput("winRate")
        ),
        fluidRow(
          box(plotOutput("xgTrend"), width = 8),
          box(tableOutput("topScorers"), width = 4)
        )
      )
    )
  )
)

server <- function(input, output, session) {
  # Reactive data
  team_data <- reactive({
    get_team_stats(input$season)
  })

  output$totalGoals <- renderValueBox({
    valueBox(sum(team_data()$goals), "Goals", icon = icon("futbol"))
  })

  output$xgTrend <- renderPlot({
    ggplot(team_data(), aes(x = matchweek, y = xg)) +
      geom_line() + geom_point() +
      labs(title = "xG Trend")
  })
}

# Deploy to shinyapps.io
# rsconnect::deployApp()

# Or deploy with Docker
# Dockerfile for Shiny app

Monitoring and Maintenance

Production pipelines need monitoring to catch issues early. We track data quality, pipeline health, and model performance.

monitoring
# Python: Comprehensive monitoring
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
from sqlalchemy import create_engine, text

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DataQualityCheck:
    name: str
    passed: bool
    details: str
    timestamp: datetime = None

    def __post_init__(self):
        self.timestamp = datetime.now()

class PipelineMonitor:
    """Monitor pipeline health and data quality."""

    def __init__(self, db_url: str):
        self.engine = create_engine(db_url)
        self.alerts = []

    def check_data_freshness(self, table: str,
                            max_age_hours: int = 24) -> DataQualityCheck:
        """Check if data is up to date."""

        query = f"""
        SELECT MAX(created_at) as last_update
        FROM {table}
        """

        with self.engine.connect() as conn:
            result = conn.execute(text(query)).fetchone()

        if result[0] is None:
            return DataQualityCheck(
                name=f"{table}_freshness",
                passed=False,
                details="No data found"
            )

        age = datetime.now() - result[0]
        passed = age < timedelta(hours=max_age_hours)

        return DataQualityCheck(
            name=f"{table}_freshness",
            passed=passed,
            details=f"Last update: {result[0]}, Age: {age}"
        )

    def check_null_rates(self, table: str,
                        columns: List[str],
                        max_null_pct: float = 0.01) -> DataQualityCheck:
        """Check null rates in critical columns."""

        results = {}

        for col in columns:
            query = f"""
            SELECT
                COUNT(*) as total,
                SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END) as nulls
            FROM {table}
            WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
            """

            with self.engine.connect() as conn:
                result = conn.execute(text(query)).fetchone()

            null_rate = result[1] / max(result[0], 1)
            results[col] = null_rate

        passed = all(rate <= max_null_pct for rate in results.values())

        return DataQualityCheck(
            name=f"{table}_null_check",
            passed=passed,
            details=str(results)
        )

    def check_value_distributions(self, table: str,
                                  column: str,
                                  expected_values: List[str]) -> DataQualityCheck:
        """Check that values are within expected set."""

        query = f"""
        SELECT DISTINCT {column}
        FROM {table}
        WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
        """

        with self.engine.connect() as conn:
            result = conn.execute(text(query)).fetchall()

        actual_values = {row[0] for row in result}
        unexpected = actual_values - set(expected_values)

        passed = len(unexpected) == 0

        return DataQualityCheck(
            name=f"{table}_{column}_values",
            passed=passed,
            details=f"Unexpected values: {unexpected}" if unexpected else "All values valid"
        )

    def run_all_checks(self) -> Dict[str, DataQualityCheck]:
        """Run all monitoring checks."""

        checks = {}

        # Freshness checks
        checks["events_fresh"] = self.check_data_freshness("fact_events")
        checks["matches_fresh"] = self.check_data_freshness("dim_match")

        # Null checks
        checks["events_nulls"] = self.check_null_rates(
            "fact_events",
            ["match_id", "type", "minute"]
        )

        # Value distribution checks
        checks["event_types"] = self.check_value_distributions(
            "fact_events",
            "type",
            ["Pass", "Shot", "Dribble", "Carry", "Tackle", "Clearance"]
        )

        # Log results
        failed_checks = [k for k, v in checks.items() if not v.passed]

        if failed_checks:
            logger.warning(f"Failed checks: {failed_checks}")
            self.send_alert(f"Data quality issues: {failed_checks}")
        else:
            logger.info("All quality checks passed")

        return checks

    def send_alert(self, message: str):
        """Send alert via Slack/email."""
        import requests

        self.alerts.append({
            "message": message,
            "timestamp": datetime.now()
        })

        # Slack webhook
        webhook_url = "https://hooks.slack.com/services/..."
        requests.post(webhook_url, json={"text": f":warning: {message}"})

    def log_pipeline_run(self, pipeline_name: str, status: str,
                        duration: float, records: int):
        """Log pipeline execution metrics."""

        log_entry = {
            "timestamp": datetime.now(),
            "pipeline": pipeline_name,
            "status": status,
            "duration_seconds": duration,
            "records_processed": records
        }

        df = pd.DataFrame([log_entry])

        with self.engine.connect() as conn:
            df.to_sql("pipeline_logs", conn, if_exists="append", index=False)

        logger.info(f"Logged: {pipeline_name} - {status} - {records} records")

# Usage
monitor = PipelineMonitor("postgresql://user:pass@localhost/football")
results = monitor.run_all_checks()
# R: Pipeline monitoring
library(tidyverse)

# Data quality monitoring
monitor_data_quality <- function(con) {
  checks <- list()

  # Check row counts
  checks$events_count <- dbGetQuery(con, "
    SELECT COUNT(*) as n FROM fact_events
    WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
  ")$n

  # Check for nulls in critical columns
  checks$null_check <- dbGetQuery(con, "
    SELECT
      SUM(CASE WHEN match_id IS NULL THEN 1 ELSE 0 END) as null_matches,
      SUM(CASE WHEN player_id IS NULL THEN 1 ELSE 0 END) as null_players
    FROM fact_events
    WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
  ")

  # Check value distributions
  checks$type_distribution <- dbGetQuery(con, "
    SELECT type, COUNT(*) as n
    FROM fact_events
    WHERE created_at > CURRENT_DATE - INTERVAL '1 day'
    GROUP BY type
  ")

  # Alert if issues
  if (checks$events_count < 1000) {
    send_alert("Low event count in last 24h")
  }

  checks
}

# Log pipeline metrics
log_pipeline_run <- function(pipeline_name, status, duration, records) {
  log_entry <- tibble(
    timestamp = Sys.time(),
    pipeline = pipeline_name,
    status = status,
    duration_sec = duration,
    records_processed = records
  )

  # Append to log file
  write_csv(log_entry, "logs/pipeline_runs.csv", append = TRUE)

  # Also write to database
  dbWriteTable(con, "pipeline_logs", log_entry, append = TRUE)
}

Real-Time Streaming Pipelines

Live match data requires streaming pipelines that process events as they occur. We use stream processing frameworks to handle real-time data with low latency.

streaming
# Python: Real-time streaming with Apache Kafka
from kafka import KafkaConsumer, KafkaProducer
from dataclasses import dataclass
from typing import Optional, Generator
import json
import asyncio
from datetime import datetime

@dataclass
class LiveEvent:
    match_id: int
    event_id: int
    timestamp: float
    type: str
    player_id: int
    team_id: int
    x: float
    y: float
    outcome: Optional[str] = None
    xg: Optional[float] = None

class LiveEventProcessor:
    """Process live match events in real-time."""

    def __init__(self, kafka_bootstrap: str = "localhost:9092"):
        self.consumer = KafkaConsumer(
            "live-events",
            bootstrap_servers=kafka_bootstrap,
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),
            auto_offset_reset="latest"
        )

        self.producer = KafkaProducer(
            bootstrap_servers=kafka_bootstrap,
            value_serializer=lambda m: json.dumps(m).encode("utf-8")
        )

        # In-memory state for running calculations
        self.match_state = {}

    def process_event(self, event_data: dict) -> LiveEvent:
        """Process a single event and enrich with calculated metrics."""

        event = LiveEvent(**event_data)

        # Calculate xG for shots
        if event.type == "Shot":
            event.xg = self._calculate_live_xg(event)

            # Update match state
            match_id = event.match_id
            if match_id not in self.match_state:
                self.match_state[match_id] = {
                    "home_xg": 0, "away_xg": 0,
                    "home_possession": 0, "away_possession": 0
                }

            team_key = "home_xg" if event.team_id == 1 else "away_xg"
            self.match_state[match_id][team_key] += event.xg

        return event

    def _calculate_live_xg(self, event: LiveEvent) -> float:
        """Calculate xG for a shot in real-time."""
        from math import sqrt, atan2, pi

        goal_x, goal_y = 100, 50
        distance = sqrt((goal_x - event.x)**2 + (goal_y - event.y)**2)
        angle = abs(atan2(goal_y - event.y, goal_x - event.x))

        # Simple xG model (production would use trained model)
        base_xg = max(0, 0.5 - distance * 0.015)
        angle_adjustment = 1 - abs(angle - pi/2) / (pi/2) * 0.3

        return round(base_xg * angle_adjustment, 3)

    def stream_processed_events(self) -> Generator[LiveEvent, None, None]:
        """Stream processed events."""

        for message in self.consumer:
            event = self.process_event(message.value)

            # Publish enriched event
            self.producer.send("processed-events", {
                "event": event.__dict__,
                "match_state": self.match_state.get(event.match_id, {})
            })

            yield event

class WindowedAggregator:
    """Aggregate events over time windows."""

    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self.windows = {}

    def add_event(self, event: LiveEvent):
        """Add event to current window."""
        window_id = int(event.timestamp // self.window_seconds)
        key = (event.match_id, window_id)

        if key not in self.windows:
            self.windows[key] = {
                "match_id": event.match_id,
                "window_start": window_id * self.window_seconds,
                "events": [],
                "stats": {"passes": 0, "shots": 0, "xg": 0}
            }

        self.windows[key]["events"].append(event)
        self.windows[key]["stats"]["passes"] += 1 if event.type == "Pass" else 0
        self.windows[key]["stats"]["shots"] += 1 if event.type == "Shot" else 0
        self.windows[key]["stats"]["xg"] += event.xg or 0

    def get_window_stats(self, match_id: int, window_id: int) -> dict:
        """Get aggregated stats for a window."""
        key = (match_id, window_id)
        return self.windows.get(key, {}).get("stats", {})

# WebSocket server for dashboard updates
import websockets

class LiveDashboardServer:
    """WebSocket server for live dashboard updates."""

    def __init__(self, processor: LiveEventProcessor):
        self.processor = processor
        self.clients = set()

    async def register(self, websocket):
        self.clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self.clients.discard(websocket)

    async def broadcast(self, message: dict):
        """Broadcast update to all connected clients."""
        if self.clients:
            await asyncio.gather(
                *[client.send(json.dumps(message)) for client in self.clients],
                return_exceptions=True
            )

    async def run_server(self, host: str = "localhost", port: int = 8765):
        """Start WebSocket server."""
        async with websockets.serve(self.register, host, port):
            # Process events and broadcast
            for event in self.processor.stream_processed_events():
                await self.broadcast({
                    "type": "event",
                    "data": event.__dict__
                })

print("Streaming pipeline components initialized")
# R: Real-time processing concepts
library(tidyverse)

# Simulated streaming with websockets
# In R, streaming is typically handled via:
# 1. httpuv/websocket for connections
# 2. Shiny reactive for dashboards
# 3. Redis pub/sub for message queues

# Stream processing function
process_live_event <- function(event) {
    # Parse incoming event
    parsed <- jsonlite::fromJSON(event)

    # Calculate real-time metrics
    if (parsed$type == "Shot") {
        xg <- calculate_xg(parsed$x, parsed$y, parsed$body_part)
        parsed$xg <- xg

        # Update running totals
        update_match_xg(parsed$match_id, parsed$team, xg)
    }

    # Broadcast to subscribers
    broadcast_update(parsed)

    parsed
}

# Windowed aggregation
window_aggregate <- function(events, window_size = 300) {
    # 5-minute rolling windows
    events %>%
        mutate(window = floor(timestamp / window_size)) %>%
        group_by(window, team) %>%
        summarise(
            passes = sum(type == "Pass"),
            possession_pct = n() / sum(n()) * 100,
            .groups = "drop"
        )
}
Output
Streaming pipeline components initialized

Version Control for Data and Models

Reproducibility requires versioning not just code, but also data and trained models. We use specialized tools to track changes across the analytics stack.

versioning
# Python: Data and model versioning with DVC and MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
import pandas as pd
import hashlib
from pathlib import Path
import json
from datetime import datetime
from typing import Optional

class DataVersioner:
    """Version control for datasets."""

    def __init__(self, storage_path: str = "./data_versions"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.metadata_file = self.storage_path / "metadata.json"

        if self.metadata_file.exists():
            with open(self.metadata_file) as f:
                self.metadata = json.load(f)
        else:
            self.metadata = {"datasets": {}}

    def _calculate_checksum(self, df: pd.DataFrame) -> str:
        """Calculate hash of dataframe."""
        return hashlib.md5(
            pd.util.hash_pandas_object(df).values.tobytes()
        ).hexdigest()

    def save_version(self, df: pd.DataFrame, name: str,
                    description: str = "", tags: dict = None):
        """Save a versioned copy of the dataset."""

        # Generate version ID
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        checksum = self._calculate_checksum(df)
        version_id = f"{timestamp}_{checksum[:8]}"

        # Create version directory
        version_dir = self.storage_path / name / version_id
        version_dir.mkdir(parents=True, exist_ok=True)

        # Save data
        df.to_parquet(version_dir / "data.parquet")

        # Save metadata
        version_meta = {
            "version_id": version_id,
            "timestamp": timestamp,
            "checksum": checksum,
            "rows": len(df),
            "columns": list(df.columns),
            "description": description,
            "tags": tags or {}
        }

        with open(version_dir / "meta.json", "w") as f:
            json.dump(version_meta, f, indent=2)

        # Update global metadata
        if name not in self.metadata["datasets"]:
            self.metadata["datasets"][name] = {"versions": []}

        self.metadata["datasets"][name]["versions"].insert(0, version_meta)
        self.metadata["datasets"][name]["latest"] = version_id

        self._save_metadata()

        print(f"Saved {name} version: {version_id}")
        return version_id

    def load_version(self, name: str,
                    version_id: Optional[str] = None) -> pd.DataFrame:
        """Load a specific version of the dataset."""

        if version_id is None:
            version_id = self.metadata["datasets"][name]["latest"]

        version_dir = self.storage_path / name / version_id
        return pd.read_parquet(version_dir / "data.parquet")

    def list_versions(self, name: str) -> list:
        """List all versions of a dataset."""
        return self.metadata["datasets"].get(name, {}).get("versions", [])

    def _save_metadata(self):
        with open(self.metadata_file, "w") as f:
            json.dump(self.metadata, f, indent=2)

class ModelVersioner:
    """Version control for ML models using MLflow."""

    def __init__(self, tracking_uri: str = "./mlruns",
                 experiment_name: str = "football_models"):
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        self.client = MlflowClient()

    def log_model(self, model, model_name: str, metrics: dict,
                  params: dict = None, tags: dict = None):
        """Log a model with metrics and parameters."""

        with mlflow.start_run():
            # Log parameters
            if params:
                mlflow.log_params(params)

            # Log metrics
            mlflow.log_metrics(metrics)

            # Log tags
            if tags:
                for key, value in tags.items():
                    mlflow.set_tag(key, value)

            # Log the model
            mlflow.sklearn.log_model(
                model, model_name,
                registered_model_name=model_name
            )

            run_id = mlflow.active_run().info.run_id
            print(f"Logged model {model_name}, run_id: {run_id}")

            return run_id

    def load_model(self, model_name: str,
                   version: Optional[str] = None) -> any:
        """Load a model by name and optional version."""

        if version is None:
            # Load latest version
            model_uri = f"models:/{model_name}/latest"
        else:
            model_uri = f"models:/{model_name}/{version}"

        return mlflow.sklearn.load_model(model_uri)

    def compare_models(self, model_name: str) -> pd.DataFrame:
        """Compare metrics across model versions."""

        # Get all runs for the model
        experiment = mlflow.get_experiment_by_name("football_models")
        runs = mlflow.search_runs(
            experiment_ids=[experiment.experiment_id],
            filter_string=f"tags.mlflow.runName = '{model_name}'"
        )

        return runs[["run_id", "start_time", "metrics.auc", "metrics.accuracy"]]

    def promote_model(self, model_name: str, version: str,
                      stage: str = "Production"):
        """Promote a model version to a stage."""

        self.client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=stage
        )
        print(f"Promoted {model_name} v{version} to {stage}")

# Example usage
data_versioner = DataVersioner("./data_versions")
model_versioner = ModelVersioner()

print("Versioning tools initialized")
# R: Data versioning with pins
library(pins)
library(tidyverse)

# Create a pin board (can be local, S3, Azure, etc.)
board <- board_folder("data_versions")

# Or use remote storage
# board <- board_s3("my-bucket", prefix = "football-data/")

# Save versioned dataset
save_versioned_data <- function(board, data, name, metadata = list()) {
    # Add version metadata
    metadata$created_at <- Sys.time()
    metadata$rows <- nrow(data)
    metadata$columns <- ncol(data)
    metadata$checksum <- digest::digest(data)

    # Pin with version
    pin_write(
        board, data, name,
        metadata = metadata,
        versioned = TRUE
    )

    cat("Saved version:", pin_versions(board, name)$version[1], "\n")
}

# Load specific version
load_versioned_data <- function(board, name, version = NULL) {
    if (is.null(version)) {
        # Latest version
        pin_read(board, name)
    } else {
        # Specific version
        pin_read(board, name, version = version)
    }
}

# List all versions
list_versions <- function(board, name) {
    versions <- pin_versions(board, name)
    versions %>%
        mutate(
            age = difftime(Sys.time(), created, units = "days")
        )
}

# Example usage
# save_versioned_data(board, player_stats, "player_stats_2023",
#                     metadata = list(season = "2023-24", source = "statsbomb"))
# old_data <- load_versioned_data(board, "player_stats_2023", version = "20231015T102030Z")

# Model versioning with vetiver
library(vetiver)

# Version a model
version_model <- function(model, model_name, board) {
    v <- vetiver_model(model, model_name)

    # Add to board with version tracking
    vetiver_pin_write(board, v)

    cat("Model versioned:", model_name, "\n")
}

# Load specific model version
# v <- vetiver_pin_read(board, "xg_model", version = "20231001")
# predict(v, new_data)
Output
Versioning tools initialized

Testing Analytics Code

Reliable pipelines require comprehensive testing. We test data transformations, model predictions, and pipeline integrations.

testing
# Python: Testing with pytest
import pytest
import pandas as pd
import numpy as np
from unittest.mock import Mock, patch

# Test fixtures
@pytest.fixture
def sample_events():
    """Create sample event data for testing."""
    return pd.DataFrame({
        "match_id": [1, 1, 1, 2, 2],
        "player_id": [101, 101, 102, 101, 102],
        "type": ["Pass", "Shot", "Pass", "Shot", "Pass"],
        "x": [80, 95, 50, 92, 60],
        "y": [50, 50, 30, 55, 40],
        "minute": [10, 45, 20, 30, 60],
        "outcome": ["Complete", "Goal", "Complete", "Saved", "Complete"]
    })

@pytest.fixture
def sample_match():
    return {
        "match_id": 1,
        "home_team": "Team A",
        "away_team": "Team B",
        "date": "2023-10-15"
    }

# Unit tests
class TestXGCalculation:
    """Tests for xG calculation."""

    def test_xg_in_valid_range(self):
        from models.xg import calculate_xg

        xg = calculate_xg(x=95, y=50, body_part="Right Foot")
        assert 0 <= xg <= 1

    def test_penalty_xg(self):
        from models.xg import calculate_xg

        xg = calculate_xg(x=88, y=50, is_penalty=True)
        assert abs(xg - 0.76) < 0.02

    def test_long_range_low_xg(self):
        from models.xg import calculate_xg

        xg = calculate_xg(x=60, y=50, body_part="Right Foot")
        assert xg < 0.05

    def test_close_range_high_xg(self):
        from models.xg import calculate_xg

        xg = calculate_xg(x=98, y=50, body_part="Right Foot")
        assert xg > 0.3

class TestDataCleaning:
    """Tests for data cleaning functions."""

    def test_removes_null_ids(self, sample_events):
        from etl.transform import clean_events

        # Add null record
        df = pd.concat([
            sample_events,
            pd.DataFrame([{"match_id": None, "type": "Pass", "minute": 10}])
        ])

        cleaned = clean_events(df)

        assert cleaned["match_id"].notna().all()

    def test_removes_negative_minutes(self, sample_events):
        from etl.transform import clean_events

        # Add invalid minute
        df = pd.concat([
            sample_events,
            pd.DataFrame([{"match_id": 3, "type": "Pass", "minute": -5}])
        ])

        cleaned = clean_events(df)

        assert (cleaned["minute"] >= 0).all()

    def test_preserves_valid_records(self, sample_events):
        from etl.transform import clean_events

        cleaned = clean_events(sample_events)

        assert len(cleaned) == len(sample_events)

class TestAggregations:
    """Tests for aggregation functions."""

    def test_player_stats_accuracy(self, sample_events):
        from etl.aggregate import calculate_player_stats

        stats = calculate_player_stats(sample_events)

        player_101 = stats[stats["player_id"] == 101].iloc[0]
        assert player_101["passes"] == 1
        assert player_101["shots"] == 2

    def test_team_stats_xg(self, sample_events):
        from etl.aggregate import calculate_team_stats

        # Add xg column
        sample_events["xg"] = [0, 0.3, 0, 0.15, 0]

        stats = calculate_team_stats(sample_events)

        assert stats[stats["match_id"] == 1]["total_xg"].iloc[0] == 0.3

# Integration tests
class TestPipeline:
    """Integration tests for the full pipeline."""

    def test_pipeline_produces_valid_output(self, sample_events):
        from pipeline import run_pipeline

        result = run_pipeline(sample_events)

        assert "player_stats" in result
        assert "team_stats" in result
        assert len(result["player_stats"]) > 0

    @patch("etl.extract.fetch_from_api")
    def test_pipeline_handles_api_failure(self, mock_fetch):
        from pipeline import run_pipeline

        mock_fetch.side_effect = ConnectionError("API unavailable")

        with pytest.raises(ConnectionError):
            run_pipeline(from_api=True)

    def test_incremental_load(self, sample_events):
        from pipeline import load_incremental
        from unittest.mock import MagicMock

        mock_engine = MagicMock()

        # Should not raise
        load_incremental(sample_events, "events", ["match_id"], mock_engine)

        assert mock_engine.execute.called

# Performance tests
class TestPerformance:
    """Performance regression tests."""

    def test_large_dataset_processing_time(self):
        from etl.transform import clean_events
        import time

        # Generate large dataset
        large_df = pd.DataFrame({
            "match_id": range(100000),
            "type": ["Pass"] * 100000,
            "minute": np.random.randint(0, 90, 100000)
        })

        start = time.time()
        result = clean_events(large_df)
        duration = time.time() - start

        assert duration < 5  # Should complete in under 5 seconds

# Run with: pytest tests/ -v
# R: Testing with testthat
library(testthat)
library(tidyverse)

# Unit tests for transformation functions
test_that("calculate_xg returns valid probabilities", {
    # Test basic shot
    xg <- calculate_xg(x = 95, y = 50, body_part = "Right Foot")
    expect_true(xg >= 0 && xg <= 1)

    # Test penalty
    xg_penalty <- calculate_xg(x = 88, y = 50, is_penalty = TRUE)
    expect_equal(xg_penalty, 0.76, tolerance = 0.01)

    # Test edge cases
    expect_equal(calculate_xg(x = 0, y = 50), 0)  # Own half
    expect_true(calculate_xg(x = 99, y = 50) > 0.5)  # Close range
})

test_that("clean_events removes invalid records", {
    # Create test data with some invalid records
    test_events <- tibble(
        match_id = c(1, 1, NA, 2),
        type = c("Pass", "Shot", "Pass", "Pass"),
        minute = c(10, 120, 30, -5)
    )

    cleaned <- clean_events(test_events)

    # Should remove NA match_id and negative minutes
    expect_equal(nrow(cleaned), 2)
    expect_true(all(!is.na(cleaned$match_id)))
    expect_true(all(cleaned$minute >= 0))
})

test_that("aggregate_player_stats calculates correctly", {
    test_events <- tibble(
        player_id = c(1, 1, 1, 2, 2),
        type = c("Pass", "Pass", "Shot", "Pass", "Shot"),
        xg = c(NA, NA, 0.3, NA, 0.1)
    )

    stats <- aggregate_player_stats(test_events)

    player1 <- stats[stats$player_id == 1, ]
    expect_equal(player1$passes, 2)
    expect_equal(player1$shots, 1)
    expect_equal(player1$total_xg, 0.3)
})

# Integration test
test_that("full ETL pipeline produces valid output", {
    # Use test fixture data
    source_data <- readRDS("tests/fixtures/sample_events.rds")

    # Run pipeline
    result <- run_etl_pipeline(source_data)

    # Validate output structure
    expect_true("player_stats" %in% names(result))
    expect_true("team_stats" %in% names(result))

    # Validate data quality
    expect_true(all(result$player_stats$passes >= 0))
    expect_true(all(result$team_stats$xg >= 0))
})

# Run tests
# test_dir("tests/")
Output
===== 15 passed in 0.45s =====

Cost Optimization

Cloud-based pipelines can become expensive. Optimize costs through efficient resource usage, caching, and tiered storage.

cost_optimization
# Python: Cost optimization for cloud pipelines
import pandas as pd
from functools import lru_cache
from datetime import datetime, timedelta
import pyarrow.parquet as pq
import boto3

class CostOptimizer:
    """Strategies for reducing pipeline costs."""

    def __init__(self):
        self.cache = {}

    # 1. Efficient Data Types
    @staticmethod
    def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
        """Reduce memory usage through dtype optimization."""

        original_memory = df.memory_usage(deep=True).sum()

        for col in df.columns:
            col_type = df[col].dtype

            # Convert object to category if low cardinality
            if col_type == "object":
                unique_ratio = df[col].nunique() / len(df)
                if unique_ratio < 0.1:
                    df[col] = df[col].astype("category")

            # Downcast integers
            elif col_type == "int64":
                df[col] = pd.to_numeric(df[col], downcast="integer")

            # Downcast floats
            elif col_type == "float64":
                df[col] = pd.to_numeric(df[col], downcast="float")

        new_memory = df.memory_usage(deep=True).sum()
        reduction = (1 - new_memory / original_memory) * 100

        print(f"Memory reduced by {reduction:.1f}%")
        return df

    # 2. Partitioned Storage
    @staticmethod
    def write_partitioned(df: pd.DataFrame, path: str,
                         partition_cols: list):
        """Write data partitioned for efficient querying."""

        df.to_parquet(
            path,
            partition_cols=partition_cols,
            compression="snappy",
            engine="pyarrow"
        )

    @staticmethod
    def read_partitioned(path: str, filters: list = None) -> pd.DataFrame:
        """Read only required partitions."""

        # Only reads matching partitions
        return pd.read_parquet(path, filters=filters)

    # 3. Query Optimization
    @staticmethod
    def optimize_query(conn, query: str) -> str:
        """Add optimizations to SQL query."""

        # Add query hints
        optimized = f"""
        -- Parallel query hint
        /*+ PARALLEL(4) */
        {query}
        """

        return optimized

    # 4. Caching
    @lru_cache(maxsize=1000)
    def cached_xg_calculation(self, x: float, y: float,
                               body_part: str) -> float:
        """Cache expensive xG calculations."""
        return self._calculate_xg_impl(x, y, body_part)

    def _calculate_xg_impl(self, x, y, body_part):
        # Actual calculation
        from math import sqrt
        distance = sqrt((100 - x)**2 + (50 - y)**2)
        return max(0, 0.5 - distance * 0.015)

    # 5. Tiered Storage
    @staticmethod
    def manage_storage_tiers(bucket: str, prefix: str):
        """Move old data to cheaper storage tiers."""

        s3 = boto3.client("s3")
        cutoff_date = datetime.now() - timedelta(days=90)

        paginator = s3.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                if obj["LastModified"].replace(tzinfo=None) < cutoff_date:
                    # Move to Glacier
                    s3.copy_object(
                        Bucket=bucket,
                        CopySource=f"{bucket}/{obj['Key']}",
                        Key=obj["Key"],
                        StorageClass="GLACIER"
                    )
                    print(f"Moved {obj['Key']} to Glacier")

    # 6. Batch Processing
    @staticmethod
    def batch_api_calls(items: list, batch_size: int = 100):
        """Batch API calls to reduce costs."""
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            yield batch

    # 7. Resource Scaling
    @staticmethod
    def estimate_resources(data_size_gb: float) -> dict:
        """Estimate required resources based on data size."""

        return {
            "memory_gb": max(4, data_size_gb * 2),
            "cpu_cores": max(2, int(data_size_gb / 10)),
            "estimated_cost_per_hour": data_size_gb * 0.05,
            "recommended_instance": (
                "t3.medium" if data_size_gb < 10
                else "r5.large" if data_size_gb < 50
                else "r5.xlarge"
            )
        }

# Cost tracking
class CostTracker:
    """Track and report pipeline costs."""

    def __init__(self):
        self.costs = []

    def log_cost(self, operation: str, cost: float,
                 resource: str = "compute"):
        self.costs.append({
            "timestamp": datetime.now(),
            "operation": operation,
            "cost": cost,
            "resource": resource
        })

    def daily_summary(self) -> pd.DataFrame:
        df = pd.DataFrame(self.costs)
        return df.groupby(df["timestamp"].dt.date).agg({
            "cost": "sum",
            "operation": "count"
        }).rename(columns={"operation": "operations"})

    def alert_if_over_budget(self, daily_budget: float):
        today = datetime.now().date()
        today_costs = sum(
            c["cost"] for c in self.costs
            if c["timestamp"].date() == today
        )

        if today_costs > daily_budget:
            print(f"ALERT: Daily costs ${today_costs:.2f} exceed budget ${daily_budget:.2f}")

optimizer = CostOptimizer()
print("Cost optimization tools initialized")
# R: Cost optimization strategies
library(tidyverse)

# 1. Efficient data storage
optimize_storage <- function(df) {
    # Convert strings to factors where appropriate
    df <- df %>%
        mutate(across(where(is.character), ~{
            if (n_distinct(.) / n() < 0.1) {
                as.factor(.)
            } else {
                .
            }
        }))

    # Downcast numerics
    df <- df %>%
        mutate(across(where(is.numeric), ~{
            if (all(. == floor(.))) as.integer(.) else .
        }))

    df
}

# 2. Query optimization
efficient_query <- function(con, match_ids) {
    # Bad: Pull all data then filter
    # all_data <- dbReadTable(con, "events")
    # filtered <- all_data %>% filter(match_id %in% match_ids)

    # Good: Filter in database
    tbl(con, "events") %>%
        filter(match_id %in% match_ids) %>%
        select(match_id, type, minute, xg) %>%  # Only needed columns
        collect()
}

# 3. Caching expensive computations
library(memoise)

calculate_xg_cached <- memoise(
    function(x, y, body_part) {
        # Expensive calculation
        calculate_xg_full(x, y, body_part)
    },
    cache = cache_filesystem("xg_cache/")
)

# 4. Incremental processing
process_incremental <- function(last_processed_id) {
    # Only process new data
    new_events <- get_events_since(last_processed_id)

    if (nrow(new_events) > 0) {
        processed <- transform_events(new_events)
        append_to_warehouse(processed)
    }

    max(new_events$id, last_processed_id)
}
Output
Cost optimization tools initialized

Practice Exercises

Exercise 40.1: Build an ETL Pipeline

Create an ETL pipeline that extracts StatsBomb data, transforms it into a dimensional model, and loads it into a PostgreSQL database. Include data validation and error handling.

Exercise 40.2: Schedule Automated Updates

Set up a scheduled job that runs your ETL pipeline daily. Include notifications for success/failure and a health check endpoint.

Exercise 40.3: Deploy a Dashboard

Create a Shiny or Streamlit dashboard that displays player and team statistics. Containerize it with Docker and deploy to a cloud platform.

Exercise 40.4: Implement Monitoring

Add monitoring to your pipeline: track data freshness, null rates, and value distributions. Set up alerts for anomalies.

Summary

Building robust analytics pipelines transforms one-off analyses into sustainable, scalable systems. With the infrastructure in place, you can focus on generating insights rather than wrangling data.