run_analyzers.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """Run selected analyzers over existing DB data or last run results.
  2. This is a scaffold to wire analyzers end-to-end once implementations are ready.
  3. """
  4. from __future__ import annotations
  5. import os
  6. import sys
  7. import pathlib
  8. from typing import Iterable, List
  9. import importlib
  10. # Ensure 'src' is on sys.path when running from repo root or scripts dir
  11. _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
  12. _ROOT = _SCRIPT_DIR.parent
  13. _SRC = _ROOT / "src"
  14. if _SRC.exists() and str(_SRC) not in sys.path:
  15. sys.path.insert(0, str(_SRC))
  16. def _ensure_src_on_path() -> None:
  17. """Put repository src on sys.path if available (no-op if already present)."""
  18. script_dir = pathlib.Path(__file__).resolve().parent
  19. root = script_dir.parent
  20. src = root / "src"
  21. if src.exists():
  22. sys.path.insert(0, str(src))
  23. def load_data(db, limit: int | None = None) -> List[dict]:
  24. """Load match documents from DB as analyzer inputs (scaffold)."""
  25. # NOTE: Adjust projection to include what analyzers need.
  26. return db.find("match", projection={"_id": 0}, limit=limit)
  27. def _safe_count(db, kind: str) -> int:
  28. """Best-effort count using find; avoids driver-specific count API."""
  29. try:
  30. docs = db.find(kind, projection={"_id": 1}, limit=None)
  31. return len(list(docs)) if docs is not None else 0
  32. except (RuntimeError, ValueError, TypeError): # diagnostics only
  33. return 0
  34. def main() -> None:
  35. """Build analyzers and run them on existing data (skeleton)."""
  36. # Ensure imports resolve regardless of cwd, then import dynamically
  37. _ensure_src_on_path()
  38. db_mod = importlib.import_module("databank.db")
  39. teams_mod = importlib.import_module("databank.analytics.teams")
  40. elo_mod = importlib.import_module("databank.analytics.elo")
  41. dc_mod = importlib.import_module("databank.analytics.dixon_coles")
  42. mc_mod = importlib.import_module("databank.analytics.markov_chain")
  43. h2h_mod = importlib.import_module("databank.analytics.h2h")
  44. calib_mod = importlib.import_module("databank.analytics.calibration")
  45. season_mc_mod = importlib.import_module("databank.analytics.monte_carlo")
  46. sos_mod = importlib.import_module("databank.analytics.sos")
  47. mongodb_cls = getattr(db_mod, "MongoDB")
  48. team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer")
  49. elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer")
  50. dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer")
  51. markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer")
  52. h2h_cls = getattr(h2h_mod, "H2HAnalyzer")
  53. calibration_cls = getattr(calib_mod, "CalibrationAnalyzer")
  54. season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer")
  55. sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer")
  56. uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
  57. name = os.getenv("DATABANK_DB_NAME", "databank")
  58. db = mongodb_cls(uri=uri, name=name)
  59. db.connect()
  60. data = load_data(db)
  61. print(f"Loaded matches: {len(data) if data is not None else 0}")
  62. analyzers: Iterable = [
  63. team_extractor_cls(),
  64. elo_analyzer_cls(),
  65. dixon_coles_cls(),
  66. markov_chain_cls(),
  67. h2h_cls(),
  68. calibration_cls(),
  69. sos_cls(),
  70. season_mc_cls(),
  71. ]
  72. # Prepare optional DC config from environment
  73. def _env_float(name: str, default: float) -> float:
  74. try:
  75. return (
  76. float(os.getenv(name, "")) if os.getenv(name) is not None else default
  77. )
  78. except ValueError:
  79. return default
  80. def _env_int(name: str, default: int) -> int:
  81. try:
  82. return int(os.getenv(name, "")) if os.getenv(name) is not None else default
  83. except ValueError:
  84. return default
  85. def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]:
  86. s = os.getenv(name)
  87. if not s:
  88. return default
  89. try:
  90. lo_str, hi_str = s.split(",", 1)
  91. return float(lo_str), float(hi_str)
  92. except (ValueError, TypeError):
  93. return default
  94. def _env_int_optional(name: str) -> int | None:
  95. s = os.getenv(name)
  96. if not s:
  97. return None
  98. try:
  99. return int(s)
  100. except ValueError:
  101. return None
  102. dc_kwargs = {
  103. "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0),
  104. "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)),
  105. "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01),
  106. "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20),
  107. "tol": _env_float("DATABANK_DC_TOL", 1e-4),
  108. }
  109. # Optional history configuration for DC
  110. history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower()
  111. if history_mode in {"none", "predictions", "snapshots", "both"}:
  112. dc_kwargs["history"] = history_mode
  113. dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10)
  114. dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10)
  115. dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8)
  116. # Progress controls
  117. dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in {
  118. "0",
  119. "false",
  120. "False",
  121. }
  122. dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100)
  123. dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000)
  124. dc_kwargs["skip_if_exists"] = os.getenv(
  125. "DATABANK_DC_SKIP_IF_EXISTS", "0"
  126. ).strip() not in {
  127. "0",
  128. "false",
  129. "False",
  130. }
  131. # Safety clamps/tuning knobs
  132. dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3)
  133. dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0)
  134. dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3)
  135. dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0)
  136. dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0)
  137. dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3)
  138. for analyzer in analyzers:
  139. print(f"Running analyzer: {analyzer.__class__.__name__}")
  140. try:
  141. analyzer.prepare(data)
  142. analyzer.validate(data)
  143. transformed = analyzer.transform(data)
  144. if isinstance(analyzer, dixon_coles_cls):
  145. # Pass DC-specific kwargs from environment
  146. result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
  147. print(" DC config:", dc_kwargs)
  148. elif isinstance(analyzer, calibration_cls):
  149. # Calibration env knobs
  150. cal_kwargs = {
  151. "source_kind": os.getenv(
  152. "DATABANK_CAL_SOURCE_KIND", "dc_predictions"
  153. ),
  154. "bins": _env_int("DATABANK_CAL_BINS", 10),
  155. # User asked higher min sample threshold, default 50
  156. "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50),
  157. "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip()
  158. not in {"0", "false", "False"},
  159. }
  160. # History snapshots for calibration (optional)
  161. cal_history = (
  162. os.getenv("DATABANK_CAL_HISTORY", "snapshots").strip().lower()
  163. )
  164. if cal_history in {"none", "snapshots", "both"}:
  165. cal_kwargs["history"] = cal_history
  166. cal_kwargs["snapshot_every"] = _env_int(
  167. "DATABANK_CAL_SNAPSHOT_EVERY", 1000
  168. )
  169. pdays = _env_int_optional("DATABANK_CAL_PERIOD_DAYS")
  170. if pdays is not None:
  171. cal_kwargs["period_days"] = pdays
  172. else:
  173. cal_kwargs["period_days"] = 30
  174. msnaps = _env_int_optional("DATABANK_CAL_MAX_SNAPSHOTS")
  175. if msnaps is not None:
  176. cal_kwargs["max_snapshots"] = msnaps
  177. since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS")
  178. until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS")
  179. if since_ts is not None:
  180. cal_kwargs["since_ts"] = since_ts
  181. if until_ts is not None:
  182. cal_kwargs["until_ts"] = until_ts
  183. season = os.getenv("DATABANK_CAL_SEASON")
  184. if season:
  185. cal_kwargs["season_filter"] = season
  186. groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE")
  187. if groups_inc:
  188. cal_kwargs["groups_include"] = groups_inc
  189. result = analyzer.compute(
  190. transformed, db=db, persist=True, **cal_kwargs
  191. )
  192. print(" CAL config:", cal_kwargs)
  193. # Console summary of calibration metrics per group
  194. if isinstance(result, dict) and isinstance(result.get("metrics"), dict):
  195. metrics = result["metrics"]
  196. print(" CAL summary (per group):")
  197. for gk, m in metrics.items():
  198. try:
  199. b = float(m.get("brier", float("nan")))
  200. ll = float(m.get("logloss", float("nan")))
  201. ece = float(m.get("ece", float("nan")))
  202. n = int(m.get("n", 0))
  203. print(
  204. (
  205. f" - {gk}: n={n} | Brier={b:.4f}"
  206. f" | LogLoss={ll:.4f} | ECE={ece:.4f}"
  207. )
  208. )
  209. except (ValueError, TypeError, KeyError):
  210. print(f" - {gk}: (metrics unavailable)")
  211. # Optional: print snapshot writes if present
  212. if isinstance(result, dict):
  213. sm = result.get("snap_metrics_written")
  214. sb = result.get("snap_bins_written")
  215. if (isinstance(sm, int) and sm > 0) or (
  216. isinstance(sb, int) and sb > 0
  217. ):
  218. print(
  219. (
  220. f" CAL snapshots persisted: metrics={sm or 0}"
  221. f" bins={sb or 0}"
  222. )
  223. )
  224. else:
  225. result = analyzer.compute(transformed, db=db, persist=True)
  226. analyzer.finalize(result)
  227. print(f" -> Done: {analyzer.__class__.__name__}")
  228. # Diagnostics: show where data is persisted for Elo
  229. if isinstance(analyzer, elo_analyzer_cls):
  230. ratings_cnt = _safe_count(db, "elo_ratings")
  231. history_cnt = _safe_count(db, "ratings_history")
  232. print(
  233. " Elo persisted to collections:",
  234. f"elo_ratings={ratings_cnt}",
  235. f"ratings_history={history_cnt}",
  236. )
  237. try:
  238. processed = (
  239. result.get("processed") if isinstance(result, dict) else None
  240. )
  241. print(f" Elo processed matches: {processed}")
  242. except (RuntimeError, ValueError, TypeError): # diagnostics only
  243. pass
  244. # Diagnostics: DC persistence and counts
  245. if isinstance(analyzer, dixon_coles_cls):
  246. dc_cnt = _safe_count(db, "dc_params")
  247. print(" DC persisted to collection:", f"dc_params={dc_cnt}")
  248. if isinstance(result, dict):
  249. mu = result.get("matches_used")
  250. persisted = result.get("persisted")
  251. print(
  252. " DC matches_used: "
  253. f"{mu}; persisted docs in this run: {persisted}"
  254. )
  255. gs = result.get("groups_skipped")
  256. if isinstance(gs, int) and gs > 0:
  257. print(f" DC groups skipped (skip_if_exists): {gs}")
  258. # Optional extra collections
  259. preds_cnt = _safe_count(db, "dc_predictions")
  260. snaps_cnt = _safe_count(db, "dc_params_history")
  261. if preds_cnt:
  262. print(" DC predictions count:", preds_cnt)
  263. if snaps_cnt:
  264. print(" DC params history count:", snaps_cnt)
  265. if isinstance(result, dict):
  266. pw = result.get("predictions_written")
  267. sw = result.get("snapshots_written")
  268. if pw is not None or sw is not None:
  269. print(
  270. f" DC written (streaming): predictions={pw}, snapshots={sw}"
  271. )
  272. except NotImplementedError as exc:
  273. print(f" -> Skipped (not implemented): {exc}")
  274. except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only
  275. print(f" -> Error: {type(exc).__name__}: {exc}")
  276. if __name__ == "__main__":
  277. main()