Chapter 60

Capstone - Complete Analytics System

Intermediate 30 min read 5 sections 10 code examples
0 of 60 chapters completed (0%)

Introduction to Analytics Applications

Moving from scripts and notebooks to production applications is essential for sharing analytics insights with non-technical stakeholders. This chapter covers building interactive dashboards, web applications, and APIs that make football analytics accessible and actionable.

From Analysis to Application

While Jupyter notebooks and R scripts are excellent for exploration and analysis, production environments require robust applications that non-analysts can use. Modern frameworks make building these applications accessible to data scientists.

R Shiny

Interactive dashboards in R

Streamlit

Rapid Python web apps

Flask/FastAPI

REST APIs and backends

Plotly Dash

Interactive visualizations

Building R Shiny Applications

R Shiny allows you to build interactive web applications directly from R. It's particularly powerful for analytics dashboards where the underlying analysis is already in R.

Building interactive analytics dashboards with Shiny and Streamlit

# Streamlit equivalent structure
"""
Save as app.py and run with: streamlit run app.py
"""

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

# Sample data
player_data = pd.DataFrame({
    "player": ["Haaland", "Salah", "Kane", "Saka", "Rashford"],
    "goals": [36, 19, 30, 14, 17],
    "xG": [28.5, 20.2, 25.8, 12.1, 15.8],
    "assists": [8, 12, 3, 11, 5],
    "xA": [5.2, 9.8, 4.5, 8.2, 6.1]
})

# Page config
st.set_page_config(page_title="Player Analytics", layout="wide")
st.title("Premier League Player Analytics")

# Sidebar controls
st.sidebar.header("Filters")
selected_player = st.sidebar.selectbox("Select Player", player_data["player"])
metric = st.sidebar.selectbox("Primary Metric", ["goals", "xG", "assists"])
min_goals = st.sidebar.slider("Minimum Goals", 0, 40, 10)

# Filter data
filtered_data = player_data[player_data["goals"] >= min_goals]

# Main content - tabs
tab1, tab2, tab3 = st.tabs(["Overview", "Player Profile", "Comparison"])

with tab1:
    col1, col2 = st.columns(2)

    with col1:
        # Scatter plot
        fig = px.scatter(filtered_data, x="xG", y="goals",
                        text="player", title="Goals vs Expected Goals")
        fig.add_shape(type="line", x0=0, x1=40, y0=0, y1=40,
                     line=dict(dash="dash", color="red"))
        st.plotly_chart(fig, use_container_width=True)

    with col2:
        # Summary table
        summary = filtered_data.copy()
        summary["overperformance"] = summary["goals"] - summary["xG"]
        summary["efficiency"] = (summary["goals"] / summary["xG"]).round(2)
        st.dataframe(summary.sort_values(metric, ascending=False))

with tab2:
    player = player_data[player_data["player"] == selected_player].iloc[0]

    st.subheader(f"Player Profile: {selected_player}")

    col1, col2, col3, col4 = st.columns(4)
    col1.metric("Goals", player["goals"], f"xG: {player['xG']}")
    col2.metric("Assists", player["assists"], f"xA: {player['xA']}")
    col3.metric("Overperformance", round(player["goals"] - player["xG"], 1))
    col4.metric("Efficiency", f"{(player['goals']/player['xG']):.2f}")

with tab3:
    # Comparison bar chart
    fig = go.Figure(data=[
        go.Bar(name="Goals", x=filtered_data["player"], y=filtered_data["goals"],
               marker_color="#1B5E20"),
        go.Bar(name="xG", x=filtered_data["player"], y=filtered_data["xG"],
               marker_color="#FFD700")
    ])
    fig.update_layout(barmode="group", title="Goals vs xG Comparison")
    st.plotly_chart(fig, use_container_width=True)

# Basic Shiny App Structure
library(shiny)
library(tidyverse)
library(plotly)

# Sample data
player_data <- data.frame(
  player = c("Haaland", "Salah", "Kane", "Saka", "Rashford"),
  goals = c(36, 19, 30, 14, 17),
  xG = c(28.5, 20.2, 25.8, 12.1, 15.8),
  assists = c(8, 12, 3, 11, 5),
  xA = c(5.2, 9.8, 4.5, 8.2, 6.1)
)

# UI Definition
ui <- fluidPage(
  titlePanel("Premier League Player Analytics"),

  sidebarLayout(
    sidebarPanel(
      selectInput("player",
                  "Select Player:",
                  choices = player_data$player),

      selectInput("metric",
                  "Primary Metric:",
                  choices = c("Goals" = "goals",
                             "xG" = "xG",
                             "Assists" = "assists")),

      sliderInput("min_goals",
                  "Minimum Goals:",
                  min = 0, max = 40, value = 10),

      actionButton("update", "Update Analysis")
    ),

    mainPanel(
      tabsetPanel(
        tabPanel("Overview",
                 plotlyOutput("scatter_plot"),
                 tableOutput("summary_table")),
        tabPanel("Player Profile",
                 plotOutput("radar_chart"),
                 verbatimTextOutput("player_stats")),
        tabPanel("Comparison",
                 plotlyOutput("comparison_bar"))
      )
    )
  )
)

# Server Logic
server <- function(input, output, session) {

  # Reactive filtered data
  filtered_data <- reactive({
    player_data %>%
      filter(goals >= input$min_goals)
  })

  # Scatter plot
  output$scatter_plot <- renderPlotly({
    p <- ggplot(filtered_data(), aes(x = xG, y = goals, text = player)) +
      geom_point(size = 4, color = "#1B5E20") +
      geom_abline(intercept = 0, slope = 1, linetype = "dashed", color = "red") +
      labs(title = "Goals vs Expected Goals",
           x = "Expected Goals (xG)",
           y = "Actual Goals") +
      theme_minimal()

    ggplotly(p, tooltip = "text")
  })

  # Summary table
  output$summary_table <- renderTable({
    filtered_data() %>%
      mutate(
        overperformance = goals - xG,
        efficiency = goals / xG
      ) %>%
      arrange(desc(!!sym(input$metric)))
  })

  # Player profile
  output$player_stats <- renderPrint({
    player <- player_data %>%
      filter(player == input$player)

    cat("Player:", input$player, "\n")
    cat("Goals:", player$goals, "(xG:", player$xG, ")\n")
    cat("Assists:", player$assists, "(xA:", player$xA, ")\n")
    cat("Goal Overperformance:", round(player$goals - player$xG, 1), "\n")
  })

  # Comparison bar chart
  output$comparison_bar <- renderPlotly({
    plot_ly(filtered_data(), x = ~player, y = ~goals, type = "bar",
            name = "Goals", marker = list(color = "#1B5E20")) %>%
      add_trace(y = ~xG, name = "xG", marker = list(color = "#FFD700")) %>%
      layout(title = "Goals vs xG Comparison",
             yaxis = list(title = "Count"),
             barmode = "group")
  })
}

# Run the app
# shinyApp(ui = ui, server = server)

Building Analytics APIs

APIs allow your analytics to be consumed by other applications, websites, or mobile apps. FastAPI and Flask are popular Python frameworks for building robust, scalable APIs.

Building REST APIs for football analytics

# FastAPI Implementation
"""
Save as main.py and run with: uvicorn main:app --reload
"""

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import List, Optional
import pandas as pd

app = FastAPI(
    title="Football Analytics API",
    description="API for football player statistics",
    version="1.0.0"
)

# Sample data (in production, use a database)
player_db = pd.DataFrame({
    "id": [1, 2, 3, 4, 5],
    "name": ["Haaland", "Salah", "Kane", "Saka", "Rashford"],
    "team": ["Man City", "Liverpool", "Bayern", "Arsenal", "Man Utd"],
    "goals": [36, 19, 30, 14, 17],
    "xG": [28.5, 20.2, 25.8, 12.1, 15.8],
    "position": ["ST", "RW", "ST", "RW", "LW"]
})

# Pydantic models for request/response
class Player(BaseModel):
    id: int
    name: str
    team: str
    goals: int
    xG: float
    position: str

class PlayerScore(BaseModel):
    player: str
    score: float
    weights_used: dict

class LeagueSummary(BaseModel):
    total_players: int
    avg_goals: float
    avg_xG: float
    top_scorer: str

# Endpoints
@app.get("/players", response_model=List[Player])
async def get_all_players(
    position: Optional[str] = Query(None, description="Filter by position"),
    min_goals: int = Query(0, description="Minimum goals")
):
    """Get all players with optional filters"""
    result = player_db.copy()

    if position:
        result = result[result["position"] == position]
    result = result[result["goals"] >= min_goals]

    return result.to_dict("records")

@app.get("/players/{player_id}", response_model=Player)
async def get_player(player_id: int):
    """Get player by ID"""
    player = player_db[player_db["id"] == player_id]

    if player.empty:
        raise HTTPException(status_code=404, detail="Player not found")

    return player.iloc[0].to_dict()

@app.get("/teams/{team}/players", response_model=List[Player])
async def get_team_players(team: str):
    """Get all players for a team"""
    players = player_db[player_db["team"] == team]

    if players.empty:
        raise HTTPException(status_code=404, detail="No players found for team")

    return players.to_dict("records")

@app.post("/players/{player_id}/score", response_model=PlayerScore)
async def calculate_player_score(
    player_id: int,
    goals_weight: float = 1.0,
    xG_weight: float = 0.5
):
    """Calculate weighted player score"""
    player = player_db[player_db["id"] == player_id]

    if player.empty:
        raise HTTPException(status_code=404, detail="Player not found")

    player = player.iloc[0]
    score = player["goals"] * goals_weight + player["xG"] * xG_weight

    return PlayerScore(
        player=player["name"],
        score=score,
        weights_used={"goals": goals_weight, "xG": xG_weight}
    )

@app.get("/stats/summary", response_model=LeagueSummary)
async def get_league_summary():
    """Get league-wide statistics"""
    return LeagueSummary(
        total_players=len(player_db),
        avg_goals=player_db["goals"].mean(),
        avg_xG=player_db["xG"].mean(),
        top_scorer=player_db.loc[player_db["goals"].idxmax(), "name"]
    )

# Run with: uvicorn main:app --reload
# Access docs at: http://localhost:8000/docs

# R Plumber API
library(plumber)
library(tidyverse)
library(jsonlite)

# Save as api.R and run with: plumber::plumb("api.R")$run(port=8000)

# Sample data (in production, this would be a database)
player_db <- data.frame(
  id = 1:5,
  name = c("Haaland", "Salah", "Kane", "Saka", "Rashford"),
  team = c("Man City", "Liverpool", "Bayern", "Arsenal", "Man Utd"),
  goals = c(36, 19, 30, 14, 17),
  xG = c(28.5, 20.2, 25.8, 12.1, 15.8),
  position = c("ST", "RW", "ST", "RW", "LW")
)

#* @apiTitle Football Analytics API
#* @apiDescription API for football player statistics

#* Get all players
#* @get /players
function() {
  player_db
}

#* Get player by ID
#* @param id Player ID
#* @get /players/<id:int>
function(id) {
  player <- player_db %>% filter(id == !!as.integer(id))
  if (nrow(player) == 0) {
    stop("Player not found")
  }
  player
}

#* Get players by team
#* @param team Team name
#* @get /teams/<team>/players
function(team) {
  players <- player_db %>% filter(team == !!team)
  if (nrow(players) == 0) {
    stop("No players found for team")
  }
  players
}

#* Calculate player score
#* @param id Player ID
#* @param weights JSON weights object
#* @post /players/<id:int>/score
function(id, weights = list(goals = 1, xG = 0.5)) {
  player <- player_db %>% filter(id == !!as.integer(id))

  if (nrow(player) == 0) {
    stop("Player not found")
  }

  score <- player$goals * weights$goals + player$xG * weights$xG

  list(
    player = player$name,
    score = score,
    weights_used = weights
  )
}

#* Get league statistics
#* @get /stats/summary
function() {
  player_db %>%
    summarise(
      total_players = n(),
      avg_goals = mean(goals),
      avg_xG = mean(xG),
      top_scorer = name[which.max(goals)]
    )
}

Dashboard Design Patterns

Effective analytics dashboards follow design principles that maximize insight delivery while minimizing cognitive load. Here are key patterns for football analytics dashboards.

Dashboard Design Principles
  • Progressive Disclosure: Show summary first, details on demand
  • Context First: Always provide league/team averages for comparison
  • Actionable Insights: Highlight what needs attention
  • Consistent Visual Language: Use same colors/icons throughout
  • Mobile Responsive: Design for multiple screen sizes
Building professional analytics dashboards

# Streamlit Dashboard with Components
"""
Professional dashboard structure in Streamlit
"""
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime

# Page configuration
st.set_page_config(
    page_title="Football Analytics Hub",
    page_icon="&#9917;",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS
st.markdown("""
<style>
    .metric-card {
        background-color: #1B5E20;
        color: white;
        padding: 20px;
        border-radius: 10px;
        text-align: center;
    }
    .metric-value {
        font-size: 2.5rem;
        font-weight: bold;
    }
    .metric-label {
        font-size: 0.9rem;
        opacity: 0.8;
    }
</style>
""", unsafe_allow_html=True)

# Sidebar
with st.sidebar:
    st.image("https://via.placeholder.com/150x50?text=Logo", width=150)
    st.title("Filters")

    season = st.selectbox("Season", ["2023-24", "2022-23", "2021-22"])
    league = st.selectbox("League", ["Premier League", "La Liga", "Bundesliga"])

    st.divider()
    page = st.radio("Navigation", ["Overview", "Players", "Teams", "Matches"])

# Sample data
@st.cache_data
def load_data():
    return pd.DataFrame({
        "match_week": range(1, 21),
        "goals": [28, 55, 82, 110, 138, 165, 195, 222, 250, 280,
                  308, 338, 365, 395, 422, 452, 480, 510, 538, 568],
        "xG": [26, 52, 80, 105, 132, 158, 188, 215, 245, 272,
               300, 328, 358, 385, 415, 442, 472, 500, 530, 558]
    })

data = load_data()

# Main content based on page selection
if page == "Overview":
    st.title("League Overview")

    # KPI Row
    col1, col2, col3, col4 = st.columns(4)

    with col1:
        st.metric(label="Total Goals", value="568", delta="+12 vs last season")
    with col2:
        st.metric(label="Avg xG/Match", value="2.68", delta="0.15")
    with col3:
        st.metric(label="Top Scorer", value="Haaland (28)")
    with col4:
        st.metric(label="Clean Sheets", value="45", delta="-3")

    st.divider()

    # Charts row
    col1, col2 = st.columns([2, 1])

    with col1:
        st.subheader("Goals Timeline")
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=data["match_week"], y=data["goals"],
                                 name="Goals", line=dict(color="#1B5E20")))
        fig.add_trace(go.Scatter(x=data["match_week"], y=data["xG"],
                                 name="xG", line=dict(color="#FFD700", dash="dash")))
        fig.update_layout(height=400)
        st.plotly_chart(fig, use_container_width=True)

    with col2:
        st.subheader("Quick Stats")
        stats_df = pd.DataFrame({
            "Metric": ["Matches Played", "Home Wins", "Away Wins", "Draws"],
            "Value": [200, 85, 65, 50]
        })
        st.dataframe(stats_df, hide_index=True, use_container_width=True)

    # Bottom row
    col1, col2 = st.columns(2)

    with col1:
        st.subheader("xG vs Actual Goals")
        scatter_data = pd.DataFrame({
            "team": ["Man City", "Arsenal", "Liverpool", "Chelsea", "Spurs"],
            "goals": [85, 78, 72, 58, 65],
            "xG": [78, 72, 70, 62, 60]
        })
        fig = px.scatter(scatter_data, x="xG", y="goals", text="team",
                        color_discrete_sequence=["#1B5E20"])
        fig.add_shape(type="line", x0=50, x1=90, y0=50, y1=90,
                     line=dict(dash="dash", color="red"))
        st.plotly_chart(fig, use_container_width=True)

    with col2:
        st.subheader("Top Performers")
        performers = pd.DataFrame({
            "Player": ["Haaland", "Salah", "Kane", "Saka", "Palmer"],
            "Goals": [28, 19, 22, 14, 16],
            "xG": [22.5, 18.2, 20.1, 12.5, 14.8],
            "Overperf": [5.5, 0.8, 1.9, 1.5, 1.2]
        })
        st.dataframe(performers, hide_index=True, use_container_width=True)

elif page == "Players":
    st.title("Player Analysis")

    # Search and filters
    col1, col2, col3 = st.columns(3)
    with col1:
        search = st.text_input("Search Player", "")
    with col2:
        position = st.selectbox("Position", ["All", "GK", "DEF", "MID", "FWD"])
    with col3:
        min_minutes = st.slider("Minimum Minutes", 0, 3000, 500)

    st.info("Player search and radar charts would appear here")

# Dashboard Components in Shiny
library(shiny)
library(shinydashboard)
library(plotly)
library(DT)

# Professional Dashboard UI
ui <- dashboardPage(
  dashboardHeader(title = "Football Analytics Hub"),

  dashboardSidebar(
    sidebarMenu(
      menuItem("Overview", tabName = "overview", icon = icon("dashboard")),
      menuItem("Players", tabName = "players", icon = icon("users")),
      menuItem("Teams", tabName = "teams", icon = icon("futbol")),
      menuItem("Matches", tabName = "matches", icon = icon("calendar"))
    ),

    # Global filters
    selectInput("season", "Season",
                choices = c("2023-24", "2022-23", "2021-22")),
    selectInput("league", "League",
                choices = c("Premier League", "La Liga", "Bundesliga"))
  ),

  dashboardBody(
    # Custom CSS
    tags$head(
      tags$style(HTML("
        .content-wrapper { background-color: #f4f4f4; }
        .box-header { background-color: #1B5E20; color: white; }
        .small-box { border-radius: 8px; }
      "))
    ),

    tabItems(
      # Overview Tab
      tabItem(tabName = "overview",
        fluidRow(
          # KPI boxes
          valueBoxOutput("total_goals", width = 3),
          valueBoxOutput("avg_xG", width = 3),
          valueBoxOutput("top_scorer", width = 3),
          valueBoxOutput("clean_sheets", width = 3)
        ),

        fluidRow(
          box(title = "Goals Timeline", width = 8,
              plotlyOutput("goals_timeline")),
          box(title = "Quick Stats", width = 4,
              tableOutput("quick_stats"))
        ),

        fluidRow(
          box(title = "xG vs Actual Goals", width = 6,
              plotlyOutput("xg_scatter")),
          box(title = "Top Performers", width = 6,
              DTOutput("top_performers_table"))
        )
      ),

      # Players Tab
      tabItem(tabName = "players",
        fluidRow(
          box(title = "Player Search", width = 12,
              fluidRow(
                column(4, textInput("player_search", "Search", "")),
                column(4, selectInput("position_filter", "Position",
                                     choices = c("All", "GK", "DEF", "MID", "FWD"))),
                column(4, sliderInput("minutes_filter", "Min Minutes",
                                     min = 0, max = 3000, value = 500))
              )
          )
        ),
        fluidRow(
          box(title = "Player Radar", width = 6,
              plotlyOutput("player_radar")),
          box(title = "Player Stats", width = 6,
              DTOutput("player_stats_table"))
        )
      )
    )
  )
)

# Server with KPI calculations
server <- function(input, output, session) {

  # Sample reactive data
  season_data <- reactive({
    # In production, fetch from database based on input$season
    data.frame(
      match_week = 1:20,
      goals = cumsum(sample(20:40, 20, replace = TRUE)),
      xG = cumsum(runif(20, 18, 38))
    )
  })

  # KPI Boxes
  output$total_goals <- renderValueBox({
    valueBox(
      value = 512,
      subtitle = "Total Goals",
      icon = icon("futbol"),
      color = "green"
    )
  })

  output$avg_xG <- renderValueBox({
    valueBox(
      value = "2.68",
      subtitle = "Avg xG/Match",
      icon = icon("chart-line"),
      color = "blue"
    )
  })

  output$top_scorer <- renderValueBox({
    valueBox(
      value = "Haaland (28)",
      subtitle = "Top Scorer",
      icon = icon("trophy"),
      color = "yellow"
    )
  })

  output$clean_sheets <- renderValueBox({
    valueBox(
      value = 45,
      subtitle = "Clean Sheets",
      icon = icon("shield"),
      color = "purple"
    )
  })

  # Goals timeline
  output$goals_timeline <- renderPlotly({
    data <- season_data()
    plot_ly(data, x = ~match_week) %>%
      add_lines(y = ~goals, name = "Goals", line = list(color = "#1B5E20")) %>%
      add_lines(y = ~xG, name = "xG", line = list(color = "#FFD700", dash = "dash")) %>%
      layout(title = "", xaxis = list(title = "Match Week"),
             yaxis = list(title = "Cumulative Total"))
  })
}

Data Layer Architecture

Production applications need robust data layers that handle caching, database connections, and efficient data retrieval. Here's how to structure your data access.

Building robust data layers for analytics applications

# Python Data Layer with Caching
from functools import lru_cache
from typing import Optional, List, Dict
import pandas as pd
from sqlalchemy import create_engine, text
from dataclasses import dataclass
import os
from datetime import datetime, timedelta
import redis
import json

@dataclass
class CacheConfig:
    enabled: bool = True
    ttl_seconds: int = 3600  # 1 hour

class DataManager:
    """Data access layer with caching and connection pooling"""

    def __init__(self, cache_config: Optional[CacheConfig] = None):
        self.cache_config = cache_config or CacheConfig()
        self._engine = None
        self._redis = None

    @property
    def engine(self):
        """Lazy database connection"""
        if self._engine is None:
            db_url = os.getenv("DATABASE_URL",
                              "postgresql://user:pass@localhost/football")
            self._engine = create_engine(db_url, pool_size=5, max_overflow=10)
        return self._engine

    @property
    def redis_client(self):
        """Lazy Redis connection for distributed caching"""
        if self._redis is None:
            redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
            self._redis = redis.from_url(redis_url)
        return self._redis

    def _cache_key(self, prefix: str, *args) -> str:
        """Generate cache key"""
        return f"{prefix}:{':'.join(str(a) for a in args)}"

    def _get_cached(self, key: str) -> Optional[pd.DataFrame]:
        """Get from cache"""
        if not self.cache_config.enabled:
            return None
        try:
            data = self.redis_client.get(key)
            if data:
                return pd.read_json(data)
        except:
            pass
        return None

    def _set_cached(self, key: str, df: pd.DataFrame):
        """Set cache"""
        if not self.cache_config.enabled:
            return
        try:
            self.redis_client.setex(
                key,
                self.cache_config.ttl_seconds,
                df.to_json()
            )
        except:
            pass

    def get_players(self, season: str, use_cache: bool = True) -> pd.DataFrame:
        """Get players for a season"""
        cache_key = self._cache_key("players", season)

        if use_cache:
            cached = self._get_cached(cache_key)
            if cached is not None:
                return cached

        query = text("""
            SELECT * FROM players
            WHERE season = :season
            ORDER BY goals DESC
        """)

        with self.engine.connect() as conn:
            result = pd.read_sql(query, conn, params={"season": season})

        self._set_cached(cache_key, result)
        return result

    def get_matches(self, team_id: int, season: str) -> pd.DataFrame:
        """Get matches for a team"""
        query = text("""
            SELECT m.*, t1.name as home_team, t2.name as away_team
            FROM matches m
            JOIN teams t1 ON m.home_team_id = t1.id
            JOIN teams t2 ON m.away_team_id = t2.id
            WHERE (m.home_team_id = :team_id OR m.away_team_id = :team_id)
              AND m.season = :season
            ORDER BY m.match_date
        """)

        with self.engine.connect() as conn:
            return pd.read_sql(query, conn,
                             params={"team_id": team_id, "season": season})

    def get_team_stats(self, team_id: int, season: str) -> Dict:
        """Calculate team aggregations"""
        matches = self.get_matches(team_id, season)

        is_home = matches["home_team_id"] == team_id
        wins = ((is_home & (matches["home_goals"] > matches["away_goals"])) |
                (~is_home & (matches["away_goals"] > matches["home_goals"]))).sum()

        goals_for = matches.loc[is_home, "home_goals"].sum() + \
                    matches.loc[~is_home, "away_goals"].sum()
        goals_against = matches.loc[is_home, "away_goals"].sum() + \
                        matches.loc[~is_home, "home_goals"].sum()

        return {
            "total_matches": len(matches),
            "wins": int(wins),
            "goals_for": int(goals_for),
            "goals_against": int(goals_against)
        }

    def clear_cache(self, pattern: str = "*"):
        """Clear cached data"""
        if self.cache_config.enabled:
            for key in self.redis_client.scan_iter(pattern):
                self.redis_client.delete(key)


# Usage
# dm = DataManager()
# players = dm.get_players("2023-24")
# stats = dm.get_team_stats(1, "2023-24")

# R Data Layer with Caching
library(R6)
library(DBI)
library(RPostgres)
library(memoise)

# Database connection manager
DataManager <- R6Class("DataManager",
  private = list(
    conn = NULL,
    cache = NULL,

    connect = function() {
      if (is.null(private$conn)) {
        private$conn <- dbConnect(
          Postgres(),
          dbname = Sys.getenv("DB_NAME"),
          host = Sys.getenv("DB_HOST"),
          port = Sys.getenv("DB_PORT"),
          user = Sys.getenv("DB_USER"),
          password = Sys.getenv("DB_PASSWORD")
        )
      }
      private$conn
    }
  ),

  public = list(
    initialize = function() {
      private$cache <- new.env(parent = emptyenv())
    },

    # Get players with caching
    get_players = function(season, use_cache = TRUE) {
      cache_key <- paste0("players_", season)

      if (use_cache && exists(cache_key, envir = private$cache)) {
        return(get(cache_key, envir = private$cache))
      }

      conn <- private$connect()
      result <- dbGetQuery(conn, sprintf("
        SELECT * FROM players
        WHERE season = %s
        ORDER BY goals DESC
      ", dbQuoteString(conn, season)))

      assign(cache_key, result, envir = private$cache)
      result
    },

    # Get match data
    get_matches = function(team_id, season) {
      conn <- private$connect()
      dbGetQuery(conn, sprintf("
        SELECT m.*, t1.name as home_team, t2.name as away_team
        FROM matches m
        JOIN teams t1 ON m.home_team_id = t1.id
        JOIN teams t2 ON m.away_team_id = t2.id
        WHERE (m.home_team_id = %d OR m.away_team_id = %d)
          AND m.season = %s
        ORDER BY m.match_date
      ", team_id, team_id, dbQuoteString(conn, season)))
    },

    # Calculate aggregations
    get_team_stats = function(team_id, season) {
      matches <- self$get_matches(team_id, season)

      list(
        total_matches = nrow(matches),
        wins = sum(matches$winner_id == team_id, na.rm = TRUE),
        draws = sum(is.na(matches$winner_id)),
        losses = sum(matches$winner_id != team_id & !is.na(matches$winner_id)),
        goals_for = sum(ifelse(matches$home_team_id == team_id,
                               matches$home_goals, matches$away_goals)),
        goals_against = sum(ifelse(matches$home_team_id == team_id,
                                   matches$away_goals, matches$home_goals))
      )
    },

    clear_cache = function() {
      rm(list = ls(envir = private$cache), envir = private$cache)
    },

    close = function() {
      if (!is.null(private$conn)) {
        dbDisconnect(private$conn)
        private$conn <- NULL
      }
    }
  )
)

# Usage
# dm <- DataManager$new()
# players <- dm$get_players("2023-24")
# team_stats <- dm$get_team_stats(1, "2023-24")

Deployment Options

Getting your analytics application into production requires choosing the right deployment strategy based on your audience, scale, and infrastructure.

Platform Best For Pros Cons
Streamlit Cloud Quick demos, small teams Free tier, GitHub integration Limited resources, public apps
shinyapps.io R Shiny apps Easy deployment, managed Costs scale with usage
Heroku APIs, full apps Simple, add-ons ecosystem Can get expensive
AWS/GCP/Azure Enterprise, custom needs Full control, scalable Complexity, expertise needed
Docker + VPS Self-hosted, control Cost-effective, flexible Maintenance responsibility
Docker deployment configurations for analytics apps

# Docker deployment for Python apps
# Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Install Python packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Expose port
EXPOSE 8501

# Healthcheck
HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health

# Run Streamlit
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]

# ---
# requirements.txt
streamlit==1.29.0
pandas==2.1.0
plotly==5.18.0
sqlalchemy==2.0.0
psycopg2-binary==2.9.9
redis==5.0.0

# ---
# docker-compose.yml
version: "3.8"

services:
  app:
    build: .
    ports:
      - "8501:8501"
    environment:
      - DATABASE_URL=postgresql://app:${DB_PASSWORD}@db/football
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis

  db:
    image: postgres:14-alpine
    environment:
      - POSTGRES_DB=football
      - POSTGRES_USER=app
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    volumes:
      - redisdata:/data

volumes:
  pgdata:
  redisdata:

# Docker deployment for R Shiny
# Dockerfile

FROM rocker/shiny:latest

# Install system dependencies
RUN apt-get update && apt-get install -y \
    libcurl4-gnutls-dev \
    libssl-dev \
    libpq-dev

# Install R packages
RUN R -e "install.packages(c('tidyverse', 'plotly', 'DBI', 'RPostgres', 'shinydashboard'))"

# Copy app
COPY ./app /srv/shiny-server/app

# Expose port
EXPOSE 3838

# Run
CMD ["/usr/bin/shiny-server"]

# ---
# docker-compose.yml

version: "3.8"
services:
  shiny:
    build: .
    ports:
      - "3838:3838"
    environment:
      - DB_HOST=db
      - DB_NAME=football
      - DB_USER=app
      - DB_PASSWORD=${DB_PASSWORD}
    depends_on:
      - db

  db:
    image: postgres:14
    environment:
      - POSTGRES_DB=football
      - POSTGRES_USER=app
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data

volumes:
  pgdata:

Real-Time Updates and WebSockets

Live match analytics require real-time data updates. WebSockets and Server-Sent Events enable pushing updates to connected clients without polling.

real_time_updates
# Python: FastAPI with WebSockets for Live Updates
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import json
from typing import List
from dataclasses import dataclass, asdict
import random
from datetime import datetime

app = FastAPI()

# Connection manager for WebSocket clients
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
        self.match_subscriptions: dict = {}  # match_id -> [connections]

    async def connect(self, websocket: WebSocket, match_id: int):
        await websocket.accept()
        self.active_connections.append(websocket)

        if match_id not in self.match_subscriptions:
            self.match_subscriptions[match_id] = []
        self.match_subscriptions[match_id].append(websocket)

    def disconnect(self, websocket: WebSocket, match_id: int):
        self.active_connections.remove(websocket)
        if match_id in self.match_subscriptions:
            self.match_subscriptions[match_id].remove(websocket)

    async def broadcast_match_update(self, match_id: int, data: dict):
        """Send update to all clients watching this match"""
        if match_id in self.match_subscriptions:
            for connection in self.match_subscriptions[match_id]:
                try:
                    await connection.send_json(data)
                except:
                    pass

manager = ConnectionManager()

@dataclass
class MatchEvent:
    minute: int
    event_type: str
    team: str
    player: str
    xG: float = 0.0

@dataclass
class MatchState:
    match_id: int
    minute: int
    home_xG: float
    away_xG: float
    home_goals: int
    away_goals: int
    events: List[dict]

# WebSocket endpoint for live match updates
@app.websocket("/ws/match/{match_id}")
async def websocket_match(websocket: WebSocket, match_id: int):
    await manager.connect(websocket, match_id)

    try:
        # Send initial state
        initial_state = get_match_state(match_id)
        await websocket.send_json(asdict(initial_state))

        # Keep connection alive and handle client messages
        while True:
            # Receive heartbeat or commands from client
            data = await websocket.receive_text()

            if data == "ping":
                await websocket.send_text("pong")
            elif data == "refresh":
                state = get_match_state(match_id)
                await websocket.send_json(asdict(state))

    except WebSocketDisconnect:
        manager.disconnect(websocket, match_id)

def get_match_state(match_id: int) -> MatchState:
    """Get current match state (in production, from database/API)"""
    return MatchState(
        match_id=match_id,
        minute=random.randint(1, 90),
        home_xG=round(random.uniform(0.5, 2.5), 2),
        away_xG=round(random.uniform(0.3, 2.0), 2),
        home_goals=random.randint(0, 3),
        away_goals=random.randint(0, 2),
        events=[
            {"minute": 23, "type": "Goal", "team": "Home"},
            {"minute": 45, "type": "Shot", "team": "Away"},
        ]
    )

# Background task to push updates
async def match_update_broadcaster():
    """Simulate live match updates"""
    while True:
        await asyncio.sleep(5)  # Update every 5 seconds

        for match_id in list(manager.match_subscriptions.keys()):
            if manager.match_subscriptions[match_id]:
                state = get_match_state(match_id)
                await manager.broadcast_match_update(match_id, asdict(state))

# Start background task on startup
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(match_update_broadcaster())

# Streamlit with auto-refresh
"""
# app_live.py - Streamlit with auto-refresh
import streamlit as st
import time

# Auto-refresh using st.empty and time.sleep
placeholder = st.empty()

while True:
    with placeholder.container():
        # Fetch latest data
        data = fetch_live_match_data()

        st.metric("Home xG", data["home_xG"])
        st.metric("Away xG", data["away_xG"])

        # Chart updates
        st.plotly_chart(create_xg_chart(data))

    time.sleep(5)  # Refresh every 5 seconds

# Or use streamlit-autorefresh component
# pip install streamlit-autorefresh
from streamlit_autorefresh import st_autorefresh

count = st_autorefresh(interval=5000, key="live_refresh")
"""
# R: Shiny with Real-time Updates
library(shiny)
library(plotly)
library(jsonlite)

# Reactive polling for live data
ui <- fluidPage(
    titlePanel("Live Match Tracker"),

    sidebarLayout(
        sidebarPanel(
            selectInput("match_id", "Select Match:",
                        choices = c("Match 1" = 1, "Match 2" = 2)),
            checkboxInput("auto_refresh", "Auto Refresh", TRUE),
            numericInput("refresh_rate", "Refresh (seconds):", 5, min = 1, max = 60)
        ),

        mainPanel(
            fluidRow(
                valueBoxOutput("home_xg", width = 6),
                valueBoxOutput("away_xg", width = 6)
            ),
            plotlyOutput("xg_timeline"),
            plotlyOutput("shot_map"),
            verbatimTextOutput("last_event")
        )
    )
)

server <- function(input, output, session) {

    # Reactive value to trigger refresh
    refresh_trigger <- reactiveTimer(intervalMs = 5000)

    # Simulated live match data
    match_data <- reactive({
        if (input$auto_refresh) {
            refresh_trigger()
        }

        # In production: API call to live data source
        list(
            match_id = input$match_id,
            minute = sample(1:90, 1),
            home_xG = round(runif(1, 0.5, 2.5), 2),
            away_xG = round(runif(1, 0.3, 2.0), 2),
            home_shots = sample(5:15, 1),
            away_shots = sample(3:12, 1),
            events = data.frame(
                minute = sample(1:90, 10),
                type = sample(c("Shot", "Goal", "Foul", "Corner"), 10, replace = TRUE),
                team = sample(c("Home", "Away"), 10, replace = TRUE)
            )
        )
    })

    # xG boxes
    output$home_xg <- renderValueBox({
        data <- match_data()
        valueBox(
            value = data$home_xG,
            subtitle = "Home xG",
            icon = icon("futbol"),
            color = if (data$home_xG > data$away_xG) "green" else "red"
        )
    })

    output$away_xg <- renderValueBox({
        data <- match_data()
        valueBox(
            value = data$away_xG,
            subtitle = "Away xG",
            icon = icon("futbol"),
            color = if (data$away_xG > data$home_xG) "green" else "red"
        )
    })

    # xG timeline
    output$xg_timeline <- renderPlotly({
        data <- match_data()

        # Simulated cumulative xG
        minutes <- 1:data$minute
        home_cumxG <- cumsum(runif(length(minutes), 0, 0.05))
        away_cumxG <- cumsum(runif(length(minutes), 0, 0.04))

        plot_ly() %>%
            add_lines(x = minutes, y = home_cumxG, name = "Home",
                     line = list(color = "#1B5E20")) %>%
            add_lines(x = minutes, y = away_cumxG, name = "Away",
                     line = list(color = "#C62828")) %>%
            layout(title = "Cumulative xG",
                   xaxis = list(title = "Minute"),
                   yaxis = list(title = "xG"))
    })

    # Last event
    output$last_event <- renderText({
        data <- match_data()
        last <- data$events[1, ]
        sprintf("Last Event: %s' - %s by %s team",
                last$minute, last$type, last$team)
    })
}

# With WebSocket support (using shinyjs)
# library(shinyjs)
# observeEvent(websocket$message, {
#     data <- fromJSON(websocket$message)
#     match_reactive(data)
# })
Real-Time Data Strategies
Polling
  • Simple to implement
  • Client-initiated requests
  • Higher latency (5-30s)
  • More server load
Server-Sent Events (SSE)
  • One-way (server to client)
  • Built-in reconnection
  • Good browser support
  • HTTP-based
WebSockets
  • Two-way communication
  • Lowest latency
  • Requires connection management
  • Best for interactive apps

Authentication and Authorization

Production analytics applications need secure access control. Here's how to implement authentication for different frameworks.

authentication_authorization
# Python: FastAPI with JWT Authentication
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
import os

# Configuration
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI()

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Models
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None
    role: Optional[str] = None

class User(BaseModel):
    username: str
    email: str
    role: str
    disabled: bool = False

class UserInDB(User):
    hashed_password: str

# Simulated user database (use real DB in production)
fake_users_db = {
    "analyst": {
        "username": "analyst",
        "email": "analyst@club.com",
        "role": "analyst",
        "hashed_password": pwd_context.hash("analyst123"),
        "disabled": False,
    },
    "manager": {
        "username": "manager",
        "email": "manager@club.com",
        "role": "manager",
        "hashed_password": pwd_context.hash("manager123"),
        "disabled": False,
    },
    "admin": {
        "username": "admin",
        "email": "admin@club.com",
        "role": "admin",
        "hashed_password": pwd_context.hash("admin123"),
        "disabled": False,
    }
}

# Helper functions
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db: dict, username: str) -> Optional[UserInDB]:
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    return None

def authenticate_user(db: dict, username: str, password: str) -> Optional[UserInDB]:
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return None
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        role: str = payload.get("role")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username, role=role)
    except JWTError:
        raise credentials_exception

    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# Role-based access decorator
def require_role(allowed_roles: list):
    async def role_checker(current_user: User = Depends(get_current_user)):
        if current_user.role not in allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions"
            )
        return current_user
    return role_checker

# Endpoints
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(
        data={"sub": user.username, "role": user.role},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

# Protected endpoints with role requirements
@app.get("/admin/users")
async def admin_get_users(user: User = Depends(require_role(["admin"]))):
    """Admin only - get all users"""
    return {"users": list(fake_users_db.keys())}

@app.get("/manager/team/{team_id}")
async def manager_get_team(
    team_id: int,
    user: User = Depends(require_role(["admin", "manager"]))
):
    """Managers and admins - get team details"""
    return {"team_id": team_id, "accessed_by": user.username}

@app.get("/analytics/players")
async def get_players(user: User = Depends(get_current_user)):
    """All authenticated users - get player analytics"""
    return {"players": ["Player 1", "Player 2"], "user_role": user.role}
# R: Shiny Authentication with shinymanager
library(shiny)
library(shinymanager)

# Define credentials (in production, use database)
credentials <- data.frame(
    user = c("analyst", "manager", "admin"),
    password = c("analyst123", "manager123", "admin123"),
    role = c("analyst", "manager", "admin"),
    stringsAsFactors = FALSE
)

# Main app UI
ui <- fluidPage(
    titlePanel("Secure Analytics Dashboard"),

    # Content varies by user role
    uiOutput("role_based_content")
)

# Wrap UI with authentication
ui <- secure_app(ui,
    enable_admin = TRUE,
    theme = shinythemes::shinytheme("flatly")
)

server <- function(input, output, session) {

    # Get user info after authentication
    user_info <- reactive({
        reactiveValuesToList(session$userData)
    })

    # Role-based content
    output$role_based_content <- renderUI({
        user <- user_info()

        if (is.null(user$user)) {
            return(NULL)
        }

        # Different content based on role
        role <- credentials$role[credentials$user == user$user]

        if (role == "admin") {
            tagList(
                h3("Admin Dashboard"),
                actionButton("manage_users", "Manage Users"),
                actionButton("view_logs", "View Logs"),
                hr(),
                # Full analytics content
                plotOutput("full_analysis")
            )
        } else if (role == "manager") {
            tagList(
                h3("Manager Dashboard"),
                # Team-specific analytics
                plotOutput("team_analysis"),
                tableOutput("player_table")
            )
        } else {
            tagList(
                h3("Analyst Dashboard"),
                # Read-only analytics
                plotOutput("basic_charts")
            )
        }
    })

    # Log user activity
    observe({
        user <- user_info()
        if (!is.null(user$user)) {
            log_activity(user$user, "login", Sys.time())
        }
    })
}

# Run with authentication
shinyApp(ui, server)

# Alternative: Google OAuth with googleAuthR
# library(googleAuthR)
# options(googleAuthR.scopes.selected = "email")
# gar_auth_configure(key = "YOUR_CLIENT_ID")
Security Best Practices
  • Never store passwords in plain text - always use bcrypt or similar hashing
  • Use environment variables for secrets (SECRET_KEY, API keys)
  • Implement rate limiting to prevent brute force attacks
  • Use HTTPS in production for all authentication endpoints
  • Token expiration - use short-lived access tokens with refresh tokens
  • Audit logging - log all authentication events and sensitive actions

Performance Optimization

Analytics applications often deal with large datasets. Proper optimization ensures responsive user experiences even with complex calculations.

performance_optimization
# Python: Performance Optimization Techniques
import streamlit as st
import pandas as pd
import numpy as np
from functools import lru_cache
import asyncio
from concurrent.futures import ThreadPoolExecutor
import dask.dataframe as dd

# 1. Streamlit caching
@st.cache_data(ttl=3600)  # Cache for 1 hour
def load_large_dataset(filepath: str) -> pd.DataFrame:
    """Load and cache large dataset"""
    return pd.read_parquet(filepath)

@st.cache_resource  # Cache resource objects (DB connections, models)
def get_database_connection():
    """Cache database connection"""
    return create_engine("postgresql://...")

# 2. Efficient data loading with chunking
def process_large_csv(filepath: str, chunk_size: int = 100000):
    """Process large CSV in chunks"""
    results = []

    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Process each chunk
        chunk_result = chunk.groupby("team").agg({
            "goals": "sum",
            "xG": "mean"
        })
        results.append(chunk_result)

    # Combine results
    return pd.concat(results).groupby(level=0).sum()

# 3. Dask for out-of-core computation
def analyze_with_dask(filepath: str):
    """Use Dask for large-than-memory datasets"""
    # Dask reads data lazily
    ddf = dd.read_parquet(filepath)

    # Operations are lazy until .compute()
    result = ddf.groupby("team").agg({
        "goals": "sum",
        "xG": "mean"
    }).compute()

    return result

# 4. Async operations for I/O bound tasks
async def fetch_multiple_apis(endpoints: list) -> list:
    """Fetch from multiple APIs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in endpoints]
        return await asyncio.gather(*tasks)

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()

# 5. ThreadPoolExecutor for CPU-bound tasks
def parallel_processing(data_chunks: list):
    """Process data chunks in parallel"""
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(process_chunk, data_chunks))
    return pd.concat(results)

def process_chunk(chunk):
    # CPU-intensive processing
    return chunk.apply(complex_calculation, axis=1)

# 6. Query optimization with SQL pushdown
def optimized_query(engine, team_id: int, season: str):
    """Push filtering to database"""
    # Bad: Load all data then filter
    # df = pd.read_sql("SELECT * FROM matches", engine)
    # df = df[(df.team_id == team_id) & (df.season == season)]

    # Good: Filter in SQL
    query = """
        SELECT m.*, p.name as player_name
        FROM matches m
        JOIN players p ON m.player_id = p.id
        WHERE m.team_id = %(team_id)s
          AND m.season = %(season)s
    """
    return pd.read_sql(query, engine, params={"team_id": team_id, "season": season})

# 7. Memory optimization
def optimize_memory(df: pd.DataFrame) -> pd.DataFrame:
    """Reduce DataFrame memory usage"""

    for col in df.columns:
        col_type = df[col].dtype

        if col_type == "object":
            # Convert to category if low cardinality
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype("category")

        elif col_type == "int64":
            # Downcast integers
            df[col] = pd.to_numeric(df[col], downcast="integer")

        elif col_type == "float64":
            # Downcast floats
            df[col] = pd.to_numeric(df[col], downcast="float")

    return df

# Usage in Streamlit
# Before: df = pd.read_csv("large_file.csv")
# After:
# df = load_large_dataset("data.parquet")
# df = optimize_memory(df)
# R: Performance Optimization Techniques
library(shiny)
library(memoise)
library(data.table)
library(profvis)

# 1. Memoization - Cache expensive calculations
expensive_calculation <- memoise(function(data, params) {
    # Simulate heavy computation
    Sys.sleep(2)
    data %>%
        group_by(team) %>%
        summarise(
            total_xG = sum(xG),
            avg_goals = mean(goals),
            .groups = "drop"
        )
})

# 2. Use data.table for large datasets
process_large_dataset <- function(file_path) {
    # data.table is much faster than tibble for large data
    dt <- fread(file_path)

    # Fast aggregation with data.table syntax
    result <- dt[, .(
        total_goals = sum(goals),
        avg_xG = mean(xG),
        matches = .N
    ), by = .(team, season)]

    result
}

# 3. Lazy loading with bindCache (Shiny 1.6+)
server <- function(input, output, session) {

    # Cache reactive results
    filtered_data <- reactive({
        player_data %>%
            filter(season == input$season) %>%
            filter(position %in% input$positions)
    }) %>%
        bindCache(input$season, input$positions)

    # Cache render outputs
    output$main_plot <- renderPlot({
        create_complex_visualization(filtered_data())
    }) %>%
        bindCache(input$season, input$positions, input$chart_type)
}

# 4. Async operations with promises
library(promises)
library(future)
plan(multisession)

server <- function(input, output, session) {

    # Non-blocking heavy computation
    heavy_result <- eventReactive(input$run_analysis, {
        future({
            # This runs in background
            perform_heavy_analysis(input$data)
        })
    })

    output$result <- renderTable({
        heavy_result() %...>%
            format_results()
    })
}

# 5. Pagination for large tables
library(DT)

output$large_table <- renderDT({
    datatable(
        large_dataset,
        options = list(
            pageLength = 25,
            scrollX = TRUE,
            server = TRUE,  # Server-side processing
            processing = TRUE
        )
    )
})

# 6. Profile your code
# profvis({
#     shinyApp(ui, server)
# })
Performance Checklist
Data Layer
  • Use Parquet instead of CSV
  • Index database tables properly
  • Implement query caching (Redis)
  • Paginate large result sets
  • Use connection pooling
Application Layer
  • Cache expensive computations
  • Use async for I/O operations
  • Lazy load components
  • Profile before optimizing
  • Set appropriate cache TTLs

Testing Analytics Applications

Testing analytics applications ensures reliability and catches regressions. Here's how to test different components of your analytics stack.

testing_analytics_applications
# Python: Testing Analytics Applications
import pytest
from fastapi.testclient import TestClient
from unittest.mock import Mock, patch
import pandas as pd
import numpy as np

# Import your app
from main import app
from data_layer import DataManager
from analytics import calculate_xG, PlayerAnalyzer

client = TestClient(app)

# 1. Unit tests for analytics functions
class TestXGCalculation:
    def test_basic_xg_calculation(self):
        shots = pd.DataFrame({
            "x": [0.9, 0.85, 0.7],
            "y": [0.5, 0.3, 0.6],
            "shot_type": ["foot", "head", "foot"]
        })

        result = calculate_xG(shots)

        assert len(result) == 3
        assert all(0 <= xg <= 1 for xg in result)
        assert result[0] > result[2]  # Closer shot higher xG

    def test_edge_cases(self):
        # Empty dataframe
        empty_shots = pd.DataFrame(columns=["x", "y", "shot_type"])
        result = calculate_xG(empty_shots)
        assert len(result) == 0

        # Single shot
        single = pd.DataFrame({"x": [0.88], "y": [0.5], "shot_type": ["foot"]})
        result = calculate_xG(single)
        assert len(result) == 1

class TestPlayerAnalyzer:
    @pytest.fixture
    def sample_data(self):
        return pd.DataFrame({
            "player": ["A", "B", "C"],
            "position": ["FW", "MF", "DF"],
            "goals": [10, 5, 2],
            "xG": [8.5, 6.0, 1.5]
        })

    def test_filter_by_position(self, sample_data):
        analyzer = PlayerAnalyzer(sample_data)
        forwards = analyzer.filter_by_position("FW")

        assert len(forwards) == 1
        assert forwards.iloc[0]["player"] == "A"

    def test_goal_overperformance(self, sample_data):
        analyzer = PlayerAnalyzer(sample_data)
        result = analyzer.calculate_overperformance()

        assert result.loc[result["player"] == "A", "overperformance"].values[0] == 1.5

# 2. API testing with FastAPI TestClient
class TestPlayersAPI:
    def test_get_all_players(self):
        response = client.get("/players")
        assert response.status_code == 200
        assert isinstance(response.json(), list)

    def test_get_player_by_id(self):
        response = client.get("/players/1")
        assert response.status_code == 200
        assert "name" in response.json()

    def test_player_not_found(self):
        response = client.get("/players/9999")
        assert response.status_code == 404

    def test_filter_players(self):
        response = client.get("/players?position=FW&min_goals=5")
        assert response.status_code == 200
        players = response.json()
        assert all(p["position"] == "FW" for p in players)
        assert all(p["goals"] >= 5 for p in players)

class TestAuthentication:
    def test_login_success(self):
        response = client.post("/token",
            data={"username": "analyst", "password": "analyst123"})
        assert response.status_code == 200
        assert "access_token" in response.json()

    def test_login_failure(self):
        response = client.post("/token",
            data={"username": "analyst", "password": "wrong"})
        assert response.status_code == 401

    def test_protected_endpoint_without_token(self):
        response = client.get("/admin/users")
        assert response.status_code == 401

    def test_protected_endpoint_with_token(self):
        # First login
        login = client.post("/token",
            data={"username": "admin", "password": "admin123"})
        token = login.json()["access_token"]

        # Then access protected endpoint
        response = client.get("/admin/users",
            headers={"Authorization": f"Bearer {token}"})
        assert response.status_code == 200

# 3. Integration tests with mocking
class TestDataIntegration:
    @patch("data_layer.DataManager.get_players")
    def test_player_endpoint_with_mock(self, mock_get_players):
        mock_get_players.return_value = pd.DataFrame({
            "id": [1, 2],
            "name": ["Player 1", "Player 2"],
            "goals": [10, 5]
        })

        response = client.get("/players")

        assert response.status_code == 200
        assert len(response.json()) == 2
        mock_get_players.assert_called_once()

# 4. Streamlit testing with pytest
def test_streamlit_components():
    """Test Streamlit helper functions"""
    from app import create_radar_chart, format_player_stats

    # Test chart creation
    player_data = {"shooting": 80, "passing": 75, "defending": 60}
    chart = create_radar_chart(player_data)
    assert chart is not None

    # Test formatting
    stats = format_player_stats({"goals": 10, "assists": 5})
    assert "Goals: 10" in stats

# Run with: pytest tests/ -v --cov=app
# R: Testing Shiny Applications
library(testthat)
library(shinytest2)

# 1. Unit tests for data functions
test_that("xG calculation is correct", {
    shots <- data.frame(
        x = c(0.9, 0.85, 0.7),
        y = c(0.5, 0.3, 0.6),
        shot_type = c("foot", "head", "foot")
    )

    result <- calculate_xG(shots)

    expect_equal(length(result), 3)
    expect_true(all(result >= 0 & result <= 1))
    expect_true(result[1] > result[3])  # Closer shot higher xG
})

test_that("player filtering works correctly", {
    players <- data.frame(
        name = c("A", "B", "C"),
        position = c("FW", "MF", "DF"),
        goals = c(10, 5, 2)
    )

    forwards <- filter_by_position(players, "FW")

    expect_equal(nrow(forwards), 1)
    expect_equal(forwards$name, "A")
})

# 2. Shiny app testing with shinytest2
test_that("dashboard loads correctly", {
    app <- AppDriver$new(app_dir = "app/")

    # Wait for app to load
    app$wait_for_idle()

    # Check initial state
    expect_true(app$get_value(input = "season") == "2023-24")

    # Interact with app
    app$set_inputs(season = "2022-23")
    app$click("update_btn")

    # Check output updated
    output <- app$get_value(output = "summary_table")
    expect_true(nrow(output) > 0)

    app$stop()
})

test_that("filters update visualization", {
    app <- AppDriver$new(app_dir = "app/")

    # Change filter
    app$set_inputs(min_goals = 10)
    app$wait_for_idle()

    # Capture screenshot for visual testing
    app$expect_screenshot()

    app$stop()
})

# 3. Snapshot testing for outputs
test_that("summary statistics are consistent", {
    data <- load_test_data()
    summary <- calculate_summary_stats(data)

    expect_snapshot(summary)
})

# 4. API testing (Plumber)
library(httr)

test_that("player endpoint returns valid data", {
    response <- GET("http://localhost:8000/players")

    expect_equal(status_code(response), 200)

    data <- content(response, "parsed")
    expect_true(length(data) > 0)
    expect_true("name" %in% names(data[[1]]))
})

Practice Exercises

Exercise 48.1: Player Comparison App

Build a Streamlit or Shiny app that allows users to select two players and see a side-by-side comparison including radar charts, statistics tables, and performance trends.

Hints:
  • Use session state to maintain player selections
  • Normalize metrics for radar chart comparison
  • Add export functionality for reports
Exercise 48.2: REST API with Documentation

Create a FastAPI-based analytics API with endpoints for players, teams, and matches. Include automatic documentation, authentication, and rate limiting.

Hints:
  • Use Pydantic models for request/response validation
  • Implement API key authentication
  • Add caching with Redis
Exercise 48.3: Deployed Dashboard

Deploy your analytics dashboard to a cloud platform. Include CI/CD pipeline for automatic deployments when code changes.

Hints:
  • Use GitHub Actions for CI/CD
  • Set up environment variables for secrets
  • Configure health checks and monitoring
Exercise 48.4: Live Match Tracker

Build a real-time match tracking application using WebSockets. Display live xG accumulation, shot maps, and key event feeds that update automatically during a match.

Hints:
  • Use FastAPI WebSockets or Shiny reactive timers
  • Implement connection heartbeats for reliability
  • Add visual indicators when data updates
  • Handle disconnection and reconnection gracefully
Exercise 48.5: Multi-Tenant Analytics Platform

Create an analytics platform that supports multiple clubs/users with role-based access control. Each club should only see their own data, with admin users able to see aggregated cross-club analytics.

Hints:
  • Implement JWT tokens with club_id claims
  • Use row-level security in database queries
  • Create admin, manager, and analyst role levels
  • Log all data access for audit trails
Exercise 48.6: Mobile-Responsive Dashboard

Adapt an existing analytics dashboard to work well on mobile devices. Implement touch-friendly controls, responsive charts, and progressive loading for slower connections.

Hints:
  • Use CSS media queries and Bootstrap breakpoints
  • Replace hover interactions with tap/click
  • Implement lazy loading for heavy chart sections
  • Test on actual mobile devices, not just browser emulation
Exercise 48.7: Analytics API with GraphQL

Build a GraphQL API for football analytics that allows flexible querying of player, team, and match data. Implement proper pagination, filtering, and nested queries.

Hints:
  • Use Strawberry (Python) or gqlgen (Go) for GraphQL
  • Implement DataLoader to avoid N+1 queries
  • Add query complexity limits to prevent abuse
  • Generate TypeScript types from schema for frontend
Exercise 48.8: Automated Report Generator

Create a system that automatically generates PDF/HTML match reports after games. Include configurable templates, scheduled generation, and email delivery.

Hints:
  • Use Quarto or R Markdown for report templates
  • Implement a job queue (Celery, RQ) for background generation
  • Store generated reports in cloud storage (S3)
  • Add webhook notifications for report completion

Summary

Key Takeaways
  • Choose the Right Tool: Shiny for R users, Streamlit for rapid Python development, FastAPI for production APIs
  • Design for Users: Follow dashboard design principles to maximize insight delivery and minimize complexity
  • Build Robust Data Layers: Implement caching, connection pooling, and proper error handling
  • Plan for Deployment: Consider your audience, scale, and infrastructure needs when choosing deployment options
  • Iterate Based on Feedback: Production applications should evolve based on user needs and usage patterns
  • Security First: Authentication, authorization, and input validation are essential for any production application
  • Performance Matters: Cache aggressively, optimize queries, and use async operations for responsive user experiences
  • Test Everything: Unit tests, integration tests, and end-to-end tests prevent regressions and ensure reliability
Common Pitfalls
  • Premature Optimization: Building complex architectures before validating the product - start simple and scale as needed
  • No Caching: Recalculating expensive operations on every request tanks performance - implement caching from the start
  • Ignoring Mobile: Many stakeholders access dashboards on phones/tablets; design responsively from the beginning
  • Hardcoded Secrets: Database passwords, API keys in code lead to security breaches - always use environment variables
  • No Error Handling: Unhandled exceptions crash apps and confuse users; implement graceful error messages
  • Over-Engineering: Adding unnecessary features and abstractions before users need them wastes development time
  • Ignoring Browser Differences: Test on multiple browsers, not just Chrome; Safari and Firefox have different behaviors
  • No Monitoring: Without logging and metrics, debugging production issues becomes guesswork - implement observability early
Essential Tools for Building Analytics Apps
Category R Python
Dashboard Framework shiny, shinydashboard streamlit, dash
REST API plumber fastapi, flask
Database Access DBI, RPostgres sqlalchemy, asyncpg
Caching memoise, shiny::bindCache redis, st.cache_data
Authentication shinymanager, googleAuthR python-jose, passlib
Testing testthat, shinytest2 pytest, httpx
Deployment shinyapps.io, Shiny Server Streamlit Cloud, Docker, Heroku
Application Architecture by Scale
Small (1-10 users)
  • Streamlit/Shiny single file
  • SQLite or CSV files
  • Local or free-tier hosting
  • Basic password protection
Medium (10-100 users)
  • Modular app structure
  • PostgreSQL/MySQL database
  • Redis caching layer
  • JWT authentication
  • Docker deployment
Large (100+ users)
  • Microservices architecture
  • Load balancer + multiple instances
  • CDN for static assets
  • OAuth2/SSO integration
  • Kubernetes orchestration
Performance Benchmarks
Metric Target Acceptable Poor
Initial page load < 2s 2-5s > 5s
Chart render < 500ms 500ms-2s > 2s
Filter update < 300ms 300ms-1s > 1s
API response < 100ms 100-500ms > 500ms
Real-time update < 1s 1-5s > 5s
Pre-Deployment Checklist
Security
  • All secrets in environment variables
  • HTTPS configured
  • Authentication working
  • Rate limiting implemented
  • Input validation complete
Reliability
  • Error handling implemented
  • Health check endpoint added
  • Logging configured
  • Database backups scheduled
  • Monitoring/alerting set up
complete_application_template
# Python: Complete FastAPI + Streamlit Template
"""
Project structure:
/app
  /api
    main.py       # FastAPI app
    auth.py       # Authentication
    models.py     # Pydantic models
  /dashboard
    app.py        # Streamlit app
  /data
    manager.py    # Data layer
  /tests
    test_api.py
  docker-compose.yml
  requirements.txt
"""

# api/main.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from .auth import get_current_user
from .models import Player, Team
from ..data.manager import DataManager

app = FastAPI(
    title="Football Analytics API",
    version="1.0.0",
    docs_url="/docs"
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure for production
    allow_methods=["*"],
    allow_headers=["*"],
)

dm = DataManager()

@app.get("/health")
async def health_check():
    return {"status": "healthy", "version": "1.0.0"}

@app.get("/players", response_model=list[Player])
async def get_players(
    season: str = "2023-24",
    user = Depends(get_current_user)
):
    return dm.get_players(season)

@app.get("/teams/{team_id}")
async def get_team(team_id: int, user = Depends(get_current_user)):
    team = dm.get_team(team_id)
    if not team:
        raise HTTPException(404, "Team not found")
    return team

# dashboard/app.py
import streamlit as st
import requests
import plotly.express as px

st.set_page_config(page_title="Football Analytics", layout="wide")

# Authentication
if "token" not in st.session_state:
    with st.form("login"):
        username = st.text_input("Username")
        password = st.text_input("Password", type="password")
        if st.form_submit_button("Login"):
            response = requests.post(
                "http://api:8000/token",
                data={"username": username, "password": password}
            )
            if response.ok:
                st.session_state.token = response.json()["access_token"]
                st.rerun()
            else:
                st.error("Invalid credentials")
    st.stop()

# Headers for API requests
headers = {"Authorization": f"Bearer {st.session_state.token}"}

# Sidebar
season = st.sidebar.selectbox("Season", ["2023-24", "2022-23"])

# Main content
st.title("Football Analytics Dashboard")

# Fetch data
players = requests.get(
    f"http://api:8000/players?season={season}",
    headers=headers
).json()

# KPIs
col1, col2, col3 = st.columns(3)
col1.metric("Players", len(players))
col2.metric("Total Goals", sum(p["goals"] for p in players))
col3.metric("Avg xG", f"{sum(p['xG'] for p in players)/len(players):.2f}")

# Chart
fig = px.scatter(players, x="xG", y="goals", hover_name="name")
st.plotly_chart(fig, use_container_width=True)

# data/manager.py
from functools import lru_cache
import pandas as pd
from sqlalchemy import create_engine
import redis
import json
import os

class DataManager:
    def __init__(self):
        self.engine = create_engine(os.getenv("DATABASE_URL"))
        self.redis = redis.from_url(os.getenv("REDIS_URL"))

    def get_players(self, season: str) -> list:
        cache_key = f"players:{season}"

        # Check cache
        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # Query database
        df = pd.read_sql(
            "SELECT * FROM players WHERE season = %s",
            self.engine,
            params=[season]
        )
        result = df.to_dict("records")

        # Cache result
        self.redis.setex(cache_key, 3600, json.dumps(result))

        return result
# R: Complete Shiny Application Template
library(shiny)
library(shinydashboard)
library(shinymanager)
library(tidyverse)
library(DBI)
library(pool)

# Database connection pool
pool <- dbPool(
    drv = RPostgres::Postgres(),
    dbname = Sys.getenv("DB_NAME"),
    host = Sys.getenv("DB_HOST"),
    user = Sys.getenv("DB_USER"),
    password = Sys.getenv("DB_PASSWORD")
)

onStop(function() {
    poolClose(pool)
})

# Data layer with caching
get_players <- memoise::memoise(
    function(season) {
        dbGetQuery(pool, "SELECT * FROM players WHERE season = $1", list(season))
    },
    cache = cachem::cache_mem(max_age = 3600)
)

# UI
ui <- dashboardPage(
    dashboardHeader(title = "Football Analytics"),
    dashboardSidebar(
        sidebarMenu(
            menuItem("Dashboard", tabName = "dashboard", icon = icon("dashboard")),
            menuItem("Players", tabName = "players", icon = icon("users")),
            menuItem("Teams", tabName = "teams", icon = icon("futbol"))
        ),
        selectInput("season", "Season", choices = c("2023-24", "2022-23"))
    ),
    dashboardBody(
        tabItems(
            tabItem(tabName = "dashboard",
                fluidRow(
                    valueBoxOutput("total_goals"),
                    valueBoxOutput("avg_xG"),
                    valueBoxOutput("top_scorer")
                ),
                fluidRow(
                    box(plotlyOutput("xg_chart"), width = 8),
                    box(tableOutput("quick_stats"), width = 4)
                )
            )
        )
    )
)

# Wrap with authentication
ui <- secure_app(ui, enable_admin = TRUE)

# Server
server <- function(input, output, session) {

    # Cached reactive data
    players <- reactive({
        get_players(input$season)
    }) %>% bindCache(input$season)

    output$total_goals <- renderValueBox({
        valueBox(sum(players()$goals), "Total Goals", icon = icon("futbol"))
    })

    output$xg_chart <- renderPlotly({
        p <- ggplot(players(), aes(x = xG, y = goals)) +
            geom_point() +
            geom_abline(slope = 1, linetype = "dashed") +
            theme_minimal()
        ggplotly(p)
    })
}

# Run
shinyApp(ui, server)

Building analytics applications transforms your analyses from one-off scripts into tools that can be used daily by non-technical stakeholders. Master these frameworks to maximize the impact of your football analytics work. Start simple, iterate based on user feedback, and scale your architecture as your user base grows. The best analytics tool is one that people actually use, so prioritize usability and reliability over feature completeness.