| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- """Run selected analyzers over existing DB data or last run results.
- This is a scaffold to wire analyzers end-to-end once implementations are ready.
- """
- from __future__ import annotations
- import os
- import sys
- import pathlib
- from typing import Iterable, List
- import importlib
- # Ensure 'src' is on sys.path when running from repo root or scripts dir
- _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
- _ROOT = _SCRIPT_DIR.parent
- _SRC = _ROOT / "src"
- if _SRC.exists() and str(_SRC) not in sys.path:
- sys.path.insert(0, str(_SRC))
- def _ensure_src_on_path() -> None:
- """Put repository src on sys.path if available (no-op if already present)."""
- script_dir = pathlib.Path(__file__).resolve().parent
- root = script_dir.parent
- src = root / "src"
- if src.exists():
- sys.path.insert(0, str(src))
- def load_data(db, limit: int | None = None) -> List[dict]:
- """Load match documents from DB as analyzer inputs (scaffold)."""
- # NOTE: Adjust projection to include what analyzers need.
- return db.find("match", projection={"_id": 0}, limit=limit)
- def _safe_count(db, kind: str) -> int:
- """Best-effort count using find; avoids driver-specific count API."""
- try:
- docs = db.find(kind, projection={"_id": 1}, limit=None)
- return len(list(docs)) if docs is not None else 0
- except (RuntimeError, ValueError, TypeError): # diagnostics only
- return 0
- def main() -> None:
- """Build analyzers and run them on existing data (skeleton)."""
- # Ensure imports resolve regardless of cwd, then import dynamically
- _ensure_src_on_path()
- db_mod = importlib.import_module("databank.db")
- teams_mod = importlib.import_module("databank.analytics.teams")
- elo_mod = importlib.import_module("databank.analytics.elo")
- dc_mod = importlib.import_module("databank.analytics.dixon_coles")
- mc_mod = importlib.import_module("databank.analytics.markov_chain")
- h2h_mod = importlib.import_module("databank.analytics.h2h")
- calib_mod = importlib.import_module("databank.analytics.calibration")
- season_mc_mod = importlib.import_module("databank.analytics.monte_carlo")
- sos_mod = importlib.import_module("databank.analytics.sos")
- mongodb_cls = getattr(db_mod, "MongoDB")
- team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer")
- elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer")
- dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer")
- markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer")
- h2h_cls = getattr(h2h_mod, "H2HAnalyzer")
- calibration_cls = getattr(calib_mod, "CalibrationAnalyzer")
- season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer")
- sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer")
- uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
- name = os.getenv("DATABANK_DB_NAME", "databank")
- db = mongodb_cls(uri=uri, name=name)
- db.connect()
- data = load_data(db)
- print(f"Loaded matches: {len(data) if data is not None else 0}")
- analyzers: Iterable = [
- team_extractor_cls(),
- elo_analyzer_cls(),
- dixon_coles_cls(),
- markov_chain_cls(),
- h2h_cls(),
- calibration_cls(),
- sos_cls(),
- season_mc_cls(),
- ]
- # Prepare optional DC config from environment
- def _env_float(name: str, default: float) -> float:
- try:
- return (
- float(os.getenv(name, "")) if os.getenv(name) is not None else default
- )
- except ValueError:
- return default
- def _env_int(name: str, default: int) -> int:
- try:
- return int(os.getenv(name, "")) if os.getenv(name) is not None else default
- except ValueError:
- return default
- def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]:
- s = os.getenv(name)
- if not s:
- return default
- try:
- lo_str, hi_str = s.split(",", 1)
- return float(lo_str), float(hi_str)
- except (ValueError, TypeError):
- return default
- def _env_int_optional(name: str) -> int | None:
- s = os.getenv(name)
- if not s:
- return None
- try:
- return int(s)
- except ValueError:
- return None
- dc_kwargs = {
- "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0),
- "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)),
- "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01),
- "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20),
- "tol": _env_float("DATABANK_DC_TOL", 1e-4),
- }
- # Optional history configuration for DC
- history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower()
- if history_mode in {"none", "predictions", "snapshots", "both"}:
- dc_kwargs["history"] = history_mode
- dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10)
- dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10)
- dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8)
- # Progress controls
- dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in {
- "0",
- "false",
- "False",
- }
- dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100)
- dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000)
- dc_kwargs["skip_if_exists"] = os.getenv(
- "DATABANK_DC_SKIP_IF_EXISTS", "0"
- ).strip() not in {
- "0",
- "false",
- "False",
- }
- # Safety clamps/tuning knobs
- dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3)
- dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0)
- dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3)
- dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0)
- dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0)
- dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3)
- for analyzer in analyzers:
- print(f"Running analyzer: {analyzer.__class__.__name__}")
- try:
- analyzer.prepare(data)
- analyzer.validate(data)
- transformed = analyzer.transform(data)
- if isinstance(analyzer, dixon_coles_cls):
- # Pass DC-specific kwargs from environment
- result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
- print(" DC config:", dc_kwargs)
- elif isinstance(analyzer, calibration_cls):
- # Calibration env knobs
- cal_kwargs = {
- "source_kind": os.getenv(
- "DATABANK_CAL_SOURCE_KIND", "dc_predictions"
- ),
- "bins": _env_int("DATABANK_CAL_BINS", 10),
- # User asked higher min sample threshold, default 50
- "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50),
- "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip()
- not in {"0", "false", "False"},
- }
- since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS")
- until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS")
- if since_ts is not None:
- cal_kwargs["since_ts"] = since_ts
- if until_ts is not None:
- cal_kwargs["until_ts"] = until_ts
- season = os.getenv("DATABANK_CAL_SEASON")
- if season:
- cal_kwargs["season_filter"] = season
- groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE")
- if groups_inc:
- cal_kwargs["groups_include"] = groups_inc
- result = analyzer.compute(
- transformed, db=db, persist=True, **cal_kwargs
- )
- print(" CAL config:", cal_kwargs)
- # Console summary of calibration metrics per group
- if isinstance(result, dict) and isinstance(result.get("metrics"), dict):
- metrics = result["metrics"]
- print(" CAL summary (per group):")
- for gk, m in metrics.items():
- try:
- b = float(m.get("brier", float("nan")))
- ll = float(m.get("logloss", float("nan")))
- ece = float(m.get("ece", float("nan")))
- n = int(m.get("n", 0))
- print(
- (
- f" - {gk}: n={n} | Brier={b:.4f}"
- f" | LogLoss={ll:.4f} | ECE={ece:.4f}"
- )
- )
- except (ValueError, TypeError, KeyError):
- print(f" - {gk}: (metrics unavailable)")
- else:
- result = analyzer.compute(transformed, db=db, persist=True)
- analyzer.finalize(result)
- print(f" -> Done: {analyzer.__class__.__name__}")
- # Diagnostics: show where data is persisted for Elo
- if isinstance(analyzer, elo_analyzer_cls):
- ratings_cnt = _safe_count(db, "elo_ratings")
- history_cnt = _safe_count(db, "ratings_history")
- print(
- " Elo persisted to collections:",
- f"elo_ratings={ratings_cnt}",
- f"ratings_history={history_cnt}",
- )
- try:
- processed = (
- result.get("processed") if isinstance(result, dict) else None
- )
- print(f" Elo processed matches: {processed}")
- except (RuntimeError, ValueError, TypeError): # diagnostics only
- pass
- # Diagnostics: DC persistence and counts
- if isinstance(analyzer, dixon_coles_cls):
- dc_cnt = _safe_count(db, "dc_params")
- print(" DC persisted to collection:", f"dc_params={dc_cnt}")
- if isinstance(result, dict):
- mu = result.get("matches_used")
- persisted = result.get("persisted")
- print(
- " DC matches_used: "
- f"{mu}; persisted docs in this run: {persisted}"
- )
- gs = result.get("groups_skipped")
- if isinstance(gs, int) and gs > 0:
- print(f" DC groups skipped (skip_if_exists): {gs}")
- # Optional extra collections
- preds_cnt = _safe_count(db, "dc_predictions")
- snaps_cnt = _safe_count(db, "dc_params_history")
- if preds_cnt:
- print(" DC predictions count:", preds_cnt)
- if snaps_cnt:
- print(" DC params history count:", snaps_cnt)
- if isinstance(result, dict):
- pw = result.get("predictions_written")
- sw = result.get("snapshots_written")
- if pw is not None or sw is not None:
- print(
- f" DC written (streaming): predictions={pw}, snapshots={sw}"
- )
- except NotImplementedError as exc:
- print(f" -> Skipped (not implemented): {exc}")
- except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only
- print(f" -> Error: {type(exc).__name__}: {exc}")
- if __name__ == "__main__":
- main()
|