瀏覽代碼

优化Dixon-Coles分析器,添加时间衰减权重和参数持久化功能;增强诊断输出以跟踪处理的比赛和持久化文档数量。

admin 1 月之前
父節點
當前提交
3e25b9c6f8
共有 2 個文件被更改,包括 298 次插入83 次删除
  1. 10 0
      scripts/run_analyzers.py
  2. 288 83
      src/databank/analytics/dixon_coles.py

+ 10 - 0
scripts/run_analyzers.py

@@ -114,6 +114,16 @@ def main() -> None:
                     print(f"    Elo processed matches: {processed}")
                     print(f"    Elo processed matches: {processed}")
                 except (RuntimeError, ValueError, TypeError):  # diagnostics only
                 except (RuntimeError, ValueError, TypeError):  # diagnostics only
                     pass
                     pass
+            # Diagnostics: DC persistence and counts
+            if isinstance(analyzer, dixon_coles_cls):
+                dc_cnt = _safe_count(db, "dc_params")
+                print("    DC persisted to collection:", f"dc_params={dc_cnt}")
+                if isinstance(result, dict):
+                    mu = result.get("matches_used")
+                    persisted = result.get("persisted")
+                    print(
+                        f"    DC matches_used: {mu}; persisted docs in this run: {persisted}"
+                    )
         except NotImplementedError as exc:
         except NotImplementedError as exc:
             print(f" -> Skipped (not implemented): {exc}")
             print(f" -> Skipped (not implemented): {exc}")
         except (RuntimeError, ValueError) as exc:  # pragma: no cover - diagnostics only
         except (RuntimeError, ValueError) as exc:  # pragma: no cover - diagnostics only

+ 288 - 83
src/databank/analytics/dixon_coles.py

@@ -1,15 +1,18 @@
-"""Dixon–Coles (DC) model analyzer for football scores (minimal V1).
+"""Dixon–Coles (DC) model analyzer with likelihood, rho, and time decay.
 
 
 Purpose:
 Purpose:
-- Estimate attacking/defending strengths and home advantage via Poisson-style rates.
-- Produce parameters per group (league-season or global) and persist them.
+- Fit per-team attacking/defending strengths (home/away) with home advantage.
+- Optimize Dixon–Coles correlation term (rho) via 1D grid search.
+- Apply exponential time decay weighting (half-life in days).
+- Persist parameters per group (league-season or global).
 """
 """
 
 
 from __future__ import annotations
 from __future__ import annotations
 
 
-from typing import Any, Dict, Iterable, Optional
+from typing import Any, Dict, Iterable, Optional, Tuple
 from collections import defaultdict
 from collections import defaultdict
 import re
 import re
+import math
 
 
 from databank.core.models import Document
 from databank.core.models import Document
 from databank.db.base import BaseDB
 from databank.db.base import BaseDB
@@ -18,28 +21,64 @@ from .base import AnalyticsBase
 
 
 
 
 class DixonColesAnalyzer(AnalyticsBase):
 class DixonColesAnalyzer(AnalyticsBase):
-    """Estimate Poisson-style parameters and infer probabilities (baseline)."""
+    """Estimate DC parameters with time-decayed likelihood and rho correlation."""
 
 
     def compute(self, data: Any, **kwargs: Any) -> Any:  # noqa: D401
     def compute(self, data: Any, **kwargs: Any) -> Any:  # noqa: D401
-        """Fit/estimate Poisson-style parameters and basic probabilities.
-
-        Minimal V1 without numerical optimization: rate-based factors.
+        """Fit/estimate Poisson-style parameters with DC correlation and decay.
 
 
         Args:
         Args:
             data: Iterable of match-like docs (same shape as Elo input).
             data: Iterable of match-like docs (same shape as Elo input).
-            **kwargs: group_by ("league_season"|"global"), max_goals (int),
-                persist (bool), db (BaseDB).
+            **kwargs: group_by ("league_season"|"global"), persist (bool), db (BaseDB),
+                halflife_days (float, default 180), rho_range (tuple[float,float], default (-0.3,0.3)),
+                rho_step (float, default 0.01), max_iters (int, default 20), tol (float, default 1e-4).
 
 
         Returns:
         Returns:
             dict with per-group parameters summary. If persist=True, writes
             dict with per-group parameters summary. If persist=True, writes
             documents to 'dc_params'.
             documents to 'dc_params'.
         """
         """
         group_by = str(kwargs.get("group_by", "league_season"))
         group_by = str(kwargs.get("group_by", "league_season"))
-        max_goals = int(kwargs.get("max_goals", 8))
         persist = bool(kwargs.get("persist", True))
         persist = bool(kwargs.get("persist", True))
         db: Optional[BaseDB] = kwargs.get("db")
         db: Optional[BaseDB] = kwargs.get("db")
+        halflife_days: float = float(kwargs.get("halflife_days", 180.0))
+        rho_range: Tuple[float, float] = tuple(kwargs.get("rho_range", (-0.3, 0.3)))  # type: ignore[assignment]
+        rho_step: float = float(kwargs.get("rho_step", 0.01))
+        max_iters: int = int(kwargs.get("max_iters", 20))
+        tol: float = float(kwargs.get("tol", 1e-4))
 
 
         # Helpers
         # Helpers
+        def _get_ts(match: dict) -> int:
+            # Try common timestamp-like fields; fallback to 0 (unknown)
+            for k in ("timestamp", "ts", "kickoffTs", "timeTs"):
+                v = match.get(k)
+                if isinstance(v, (int, float)):
+                    return int(v)
+            # date+time strings like "2024-08-12" + "15:30"
+            date_s = (
+                match.get("matchDate") or match.get("date") or match.get("gameDate")
+            )
+            time_s = (
+                match.get("matchTime") or match.get("time") or match.get("gameTime")
+            )
+            if isinstance(date_s, str) and isinstance(time_s, str):
+                # Best-effort parse: YYYY-MM-DD and HH:MM
+                try:
+                    y, m, d = [int(x) for x in re.split(r"[-/]", date_s.strip())[:3]]
+                    hh, mm = [int(x) for x in time_s.strip().split(":")[:2]]
+                    # naive epoch assume UTC; avoid importing datetime to keep deps minimal
+                    # Use a rough conversion: days since epoch * 86400 + seconds
+                    # Here we fallback to 0 if parsing fails in any step.
+                    import time as _t
+                    import calendar as _cal
+
+                    try:
+                        struct = _t.struct_time((y, m, d, hh, mm, 0, 0, 0, 0))
+                        return int(_cal.timegm(struct))
+                    except (OverflowError, ValueError):
+                        return 0
+                except (ValueError, TypeError):
+                    return 0
+            return 0
+
         def _get_team(match: dict, side: str) -> Optional[str]:
         def _get_team(match: dict, side: str) -> Optional[str]:
             id_val = match.get(f"{side}TeamId") or match.get(f"{side}Id")
             id_val = match.get(f"{side}TeamId") or match.get(f"{side}Id")
             if side == "home":
             if side == "home":
@@ -106,31 +145,14 @@ class DixonColesAnalyzer(AnalyticsBase):
                 return f"{lid or 'NA'}::{s or 'NA'}"
                 return f"{lid or 'NA'}::{s or 'NA'}"
             return "global"
             return "global"
 
 
-        # Accumulate statistics
-        totals = defaultdict(
-            lambda: {
-                "matches": 0,
-                "goals_home": 0,
-                "goals_away": 0,
-            }
-        )
-        per_team = defaultdict(
-            lambda: {
-                "home_played": 0,
-                "home_for": 0,
-                "home_against": 0,
-                "away_played": 0,
-                "away_for": 0,
-                "away_against": 0,
-            }
-        )
-
+        # Collect finished matches into groups
         def _finished(match: dict) -> bool:
         def _finished(match: dict) -> bool:
             hs = _get_score(match, "home")
             hs = _get_score(match, "home")
             as_ = _get_score(match, "away")
             as_ = _get_score(match, "away")
             return hs is not None and as_ is not None
             return hs is not None and as_ is not None
 
 
         items = list(data) if isinstance(data, Iterable) else []
         items = list(data) if isinstance(data, Iterable) else []
+        groups: Dict[str, list[dict]] = defaultdict(list)
         for d in items:
         for d in items:
             m = None
             m = None
             if isinstance(d, dict):
             if isinstance(d, dict):
@@ -139,81 +161,264 @@ class DixonColesAnalyzer(AnalyticsBase):
                 continue
                 continue
             if not _finished(m):
             if not _finished(m):
                 continue
                 continue
+            gk = _group_key(d)
             h = _get_team(m, "home")
             h = _get_team(m, "home")
             a = _get_team(m, "away")
             a = _get_team(m, "away")
             hs = _get_score(m, "home")
             hs = _get_score(m, "home")
             as_ = _get_score(m, "away")
             as_ = _get_score(m, "away")
+            ts = _get_ts(m)
             if not h or not a or hs is None or as_ is None:
             if not h or not a or hs is None or as_ is None:
                 continue
                 continue
-            g = _group_key(d)
-            totals[g]["matches"] += 1
-            totals[g]["goals_home"] += hs
-            totals[g]["goals_away"] += as_
-            per_team[(g, h)]["home_played"] += 1
-            per_team[(g, h)]["home_for"] += hs
-            per_team[(g, h)]["home_against"] += as_
-            per_team[(g, a)]["away_played"] += 1
-            per_team[(g, a)]["away_for"] += as_
-            per_team[(g, a)]["away_against"] += hs
-
-        # Compute parameters per group
-        params: Dict[str, Dict[str, Dict[str, float]]] = {}
-        docs: list[Document] = []
-        eps = 1e-9
-        for g, t in totals.items():
-            n = max(1, t["matches"])
-            league_home_avg = t["goals_home"] / n
-            league_away_avg = t["goals_away"] / n
-            params[g] = {}
-            for (gg, team_id), st in per_team.items():
-                if gg != g:
-                    continue
-                hp = st["home_played"]
-                ap = st["away_played"]
-                hf = st["home_for"]
-                ha = st["home_against"]
-                af = st["away_for"]
-                aa = st["away_against"]
-                # Factors around 1.0 (simple shrinkage via eps)
-                att_home = (
-                    (hf / max(1, hp)) / (league_home_avg + eps) if hp > 0 else 1.0
+            groups[gk].append(
+                {
+                    "home": h,
+                    "away": a,
+                    "hs": int(hs),
+                    "as": int(as_),
+                    "ts": int(ts),
+                }
+            )
+
+        def _fit_group(rows: list[dict]) -> tuple[dict, dict]:
+            if not rows:
+                return {}, {"matches": 0}
+
+            # Time decay weights
+            max_ts = max(r["ts"] for r in rows)
+            lam = math.log(2.0) / max(1.0, halflife_days)
+            for r in rows:
+                age_days = max(0.0, (max_ts - r["ts"]) / 86400.0)
+                r["w"] = math.exp(-lam * age_days)
+
+            teams = sorted({r["home"] for r in rows} | {r["away"] for r in rows})
+            nteams = len(teams)
+            if nteams == 0:
+                return {}, {"matches": 0}
+
+            # Initialize parameters
+            att_h = {t: 1.0 for t in teams}
+            att_a = {t: 1.0 for t in teams}
+            def_h = {t: 1.0 for t in teams}
+            def_a = {t: 1.0 for t in teams}
+
+            # League averages
+            sum_w = sum(r["w"] for r in rows)
+            sum_hg = sum(r["w"] * r["hs"] for r in rows)
+            sum_ag = sum(r["w"] * r["as"] for r in rows)
+            base_h = (sum_hg / max(1e-9, sum_w)) if sum_w > 0 else 1.3
+            base_a = (sum_ag / max(1e-9, sum_w)) if sum_w > 0 else 1.1
+
+            def _normalize() -> None:
+                # Keep identifiability by normalizing geometric means to 1
+                eps = 1e-12
+                gm = math.exp(
+                    sum(math.log(max(eps, v)) for v in att_h.values()) / nteams
                 )
                 )
-                att_away = (
-                    (af / max(1, ap)) / (league_away_avg + eps) if ap > 0 else 1.0
+                for k in att_h:
+                    att_h[k] /= gm
+                gm = math.exp(
+                    sum(math.log(max(eps, v)) for v in att_a.values()) / nteams
                 )
                 )
-                def_home = (
-                    (ha / max(1, hp)) / (league_away_avg + eps) if hp > 0 else 1.0
+                for k in att_a:
+                    att_a[k] /= gm
+                gm = math.exp(
+                    sum(math.log(max(eps, v)) for v in def_h.values()) / nteams
                 )
                 )
-                def_away = (
-                    (aa / max(1, ap)) / (league_home_avg + eps) if ap > 0 else 1.0
+                for k in def_h:
+                    def_h[k] /= gm
+                gm = math.exp(
+                    sum(math.log(max(eps, v)) for v in def_a.values()) / nteams
                 )
                 )
-                params[g][team_id] = {
-                    "attack_home": float(att_home),
-                    "attack_away": float(att_away),
-                    "defense_home": float(def_home),
-                    "defense_away": float(def_away),
-                    "league_home_avg": float(league_home_avg),
-                    "league_away_avg": float(league_away_avg),
+                for k in def_a:
+                    def_a[k] /= gm
+
+            def _expected(r: dict) -> tuple[float, float]:
+                mu = base_h * att_h[r["home"]] * def_a[r["away"]]
+                nu = base_a * att_a[r["away"]] * def_h[r["home"]]
+                return (max(1e-9, mu), max(1e-9, nu))
+
+            # IPF-like alternating updates
+            for _ in range(max_iters):
+                delta = 0.0
+                # Update attack_home
+                for t in teams:
+                    num = 0.0
+                    den = 0.0
+                    for r in rows:
+                        if r["home"] != t:
+                            continue
+                        mu, _ = _expected(r)
+                        num += r["w"] * r["hs"]
+                        den += r["w"] * mu
+                    if den > 0:
+                        factor = num / den
+                        delta = max(delta, abs(1 - factor))
+                        att_h[t] *= factor
+                _normalize()
+
+                # Update attack_away
+                for t in teams:
+                    num = 0.0
+                    den = 0.0
+                    for r in rows:
+                        if r["away"] != t:
+                            continue
+                        _, nu = _expected(r)
+                        num += r["w"] * r["as"]
+                        den += r["w"] * nu
+                    if den > 0:
+                        factor = num / den
+                        delta = max(delta, abs(1 - factor))
+                        att_a[t] *= factor
+                _normalize()
+
+                # Update defense_away (affects mu)
+                for t in teams:
+                    num = 0.0
+                    den = 0.0
+                    for r in rows:
+                        if r["away"] != t:
+                            continue
+                        mu, _ = _expected(r)
+                        num += r["w"] * r["hs"]
+                        den += r["w"] * mu
+                    if den > 0:
+                        factor = num / den
+                        delta = max(delta, abs(1 - factor))
+                        def_a[t] *= factor
+                _normalize()
+
+                # Update defense_home (affects nu)
+                for t in teams:
+                    num = 0.0
+                    den = 0.0
+                    for r in rows:
+                        if r["home"] != t:
+                            continue
+                        _, nu = _expected(r)
+                        num += r["w"] * r["as"]
+                        den += r["w"] * nu
+                    if den > 0:
+                        factor = num / den
+                        delta = max(delta, abs(1 - factor))
+                        def_h[t] *= factor
+                _normalize()
+
+                if delta < tol:
+                    break
+
+            # Given parameters, grid-search rho for DC correlation
+            def _dc_phi(hg: int, ag: int, mu: float, nu: float, rho: float) -> float:
+                # Dixon–Coles small-score adjustment
+                if hg == 0 and ag == 0:
+                    return max(1e-9, 1.0 - mu * nu * rho)
+                if hg == 0 and ag == 1:
+                    return max(1e-9, 1.0 + mu * rho)
+                if hg == 1 and ag == 0:
+                    return max(1e-9, 1.0 + nu * rho)
+                if hg == 1 and ag == 1:
+                    return max(1e-9, 1.0 - rho)
+                return 1.0
+
+            def _ll_for_rho(rho: float) -> float:
+                s = 0.0
+                for r in rows:
+                    mu, nu = _expected(r)
+                    # Poisson log pmf (ignoring constant factorial by Stirling or exact; include exact via math.lgamma)
+                    x = r["hs"]
+                    y = r["as"]
+                    log_px = x * math.log(mu) - mu - math.lgamma(x + 1)
+                    log_py = y * math.log(nu) - nu - math.lgamma(y + 1)
+                    phi = _dc_phi(x, y, mu, nu, rho)
+                    s += r["w"] * (log_px + log_py + math.log(phi))
+                return s
+
+            rlo, rhi = float(rho_range[0]), float(rho_range[1])
+            step = max(1e-4, rho_step)
+            best_rho = 0.0
+            best_ll = _ll_for_rho(best_rho)
+            rho = rlo
+            while rho <= rhi + 1e-12:
+                ll = _ll_for_rho(rho)
+                if ll > best_ll:
+                    best_ll, best_rho = ll, rho
+                rho += step
+            # local refine around best
+            refine = max(5, int(math.ceil(0.02 / step)))
+            fine_step = step / 10.0
+            rho = max(rlo, best_rho - refine * fine_step)
+            end = min(rhi, best_rho + refine * fine_step)
+            while rho <= end + 1e-12:
+                ll = _ll_for_rho(rho)
+                if ll > best_ll:
+                    best_ll, best_rho = ll, rho
+                rho += fine_step
+
+            # Assemble params for this group
+            out_params: Dict[str, Dict[str, float]] = {}
+            for t in teams:
+                out_params[t] = {
+                    "attack_home": float(att_h[t]),
+                    "attack_away": float(att_a[t]),
+                    "defense_home": float(def_h[t]),
+                    "defense_away": float(def_a[t]),
+                    "league_home_avg": float(base_h),
+                    "league_away_avg": float(base_a),
+                    "rho": float(best_rho),
+                    "halflife_days": float(halflife_days),
                 }
                 }
-                if persist and db:
+
+            return out_params, {
+                "matches": len(rows),
+                "rho": float(best_rho),
+                "base_home": float(base_h),
+                "base_away": float(base_a),
+            }
+
+        # Fit all groups
+        all_params: Dict[str, Dict[str, Dict[str, float]]] = {}
+        docs: list[Document] = []
+        total_matches = 0
+        for gk, rows in groups.items():
+            p, stats = _fit_group(rows)
+            all_params[gk] = p
+            total_matches += int(stats.get("matches", 0))
+            if persist and db and p:
+                for team_id, vals in p.items():
                     docs.append(
                     docs.append(
                         Document(
                         Document(
-                            id=f"{g}:{team_id}",
+                            id=f"{gk}:{team_id}",
                             kind="dc_params",
                             kind="dc_params",
                             data={
                             data={
-                                "group": g,
+                                "group": gk,
                                 "team_id": team_id,
                                 "team_id": team_id,
-                                **params[g][team_id],
+                                **vals,
                             },
                             },
                         )
                         )
                     )
                     )
+                # Optionally also persist a group-level summary doc
+                docs.append(
+                    Document(
+                        id=f"{gk}:__summary__",
+                        kind="dc_params",
+                        data={
+                            "group": gk,
+                            "summary": True,
+                            "matches": int(stats.get("matches", 0)),
+                            "rho": float(stats.get("rho", 0.0)),
+                            "league_home_avg": float(stats.get("base_home", 0.0)),
+                            "league_away_avg": float(stats.get("base_away", 0.0)),
+                            "halflife_days": float(halflife_days),
+                        },
+                    )
+                )
 
 
         if persist and db and docs:
         if persist and db and docs:
             db.insert_many(docs)
             db.insert_many(docs)
 
 
         return {
         return {
-            "groups": list(params.keys()),
-            "params": params,
-            "matches_used": sum(totals[g]["matches"] for g in totals),
-            "max_goals": max_goals,
+            "groups": list(all_params.keys()),
+            "params": all_params,
+            "matches_used": total_matches,
+            "persisted": len(docs) if docs else 0,
         }
         }