Pārlūkot izejas kodu

为H2H分析器添加时间戳、球队和得分提取功能;实现数据聚合和持久化逻辑,支持按赛季和联赛分组统计,并增强输出信息以提供详细的比赛摘要。

admin 1 mēnesi atpakaļ
vecāks
revīzija
75562d09bf
2 mainītis faili ar 278 papildinājumiem un 5 dzēšanām
  1. 55 3
      scripts/run_analyzers.py
  2. 223 2
      src/databank/analytics/h2h.py

+ 55 - 3
scripts/run_analyzers.py

@@ -8,7 +8,7 @@ from __future__ import annotations
 import os
 import sys
 import pathlib
-from typing import Iterable, List
+from typing import Iterable, List, Any
 import importlib
 
 # Ensure 'src' is on sys.path when running from repo root or scripts dir
@@ -77,16 +77,35 @@ def main() -> None:
     data = load_data(db)
     print(f"Loaded matches: {len(data) if data is not None else 0}")
 
-    analyzers: Iterable = [
+    # Control whether to include the Dixon–Coles analyzer (expensive) via env
+    enable_dc = os.getenv("DATABANK_ENABLE_DC", "0").strip() not in {
+        "0",
+        "false",
+        "False",
+    }
+
+    analyzers_list: List[Any] = [
         team_extractor_cls(),
         elo_analyzer_cls(),
-        dixon_coles_cls(),
+        # DC is optional; include only when explicitly enabled
+        # dixon_coles_cls(),
         markov_chain_cls(),
         h2h_cls(),
         calibration_cls(),
         sos_cls(),
         season_mc_cls(),
     ]
+    if enable_dc:
+        # Insert DC after Elo for deterministic ordering
+        analyzers_list.insert(2, dixon_coles_cls())
+        print("[RUN] Dixon–Coles enabled via DATABANK_ENABLE_DC", flush=True)
+    else:
+        print(
+            "[RUN] Dixon–Coles disabled by default (set DATABANK_ENABLE_DC=1 to enable)",
+            flush=True,
+        )
+
+    analyzers: Iterable = analyzers_list
 
     # Prepare optional DC config from environment
     def _env_float(name: str, default: float) -> float:
@@ -170,6 +189,39 @@ def main() -> None:
                 # Pass DC-specific kwargs from environment
                 result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
                 print("    DC config:", dc_kwargs)
+            elif isinstance(analyzer, h2h_cls):
+                # H2H env knobs
+                h2h_kwargs = {
+                    "group_by": os.getenv("DATABANK_H2H_GROUP_BY", "league"),
+                    "separate_home_away": os.getenv(
+                        "DATABANK_H2H_SEPARATE_HOME_AWAY", "1"
+                    ).strip()
+                    not in {"0", "false", "False"},
+                    "recent_window": _env_int("DATABANK_H2H_RECENT_WINDOW", 16),
+                }
+                result = analyzer.compute(
+                    transformed, db=db, persist=True, **h2h_kwargs
+                )
+                print("    H2H config:", h2h_kwargs)
+                # Diagnostics summary
+                try:
+                    pairs = (
+                        len(result.get("pairs", []))
+                        if isinstance(result, dict)
+                        else None
+                    )
+                    persisted = (
+                        result.get("persisted") if isinstance(result, dict) else None
+                    )
+                    total_docs = _safe_count(db, "h2h_summary")
+                    print(
+                        (
+                            f"    H2H pairs={pairs} | persisted={persisted}"
+                            f" | h2h_summary_total={total_docs}"
+                        )
+                    )
+                except (RuntimeError, ValueError, TypeError):
+                    pass
             elif isinstance(analyzer, calibration_cls):
                 # Calibration env knobs
                 cal_kwargs = {

+ 223 - 2
src/databank/analytics/h2h.py

@@ -7,7 +7,13 @@ Idea:
 
 from __future__ import annotations
 
-from typing import Any
+from typing import Any, Dict, Iterable, Optional, Tuple
+import re
+import time as _t
+import calendar as _cal
+
+from databank.core.models import Document
+from databank.db.base import BaseDB
 
 from .base import AnalyticsBase
 
@@ -25,4 +31,219 @@ class H2HAnalyzer(AnalyticsBase):
         Returns:
             A structure keyed by (team_a, team_b) with stats and optional rating.
         """
-        raise NotImplementedError("H2H analyzer not implemented yet")
+
+        persist: bool = bool(kwargs.get("persist", True))
+        db: Optional[BaseDB] = kwargs.get("db")
+        recent_window: int = int(kwargs.get("recent_window", 20))
+        separate_home_away: bool = bool(kwargs.get("separate_home_away", False))
+        group_by: str = str(kwargs.get("group_by", "league"))
+
+        def _get_ts(match: dict) -> int:
+            for k in ("timestamp", "ts", "kickoffTs", "timeTs"):
+                v = match.get(k)
+                if isinstance(v, (int, float)):
+                    return int(v)
+            date_s = (
+                match.get("matchDate") or match.get("date") or match.get("gameDate")
+            )
+            time_s = (
+                match.get("matchTime") or match.get("time") or match.get("gameTime")
+            )
+            if isinstance(date_s, str) and isinstance(time_s, str):
+                try:
+                    y, m, d = [int(x) for x in re.split(r"[-/]", date_s.strip())[:3]]
+                    hh, mm = [int(x) for x in time_s.strip().split(":")[:2]]
+                    try:
+                        struct = _t.struct_time((y, m, d, hh, mm, 0, 0, 0, 0))
+                        return int(_cal.timegm(struct))
+                    except (OverflowError, ValueError):
+                        return 0
+                except (ValueError, TypeError):
+                    return 0
+            return 0
+
+        def _get_team(match: dict, side: str) -> Optional[str]:
+            id_val = match.get(f"{side}TeamId") or match.get(f"{side}Id")
+            if side == "home":
+                id_val = id_val or match.get("hostTeamId") or match.get("hostId")
+            else:
+                id_val = (
+                    id_val
+                    or match.get("awayTeamId")
+                    or match.get("guestTeamId")
+                    or match.get("guestId")
+                )
+            return str(id_val) if id_val is not None else None
+
+        def _get_score(match: dict, side: str) -> Optional[int]:
+            keys = [
+                f"{side}Score",
+                f"{side}Goals",
+                f"{side}Goal",
+                f"{'hostScore' if side=='home' else 'guestScore'}",
+            ]
+            for k in keys:
+                if k in match and match[k] is not None:
+                    try:
+                        return int(match[k])
+                    except (ValueError, TypeError):
+                        continue
+            sc = match.get("score")
+            if isinstance(sc, list):
+                for s in reversed(sc):
+                    if not isinstance(s, str):
+                        continue
+                    s2 = s.strip()
+                    m = re.search(r"(\d+)\s*:\s*(\d+)", s2)
+                    if m:
+                        try:
+                            h = int(m.group(1))
+                            a = int(m.group(2))
+                            return h if side == "home" else a
+                        except (ValueError, TypeError):
+                            pass
+            return None
+
+        def _norm_season(s: Optional[str]) -> Optional[str]:
+            if not isinstance(s, str):
+                return s
+            s2 = s.strip()
+            # Try to extract canonical season like '2024-2025' or '2024'
+            m = re.search(r"(20\d{2})\s*[-/~––]\s*(20\d{2})", s2)
+            if m:
+                return f"{m.group(1)}-{m.group(2)}"
+            m2 = re.search(r"(19\d{2}|20\d{2})", s2)
+            if m2:
+                return m2.group(1)
+            return s2
+
+        def _context(doc: dict) -> Tuple[Optional[str], Optional[str]]:
+            payload = None
+            if isinstance(doc, dict):
+                data_dict = (
+                    doc.get("data") if isinstance(doc.get("data"), dict) else None
+                )
+                if data_dict:
+                    payload = data_dict.get("payload")
+                if payload is None and isinstance(doc.get("payload"), dict):
+                    payload = doc.get("payload")
+            if isinstance(payload, dict):
+                league_id = payload.get("leagueId")
+                season = payload.get("seasonName") or payload.get("season")
+                lid = str(league_id) if league_id is not None else None
+                s = _norm_season(str(season)) if season is not None else None
+                return (lid, s)
+            return (None, None)
+
+        def _group_key(doc: dict) -> str:
+            if group_by == "league_season":
+                lid, s = _context(doc)
+                return f"{lid or 'NA'}::{s or 'NA'}"
+            if group_by == "league":
+                lid, _s = _context(doc)
+                return f"{lid or 'NA'}"
+            return "global"
+
+        # Collect finished matches
+        items = list(data) if isinstance(data, Iterable) else []
+        rows: Dict[str, list[dict]] = {}
+        for d in items:
+            m = None
+            if isinstance(d, dict):
+                m = d.get("match") or d.get("data", {}).get("match")
+            if not isinstance(m, dict):
+                continue
+            hs = _get_score(m, "home")
+            as_ = _get_score(m, "away")
+            if hs is None or as_ is None:
+                continue
+            h = _get_team(m, "home")
+            a = _get_team(m, "away")
+            if not h or not a:
+                continue
+            gk = _group_key(d)
+            key = (
+                f"{gk}:{h}:{a}"
+                if separate_home_away
+                else f"{gk}:{':'.join(sorted([h,a]))}"
+            )
+            rows.setdefault(key, []).append(
+                {
+                    "group": gk,
+                    "home": h,
+                    "away": a,
+                    "hs": int(hs),
+                    "as": int(as_),
+                    "ts": _get_ts(m),
+                }
+            )
+
+        # Aggregate per key
+        summaries: Dict[str, dict] = {}
+        docs: list[Document] = []
+        for key, lst in rows.items():
+            lst_sorted = sorted(lst, key=lambda r: r["ts"])  # ascending
+            n = len(lst_sorted)
+            gf = sum(r["hs"] for r in lst_sorted)
+            ga = sum(r["as"] for r in lst_sorted)
+            # From perspective of the listed order (home team first if separate_home_away)
+            # Define team1/team2 for symmetric key
+            parts = key.split(":")
+            gk = parts[0]
+            if separate_home_away:
+                team1, team2 = parts[1], parts[2]
+            else:
+                # Recover team ids from first row
+                team1 = lst_sorted[0]["home"]
+                team2 = lst_sorted[0]["away"]
+
+            wins = sum(1 for r in lst_sorted if r["hs"] > r["as"])
+            draws = sum(1 for r in lst_sorted if r["hs"] == r["as"])
+            losses = n - wins - draws
+            recent = lst_sorted[-recent_window:] if recent_window > 0 else lst_sorted
+            rw = sum(1 for r in recent if r["hs"] > r["as"])
+            rd = sum(1 for r in recent if r["hs"] == r["as"])
+            rl = len(recent) - rw - rd
+            rgf = sum(r["hs"] for r in recent)
+            rga = sum(r["as"] for r in recent)
+
+            summary = {
+                "group": gk,
+                "team1": team1,
+                "team2": team2,
+                "matches": n,
+                "wins": wins,
+                "draws": draws,
+                "losses": losses,
+                "goals_for": gf,
+                "goals_against": ga,
+                "recent_window": int(recent_window),
+                "recent": {
+                    "matches": len(recent),
+                    "wins": rw,
+                    "draws": rd,
+                    "losses": rl,
+                    "goals_for": rgf,
+                    "goals_against": rga,
+                },
+                "separate_home_away": bool(separate_home_away),
+            }
+
+            summaries[key] = summary
+            if persist and db:
+                docs.append(
+                    Document(
+                        id=f"{key}:sum",
+                        kind="h2h_summary",
+                        data=summary,
+                    )
+                )
+
+        if persist and db and docs:
+            db.insert_many(docs)
+
+        return {
+            "pairs": list(summaries.keys()),
+            "summaries": summaries,
+            "persisted": len(docs) if (persist and db) else 0,
+        }