Skip to content

Serving

Application Entry-point

prometheus_metrics()

Prometheus scrape endpoint.

Uses MultiProcessCollector when PROMETHEUS_MULTIPROC_DIR is set (required for Gunicorn multi-worker deployments).

Source code in src/app/main.py
@app.get("/metrics", include_in_schema=False, tags=["ops"])
def prometheus_metrics() -> Response:
    """Prometheus scrape endpoint.

    Uses ``MultiProcessCollector`` when ``PROMETHEUS_MULTIPROC_DIR`` is set
    (required for Gunicorn multi-worker deployments).
    """
    if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
        registry = CollectorRegistry()
        multiprocess.MultiProcessCollector(registry)
        data = generate_latest(registry)
    else:
        data = generate_latest()
    return Response(content=data, media_type=CONTENT_TYPE_LATEST)

Database

Dependencies

get_token_header(x_api_key=None) async

Validate the X-API-Key request header.

Uses hmac.compare_digest for constant-time comparison to prevent timing-based key enumeration attacks. Returns 401 when the header is absent or does not match the configured secret (FASTAPI_HEADER_TOKEN).

Source code in src/app/dependencies.py
async def get_token_header(
    x_api_key: Annotated[str | None, Header(alias="x-api-key")] = None,
) -> None:
    """Validate the ``X-API-Key`` request header.

    Uses ``hmac.compare_digest`` for constant-time comparison to prevent
    timing-based key enumeration attacks.  Returns 401 when the header is
    absent or does not match the configured secret (``FASTAPI_HEADER_TOKEN``).
    """
    expected = get_security_settings().token_header
    if x_api_key is None or not hmac.compare_digest(x_api_key, expected):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing or invalid X-API-Key header",
            headers={"WWW-Authenticate": "ApiKey"},
        )

get_query_token(token=None) async

Validate the token query parameter.

Source code in src/app/dependencies.py
async def get_query_token(token: str | None = None) -> None:
    """Validate the ``token`` query parameter."""
    expected = get_security_settings().token_query
    if token is None or not hmac.compare_digest(token, expected):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Missing or invalid token query parameter",
        )

Routers

list_upcoming_matches(lookup) async

Return upcoming matches available for prediction (from batch inference output).

Each item includes match_id, teams, and date. Use these IDs with GET /predict/{match_id}.

Source code in src/app/routers/predict.py
@router.get(
    "/matches/",
    status_code=status.HTTP_200_OK,
    summary="List upcoming matches available for prediction",
)
async def list_upcoming_matches(lookup: _LookupDep) -> list[dict]:
    """Return upcoming matches available for prediction (from batch inference output).

    Each item includes ``match_id``, teams, and date.  Use these IDs with
    ``GET /predict/{match_id}``.
    """
    return lookup.list_matches()

predict(request, stage) async

Synchronous 1×2 outcome prediction from provided features.

Submits the task to the ml Celery worker and blocks until the result is ready (up to _SYNC_TIMEOUT seconds).

Use ?stage=challenger to use the challenger model (requires MLFLOW_MODEL_STAGES=champion,challenger on the worker).

Source code in src/app/routers/predict.py
@router.post(
    "/",
    response_model=PredictResponse,
    status_code=status.HTTP_200_OK,
    summary="Predict match outcome (inline features)",
)
async def predict(
    request: PredictRequest,
    stage: StageDep,
) -> PredictResponse:
    """Synchronous 1×2 outcome prediction from provided features.

    Submits the task to the ``ml`` Celery worker and blocks until the result
    is ready (up to ``_SYNC_TIMEOUT`` seconds).

    Use ``?stage=challenger`` to use the challenger model (requires
    MLFLOW_MODEL_STAGES=champion,challenger on the worker).
    """
    _validate_stage(stage)
    try:
        task = predict_match_task.apply_async(
            args=[request.match_id, request.features, None, stage],
            queue="ml",
        )
        result = await _poll_task_result(task)
        PREDICTION_COUNT.labels(source="sync").inc()
        return PredictResponse(**result)
    except CeleryTimeoutError as exc:
        PREDICTION_TIMEOUTS.inc()
        raise HTTPException(
            status_code=status.HTTP_504_GATEWAY_TIMEOUT,
            detail="ML worker did not respond in time. Try /predict/async/ instead.",
        ) from exc
    except Exception as exc:
        logger.exception("Prediction failed for match_id=%s", request.match_id)
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Prediction failed: {exc}",
        ) from exc

predict_precomputed(match_id, pred_lookup) async

Return the precomputed prediction for a match from predictions.parquet.

Predictions are produced by the batch_inference DVC stage which runs model.predict() over all matches and saves the result to MinIO. This endpoint reads directly from the in-memory cache — no Celery task, no MLflow model call at request time.

Returns 404 if the match is not found in the latest batch output.

Source code in src/app/routers/predict.py
@router.get(
    "/precomputed/{match_id}",
    response_model=PrecomputedPredictResponse,
    status_code=status.HTTP_200_OK,
    summary="Return precomputed prediction for a match (no Celery, no model call)",
)
async def predict_precomputed(
    match_id: int,
    pred_lookup: _PredLookupDep,
) -> PrecomputedPredictResponse:
    """Return the precomputed prediction for a match from predictions.parquet.

    Predictions are produced by the ``batch_inference`` DVC stage which runs
    ``model.predict()`` over all matches and saves the result to MinIO.
    This endpoint reads directly from the in-memory cache — no Celery task,
    no MLflow model call at request time.

    Returns 404 if the match is not found in the latest batch output.
    """
    row = pred_lookup.get_prediction(match_id)
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(
                f"match_id={match_id} not found in precomputed predictions. "
                "Ensure the batch_inference DVC stage has been run and "
                "MINIO_BUCKET_PREDICTIONS is configured."
            ),
        )
    return PrecomputedPredictResponse(
        match_id=match_id,
        proba_home=float(row["proba_home"]),
        proba_draw=float(row["proba_draw"]),
        proba_away=float(row["proba_away"]),
        predicted_class=int(row["predicted_class"]),
        predicted_label=str(row.get("predicted_label", "")),
        is_future=bool(row["is_future"]) if "is_future" in row else None,
        start_time_utc=row.get("startTimeUtc"),
        home_team_name=row.get("homeTeamName"),
        away_team_name=row.get("awayTeamName"),
        model_run_id=row.get("model_run_id"),
        model_stage=row.get("model_stage"),
        predictions_computed_at=pred_lookup.predictions_computed_at,
    )

predict_by_match_id(match_id, lookup, stage) async

Predict outcome for a match using precomputed features.

Features are read from data/predictions/match_features.parquet produced by the batch_inference DVC stage.

Submits the task to the ml Celery worker and blocks until the result is ready. Returns 404 if match_id is not in the current batch output.

Use ?stage=challenger to use the challenger model (requires MLFLOW_MODEL_STAGES=champion,challenger on the worker).

Source code in src/app/routers/predict.py
@router.get(
    "/{match_id}",
    response_model=PredictResponse,
    status_code=status.HTTP_200_OK,
    summary="Predict upcoming match by ID (look-up)",
)
async def predict_by_match_id(
    match_id: int,
    lookup: _LookupDep,
    stage: StageDep,
) -> PredictResponse:
    """Predict outcome for a match using precomputed features.

    Features are read from ``data/predictions/match_features.parquet``
    produced by the ``batch_inference`` DVC stage.

    Submits the task to the ``ml`` Celery worker and blocks until the result
    is ready. Returns 404 if ``match_id`` is not in the current batch output.

    Use ``?stage=challenger`` to use the challenger model (requires
    MLFLOW_MODEL_STAGES=champion,challenger on the worker).
    """
    _validate_stage(stage)
    features = lookup.get_features(match_id)
    if features is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(
                f"match_id={match_id} not found in precomputed features. "
                "Ensure the batch_inference DVC stage has been run."
            ),
        )
    fca = lookup.features_computed_at
    try:
        task = predict_match_task.apply_async(
            args=[match_id, features, fca.isoformat() if fca else None, stage],
            queue="ml",
        )
        result = await _poll_task_result(task)
        PREDICTION_COUNT.labels(source="sync").inc()
        return PredictResponse(**result)
    except CeleryTimeoutError as exc:
        PREDICTION_TIMEOUTS.inc()
        raise HTTPException(
            status_code=status.HTTP_504_GATEWAY_TIMEOUT,
            detail="ML worker did not respond in time. Try /predict/async/ instead.",
        ) from exc
    except Exception as exc:
        logger.exception("Prediction failed for match_id=%s", match_id)
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Prediction failed: {exc}",
        ) from exc

model_info(stage) async

Return the current model's metadata from the MLflow Model Registry.

Delegates to the ml Celery worker where MLflow is available. Includes version, stage, run metrics, and the input feature schema.

Source code in src/app/routers/predict.py
@router.get(
    "/model/info",
    response_model=ModelInfoResponse,
    status_code=status.HTTP_200_OK,
    summary="MLflow model metadata (version, stage, metrics, features)",
)
async def model_info(
    stage: StageDep,
) -> ModelInfoResponse:
    """Return the current model's metadata from the MLflow Model Registry.

    Delegates to the ``ml`` Celery worker where MLflow is available.
    Includes version, stage, run metrics, and the input feature schema.
    """
    _validate_stage(stage)
    try:
        task = get_model_info_task.apply_async(args=[stage], queue="ml")
        info = await _poll_task_result(task)
        return ModelInfoResponse(**info)
    except CeleryTimeoutError as exc:
        raise HTTPException(
            status_code=status.HTTP_504_GATEWAY_TIMEOUT,
            detail="ML worker did not respond in time.",
        ) from exc
    except Exception as exc:
        logger.exception("Failed to retrieve model info")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Could not retrieve model info: {exc}",
        ) from exc

predict_async(request, lookup) async

Submit a 1×2 prediction to the Celery queue.

Returns a task_id immediately. Poll GET /monitoring/task_status/{task_id} until status == 'SUCCESS' to retrieve the result.

Returns 404 if the match has no precomputed features.

Source code in src/app/routers/predict.py
@router.post(
    "/async/",
    response_model=AsyncPredictResponse,
    status_code=status.HTTP_202_ACCEPTED,
    summary="Submit async prediction job (Celery)",
)
async def predict_async(
    request: AsyncPredictRequest, lookup: _LookupDep
) -> AsyncPredictResponse:
    """Submit a 1×2 prediction to the Celery queue.

    Returns a ``task_id`` immediately.  Poll
    ``GET /monitoring/task_status/{task_id}`` until ``status == 'SUCCESS'``
    to retrieve the result.

    Returns 404 if the match has no precomputed features.
    """
    features = lookup.get_features(request.match_id)
    if features is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=(
                f"match_id={request.match_id} not found in precomputed features. "
                "Ensure the batch_inference DVC stage has been run."
            ),
        )
    fca = lookup.features_computed_at
    task = predict_match_task.apply_async(
        args=[
            request.match_id,
            features,
            fca.isoformat() if fca else None,
        ],
        queue="ml",
    )
    PREDICTION_COUNT.labels(source="async").inc()
    return AsyncPredictResponse(
        task_id=task.id,
        status="submitted",
        status_url=f"/monitoring/task_status/{task.id}",
    )

get_drift_status()

Return the latest feature drift summary and refresh the Prometheus gauge.

The report is written by the monitor_drift DVC stage. If the report does not yet exist, returns drift_score=null.

Source code in src/app/routers/monitoring.py
@router.get("/drift")
def get_drift_status() -> dict:
    """Return the latest feature drift summary and refresh the Prometheus gauge.

    The report is written by the ``monitor_drift`` DVC stage.
    If the report does not yet exist, returns drift_score=null.
    """
    if not _DRIFT_REPORT_PATH.exists():
        return {"drift_score": None, "message": "Drift report not yet available."}

    try:
        with open(_DRIFT_REPORT_PATH) as f:
            payload: dict = json.load(f)
    except (OSError, json.JSONDecodeError) as exc:
        logger.exception("Failed to read drift report: %s", exc)
        raise HTTPException(status_code=500, detail="Failed to read drift report.")

    score = payload.get("drift_score")
    if score is not None:
        DRIFT_SCORE.set(float(score))

    return payload

get_queue_stats()

Return active/scheduled/reserved task counts and worker stats.

Source code in src/app/routers/monitoring.py
@router.get("/celery/queues")
def get_queue_stats() -> dict:
    """Return active/scheduled/reserved task counts and worker stats."""
    inspect = celery_app.control.inspect()
    return {
        "active": inspect.active(),
        "scheduled": inspect.scheduled(),
        "reserved": inspect.reserved(),
        "stats": inspect.stats(),
    }

get_workers()

Return active queues and ping status for all connected workers.

Source code in src/app/routers/monitoring.py
@router.get("/celery/workers")
def get_workers() -> dict:
    """Return active queues and ping status for all connected workers."""
    inspect = celery_app.control.inspect()
    return {
        "active_workers": inspect.active_queues(),
        "ping": inspect.ping(),
    }

Services

FeatureLookupService

Loads precomputed features for all matches from the batch inference output.

The parquet file is produced by the batch_inference DVC stage and has the match id as its index. It contains both upcoming matches and finished matches (with outcome_1x2, homeScore, awayScore). The service is loaded lazily on the first call and cached in-process.

Parameters:

Name Type Description Default
features_path Path | None

Absolute path to match_features.parquet.

None
Source code in src/app/services/predict.py
class FeatureLookupService:
    """Loads precomputed features for all matches from the batch inference output.

    The parquet file is produced by the ``batch_inference`` DVC stage and has
    the match ``id`` as its index.  It contains both upcoming matches and
    finished matches (with ``outcome_1x2``, ``homeScore``, ``awayScore``).
    The service is loaded lazily on the first call and cached in-process.

    Args:
        features_path: Absolute path to ``match_features.parquet``.
    """

    def __init__(self, features_path: Path | None = None) -> None:
        self._path = features_path or (
            DATA_PREDICTIONS_PATH / _FUTURE_FEATURES_FILENAME
        )
        self._df: pd.DataFrame | None = None
        # UTC timestamp of the loaded data: local st_mtime or MinIO LastModified.
        self._mtime: float | None = None
        # monotonic time of the last MinIO head_object check (avoids per-request calls).
        self._last_minio_check: float = 0.0

    # ------------------------------------------------------------------
    # MinIO helpers
    # ------------------------------------------------------------------

    def _minio_last_modified(self) -> float | None:
        """Return the MinIO LastModified timestamp for the features object, or None."""
        bucket = get_minio_settings().bucket_predictions
        if not bucket:
            return None
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(Bucket=bucket, Key=_MINIO_PREDICTIONS_KEY)
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        """Read future_features.parquet directly from MinIO into memory."""
        bucket = get_minio_settings().bucket_predictions
        s3_url = f"s3://{bucket}/{_MINIO_PREDICTIONS_KEY}"
        try:
            df = pd.read_parquet(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info(
                "Loaded future features from MinIO (%s): %d matches", s3_url, len(df)
            )
            return df
        except Exception:  # noqa: BLE001
            logger.exception(
                "Failed to load future_features.parquet from MinIO (%s)", s3_url
            )
            return pd.DataFrame()

    # ------------------------------------------------------------------
    # Core load logic
    # ------------------------------------------------------------------

    def _load(self) -> pd.DataFrame:
        # ── Local file (dev / Airflow server) ────────────────────────────
        if self._path.exists():
            current_mtime = self._path.stat().st_mtime
            if self._df is not None and current_mtime == self._mtime:
                return self._df
            self._df = pd.read_parquet(self._path)
            self._mtime = current_mtime
            logger.info(
                "Loaded future features from local: %d matches (mtime=%s)",
                len(self._df),
                datetime.fromtimestamp(current_mtime, tz=timezone.utc).isoformat(),
            )
            return self._df

        # ── MinIO (production K8s — no local data volume) ─────────────────
        now = time.monotonic()
        should_check = (now - self._last_minio_check) >= _FEATURE_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df  # return cached; not time to re-check yet

        self._last_minio_check = now
        minio_mtime = self._minio_last_modified()

        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale feature cache")
                return self._df
            logger.warning(
                "future_features.parquet not found locally or in MinIO — "
                "run the batch_inference DVC stage and ensure "
                "MINIO_BUCKET_PREDICTIONS is configured."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df

        if self._df is not None and minio_mtime == self._mtime:
            return self._df  # MinIO file unchanged

        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def get_features(self, match_id: int) -> dict | None:
        """Return the feature dict for *match_id*, or ``None`` if not found."""
        df = self._load()
        if df.empty or match_id not in df.index:
            return None
        row = df.loc[match_id]
        # Convert to plain Python dict; drop NaN values to avoid serialisation issues
        return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

    def list_matches(self) -> list[dict]:
        """Return a lightweight list of upcoming matches for UI display."""
        df = self._load()
        if df.empty:
            return []
        # Keep only matches that have not been played yet (no recorded outcome).
        if "outcome_1x2" in df.columns:
            df = df[df["outcome_1x2"].isna()]
        display_cols = [
            c
            for c in (
                "startTimeUtc",
                "homeTeamName",
                "awayTeamName",
                "homeTeamId",
                "awayTeamId",
            )
            if c in df.columns
        ]
        df_display = df[display_cols].copy()
        df_display.index.name = "match_id"
        return df_display.reset_index().to_dict(orient="records")

    @property
    def features_computed_at(self) -> datetime | None:
        """UTC datetime when the feature file was last written (= last batch_inference run)."""
        self._load()
        if self._mtime is None:
            return None
        return datetime.fromtimestamp(self._mtime, tz=timezone.utc)

features_computed_at property

UTC datetime when the feature file was last written (= last batch_inference run).

get_features(match_id)

Return the feature dict for match_id, or None if not found.

Source code in src/app/services/predict.py
def get_features(self, match_id: int) -> dict | None:
    """Return the feature dict for *match_id*, or ``None`` if not found."""
    df = self._load()
    if df.empty or match_id not in df.index:
        return None
    row = df.loc[match_id]
    # Convert to plain Python dict; drop NaN values to avoid serialisation issues
    return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

list_matches()

Return a lightweight list of upcoming matches for UI display.

Source code in src/app/services/predict.py
def list_matches(self) -> list[dict]:
    """Return a lightweight list of upcoming matches for UI display."""
    df = self._load()
    if df.empty:
        return []
    # Keep only matches that have not been played yet (no recorded outcome).
    if "outcome_1x2" in df.columns:
        df = df[df["outcome_1x2"].isna()]
    display_cols = [
        c
        for c in (
            "startTimeUtc",
            "homeTeamName",
            "awayTeamName",
            "homeTeamId",
            "awayTeamId",
        )
        if c in df.columns
    ]
    df_display = df[display_cols].copy()
    df_display.index.name = "match_id"
    return df_display.reset_index().to_dict(orient="records")

PredictionLookupService

Serves precomputed batch predictions from the batch_inference DVC stage output.

Mirrors the FeatureLookupService caching pattern: - Checks local file first (dev / CI). - Falls back to MinIO with a configurable re-check interval.

The parquet file is indexed by match id and must contain columns: proba_home, proba_draw, proba_away, predicted_class, predicted_label, optionally is_future, startTimeUtc, homeTeamName, awayTeamName, model_run_id, model_stage.

Source code in src/app/services/predict.py
class PredictionLookupService:
    """Serves precomputed batch predictions from the batch_inference DVC stage output.

    Mirrors the ``FeatureLookupService`` caching pattern:
    - Checks local file first (dev / CI).
    - Falls back to MinIO with a configurable re-check interval.

    The parquet file is indexed by match ``id`` and must contain columns:
    ``proba_home``, ``proba_draw``, ``proba_away``, ``predicted_class``,
    ``predicted_label``, optionally ``is_future``, ``startTimeUtc``,
    ``homeTeamName``, ``awayTeamName``, ``model_run_id``, ``model_stage``.
    """

    def __init__(self, predictions_path: Path | None = None) -> None:
        self._path = predictions_path or (DATA_PREDICTIONS_PATH / _PREDICTIONS_FILENAME)
        self._df: pd.DataFrame | None = None
        self._mtime: float | None = None
        self._last_minio_check: float = 0.0

    # ------------------------------------------------------------------
    # MinIO helpers
    # ------------------------------------------------------------------

    def _minio_last_modified(self) -> float | None:
        """Return the MinIO LastModified for the predictions object, or None."""
        bucket = get_minio_settings().bucket_predictions
        if not bucket:
            return None
        try:
            client = boto3.client(
                "s3",
                endpoint_url=get_minio_settings().endpoint_url,
                aws_access_key_id=get_minio_settings().access_key,
                aws_secret_access_key=get_minio_settings().secret_key,
            )
            resp = client.head_object(Bucket=bucket, Key=_MINIO_BATCH_PREDICTIONS_KEY)
            return resp["LastModified"].timestamp()
        except Exception as exc:  # noqa: BLE001
            logger.debug("MinIO head_object (predictions) failed: %s", exc)
            return None

    def _load_from_minio(self) -> pd.DataFrame:
        """Read predictions.parquet directly from MinIO into memory."""
        bucket = get_minio_settings().bucket_predictions
        s3_url = f"s3://{bucket}/{_MINIO_BATCH_PREDICTIONS_KEY}"
        try:
            df = pd.read_parquet(
                s3_url, storage_options=get_minio_settings().storage_options
            )
            logger.info(
                "Loaded batch predictions from MinIO (%s): %d rows", s3_url, len(df)
            )
            return df
        except Exception:  # noqa: BLE001
            logger.exception(
                "Failed to load predictions.parquet from MinIO (%s)", s3_url
            )
            return pd.DataFrame()

    # ------------------------------------------------------------------
    # Core load logic
    # ------------------------------------------------------------------

    def _load(self) -> pd.DataFrame:
        # ── Local file (dev / CI) ─────────────────────────────────────────
        if self._path.exists():
            current_mtime = self._path.stat().st_mtime
            if self._df is not None and current_mtime == self._mtime:
                return self._df
            self._df = pd.read_parquet(self._path)
            self._mtime = current_mtime
            logger.info(
                "Loaded batch predictions from local: %d rows (mtime=%s)",
                len(self._df),
                datetime.fromtimestamp(current_mtime, tz=timezone.utc).isoformat(),
            )
            return self._df

        # ── MinIO (production K8s) ────────────────────────────────────────
        now = time.monotonic()
        should_check = (
            now - self._last_minio_check
        ) >= _PREDICTIONS_CACHE_CHECK_INTERVAL
        if self._df is not None and not should_check:
            return self._df

        self._last_minio_check = now
        minio_mtime = self._minio_last_modified()

        if minio_mtime is None:
            if self._df is not None:
                logger.warning("MinIO unreachable — serving stale predictions cache")
                return self._df
            logger.warning(
                "predictions.parquet not found locally or in MinIO — "
                "run the batch_inference DVC stage and ensure "
                "MINIO_BUCKET_PREDICTIONS is configured."
            )
            self._df = pd.DataFrame()
            self._mtime = None
            return self._df

        if self._df is not None and minio_mtime == self._mtime:
            return self._df

        self._df = self._load_from_minio()
        self._mtime = minio_mtime
        return self._df

    def get_prediction(self, match_id: int) -> dict | None:
        """Return the prediction dict for *match_id*, or ``None`` if not found."""
        df = self._load()
        if df.empty or match_id not in df.index:
            return None
        row = df.loc[match_id]
        return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

    def list_matches(self) -> list[dict]:
        """Return all rows as a list of dicts (for diagnostics/admin endpoints)."""
        df = self._load()
        if df.empty:
            return []
        return df.reset_index().to_dict(orient="records")

    @property
    def predictions_computed_at(self) -> datetime | None:
        """UTC datetime when predictions.parquet was last written."""
        self._load()
        if self._mtime is None:
            return None
        return datetime.fromtimestamp(self._mtime, tz=timezone.utc)

predictions_computed_at property

UTC datetime when predictions.parquet was last written.

get_prediction(match_id)

Return the prediction dict for match_id, or None if not found.

Source code in src/app/services/predict.py
def get_prediction(self, match_id: int) -> dict | None:
    """Return the prediction dict for *match_id*, or ``None`` if not found."""
    df = self._load()
    if df.empty or match_id not in df.index:
        return None
    row = df.loc[match_id]
    return {k: v for k, v in row.to_dict().items() if pd.notna(v)}

list_matches()

Return all rows as a list of dicts (for diagnostics/admin endpoints).

Source code in src/app/services/predict.py
def list_matches(self) -> list[dict]:
    """Return all rows as a list of dicts (for diagnostics/admin endpoints)."""
    df = self._load()
    if df.empty:
        return []
    return df.reset_index().to_dict(orient="records")

PredictionService

Loads and serves a model from the MLflow Model Registry.

The model is loaded lazily on the first call to predict and then cached in-process for the lifetime of the worker.

Source code in src/app/services/predict.py
class PredictionService:
    """Loads and serves a model from the MLflow Model Registry.

    The model is loaded lazily on the first call to ``predict`` and then
    cached in-process for the lifetime of the worker.
    """

    def __init__(
        self, tracking_uri: str, model_name: str, model_stage: str = "Staging"
    ) -> None:
        self._tracking_uri = tracking_uri
        self._model_name = model_name
        self._model_stage = model_stage
        self._model: Any = None  # mlflow.pyfunc.PyFuncModel, imported lazily
        # Protects the lazy-load critical section from concurrent reads
        # (e.g. gunicorn workers sharing the same process via threads).
        self._lock: threading.Lock = threading.Lock()
        # Redis client initialised lazily; None means "not yet checked".
        self._redis: Any = None
        self._redis_checked: bool = False

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _load_model(self) -> Any:
        # Fast path: model already loaded (no lock needed after init).
        if self._model is not None:
            return self._model
        # Slow path: double-checked locking prevents duplicate load
        # when multiple threads/workers call predict() simultaneously on startup.
        with self._lock:
            if self._model is None:
                # Deferred import: mlflow is only installed in the ml environment.
                # Must never execute at module-load time (api env has no mlflow).
                import mlflow
                import mlflow.pyfunc

                # MLflow's boto3 client reads these from os.environ at download time.
                # pydantic-settings populates Python objects but does NOT write back
                # to os.environ, so we must do it explicitly here.
                os.environ.setdefault(
                    "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
                )
                os.environ.setdefault(
                    "AWS_ACCESS_KEY_ID", get_minio_settings().access_key
                )
                os.environ.setdefault(
                    "AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key
                )
                mlflow.set_tracking_uri(self._tracking_uri)
                # MLflow 3.x: use alias URI (models:/name@alias)
                # Falls back gracefully to stage URI for older MLflow servers
                model_uri = f"models:/{self._model_name}@{self._model_stage}"
                logger.info("Loading model from MLflow: %s", model_uri)
                self._model = mlflow.pyfunc.load_model(model_uri)
                logger.info(
                    "Model loaded. run_id=%s",
                    getattr(self._model.metadata, "run_id", "unknown"),
                )
        return self._model

    def load(self) -> Any:
        """Eagerly load the model. Safe to call multiple times (idempotent).

        Call this during application startup (e.g. FastAPI lifespan or
        Celery ``worker_process_init``) to avoid paying the cold-start
        penalty on the first user request.
        """
        return self._load_model()

    # ------------------------------------------------------------------
    # Redis prediction cache
    # ------------------------------------------------------------------

    def _get_redis(self) -> Any:
        """Return a connected Redis client or *None* if unavailable.

        Tried once per process; subsequent calls return the cached result
        without retrying so a missing Redis never blocks inference.
        """
        if self._redis_checked:
            return self._redis
        self._redis_checked = True
        try:
            import redis as redis_lib

            url = os.getenv(
                "REDIS_CACHE_URL",
                os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
            )
            client = redis_lib.from_url(
                url, decode_responses=True, socket_connect_timeout=2
            )
            client.ping()
            self._redis = client
            logger.info("Prediction cache: connected to Redis at %s", url)
        except Exception as exc:
            logger.warning("Redis unavailable; prediction caching disabled: %s", exc)
        return self._redis

    def _cache_get(self, key: str) -> dict | None:
        r = self._get_redis()
        if r is None:
            return None
        try:
            value = r.get(key)
            return json.loads(value) if value else None
        except Exception as exc:
            logger.debug("Cache get failed for key=%s: %s", key, exc)
            return None

    def _cache_set(self, key: str, value: dict, ttl: int) -> None:
        r = self._get_redis()
        if r is None:
            return
        try:
            r.setex(key, ttl, json.dumps(value))
        except Exception as exc:
            logger.debug("Cache set failed for key=%s: %s", key, exc)

    def _predict_proba(self, df: pd.DataFrame) -> "Any":  # np.ndarray at runtime
        """Return (N, 3) probability array.

        Handles two common MLflow pyfunc flavours:
        - sklearn Pipeline logged with pyfunc_predict_fn='predict_proba'  → model.predict() returns (N, 3)
        - sklearn Pipeline logged with default pyfunc                     → model.predict() returns (N,) labels;
          in that case we fall back to the underlying python_model.
        """
        import numpy as np

        model = self._load_model()
        raw = model.predict(df)

        # Already a 2-D probability array
        if hasattr(raw, "ndim") and raw.ndim == 2:
            return raw

        # 1-D label output → try to access the underlying sklearn estimator
        logger.warning(
            "pyfunc.predict() returned 1-D output; attempting predict_proba fallback."
        )
        try:
            sklearn_model = model._model_impl.python_model
            proba = sklearn_model.predict_proba(df)
            return np.asarray(proba)
        except AttributeError:
            # Last resort: one-hot encode the hard labels
            labels = raw.astype(int)
            proba = np.zeros((len(labels), len(_LABEL_ORDER)), dtype=float)
            for i, lbl in enumerate(labels):
                if lbl in _LABEL_ORDER:
                    proba[i, _LABEL_ORDER.index(lbl)] = 1.0
            return proba

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    def get_model_info(self) -> dict:
        """Return model metadata from the MLflow Model Registry.

        Queries the registered model for the configured stage, then fetches
        run metrics and params.  Does NOT require the model to be loaded.
        """
        import mlflow
        from mlflow.tracking import MlflowClient

        os.environ.setdefault(
            "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
        )
        os.environ.setdefault("AWS_ACCESS_KEY_ID", get_minio_settings().access_key)
        os.environ.setdefault("AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key)
        mlflow.set_tracking_uri(self._tracking_uri)

        client = MlflowClient()
        try:
            # MLflow 3.x: look up by alias (e.g. "Staging")
            mv = client.get_model_version_by_alias(self._model_name, self._model_stage)
        except Exception:
            mv = None
        if mv is None:
            return {
                "model_name": self._model_name,
                "stage": self._model_stage,
                "version": "unknown",
                "run_id": "unknown",
                "metrics": {},
                "params": {},
                "feature_names": [],
                "created_at": None,
            }

        run_id = mv.run_id
        if run_id is None:
            return {
                "params": {},
                "feature_names": [],
                "created_at": None,
            }
        run = client.get_run(run_id)
        metrics = dict(run.data.metrics)
        params = dict(run.data.params)

        # Try to retrieve feature names from the input schema of the loaded model
        feature_names: list[str] = []
        try:
            model = self._load_model()
            schema = model.metadata.get_input_schema()
            if schema is not None:
                feature_names = [c.name for c in schema.inputs]
        except Exception:
            logger.debug("Could not retrieve feature names from model schema.")

        return {
            "model_name": self._model_name,
            "stage": self._model_stage,
            "version": mv.version,
            "run_id": run_id,
            "metrics": metrics,
            "params": params,
            "feature_names": feature_names,
            "created_at": str(mv.creation_timestamp) if mv.creation_timestamp else None,
        }

    def predict(
        self,
        features: dict,
        match_id: int | None = None,
        features_computed_at: datetime | None = None,
    ) -> dict:
        """Run inference for a single match.

        Args:
            features: Feature dict matching model input schema.
            match_id: Optional identifier for downstream tracing.
            features_computed_at: UTC timestamp when features were produced
                (batch_inference stage).  Stored in the response for traceability.

        Returns:
            Dict compatible with ``PredictResponse`` schema.
        """
        model = self._load_model()
        run_id = getattr(model.metadata, "run_id", "unknown")
        cache_ttl = int(os.getenv("PREDICTION_CACHE_TTL", "3600"))

        # Cache key is scoped to (match_id, run_id) so it auto-invalidates
        # when the model is updated in the registry.
        cache_key: str | None = (
            f"predict:{match_id}:{run_id}" if match_id is not None else None
        )
        if cache_key:
            cached = self._cache_get(cache_key)
            if cached is not None:
                cached["cached"] = True
                return cached

        df = pd.DataFrame([features])

        input_schema = model.metadata.get_input_schema()

        # Restore columns that were dropped during JSON serialisation (NaN is not
        # JSON-safe so get_features() omits them).  The model's sklearn pipeline
        # handles NaN via its own imputer/transformer; we just need the column to
        # exist so ColumnTransformer does not raise "columns are missing".
        if input_schema is not None:
            for col_spec in input_schema.inputs:
                if col_spec.name not in df.columns:
                    df[col_spec.name] = float("nan")

        # Align column dtypes to the model's MLflow input schema.
        # Parquet round-trips can promote int → float (e.g. when NaNs are present);
        # MLflow's schema enforcement refuses unsafe float→int conversions.
        _MLFLOW_TO_NUMPY: dict[str, str] = {
            "integer": "int32",
            "long": "int64",
            "float": "float32",
            "double": "float64",
        }
        if input_schema is not None:
            for col_spec in input_schema.inputs:
                if col_spec.name in df.columns:
                    target = _MLFLOW_TO_NUMPY.get(col_spec.type.name)
                    if target is not None and str(df[col_spec.name].dtype) != target:
                        df[col_spec.name] = df[col_spec.name].astype(target)
            # Drop columns not in the model signature to avoid MLflow warnings
            # about extra inputs (metadata cols like homeTeamName, startTimeUtc…).
            schema_cols = {c.name for c in input_schema.inputs}
            extra_cols = [c for c in df.columns if c not in schema_cols]
            if extra_cols:
                df = df.drop(columns=extra_cols)

        proba = self._predict_proba(df)

        predicted_idx = int(proba.argmax(axis=1)[0])
        predicted_class = _LABEL_ORDER[predicted_idx]

        result: dict = {
            "match_id": match_id,
            "cached": False,
            "features_computed_at": (
                features_computed_at.isoformat() if features_computed_at else None
            ),
            "prediction": {
                "predicted_class": predicted_class,
                "probabilities": {
                    str(lbl): float(proba[0, i]) for i, lbl in enumerate(_LABEL_ORDER)
                },
                "model_version": self._model_stage,
                "model_run_id": run_id,
            },
        }
        if cache_key:
            self._cache_set(cache_key, result, ttl=cache_ttl)
        return result

load()

Eagerly load the model. Safe to call multiple times (idempotent).

Call this during application startup (e.g. FastAPI lifespan or Celery worker_process_init) to avoid paying the cold-start penalty on the first user request.

Source code in src/app/services/predict.py
def load(self) -> Any:
    """Eagerly load the model. Safe to call multiple times (idempotent).

    Call this during application startup (e.g. FastAPI lifespan or
    Celery ``worker_process_init``) to avoid paying the cold-start
    penalty on the first user request.
    """
    return self._load_model()

get_model_info()

Return model metadata from the MLflow Model Registry.

Queries the registered model for the configured stage, then fetches run metrics and params. Does NOT require the model to be loaded.

Source code in src/app/services/predict.py
def get_model_info(self) -> dict:
    """Return model metadata from the MLflow Model Registry.

    Queries the registered model for the configured stage, then fetches
    run metrics and params.  Does NOT require the model to be loaded.
    """
    import mlflow
    from mlflow.tracking import MlflowClient

    os.environ.setdefault(
        "MLFLOW_S3_ENDPOINT_URL", get_minio_settings().endpoint_url
    )
    os.environ.setdefault("AWS_ACCESS_KEY_ID", get_minio_settings().access_key)
    os.environ.setdefault("AWS_SECRET_ACCESS_KEY", get_minio_settings().secret_key)
    mlflow.set_tracking_uri(self._tracking_uri)

    client = MlflowClient()
    try:
        # MLflow 3.x: look up by alias (e.g. "Staging")
        mv = client.get_model_version_by_alias(self._model_name, self._model_stage)
    except Exception:
        mv = None
    if mv is None:
        return {
            "model_name": self._model_name,
            "stage": self._model_stage,
            "version": "unknown",
            "run_id": "unknown",
            "metrics": {},
            "params": {},
            "feature_names": [],
            "created_at": None,
        }

    run_id = mv.run_id
    if run_id is None:
        return {
            "params": {},
            "feature_names": [],
            "created_at": None,
        }
    run = client.get_run(run_id)
    metrics = dict(run.data.metrics)
    params = dict(run.data.params)

    # Try to retrieve feature names from the input schema of the loaded model
    feature_names: list[str] = []
    try:
        model = self._load_model()
        schema = model.metadata.get_input_schema()
        if schema is not None:
            feature_names = [c.name for c in schema.inputs]
    except Exception:
        logger.debug("Could not retrieve feature names from model schema.")

    return {
        "model_name": self._model_name,
        "stage": self._model_stage,
        "version": mv.version,
        "run_id": run_id,
        "metrics": metrics,
        "params": params,
        "feature_names": feature_names,
        "created_at": str(mv.creation_timestamp) if mv.creation_timestamp else None,
    }

predict(features, match_id=None, features_computed_at=None)

Run inference for a single match.

Parameters:

Name Type Description Default
features dict

Feature dict matching model input schema.

required
match_id int | None

Optional identifier for downstream tracing.

None
features_computed_at datetime | None

UTC timestamp when features were produced (batch_inference stage). Stored in the response for traceability.

None

Returns:

Type Description
dict

Dict compatible with PredictResponse schema.

Source code in src/app/services/predict.py
def predict(
    self,
    features: dict,
    match_id: int | None = None,
    features_computed_at: datetime | None = None,
) -> dict:
    """Run inference for a single match.

    Args:
        features: Feature dict matching model input schema.
        match_id: Optional identifier for downstream tracing.
        features_computed_at: UTC timestamp when features were produced
            (batch_inference stage).  Stored in the response for traceability.

    Returns:
        Dict compatible with ``PredictResponse`` schema.
    """
    model = self._load_model()
    run_id = getattr(model.metadata, "run_id", "unknown")
    cache_ttl = int(os.getenv("PREDICTION_CACHE_TTL", "3600"))

    # Cache key is scoped to (match_id, run_id) so it auto-invalidates
    # when the model is updated in the registry.
    cache_key: str | None = (
        f"predict:{match_id}:{run_id}" if match_id is not None else None
    )
    if cache_key:
        cached = self._cache_get(cache_key)
        if cached is not None:
            cached["cached"] = True
            return cached

    df = pd.DataFrame([features])

    input_schema = model.metadata.get_input_schema()

    # Restore columns that were dropped during JSON serialisation (NaN is not
    # JSON-safe so get_features() omits them).  The model's sklearn pipeline
    # handles NaN via its own imputer/transformer; we just need the column to
    # exist so ColumnTransformer does not raise "columns are missing".
    if input_schema is not None:
        for col_spec in input_schema.inputs:
            if col_spec.name not in df.columns:
                df[col_spec.name] = float("nan")

    # Align column dtypes to the model's MLflow input schema.
    # Parquet round-trips can promote int → float (e.g. when NaNs are present);
    # MLflow's schema enforcement refuses unsafe float→int conversions.
    _MLFLOW_TO_NUMPY: dict[str, str] = {
        "integer": "int32",
        "long": "int64",
        "float": "float32",
        "double": "float64",
    }
    if input_schema is not None:
        for col_spec in input_schema.inputs:
            if col_spec.name in df.columns:
                target = _MLFLOW_TO_NUMPY.get(col_spec.type.name)
                if target is not None and str(df[col_spec.name].dtype) != target:
                    df[col_spec.name] = df[col_spec.name].astype(target)
        # Drop columns not in the model signature to avoid MLflow warnings
        # about extra inputs (metadata cols like homeTeamName, startTimeUtc…).
        schema_cols = {c.name for c in input_schema.inputs}
        extra_cols = [c for c in df.columns if c not in schema_cols]
        if extra_cols:
            df = df.drop(columns=extra_cols)

    proba = self._predict_proba(df)

    predicted_idx = int(proba.argmax(axis=1)[0])
    predicted_class = _LABEL_ORDER[predicted_idx]

    result: dict = {
        "match_id": match_id,
        "cached": False,
        "features_computed_at": (
            features_computed_at.isoformat() if features_computed_at else None
        ),
        "prediction": {
            "predicted_class": predicted_class,
            "probabilities": {
                str(lbl): float(proba[0, i]) for i, lbl in enumerate(_LABEL_ORDER)
            },
            "model_version": self._model_stage,
            "model_run_id": run_id,
        },
    }
    if cache_key:
        self._cache_set(cache_key, result, ttl=cache_ttl)
    return result

Schemas

PredictRequest

Bases: BaseModel

Input features for a single match prediction.

Features must match the model's training schema exactly. The model expects rolling-window difference features (side='diff', window=5) plus a categorical 'sex' column (0=men, 1=women).

Source code in src/app/schemas/predict.py
class PredictRequest(BaseModel):
    """Input features for a single match prediction.

    Features must match the model's training schema exactly.
    The model expects rolling-window difference features (side='diff', window=5)
    plus a categorical 'sex' column (0=men, 1=women).
    """

    match_id: int | None = Field(
        None, description="Optional match identifier for tracing"
    )
    features: dict[str, float | int | None] = Field(
        ...,
        description=(
            "Feature dict matching the model's input schema. "
            "Keys are feature names (e.g. diff_win_5_mean), values are numeric."
        ),
    )

    model_config = {
        "json_schema_extra": {
            "example": {
                "match_id": 12345,
                "features": {
                    "diff_win_5_mean": 0.2,
                    "diff_goals_for_5_mean": 0.4,
                    "diff_goals_against_5_mean": -0.1,
                    "sex": 0,
                },
            }
        }
    }

AsyncPredictRequest

Bases: BaseModel

Request body for POST /predict/async/.

Source code in src/app/schemas/predict.py
class AsyncPredictRequest(BaseModel):
    """Request body for POST /predict/async/."""

    match_id: int = Field(..., description="Match ID with precomputed features")

AsyncPredictResponse

Bases: BaseModel

Returned immediately after submitting an async prediction task.

Source code in src/app/schemas/predict.py
class AsyncPredictResponse(BaseModel):
    """Returned immediately after submitting an async prediction task."""

    task_id: str
    status: str = "submitted"
    status_url: str = Field(..., description="Poll this URL to get the result")

ModelInfoResponse

Bases: BaseModel

MLflow model metadata returned by GET /predict/model/info.

Source code in src/app/schemas/predict.py
class ModelInfoResponse(BaseModel):
    """MLflow model metadata returned by GET /predict/model/info."""

    model_name: str
    stage: str
    version: str
    run_id: str
    metrics: dict[str, float] = Field(default_factory=dict)
    params: dict[str, str] = Field(default_factory=dict)
    feature_names: list[str] = Field(default_factory=list)
    created_at: str | None = None

PrecomputedPredictResponse

Bases: BaseModel

Response for GET /predict/precomputed/{match_id}.

Served directly from predictions.parquet produced by the batch_inference DVC stage — no Celery task, no MLflow model call at request time.

Source code in src/app/schemas/predict.py
class PrecomputedPredictResponse(BaseModel):
    """Response for GET /predict/precomputed/{match_id}.

    Served directly from predictions.parquet produced by the batch_inference
    DVC stage — no Celery task, no MLflow model call at request time.
    """

    match_id: int
    proba_home: float = Field(..., description="P(home win)")
    proba_draw: float = Field(..., description="P(draw)")
    proba_away: float = Field(..., description="P(away win)")
    predicted_class: int = Field(
        ..., description="Argmax class: 0=home win, 1=draw, 2=away win"
    )
    predicted_label: str = Field(
        ..., description="Human-readable label: home_win | draw | away_win"
    )
    is_future: bool | None = None
    start_time_utc: datetime | None = None
    home_team_name: str | None = None
    away_team_name: str | None = None
    model_run_id: str | None = None
    model_stage: str | None = None
    predictions_computed_at: datetime | None = Field(
        None,
        description="UTC timestamp when predictions.parquet was last refreshed from MinIO.",
    )
    params: dict[str, str] = Field(default_factory=dict)
    feature_names: list[str] = Field(default_factory=list)
    created_at: str | None = None

MatchRawLive

Bases: BaseModel

Projected subset of MatchRaw for live-scores display.

Source code in src/app/schemas/models.py
class MatchRawLive(BaseModel):
    """Projected subset of MatchRaw for live-scores display."""

    id: int
    status: Optional[int]
    startTimeUtc: Optional[datetime]
    # Home team
    homeTeamName: Optional[str]
    homeScore: Optional[int]
    # Away team
    awayScore: Optional[int]
    awayTeamName: Optional[str]
    # Tournament
    tournamentName: Optional[str]
    stageName: Optional[str]
    regionName: Optional[str]
    sex: Optional[int]

Celery Tasks

Celery task: asynchronous match outcome prediction.

Submitted by POST /predict/async/ and executed by the ml Celery worker. The result is stored in the Celery result backend and can be retrieved via GET /monitoring/task_status/{task_id}.

The task result has the same shape as PredictResponse so the Streamlit polling page can display it directly.

Architecture note

The PredictionService is initialised once per worker process via the worker_process_init Celery signal. This avoids loading the MLflow model (potentially hundreds of MB from MinIO) on every task invocation.

predict_match(self, match_id, features, features_computed_at=None, model_stage=None)

Run 1×2 inference for match_id using pre-computed features.

Parameters:

Name Type Description Default
match_id int

Identifier used for logging and response tracing.

required
features dict

Feature dict matching the model's input schema, produced by the batch_inference DVC stage.

required
features_computed_at str | None

ISO-8601 UTC string of when the features were computed (batch_inference mtime). Stored in the response for end-to-end traceability.

None
model_stage str | None

MLflow alias/stage to use (e.g. "champion", "challenger"). Defaults to settings.mlflow.model_stage when None.

None

Returns:

Type Description
dict

Dict compatible with PredictResponse.

Source code in src/app/tasks/predict.py
@celery_app.task(
    name="predict_match",
    bind=True,
    max_retries=2,
    default_retry_delay=10,
    queue="ml",
)
def predict_match(
    self,
    match_id: int,
    features: dict,
    features_computed_at: str | None = None,
    model_stage: str | None = None,
) -> dict:
    """Run 1×2 inference for *match_id* using pre-computed *features*.

    Args:
        match_id: Identifier used for logging and response tracing.
        features: Feature dict matching the model's input schema, produced by
            the ``batch_inference`` DVC stage.
        features_computed_at: ISO-8601 UTC string of when the features were
            computed (batch_inference mtime).  Stored in the response for
            end-to-end traceability.
        model_stage: MLflow alias/stage to use (e.g. "champion", "challenger").
            Defaults to ``settings.mlflow.model_stage`` when None.

    Returns:
        Dict compatible with ``PredictResponse``.
    """
    self.update_state(
        state="PROGRESS",
        meta={
            "status": f"Running inference for match_id={match_id} stage={model_stage}…"
        },
    )

    try:
        fca: datetime | None = (
            datetime.fromisoformat(features_computed_at).replace(tzinfo=timezone.utc)
            if features_computed_at
            else None
        )
        _t0 = time.perf_counter()
        result = _get_service(model_stage).predict(
            features=features, match_id=match_id, features_computed_at=fca
        )
        INFERENCE_LATENCY.observe(time.perf_counter() - _t0)

        # Track predicted probability per outcome class.
        for label_key, prob in (
            result.get("prediction", {}).get("probabilities", {}).items()
        ):
            PREDICTION_CONFIDENCE.labels(
                outcome=_LABEL_NAMES.get(str(label_key), str(label_key))
            ).observe(float(prob))

        logger.info(
            "Async prediction complete: match_id=%s stage=%s run_id=%s cached=%s",
            match_id,
            model_stage,
            result.get("prediction", {}).get("model_run_id", "?"),
            result.get("cached", False),
        )
        return result
    except ValueError as exc:
        # Unknown stage — retrying won't help, fail immediately.
        logger.error("Unknown model stage: %s", exc)
        raise
    except Exception as exc:
        logger.exception("Async prediction failed: match_id=%s", match_id)
        raise self.retry(exc=exc)

get_model_info(self, model_stage=None)

Retrieve MLflow model metadata from the registry.

Parameters:

Name Type Description Default
model_stage str | None

Stage/alias to query. Defaults to settings.mlflow.model_stage.

None

Returns:

Type Description
dict

Dict compatible with ModelInfoResponse.

Source code in src/app/tasks/predict.py
@celery_app.task(
    name="get_model_info",
    bind=True,
    max_retries=1,
    default_retry_delay=5,
    queue="ml",
)
def get_model_info(self, model_stage: str | None = None) -> dict:
    """Retrieve MLflow model metadata from the registry.

    Args:
        model_stage: Stage/alias to query. Defaults to ``settings.mlflow.model_stage``.

    Returns:
        Dict compatible with ``ModelInfoResponse``.
    """
    try:
        return _get_service(model_stage).get_model_info()
    except ValueError as exc:
        logger.error("Unknown model stage: %s", exc)
        raise
    except Exception as exc:
        logger.exception("get_model_info task failed")
        raise self.retry(exc=exc)

export_data_raw(self, name_table)

Export data from a database table to a Parquet file in MinIO.

Source code in src/app/tasks/export.py
@celery_app.task(
    name="export_data_raw",
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    queue="api",
)
def export_data_raw(
    self,
    name_table: str,
):
    """Export data from a database table to a Parquet file in MinIO."""
    if name_table not in _ALLOWED_TABLES:
        raise ValueError(
            f"Table '{name_table}' is not in the export allowlist: {sorted(_ALLOWED_TABLES)}"
        )

    try:
        self.update_state(
            state="PROGRESS",
            meta={"status": f"Loading data for database table {name_table}..."},
        )

        with engine.connect() as conn:
            df = pd.read_sql_query(text(f"SELECT * FROM {name_table}"), conn)

        df.to_parquet(
            f"s3://{get_settings().minio.bucket_data_raw}/{name_table}.parquet",
            storage_options=get_settings().minio.storage_options,
        )

        self.update_state(
            state="PROGRESS", meta={"status": "Data exported to storage..."}
        )

        return {"status": f"Data from table {name_table} exported successfully"}
    except Exception as e:
        self.update_state(state="FAILURE", meta={"error": str(e)})
        raise

Prometheus Metrics

Prometheus metrics registry for the SoccerPredictAI service.

All metric objects are defined here as module-level singletons so they are shared across the FastAPI app and Celery worker within the same process.

Gunicorn multiprocess note

When running under Gunicorn with multiple workers, prometheus-client requires PROMETHEUS_MULTIPROC_DIR to be set to a writable directory. The /metrics endpoint in main.py uses MultiProcessCollector automatically when that variable is present.