Explorar el Código

重构分析器,动态导入模块,优化数据加载和处理逻辑;添加增量处理和快照种子功能,改进评分历史记录的存储结构,支持团队名称和排名信息。

admin hace 2 meses
padre
commit
ce9a7df88e
Se han modificado 2 ficheros con 268 adiciones y 36 borrados
  1. 79 21
      scripts/run_analyzers.py
  2. 189 15
      src/databank/analytics/elo.py

+ 79 - 21
scripts/run_analyzers.py

@@ -6,45 +6,86 @@ This is a scaffold to wire analyzers end-to-end once implementations are ready.
 from __future__ import annotations
 
 import os
+import sys
+import pathlib
 from typing import Iterable, List
+import importlib
 
-from databank.db import MongoDB
-from databank.analytics.base import AnalyticsBase
-from databank.analytics.teams import TeamExtractorAnalyzer
-from databank.analytics.elo import EloAnalyzer
-from databank.analytics.dixon_coles import DixonColesAnalyzer
-from databank.analytics.markov_chain import MarkovChainAnalyzer
-from databank.analytics.h2h import H2HAnalyzer
-from databank.analytics.calibration import CalibrationAnalyzer
-from databank.analytics.monte_carlo import SeasonMonteCarloAnalyzer
-from databank.analytics.sos import StrengthOfScheduleAnalyzer
+# Ensure 'src' is on sys.path when running from repo root or scripts dir
+_SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
+_ROOT = _SCRIPT_DIR.parent
+_SRC = _ROOT / "src"
+if _SRC.exists() and str(_SRC) not in sys.path:
+    sys.path.insert(0, str(_SRC))
 
 
-def load_data(db: MongoDB, limit: int | None = 2000) -> List[dict]:
+def _ensure_src_on_path() -> None:
+    """Put repository src on sys.path if available (no-op if already present)."""
+    script_dir = pathlib.Path(__file__).resolve().parent
+    root = script_dir.parent
+    src = root / "src"
+    if src.exists():
+        sys.path.insert(0, str(src))
+
+
+def load_data(db, limit: int | None = None) -> List[dict]:
     """Load match documents from DB as analyzer inputs (scaffold)."""
     # NOTE: Adjust projection to include what analyzers need.
     return db.find("match", projection={"_id": 0}, limit=limit)
 
 
+def _safe_count(db, kind: str) -> int:
+    """Best-effort count using find; avoids driver-specific count API."""
+    try:
+        docs = db.find(kind, projection={"_id": 1}, limit=None)
+        return len(list(docs)) if docs is not None else 0
+    except (RuntimeError, ValueError, TypeError):  # diagnostics only
+        return 0
+
+
 def main() -> None:
     """Build analyzers and run them on existing data (skeleton)."""
+    # Ensure imports resolve regardless of cwd, then import dynamically
+    _ensure_src_on_path()
+
+    db_mod = importlib.import_module("databank.db")
+    teams_mod = importlib.import_module("databank.analytics.teams")
+    elo_mod = importlib.import_module("databank.analytics.elo")
+    dc_mod = importlib.import_module("databank.analytics.dixon_coles")
+    mc_mod = importlib.import_module("databank.analytics.markov_chain")
+    h2h_mod = importlib.import_module("databank.analytics.h2h")
+    calib_mod = importlib.import_module("databank.analytics.calibration")
+    season_mc_mod = importlib.import_module("databank.analytics.monte_carlo")
+    sos_mod = importlib.import_module("databank.analytics.sos")
+
+    mongodb_cls = getattr(db_mod, "MongoDB")
+    team_extractor_cls = getattr(teams_mod, "TeamExtractorAnalyzer")
+    elo_analyzer_cls = getattr(elo_mod, "EloAnalyzer")
+    dixon_coles_cls = getattr(dc_mod, "DixonColesAnalyzer")
+    markov_chain_cls = getattr(mc_mod, "MarkovChainAnalyzer")
+    h2h_cls = getattr(h2h_mod, "H2HAnalyzer")
+    calibration_cls = getattr(calib_mod, "CalibrationAnalyzer")
+    season_mc_cls = getattr(season_mc_mod, "SeasonMonteCarloAnalyzer")
+    sos_cls = getattr(sos_mod, "StrengthOfScheduleAnalyzer")
+
     uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
     name = os.getenv("DATABANK_DB_NAME", "databank")
 
-    db = MongoDB(uri=uri, name=name)
+    db = mongodb_cls(uri=uri, name=name)
     db.connect()
 
     data = load_data(db)
+    print(f"Loaded matches: {len(data) if data is not None else 0}")
 
-    analyzers: Iterable[AnalyticsBase] = [
-        TeamExtractorAnalyzer(),
-        EloAnalyzer(),
-        DixonColesAnalyzer(),
-        MarkovChainAnalyzer(),
-        H2HAnalyzer(),
-        CalibrationAnalyzer(),
-        StrengthOfScheduleAnalyzer(),
-        SeasonMonteCarloAnalyzer(),
+    analyzers: Iterable = [
+        team_extractor_cls(),
+        elo_analyzer_cls(),
+        dixon_coles_cls(),
+        markov_chain_cls(),
+        h2h_cls(),
+        calibration_cls(),
+        sos_cls(),
+        season_mc_cls(),
     ]
 
     for analyzer in analyzers:
@@ -56,6 +97,23 @@ def main() -> None:
             result = analyzer.compute(transformed, db=db, persist=True)
             analyzer.finalize(result)
             print(f" -> Done: {analyzer.__class__.__name__}")
+
+            # Diagnostics: show where data is persisted for Elo
+            if isinstance(analyzer, elo_analyzer_cls):
+                ratings_cnt = _safe_count(db, "elo_ratings")
+                history_cnt = _safe_count(db, "ratings_history")
+                print(
+                    "    Elo persisted to collections:",
+                    f"elo_ratings={ratings_cnt}",
+                    f"ratings_history={history_cnt}",
+                )
+                try:
+                    processed = (
+                        result.get("processed") if isinstance(result, dict) else None
+                    )
+                    print(f"    Elo processed matches: {processed}")
+                except (RuntimeError, ValueError, TypeError):  # diagnostics only
+                    pass
         except NotImplementedError as exc:
             print(f" -> Skipped (not implemented): {exc}")
         except (RuntimeError, ValueError) as exc:  # pragma: no cover - diagnostics only

+ 189 - 15
src/databank/analytics/elo.py

@@ -1,7 +1,8 @@
 """Elo rating analyzer skeleton for football.
 
 Features considered (to implement):
-- Base K, goal-difference scaling, home advantage offset, time decay, season reset.
+- Base K, goal-difference scaling, home advantage offset,
+  time decay, season reset.
 - Probability mapping and optional calibration as a downstream step.
 """
 
@@ -44,6 +45,8 @@ class EloAnalyzer(AnalyticsBase):
         home_adv: float = float(kwargs.get("home_adv", 60.0))
         persist: bool = bool(kwargs.get("persist", True))
         return_expectations: bool = bool(kwargs.get("return_expectations", False))
+        incremental: bool = bool(kwargs.get("incremental", True))
+        seed_from_snapshot: bool = bool(kwargs.get("seed_from_snapshot", True))
         db: Optional[BaseDB] = kwargs.get("db")
 
         # Helpers
@@ -55,12 +58,33 @@ class EloAnalyzer(AnalyticsBase):
 
         def _get_team(match: dict, side: str) -> Tuple[Optional[str], Optional[str]]:
             # returns (team_id, team_name)
+            # Try standard side-based keys
             id_val = match.get(f"{side}TeamId") or match.get(f"{side}Id")
-            name_val = (
-                match.get(f"{side}TeamName")
-                or match.get(f"{side}Name")
-                or (match.get("homeName") if side == "home" else match.get("awayName"))
-            )
+            name_val = match.get(f"{side}TeamName") or match.get(f"{side}Name")
+            # Try host/guest variants
+            if side == "home":
+                id_val = id_val or match.get("hostTeamId") or match.get("hostId")
+                name_val = (
+                    name_val or match.get("hostTeamName") or match.get("hostName")
+                )
+            else:
+                id_val = (
+                    id_val
+                    or match.get("awayTeamId")
+                    or match.get("guestTeamId")
+                    or match.get("guestId")
+                )
+                name_val = (
+                    name_val
+                    or match.get("awayTeamName")
+                    or match.get("guestTeamName")
+                    or match.get("guestName")
+                )
+            # Final fallback to generic homeName/awayName
+            if name_val is None:
+                name_val = (
+                    match.get("homeName") if side == "home" else match.get("awayName")
+                )
             team_id = str(id_val) if id_val is not None else None
             if team_id is None and isinstance(name_val, str):
                 team_id = _norm(name_val)
@@ -79,10 +103,37 @@ class EloAnalyzer(AnalyticsBase):
                         return int(match[k])
                     except (ValueError, TypeError):
                         continue
+            # Fallback: parse from score array or strings like "FT 2:1"
+            sc = match.get("score")
+            if isinstance(sc, list):
+                for s in reversed(sc):
+                    if not isinstance(s, str):
+                        continue
+                    s2 = s.strip()
+                    # Extract score pattern anywhere in the string
+                    m = re.search(r"(\d+)\s*:\s*(\d+)", s2)
+                    if m:
+                        try:
+                            h = int(m.group(1))
+                            a = int(m.group(2))
+                            return h if side == "home" else a
+                        except (ValueError, TypeError):
+                            continue
             return None
 
         def _get_ts(match: dict) -> float:
             # Try common timestamp fields (ms or s)
+            # 1) Combined date+time if available (e.g., matchDate + matchTime)
+            md = match.get("matchDate")
+            mt = match.get("matchTime")
+            if isinstance(md, str) and isinstance(mt, str) and md and mt:
+                combo = f"{md.strip()} {mt.strip()}"
+                for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
+                    try:
+                        dtc = datetime.strptime(combo, fmt).replace(tzinfo=timezone.utc)
+                        return dtc.timestamp()
+                    except ValueError:
+                        continue
             for k in ("matchTime", "startTime", "time", "date", "matchDate"):
                 if k in match and match[k] is not None:
                     v = match[k]
@@ -119,6 +170,7 @@ class EloAnalyzer(AnalyticsBase):
         # Normalize input
         items = list(data) if isinstance(data, Iterable) else []
         matches = []
+        team_names: Dict[str, str] = {}
         for d in items:
             m = None
             if isinstance(d, dict):
@@ -128,13 +180,38 @@ class EloAnalyzer(AnalyticsBase):
             if not _finished(m):
                 continue
             ts = _get_ts(m)
+            # Capture context from original document payload if available (nested or top-level)
+            payload_ctx = None
+            token = None
+            if isinstance(d, dict):
+                data_dict = d.get("data") if isinstance(d.get("data"), dict) else None
+                if data_dict:
+                    payload_ctx = data_dict.get("payload")
+                    token = data_dict.get("token")
+                # Top-level fallbacks
+                if payload_ctx is None and isinstance(d.get("payload"), dict):
+                    payload_ctx = d.get("payload")
+                if token is None and isinstance(d.get("token"), str):
+                    token = d.get("token")
             mid = m.get("matchId") or m.get("id")
-            h_id, _ = _get_team(m, "home")
-            a_id, _ = _get_team(m, "away")
+            h_id, h_name = _get_team(m, "home")
+            a_id, a_name = _get_team(m, "away")
             hs = _get_score(m, "home")
             as_ = _get_score(m, "away")
             if not h_id or not a_id or hs is None or as_ is None:
                 continue
+            if isinstance(h_name, str) and h_name:
+                team_names.setdefault(h_id, h_name)
+            if isinstance(a_name, str) and a_name:
+                team_names.setdefault(a_id, a_name)
+            # Normalize season/league/round context
+            league_id = None
+            season = None
+            round_no = None
+            if isinstance(payload_ctx, dict):
+                league_id = payload_ctx.get("leagueId")
+                season = payload_ctx.get("seasonName") or payload_ctx.get("season")
+                round_no = payload_ctx.get("round") or payload_ctx.get("round_no")
             matches.append(
                 {
                     "ts": ts,
@@ -143,12 +220,70 @@ class EloAnalyzer(AnalyticsBase):
                     "away": a_id,
                     "hs": hs,
                     "as": as_,
+                    "league_id": league_id,
+                    "season": season,
+                    "round_no": round_no,
+                    "token": token,
                 }
             )
 
         matches.sort(key=lambda x: x["ts"])
 
+        # Optional incremental skip:
+        # ignore matches already in ratings_history by match_id
+        if db and incremental:
+            mids = sorted(
+                {rec["match_id"] for rec in matches if rec.get("match_id") is not None}
+            )
+            if mids:
+                try:
+                    db_any: Any = db  # dynamic access to find
+                    existing = db_any.find(
+                        "ratings_history",
+                        {"match_id": {"$in": mids}},
+                        projection={"match_id": 1},
+                        limit=None,
+                    )
+                    processed_mids = {doc.get("match_id") for doc in (existing or [])}
+                    matches = [
+                        rec
+                        for rec in matches
+                        if rec.get("match_id") not in processed_mids
+                    ]
+                except (RuntimeError, ValueError, TypeError, AttributeError):
+                    # Best-effort: if backend doesn't support find, process all
+                    pass
+
         ratings: Dict[str, float] = {}
+        # Seed ratings from existing elo_ratings snapshot for true incremental behavior
+        if db and seed_from_snapshot:
+            try:
+                db_any: Any = db
+                snaps = db_any.find(
+                    "elo_ratings",
+                    projection={"_id": 1, "team_id": 1, "rating": 1},
+                    limit=None,
+                )
+                for s in snaps or []:
+                    # Support both flat and nested shapes
+                    tid = (
+                        s.get("_id")
+                        or s.get("team_id")
+                        or (s.get("data", {}) or {}).get("team_id")
+                    )
+                    r = (
+                        s.get("rating")
+                        if "rating" in s
+                        else (s.get("data", {}) or {}).get("rating")
+                    )
+                    if tid is not None and r is not None:
+                        try:
+                            ratings[str(tid)] = float(r)
+                        except (ValueError, TypeError):
+                            continue
+            except (RuntimeError, ValueError, TypeError, AttributeError):
+                # If snapshot read fails, continue with default ratings
+                pass
         expectations = []
 
         def _expected(r_home: float, r_away: float) -> float:
@@ -180,19 +315,43 @@ class EloAnalyzer(AnalyticsBase):
             ra2 = ra - change
             ratings[h] = rh2
             ratings[a] = ra2
-            # history for this match (no id to allow multiple entries)
+            # history for this match with deterministic ids for idempotency
+            league_id = rec.get("league_id")
+            season = rec.get("season")
+            round_no = rec.get("round_no")
+            token = rec.get("token")
+            common_ctx = {
+                "match_id": mid,
+                "ts": ts,
+                "league_id": league_id,
+                "season": season,
+                "round_no": round_no,
+                "token": token,
+            }
+            hid = f"{h}:{mid}" if mid is not None else None
+            aid = f"{a}:{mid}" if mid is not None else None
             history_docs.append(
                 Document(
-                    id=None,
+                    id=hid,
                     kind="ratings_history",
-                    data={"team_id": h, "rating": rh2, "ts": ts, "match_id": mid},
+                    data={
+                        "team_id": h,
+                        "team_name": team_names.get(h),
+                        "rating": rh2,
+                        **common_ctx,
+                    },
                 )
             )
             history_docs.append(
                 Document(
-                    id=None,
+                    id=aid,
                     kind="ratings_history",
-                    data={"team_id": a, "rating": ra2, "ts": ts, "match_id": mid},
+                    data={
+                        "team_id": a,
+                        "team_name": team_names.get(a),
+                        "rating": ra2,
+                        **common_ctx,
+                    },
                 )
             )
             if return_expectations:
@@ -206,7 +365,15 @@ class EloAnalyzer(AnalyticsBase):
                     }
                 )
 
-        result: Dict[str, Any] = {"ratings": ratings, "processed": len(matches)}
+        # Compute ranks across all teams after processing
+        ranked = sorted(ratings.items(), key=lambda x: x[1], reverse=True)
+        ranks: Dict[str, int] = {tid: idx + 1 for idx, (tid, _) in enumerate(ranked)}
+
+        result: Dict[str, Any] = {
+            "ratings": ratings,
+            "processed": len(matches),
+            "ranks": ranks,
+        }
         if return_expectations:
             result["expectations"] = expectations
 
@@ -215,7 +382,14 @@ class EloAnalyzer(AnalyticsBase):
             if ratings:
                 docs = [
                     Document(
-                        id=tid, kind="elo_ratings", data={"team_id": tid, "rating": r}
+                        id=tid,
+                        kind="elo_ratings",
+                        data={
+                            "team_id": tid,
+                            "team_name": team_names.get(tid),
+                            "rating": r,
+                            "rank": ranks.get(tid),
+                        },
                     )
                     for tid, r in ratings.items()
                 ]