run_analyzers.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. """Run selected analyzers over existing DB data or last run results.
  2. This is a scaffold to wire analyzers end-to-end once implementations are ready.
  3. """
  4. from __future__ import annotations
  5. import os
  6. import sys
  7. import pathlib
  8. from typing import Iterable, List, Any
  9. import importlib
  10. # Ensure 'src' is on sys.path when running from repo root or scripts dir
  11. _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
  12. _ROOT = _SCRIPT_DIR.parent
  13. _SRC = _ROOT / "src"
  14. if _SRC.exists() and str(_SRC) not in sys.path:
  15. sys.path.insert(0, str(_SRC))
  16. def _ensure_src_on_path() -> None:
  17. """Put repository src on sys.path if available (no-op if already present)."""
  18. script_dir = pathlib.Path(__file__).resolve().parent
  19. root = script_dir.parent
  20. src = root / "src"
  21. if src.exists():
  22. sys.path.insert(0, str(src))
  23. def load_data(db, limit: int | None = None) -> List[dict]:
  24. """Load match documents from DB as analyzer inputs (scaffold)."""
  25. # NOTE: Adjust projection to include what analyzers need.
  26. return db.find("match", projection={"_id": 0}, limit=limit)
  27. def _safe_count(db, kind: str) -> int:
  28. """Best-effort count using find; avoids driver-specific count API."""
  29. try:
  30. docs = db.find(kind, projection={"_id": 1}, limit=None)
  31. return len(list(docs)) if docs is not None else 0
  32. except (RuntimeError, ValueError, TypeError): # diagnostics only
  33. return 0
  34. def main() -> None:
  35. """Build analyzers and run them on existing data (skeleton)."""
  36. # Ensure imports resolve regardless of cwd, then import dynamically
  37. _ensure_src_on_path()
  38. db_mod = importlib.import_module("databank.db")
  39. teams_mod = importlib.import_module("databank.analytics.teams")
  40. elo_mod = importlib.import_module("databank.analytics.elo")
  41. dc_mod = importlib.import_module("databank.analytics.dixon_coles")
  42. mc_mod = importlib.import_module("databank.analytics.markov_chain")
  43. h2h_mod = importlib.import_module("databank.analytics.h2h")
  44. calib_mod = importlib.import_module("databank.analytics.calibration")
  45. season_mc_mod = importlib.import_module("databank.analytics.monte_carlo")
  46. sos_mod = importlib.import_module("databank.analytics.sos")
  47. mongodb_cls = getattr(db_mod, "MongoDB")
  48. team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer")
  49. elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer")
  50. dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer")
  51. markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer")
  52. h2h_cls = getattr(h2h_mod, "H2HAnalyzer")
  53. calibration_cls = getattr(calib_mod, "CalibrationAnalyzer")
  54. season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer")
  55. sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer")
  56. uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
  57. name = os.getenv("DATABANK_DB_NAME", "databank")
  58. db = mongodb_cls(uri=uri, name=name)
  59. db.connect()
  60. data = load_data(db)
  61. print(f"Loaded matches: {len(data) if data is not None else 0}")
  62. # Control whether to include the Dixon–Coles analyzer (expensive) via env
  63. enable_dc = os.getenv("DATABANK_ENABLE_DC", "0").strip() not in {
  64. "0",
  65. "false",
  66. "False",
  67. }
  68. analyzers_list: List[Any] = [
  69. team_extractor_cls(),
  70. elo_analyzer_cls(),
  71. # DC is optional; include only when explicitly enabled
  72. # dixon_coles_cls(),
  73. markov_chain_cls(),
  74. h2h_cls(),
  75. calibration_cls(),
  76. sos_cls(),
  77. season_mc_cls(),
  78. ]
  79. if enable_dc:
  80. # Insert DC after Elo for deterministic ordering
  81. analyzers_list.insert(2, dixon_coles_cls())
  82. print("[RUN] Dixon–Coles enabled via DATABANK_ENABLE_DC", flush=True)
  83. else:
  84. print(
  85. "[RUN] Dixon–Coles disabled by default (set DATABANK_ENABLE_DC=1 to enable)",
  86. flush=True,
  87. )
  88. analyzers: Iterable = analyzers_list
  89. # Prepare optional DC config from environment
  90. def _env_float(name: str, default: float) -> float:
  91. try:
  92. return (
  93. float(os.getenv(name, "")) if os.getenv(name) is not None else default
  94. )
  95. except ValueError:
  96. return default
  97. def _env_int(name: str, default: int) -> int:
  98. try:
  99. return int(os.getenv(name, "")) if os.getenv(name) is not None else default
  100. except ValueError:
  101. return default
  102. def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]:
  103. s = os.getenv(name)
  104. if not s:
  105. return default
  106. try:
  107. lo_str, hi_str = s.split(",", 1)
  108. return float(lo_str), float(hi_str)
  109. except (ValueError, TypeError):
  110. return default
  111. def _env_int_optional(name: str) -> int | None:
  112. s = os.getenv(name)
  113. if not s:
  114. return None
  115. try:
  116. return int(s)
  117. except ValueError:
  118. return None
  119. dc_kwargs = {
  120. "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0),
  121. "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)),
  122. "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01),
  123. "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20),
  124. "tol": _env_float("DATABANK_DC_TOL", 1e-4),
  125. }
  126. # Optional history configuration for DC
  127. history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower()
  128. if history_mode in {"none", "predictions", "snapshots", "both"}:
  129. dc_kwargs["history"] = history_mode
  130. dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10)
  131. dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10)
  132. dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8)
  133. # Progress controls
  134. dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in {
  135. "0",
  136. "false",
  137. "False",
  138. }
  139. dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100)
  140. dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000)
  141. dc_kwargs["skip_if_exists"] = os.getenv(
  142. "DATABANK_DC_SKIP_IF_EXISTS", "0"
  143. ).strip() not in {
  144. "0",
  145. "false",
  146. "False",
  147. }
  148. # Safety clamps/tuning knobs
  149. dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3)
  150. dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0)
  151. dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3)
  152. dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0)
  153. dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0)
  154. dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3)
  155. for analyzer in analyzers:
  156. print(f"Running analyzer: {analyzer.__class__.__name__}")
  157. try:
  158. analyzer.prepare(data)
  159. analyzer.validate(data)
  160. transformed = analyzer.transform(data)
  161. if isinstance(analyzer, dixon_coles_cls):
  162. # Pass DC-specific kwargs from environment
  163. result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
  164. print(" DC config:", dc_kwargs)
  165. elif isinstance(analyzer, h2h_cls):
  166. # H2H env knobs
  167. h2h_kwargs = {
  168. "group_by": os.getenv("DATABANK_H2H_GROUP_BY", "league"),
  169. "separate_home_away": os.getenv(
  170. "DATABANK_H2H_SEPARATE_HOME_AWAY", "1"
  171. ).strip()
  172. not in {"0", "false", "False"},
  173. "recent_window": _env_int("DATABANK_H2H_RECENT_WINDOW", 16),
  174. }
  175. result = analyzer.compute(
  176. transformed, db=db, persist=True, **h2h_kwargs
  177. )
  178. print(" H2H config:", h2h_kwargs)
  179. # Diagnostics summary
  180. try:
  181. pairs = (
  182. len(result.get("pairs", []))
  183. if isinstance(result, dict)
  184. else None
  185. )
  186. persisted = (
  187. result.get("persisted") if isinstance(result, dict) else None
  188. )
  189. total_docs = _safe_count(db, "h2h_summary")
  190. print(
  191. (
  192. f" H2H pairs={pairs} | persisted={persisted}"
  193. f" | h2h_summary_total={total_docs}"
  194. )
  195. )
  196. except (RuntimeError, ValueError, TypeError):
  197. pass
  198. elif isinstance(analyzer, calibration_cls):
  199. # Calibration env knobs
  200. cal_kwargs = {
  201. "source_kind": os.getenv(
  202. "DATABANK_CAL_SOURCE_KIND", "dc_predictions"
  203. ),
  204. "bins": _env_int("DATABANK_CAL_BINS", 10),
  205. # User asked higher min sample threshold, default 50
  206. "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50),
  207. "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip()
  208. not in {"0", "false", "False"},
  209. }
  210. # History snapshots for calibration (optional)
  211. cal_history = (
  212. os.getenv("DATABANK_CAL_HISTORY", "snapshots").strip().lower()
  213. )
  214. if cal_history in {"none", "snapshots", "both"}:
  215. cal_kwargs["history"] = cal_history
  216. cal_kwargs["snapshot_every"] = _env_int(
  217. "DATABANK_CAL_SNAPSHOT_EVERY", 1000
  218. )
  219. pdays = _env_int_optional("DATABANK_CAL_PERIOD_DAYS")
  220. if pdays is not None:
  221. cal_kwargs["period_days"] = pdays
  222. else:
  223. cal_kwargs["period_days"] = 30
  224. msnaps = _env_int_optional("DATABANK_CAL_MAX_SNAPSHOTS")
  225. if msnaps is not None:
  226. cal_kwargs["max_snapshots"] = msnaps
  227. since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS")
  228. until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS")
  229. if since_ts is not None:
  230. cal_kwargs["since_ts"] = since_ts
  231. if until_ts is not None:
  232. cal_kwargs["until_ts"] = until_ts
  233. season = os.getenv("DATABANK_CAL_SEASON")
  234. if season:
  235. cal_kwargs["season_filter"] = season
  236. groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE")
  237. if groups_inc:
  238. cal_kwargs["groups_include"] = groups_inc
  239. result = analyzer.compute(
  240. transformed, db=db, persist=True, **cal_kwargs
  241. )
  242. print(" CAL config:", cal_kwargs)
  243. # Console summary of calibration metrics per group
  244. if isinstance(result, dict) and isinstance(result.get("metrics"), dict):
  245. metrics = result["metrics"]
  246. print(" CAL summary (per group):")
  247. for gk, m in metrics.items():
  248. try:
  249. b = float(m.get("brier", float("nan")))
  250. ll = float(m.get("logloss", float("nan")))
  251. ece = float(m.get("ece", float("nan")))
  252. n = int(m.get("n", 0))
  253. print(
  254. (
  255. f" - {gk}: n={n} | Brier={b:.4f}"
  256. f" | LogLoss={ll:.4f} | ECE={ece:.4f}"
  257. )
  258. )
  259. except (ValueError, TypeError, KeyError):
  260. print(f" - {gk}: (metrics unavailable)")
  261. # Optional: print snapshot writes if present
  262. if isinstance(result, dict):
  263. sm = result.get("snap_metrics_written")
  264. sb = result.get("snap_bins_written")
  265. if (isinstance(sm, int) and sm > 0) or (
  266. isinstance(sb, int) and sb > 0
  267. ):
  268. print(
  269. (
  270. f" CAL snapshots persisted: metrics={sm or 0}"
  271. f" bins={sb or 0}"
  272. )
  273. )
  274. else:
  275. result = analyzer.compute(transformed, db=db, persist=True)
  276. analyzer.finalize(result)
  277. print(f" -> Done: {analyzer.__class__.__name__}")
  278. # Diagnostics: show where data is persisted for Elo
  279. if isinstance(analyzer, elo_analyzer_cls):
  280. ratings_cnt = _safe_count(db, "elo_ratings")
  281. history_cnt = _safe_count(db, "ratings_history")
  282. print(
  283. " Elo persisted to collections:",
  284. f"elo_ratings={ratings_cnt}",
  285. f"ratings_history={history_cnt}",
  286. )
  287. try:
  288. processed = (
  289. result.get("processed") if isinstance(result, dict) else None
  290. )
  291. print(f" Elo processed matches: {processed}")
  292. except (RuntimeError, ValueError, TypeError): # diagnostics only
  293. pass
  294. # Diagnostics: DC persistence and counts
  295. if isinstance(analyzer, dixon_coles_cls):
  296. dc_cnt = _safe_count(db, "dc_params")
  297. print(" DC persisted to collection:", f"dc_params={dc_cnt}")
  298. if isinstance(result, dict):
  299. mu = result.get("matches_used")
  300. persisted = result.get("persisted")
  301. print(
  302. " DC matches_used: "
  303. f"{mu}; persisted docs in this run: {persisted}"
  304. )
  305. gs = result.get("groups_skipped")
  306. if isinstance(gs, int) and gs > 0:
  307. print(f" DC groups skipped (skip_if_exists): {gs}")
  308. # Optional extra collections
  309. preds_cnt = _safe_count(db, "dc_predictions")
  310. snaps_cnt = _safe_count(db, "dc_params_history")
  311. if preds_cnt:
  312. print(" DC predictions count:", preds_cnt)
  313. if snaps_cnt:
  314. print(" DC params history count:", snaps_cnt)
  315. if isinstance(result, dict):
  316. pw = result.get("predictions_written")
  317. sw = result.get("snapshots_written")
  318. if pw is not None or sw is not None:
  319. print(
  320. f" DC written (streaming): predictions={pw}, snapshots={sw}"
  321. )
  322. except NotImplementedError as exc:
  323. print(f" -> Skipped (not implemented): {exc}")
  324. except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only
  325. print(f" -> Error: {type(exc).__name__}: {exc}")
  326. if __name__ == "__main__":
  327. main()