run_analyzers.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. """Run selected analyzers over existing DB data or last run results.
  2. This is a scaffold to wire analyzers end-to-end once implementations are ready.
  3. """
  4. from __future__ import annotations
  5. import os
  6. import sys
  7. import pathlib
  8. from typing import Iterable, List
  9. import importlib
  10. # Ensure 'src' is on sys.path when running from repo root or scripts dir
  11. _SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
  12. _ROOT = _SCRIPT_DIR.parent
  13. _SRC = _ROOT / "src"
  14. if _SRC.exists() and str(_SRC) not in sys.path:
  15. sys.path.insert(0, str(_SRC))
  16. def _ensure_src_on_path() -> None:
  17. """Put repository src on sys.path if available (no-op if already present)."""
  18. script_dir = pathlib.Path(__file__).resolve().parent
  19. root = script_dir.parent
  20. src = root / "src"
  21. if src.exists():
  22. sys.path.insert(0, str(src))
  23. def load_data(db, limit: int | None = None) -> List[dict]:
  24. """Load match documents from DB as analyzer inputs (scaffold)."""
  25. # NOTE: Adjust projection to include what analyzers need.
  26. return db.find("match", projection={"_id": 0}, limit=limit)
  27. def _safe_count(db, kind: str) -> int:
  28. """Best-effort count using find; avoids driver-specific count API."""
  29. try:
  30. docs = db.find(kind, projection={"_id": 1}, limit=None)
  31. return len(list(docs)) if docs is not None else 0
  32. except (RuntimeError, ValueError, TypeError): # diagnostics only
  33. return 0
  34. def main() -> None:
  35. """Build analyzers and run them on existing data (skeleton)."""
  36. # Ensure imports resolve regardless of cwd, then import dynamically
  37. _ensure_src_on_path()
  38. db_mod = importlib.import_module("databank.db")
  39. teams_mod = importlib.import_module("databank.analytics.teams")
  40. elo_mod = importlib.import_module("databank.analytics.elo")
  41. dc_mod = importlib.import_module("databank.analytics.dixon_coles")
  42. mc_mod = importlib.import_module("databank.analytics.markov_chain")
  43. h2h_mod = importlib.import_module("databank.analytics.h2h")
  44. calib_mod = importlib.import_module("databank.analytics.calibration")
  45. season_mc_mod = importlib.import_module("databank.analytics.monte_carlo")
  46. sos_mod = importlib.import_module("databank.analytics.sos")
  47. mongodb_cls = getattr(db_mod, "MongoDB")
  48. team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer")
  49. elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer")
  50. dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer")
  51. markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer")
  52. h2h_cls = getattr(h2h_mod, "H2HAnalyzer")
  53. calibration_cls = getattr(calib_mod, "CalibrationAnalyzer")
  54. season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer")
  55. sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer")
  56. uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
  57. name = os.getenv("DATABANK_DB_NAME", "databank")
  58. db = mongodb_cls(uri=uri, name=name)
  59. db.connect()
  60. data = load_data(db)
  61. print(f"Loaded matches: {len(data) if data is not None else 0}")
  62. analyzers: Iterable = [
  63. team_extractor_cls(),
  64. elo_analyzer_cls(),
  65. dixon_coles_cls(),
  66. markov_chain_cls(),
  67. h2h_cls(),
  68. calibration_cls(),
  69. sos_cls(),
  70. season_mc_cls(),
  71. ]
  72. # Prepare optional DC config from environment
  73. def _env_float(name: str, default: float) -> float:
  74. try:
  75. return (
  76. float(os.getenv(name, "")) if os.getenv(name) is not None else default
  77. )
  78. except ValueError:
  79. return default
  80. def _env_int(name: str, default: int) -> int:
  81. try:
  82. return int(os.getenv(name, "")) if os.getenv(name) is not None else default
  83. except ValueError:
  84. return default
  85. def _env_rho_range(name: str, default: tuple[float, float]) -> tuple[float, float]:
  86. s = os.getenv(name)
  87. if not s:
  88. return default
  89. try:
  90. lo_str, hi_str = s.split(",", 1)
  91. return float(lo_str), float(hi_str)
  92. except (ValueError, TypeError):
  93. return default
  94. def _env_int_optional(name: str) -> int | None:
  95. s = os.getenv(name)
  96. if not s:
  97. return None
  98. try:
  99. return int(s)
  100. except ValueError:
  101. return None
  102. dc_kwargs = {
  103. "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0),
  104. "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)),
  105. "rho_step": _env_float("DATABANK_DC_RHO_STEP", 0.01),
  106. "max_iters": _env_int("DATABANK_DC_MAX_ITERS", 20),
  107. "tol": _env_float("DATABANK_DC_TOL", 1e-4),
  108. }
  109. # Optional history configuration for DC
  110. history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower()
  111. if history_mode in {"none", "predictions", "snapshots", "both"}:
  112. dc_kwargs["history"] = history_mode
  113. dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10)
  114. dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10)
  115. dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8)
  116. # Progress controls
  117. dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in {
  118. "0",
  119. "false",
  120. "False",
  121. }
  122. dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100)
  123. dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000)
  124. dc_kwargs["skip_if_exists"] = os.getenv(
  125. "DATABANK_DC_SKIP_IF_EXISTS", "0"
  126. ).strip() not in {
  127. "0",
  128. "false",
  129. "False",
  130. }
  131. # Safety clamps/tuning knobs
  132. dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3)
  133. dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0)
  134. dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3)
  135. dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0)
  136. dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0)
  137. dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3)
  138. for analyzer in analyzers:
  139. print(f"Running analyzer: {analyzer.__class__.__name__}")
  140. try:
  141. analyzer.prepare(data)
  142. analyzer.validate(data)
  143. transformed = analyzer.transform(data)
  144. if isinstance(analyzer, dixon_coles_cls):
  145. # Pass DC-specific kwargs from environment
  146. result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
  147. print(" DC config:", dc_kwargs)
  148. elif isinstance(analyzer, calibration_cls):
  149. # Calibration env knobs
  150. cal_kwargs = {
  151. "source_kind": os.getenv(
  152. "DATABANK_CAL_SOURCE_KIND", "dc_predictions"
  153. ),
  154. "bins": _env_int("DATABANK_CAL_BINS", 10),
  155. # User asked higher min sample threshold, default 50
  156. "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50),
  157. "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip()
  158. not in {"0", "false", "False"},
  159. }
  160. since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS")
  161. until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS")
  162. if since_ts is not None:
  163. cal_kwargs["since_ts"] = since_ts
  164. if until_ts is not None:
  165. cal_kwargs["until_ts"] = until_ts
  166. season = os.getenv("DATABANK_CAL_SEASON")
  167. if season:
  168. cal_kwargs["season_filter"] = season
  169. groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE")
  170. if groups_inc:
  171. cal_kwargs["groups_include"] = groups_inc
  172. result = analyzer.compute(
  173. transformed, db=db, persist=True, **cal_kwargs
  174. )
  175. print(" CAL config:", cal_kwargs)
  176. # Console summary of calibration metrics per group
  177. if isinstance(result, dict) and isinstance(result.get("metrics"), dict):
  178. metrics = result["metrics"]
  179. print(" CAL summary (per group):")
  180. for gk, m in metrics.items():
  181. try:
  182. b = float(m.get("brier", float("nan")))
  183. ll = float(m.get("logloss", float("nan")))
  184. ece = float(m.get("ece", float("nan")))
  185. n = int(m.get("n", 0))
  186. print(
  187. (
  188. f" - {gk}: n={n} | Brier={b:.4f}"
  189. f" | LogLoss={ll:.4f} | ECE={ece:.4f}"
  190. )
  191. )
  192. except (ValueError, TypeError, KeyError):
  193. print(f" - {gk}: (metrics unavailable)")
  194. else:
  195. result = analyzer.compute(transformed, db=db, persist=True)
  196. analyzer.finalize(result)
  197. print(f" -> Done: {analyzer.__class__.__name__}")
  198. # Diagnostics: show where data is persisted for Elo
  199. if isinstance(analyzer, elo_analyzer_cls):
  200. ratings_cnt = _safe_count(db, "elo_ratings")
  201. history_cnt = _safe_count(db, "ratings_history")
  202. print(
  203. " Elo persisted to collections:",
  204. f"elo_ratings={ratings_cnt}",
  205. f"ratings_history={history_cnt}",
  206. )
  207. try:
  208. processed = (
  209. result.get("processed") if isinstance(result, dict) else None
  210. )
  211. print(f" Elo processed matches: {processed}")
  212. except (RuntimeError, ValueError, TypeError): # diagnostics only
  213. pass
  214. # Diagnostics: DC persistence and counts
  215. if isinstance(analyzer, dixon_coles_cls):
  216. dc_cnt = _safe_count(db, "dc_params")
  217. print(" DC persisted to collection:", f"dc_params={dc_cnt}")
  218. if isinstance(result, dict):
  219. mu = result.get("matches_used")
  220. persisted = result.get("persisted")
  221. print(
  222. " DC matches_used: "
  223. f"{mu}; persisted docs in this run: {persisted}"
  224. )
  225. gs = result.get("groups_skipped")
  226. if isinstance(gs, int) and gs > 0:
  227. print(f" DC groups skipped (skip_if_exists): {gs}")
  228. # Optional extra collections
  229. preds_cnt = _safe_count(db, "dc_predictions")
  230. snaps_cnt = _safe_count(db, "dc_params_history")
  231. if preds_cnt:
  232. print(" DC predictions count:", preds_cnt)
  233. if snaps_cnt:
  234. print(" DC params history count:", snaps_cnt)
  235. if isinstance(result, dict):
  236. pw = result.get("predictions_written")
  237. sw = result.get("snapshots_written")
  238. if pw is not None or sw is not None:
  239. print(
  240. f" DC written (streaming): predictions={pw}, snapshots={sw}"
  241. )
  242. except NotImplementedError as exc:
  243. print(f" -> Skipped (not implemented): {exc}")
  244. except (RuntimeError, ValueError) as exc: # pragma: no cover - diagnostics only
  245. print(f" -> Error: {type(exc).__name__}: {exc}")
  246. if __name__ == "__main__":
  247. main()