"""Run selected analyzers over existing DB data or last run results. This is a scaffold to wire analyzers end-to-end once implementations are ready. """ from __future__ import annotations import os import sys import pathlib from typing import Iterable, List import importlib # Ensure 'src' is on sys.path when running from repo root or scripts dir _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent _ROOT = _SCRIPT_DIR.parent _SRC = _ROOT / "src" if _SRC.exists() and str(_SRC) not in sys.path: sys.path.insert(0, str(_SRC)) def _ensure_src_on_path() -> None: """Put repository src on sys.path if available (no-op if already present).""" script_dir = pathlib.Path(__file__).resolve().parent root = script_dir.parent src = root / "src" if src.exists(): sys.path.insert(0, str(src)) def load_data(db, limit: int | None = None) -> List[dict]: """Load match documents from DB as analyzer inputs (scaffold).""" # NOTE: Adjust projection to include what analyzers need. return db.find("match", projection={"_id": 0}, limit=limit) def _safe_count(db, kind: str) -> int: """Best-effort count using find; avoids driver-specific count API.""" try: docs = db.find(kind, projection={"_id": 1}, limit=None) return len(list(docs)) if docs is not None else 0 except (RuntimeError, ValueError, TypeError): # diagnostics only return 0 def main() -> None: """Build analyzers and run them on existing data (skeleton).""" # Ensure imports resolve regardless of cwd, then import dynamically _ensure_src_on_path() db_mod = importlib.import_module("databank.db") teams_mod = importlib.import_module("databank.analytics.teams") elo_mod = importlib.import_module("databank.analytics.elo") dc_mod = importlib.import_module("databank.analytics.dixon_coles") mc_mod = importlib.import_module("databank.analytics.markov_chain") h2h_mod = importlib.import_module("databank.analytics.h2h") calib_mod = importlib.import_module("databank.analytics.calibration") season_mc_mod = importlib.import_module("databank.analytics.monte_carlo") sos_mod = importlib.import_module("databank.analytics.sos") mongodb_cls = getattr(db_mod, "MongoDB") team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer") elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer") dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer") markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer") h2h_cls = getattr(h2h_mod, "H2HAnalyzer") calibration_cls = getattr(calib_mod, "CalibrationAnalyzer") season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer") sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer") uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017") name = os.getenv("DATABANK_DB_NAME", "databank") db = mongodb_cls(uri=uri, name=name) db.connect() data = load_data(db) print(f"Loaded matches: {len(data) if data is not None else 0}") analyzers: Iterable = [ team_extractor_cls(), elo_analyzer_cls(), dixon_coles_cls(), markov_chain_cls(), h2h_cls(), calibration_cls(), sos_cls(), season_mc_cls(), ] # Prepare optional DC config from environment def _env_float(name: str, default: float) -> float: try: return ( float(os.getenv(name, "")) if os.getenv(name) is not None else default ) except ValueError: return default def _env_int(name: str, default: int) -> int: try: return int(os.getenv(name, "")) if os.getenv(name) is not None else default except ValueError: return default def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]: s = os.getenv(name) if not s: return default try: lo_str, hi_str = s.split(",", 1) return float(lo_str), float(hi_str) except (ValueError, TypeError): return default dc_kwargs = { "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0), "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)), "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01), "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20), "tol": _env_float("DATABANK_DC_TOL", 1e-4), } # Optional history configuration for DC history_mode = os.getenv("DATABANK_DC_HISTORY", "none").strip().lower() if history_mode in {"none", "predictions", "snapshots"}: dc_kwargs["history"] = history_mode dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10) dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10) dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8) for analyzer in analyzers: print(f"Running analyzer: {analyzer.__class__.__name__}") try: analyzer.prepare(data) analyzer.validate(data) transformed = analyzer.transform(data) if isinstance(analyzer, dixon_coles_cls): # Pass DC-specific kwargs from environment result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs) print(" DC config:", dc_kwargs) else: result = analyzer.compute(transformed, db=db, persist=True) analyzer.finalize(result) print(f" -> Done: {analyzer.__class__.__name__}") # Diagnostics: show where data is persisted for Elo if isinstance(analyzer, elo_analyzer_cls): ratings_cnt = _safe_count(db, "elo_ratings") history_cnt = _safe_count(db, "ratings_history") print( " Elo persisted to collections:", f"elo_ratings={ratings_cnt}", f"ratings_history={history_cnt}", ) try: processed = ( result.get("processed") if isinstance(result, dict) else None ) print(f" Elo processed matches: {processed}") except (RuntimeError, ValueError, TypeError): # diagnostics only pass # Diagnostics: DC persistence and counts if isinstance(analyzer, dixon_coles_cls): dc_cnt = _safe_count(db, "dc_params") print(" DC persisted to collection:", f"dc_params={dc_cnt}") if isinstance(result, dict): mu = result.get("matches_used") persisted = result.get("persisted") print( f" DC matches_used: {mu}; persisted docs in this run: {persisted}" ) # Optional extra collections preds_cnt = _safe_count(db, "dc_predictions") snaps_cnt = _safe_count(db, "dc_params_history") if preds_cnt: print(" DC predictions count:", preds_cnt) if snaps_cnt: print(" DC params history count:", snaps_cnt) except NotImplementedError as exc: print(f" -> Skipped (not implemented): {exc}") except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only print(f" -> Error: {type(exc).__name__}: {exc}") if __name__ == "__main__": main()