"""Run selected analyzers over existing DB data or last run results. This is a scaffold to wire analyzers end-to-end once implementations are ready. """ from __future__ import annotations import os import sys import pathlib from typing import Iterable, List, Any import importlib # Ensure 'src' is on sys.path when running from repo root or scripts dir _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent _ROOT = _SCRIPT_DIR.parent _SRC = _ROOT / "src" if _SRC.exists() and str(_SRC) not in sys.path: sys.path.insert(0, str(_SRC)) def _ensure_src_on_path() -> None: """Put repository src on sys.path if available (no-op if already present).""" script_dir = pathlib.Path(__file__).resolve().parent root = script_dir.parent src = root / "src" if src.exists(): sys.path.insert(0, str(src)) def load_data(db, limit: int | None = None) -> List[dict]: """Load match documents from DB as analyzer inputs (scaffold).""" # NOTE: Adjust projection to include what analyzers need. return db.find("match", projection={"_id": 0}, limit=limit) def _safe_count(db, kind: str) -> int: """Best-effort count using find; avoids driver-specific count API.""" try: docs = db.find(kind, projection={"_id": 1}, limit=None) return len(list(docs)) if docs is not None else 0 except (RuntimeError, ValueError, TypeError): # diagnostics only return 0 def main() -> None: """Build analyzers and run them on existing data (skeleton).""" # Ensure imports resolve regardless of cwd, then import dynamically _ensure_src_on_path() db_mod = importlib.import_module("databank.db") teams_mod = importlib.import_module("databank.analytics.teams") elo_mod = importlib.import_module("databank.analytics.elo") dc_mod = importlib.import_module("databank.analytics.dixon_coles") mc_mod = importlib.import_module("databank.analytics.markov_chain") h2h_mod = importlib.import_module("databank.analytics.h2h") calib_mod = importlib.import_module("databank.analytics.calibration") season_mc_mod = importlib.import_module("databank.analytics.monte_carlo") sos_mod = importlib.import_module("databank.analytics.sos") mongodb_cls = getattr(db_mod, "MongoDB") team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer") elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer") dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer") markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer") h2h_cls = getattr(h2h_mod, "H2HAnalyzer") calibration_cls = getattr(calib_mod, "CalibrationAnalyzer") season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer") sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer") uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017") name = os.getenv("DATABANK_DB_NAME", "databank") db = mongodb_cls(uri=uri, name=name) db.connect() data = load_data(db) print(f"Loaded matches: {len(data) if data is not None else 0}") # Control whether to include the Dixon–Coles analyzer (expensive) via env enable_dc = os.getenv("DATABANK_ENABLE_DC", "0").strip() not in { "0", "false", "False", } analyzers_list: List[Any] = [ team_extractor_cls(), elo_analyzer_cls(), # DC is optional; include only when explicitly enabled # dixon_coles_cls(), markov_chain_cls(), h2h_cls(), calibration_cls(), sos_cls(), season_mc_cls(), ] if enable_dc: # Insert DC after Elo for deterministic ordering analyzers_list.insert(2, dixon_coles_cls()) print("[RUN] Dixon–Coles enabled via DATABANK_ENABLE_DC", flush=True) else: print( "[RUN] Dixon–Coles disabled by default (set DATABANK_ENABLE_DC=1 to enable)", flush=True, ) analyzers: Iterable = analyzers_list # Prepare optional DC config from environment def _env_float(name: str, default: float) -> float: try: return ( float(os.getenv(name, "")) if os.getenv(name) is not None else default ) except ValueError: return default def _env_int(name: str, default: int) -> int: try: return int(os.getenv(name, "")) if os.getenv(name) is not None else default except ValueError: return default def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]: s = os.getenv(name) if not s: return default try: lo_str, hi_str = s.split(",", 1) return float(lo_str), float(hi_str) except (ValueError, TypeError): return default def _env_int_optional(name: str) -> int | None: s = os.getenv(name) if not s: return None try: return int(s) except ValueError: return None dc_kwargs = { "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0), "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)), "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01), "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20), "tol": _env_float("DATABANK_DC_TOL", 1e-4), } # Optional history configuration for DC history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower() if history_mode in {"none", "predictions", "snapshots", "both"}: dc_kwargs["history"] = history_mode dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10) dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10) dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8) # Progress controls dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in { "0", "false", "False", } dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100) dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000) dc_kwargs["skip_if_exists"] = os.getenv( "DATABANK_DC_SKIP_IF_EXISTS", "0" ).strip() not in { "0", "false", "False", } # Safety clamps/tuning knobs dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3) dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0) dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3) dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0) dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0) dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3) for analyzer in analyzers: print(f"Running analyzer: {analyzer.__class__.__name__}") try: analyzer.prepare(data) analyzer.validate(data) transformed = analyzer.transform(data) if isinstance(analyzer, dixon_coles_cls): # Pass DC-specific kwargs from environment result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs) print(" DC config:", dc_kwargs) elif isinstance(analyzer, h2h_cls): # H2H env knobs h2h_kwargs = { "group_by": os.getenv("DATABANK_H2H_GROUP_BY", "league"), "separate_home_away": os.getenv( "DATABANK_H2H_SEPARATE_HOME_AWAY", "1" ).strip() not in {"0", "false", "False"}, "recent_window": _env_int("DATABANK_H2H_RECENT_WINDOW", 16), } result = analyzer.compute( transformed, db=db, persist=True, **h2h_kwargs ) print(" H2H config:", h2h_kwargs) # Diagnostics summary try: pairs = ( len(result.get("pairs", [])) if isinstance(result, dict) else None ) persisted = ( result.get("persisted") if isinstance(result, dict) else None ) total_docs = _safe_count(db, "h2h_summary") print( ( f" H2H pairs={pairs} | persisted={persisted}" f" | h2h_summary_total={total_docs}" ) ) except (RuntimeError, ValueError, TypeError): pass elif isinstance(analyzer, calibration_cls): # Calibration env knobs cal_kwargs = { "source_kind": os.getenv( "DATABANK_CAL_SOURCE_KIND", "dc_predictions" ), "bins": _env_int("DATABANK_CAL_BINS", 10), # User asked higher min sample threshold, default 50 "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50), "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip() not in {"0", "false", "False"}, } # History snapshots for calibration (optional) cal_history = ( os.getenv("DATABANK_CAL_HISTORY", "snapshots").strip().lower() ) if cal_history in {"none", "snapshots", "both"}: cal_kwargs["history"] = cal_history cal_kwargs["snapshot_every"] = _env_int( "DATABANK_CAL_SNAPSHOT_EVERY", 1000 ) pdays = _env_int_optional("DATABANK_CAL_PERIOD_DAYS") if pdays is not None: cal_kwargs["period_days"] = pdays else: cal_kwargs["period_days"] = 30 msnaps = _env_int_optional("DATABANK_CAL_MAX_SNAPSHOTS") if msnaps is not None: cal_kwargs["max_snapshots"] = msnaps since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS") until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS") if since_ts is not None: cal_kwargs["since_ts"] = since_ts if until_ts is not None: cal_kwargs["until_ts"] = until_ts season = os.getenv("DATABANK_CAL_SEASON") if season: cal_kwargs["season_filter"] = season groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE") if groups_inc: cal_kwargs["groups_include"] = groups_inc result = analyzer.compute( transformed, db=db, persist=True, **cal_kwargs ) print(" CAL config:", cal_kwargs) # Console summary of calibration metrics per group if isinstance(result, dict) and isinstance(result.get("metrics"), dict): metrics = result["metrics"] print(" CAL summary (per group):") for gk, m in metrics.items(): try: b = float(m.get("brier", float("nan"))) ll = float(m.get("logloss", float("nan"))) ece = float(m.get("ece", float("nan"))) n = int(m.get("n", 0)) print( ( f" - {gk}: n={n} | Brier={b:.4f}" f" | LogLoss={ll:.4f} | ECE={ece:.4f}" ) ) except (ValueError, TypeError, KeyError): print(f" - {gk}: (metrics unavailable)") # Optional: print snapshot writes if present if isinstance(result, dict): sm = result.get("snap_metrics_written") sb = result.get("snap_bins_written") if (isinstance(sm, int) and sm > 0) or ( isinstance(sb, int) and sb > 0 ): print( ( f" CAL snapshots persisted: metrics={sm or 0}" f" bins={sb or 0}" ) ) else: result = analyzer.compute(transformed, db=db, persist=True) analyzer.finalize(result) print(f" -> Done: {analyzer.__class__.__name__}") # Diagnostics: show where data is persisted for Elo if isinstance(analyzer, elo_analyzer_cls): ratings_cnt = _safe_count(db, "elo_ratings") history_cnt = _safe_count(db, "ratings_history") print( " Elo persisted to collections:", f"elo_ratings={ratings_cnt}", f"ratings_history={history_cnt}", ) try: processed = ( result.get("processed") if isinstance(result, dict) else None ) print(f" Elo processed matches: {processed}") except (RuntimeError, ValueError, TypeError): # diagnostics only pass # Diagnostics: DC persistence and counts if isinstance(analyzer, dixon_coles_cls): dc_cnt = _safe_count(db, "dc_params") print(" DC persisted to collection:", f"dc_params={dc_cnt}") if isinstance(result, dict): mu = result.get("matches_used") persisted = result.get("persisted") print( " DC matches_used: " f"{mu}; persisted docs in this run: {persisted}" ) gs = result.get("groups_skipped") if isinstance(gs, int) and gs > 0: print(f" DC groups skipped (skip_if_exists): {gs}") # Optional extra collections preds_cnt = _safe_count(db, "dc_predictions") snaps_cnt = _safe_count(db, "dc_params_history") if preds_cnt: print(" DC predictions count:", preds_cnt) if snaps_cnt: print(" DC params history count:", snaps_cnt) if isinstance(result, dict): pw = result.get("predictions_written") sw = result.get("snapshots_written") if pw is not None or sw is not None: print( f" DC written (streaming): predictions={pw}, snapshots={sw}" ) except NotImplementedError as exc: print(f" -> Skipped (not implemented): {exc}") except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only print(f" -> Error: {type(exc).__name__}: {exc}") if __name__ == "__main__": main()