Selaa lähdekoodia

为Dixon-Coles分析器和校准分析器添加环境变量配置,支持历史记录、进度控制和参数持久化功能;优化计算逻辑,增强输出信息,支持跳过已存在的组和安全参数限制。

admin 1 kuukausi sitten
vanhempi
commit
75d2ba0ee8
3 muutettua tiedostoa jossa 558 lisäystä ja 22 poistoa
  1. 92 3
      scripts/run_analyzers.py
  2. 313 10
      src/databank/analytics/calibration.py
  3. 153 9
      src/databank/analytics/dixon_coles.py

+ 92 - 3
scripts/run_analyzers.py

@@ -113,6 +113,15 @@ def main() -> None:
         except (ValueError, TypeError):
             return default
 
+    def _env_int_optional(name: str) -> int | None:
+        s = os.getenv(name)
+        if not s:
+            return None
+        try:
+            return int(s)
+        except ValueError:
+            return None
+
     dc_kwargs = {
         "halflife_days": _env_float("DATABANK_DC_HALFLIFE_DAYS", 180.0),
         "rho_range": _env_rho_range("DATABANK_DC_RHO_RANGE", (-0.3, 0.3)),
@@ -122,12 +131,34 @@ def main() -> None:
     }
 
     # Optional history configuration for DC
-    history_mode = os.getenv("DATABANK_DC_HISTORY", "none").strip().lower()
-    if history_mode in {"none", "predictions", "snapshots"}:
+    history_mode = os.getenv("DATABANK_DC_HISTORY", "both").strip().lower()
+    if history_mode in {"none", "predictions", "snapshots", "both"}:
         dc_kwargs["history"] = history_mode
     dc_kwargs["snapshot_every"] = _env_int("DATABANK_DC_SNAPSHOT_EVERY", 10)
     dc_kwargs["max_iters_history"] = _env_int("DATABANK_DC_MAX_ITERS_HISTORY", 10)
     dc_kwargs["max_goals"] = _env_int("DATABANK_DC_MAX_GOALS", 8)
+    # Progress controls
+    dc_kwargs["verbose"] = os.getenv("DATABANK_DC_VERBOSE", "1").strip() not in {
+        "0",
+        "false",
+        "False",
+    }
+    dc_kwargs["progress_every"] = _env_int("DATABANK_DC_PROGRESS_EVERY", 100)
+    dc_kwargs["flush_every"] = _env_int("DATABANK_DC_FLUSH_EVERY", 1000)
+    dc_kwargs["skip_if_exists"] = os.getenv(
+        "DATABANK_DC_SKIP_IF_EXISTS", "0"
+    ).strip() not in {
+        "0",
+        "false",
+        "False",
+    }
+    # Safety clamps/tuning knobs
+    dc_kwargs["param_min"] = _env_float("DATABANK_DC_PARAM_MIN", 0.3)
+    dc_kwargs["param_max"] = _env_float("DATABANK_DC_PARAM_MAX", 3.0)
+    dc_kwargs["base_min"] = _env_float("DATABANK_DC_BASE_MIN", 0.3)
+    dc_kwargs["base_max"] = _env_float("DATABANK_DC_BASE_MAX", 3.0)
+    dc_kwargs["mu_max"] = _env_float("DATABANK_DC_MU_MAX", 6.0)
+    dc_kwargs["min_history_matches"] = _env_int("DATABANK_DC_MIN_HISTORY_MATCHES", 3)
 
     for analyzer in analyzers:
         print(f"Running analyzer: {analyzer.__class__.__name__}")
@@ -139,6 +170,53 @@ def main() -> None:
                 # Pass DC-specific kwargs from environment
                 result = analyzer.compute(transformed, db=db, persist=True, **dc_kwargs)
                 print("    DC config:", dc_kwargs)
+            elif isinstance(analyzer, calibration_cls):
+                # Calibration env knobs
+                cal_kwargs = {
+                    "source_kind": os.getenv(
+                        "DATABANK_CAL_SOURCE_KIND", "dc_predictions"
+                    ),
+                    "bins": _env_int("DATABANK_CAL_BINS", 10),
+                    # User asked higher min sample threshold, default 50
+                    "min_per_bin": _env_int("DATABANK_CAL_MIN_PER_BIN", 50),
+                    "verbose": os.getenv("DATABANK_CAL_VERBOSE", "1").strip()
+                    not in {"0", "false", "False"},
+                }
+                since_ts = _env_int_optional("DATABANK_CAL_SINCE_TS")
+                until_ts = _env_int_optional("DATABANK_CAL_UNTIL_TS")
+                if since_ts is not None:
+                    cal_kwargs["since_ts"] = since_ts
+                if until_ts is not None:
+                    cal_kwargs["until_ts"] = until_ts
+                season = os.getenv("DATABANK_CAL_SEASON")
+                if season:
+                    cal_kwargs["season_filter"] = season
+                groups_inc = os.getenv("DATABANK_CAL_GROUPS_INCLUDE")
+                if groups_inc:
+                    cal_kwargs["groups_include"] = groups_inc
+
+                result = analyzer.compute(
+                    transformed, db=db, persist=True, **cal_kwargs
+                )
+                print("    CAL config:", cal_kwargs)
+                # Console summary of calibration metrics per group
+                if isinstance(result, dict) and isinstance(result.get("metrics"), dict):
+                    metrics = result["metrics"]
+                    print("    CAL summary (per group):")
+                    for gk, m in metrics.items():
+                        try:
+                            b = float(m.get("brier", float("nan")))
+                            ll = float(m.get("logloss", float("nan")))
+                            ece = float(m.get("ece", float("nan")))
+                            n = int(m.get("n", 0))
+                            print(
+                                (
+                                    f"      - {gk}: n={n} | Brier={b:.4f}"
+                                    f" | LogLoss={ll:.4f} | ECE={ece:.4f}"
+                                )
+                            )
+                        except (ValueError, TypeError, KeyError):
+                            print(f"      - {gk}: (metrics unavailable)")
             else:
                 result = analyzer.compute(transformed, db=db, persist=True)
             analyzer.finalize(result)
@@ -168,8 +246,12 @@ def main() -> None:
                     mu = result.get("matches_used")
                     persisted = result.get("persisted")
                     print(
-                        f"    DC matches_used: {mu}; persisted docs in this run: {persisted}"
+                        "    DC matches_used: "
+                        f"{mu}; persisted docs in this run: {persisted}"
                     )
+                    gs = result.get("groups_skipped")
+                    if isinstance(gs, int) and gs > 0:
+                        print(f"    DC groups skipped (skip_if_exists): {gs}")
                 # Optional extra collections
                 preds_cnt = _safe_count(db, "dc_predictions")
                 snaps_cnt = _safe_count(db, "dc_params_history")
@@ -177,6 +259,13 @@ def main() -> None:
                     print("    DC predictions count:", preds_cnt)
                 if snaps_cnt:
                     print("    DC params history count:", snaps_cnt)
+                if isinstance(result, dict):
+                    pw = result.get("predictions_written")
+                    sw = result.get("snapshots_written")
+                    if pw is not None or sw is not None:
+                        print(
+                            f"    DC written (streaming): predictions={pw}, snapshots={sw}"
+                        )
         except NotImplementedError as exc:
             print(f" -> Skipped (not implemented): {exc}")
         except (RuntimeError, ValueError) as exc:  # pragma: no cover - diagnostics only

+ 313 - 10
src/databank/analytics/calibration.py

@@ -1,14 +1,20 @@
-"""Probability calibration analyzer skeleton.
+"""Probability calibration analyzer.
 
-Use cases:
-- Calibrate model probabilities (e.g., W/D/L) using Platt scaling or isotonic.
-- Evaluate calibration via reliability curves and ECE/MCE metrics.
+Scope in this version:
+- Evaluate calibration quality for multiclass W/D/L probabilities.
+- Compute metrics: Brier score, LogLoss, ECE (expected calibration error).
+- Produce reliability curves (per-class) with configurable bins.
+- Consume predictions from DB (default: dc_predictions) and persist results.
 """
 
 from __future__ import annotations
 
-from typing import Any
+from typing import Any, Dict, Iterable, Optional, Tuple, List, cast
+from collections import defaultdict
+import math
 
+from databank.core.models import Document
+from databank.db.base import BaseDB
 from .base import AnalyticsBase
 
 
@@ -16,13 +22,310 @@ class CalibrationAnalyzer(AnalyticsBase):
     """Calibrate predicted probabilities and evaluate calibration quality."""
 
     def compute(self, data: Any, **kwargs: Any) -> Any:
-        """Fit calibration mapping and/or apply to incoming probabilities.
+        """Compute calibration metrics and reliability curves.
 
         Args:
-            data: Iterable of predictions with observed outcomes.
-            **kwargs: method ("platt"|"isotonic"), per_league, db.
+            data: Ignored by default if source_kind uses DB collections.
+            source_kind: Predictions collection kind (default "dc_predictions").
+            group_by: "league_season" (default) or "global" (uses stored group).
+            bins: Number of bins for reliability curves and ECE (default 10).
+            min_per_bin: Minimum samples to include a bin in curves (default 20).
+            persist: Whether to persist results (default True).
+            db: Database handle when persist=True.
+            verbose: Print progress (default True).
 
         Returns:
-            Calibration model parameters and post-calibration metrics.
+            Dict with groups, metrics, and persisted counts.
         """
-        raise NotImplementedError("Calibration analyzer not implemented yet")
+
+        source_kind: str = str(kwargs.get("source_kind", "dc_predictions"))
+        # group_by reserved for future use; currently uses stored group in docs
+        # group_by: str = str(kwargs.get("group_by", "league_season"))
+        bins: int = int(kwargs.get("bins", 10))
+        min_per_bin: int = int(kwargs.get("min_per_bin", 20))
+        persist: bool = bool(kwargs.get("persist", True))
+        verbose: bool = bool(kwargs.get("verbose", True))
+        db: Optional[BaseDB] = kwargs.get("db")
+        # Optional filters
+        since_ts_raw = kwargs.get("since_ts")
+        until_ts_raw = kwargs.get("until_ts")
+        season_filter: Optional[str] = kwargs.get("season_filter")
+        groups_include_raw = kwargs.get("groups_include")
+        groups_include: Optional[List[str]] = None
+        if isinstance(groups_include_raw, str):
+            groups_include = [
+                s.strip() for s in groups_include_raw.split(",") if s.strip()
+            ]
+        elif isinstance(groups_include_raw, list):
+            groups_include = [str(x) for x in groups_include_raw]
+        try:
+            since_ts = int(since_ts_raw) if since_ts_raw is not None else None
+        except (ValueError, TypeError):
+            since_ts = None
+        try:
+            until_ts = int(until_ts_raw) if until_ts_raw is not None else None
+        except (ValueError, TypeError):
+            until_ts = None
+
+        eps = 1e-12
+
+        def _one_hot(outcome: str) -> Tuple[int, int, int]:
+            if outcome == "H":
+                return 1, 0, 0
+            if outcome == "D":
+                return 0, 1, 0
+            if outcome == "A":
+                return 0, 0, 1
+            return 0, 0, 0
+
+        def _get_outcome(h: int, a: int) -> str:
+            if h > a:
+                return "H"
+            if h == a:
+                return "D"
+            return "A"
+
+        # Load predictions from DB
+        preds: Iterable[dict]
+        if source_kind == "dc_predictions":
+            if not (persist and db):
+                # If DB not provided, fallback to data if it already consists of prediction docs
+                preds = data if isinstance(data, Iterable) else []
+            else:
+                # Access DB collection for predictions
+                db_any = cast(Any, db)
+                preds = db_any.find(
+                    "dc_predictions",
+                    projection={"_id": 0},
+                    limit=None,
+                )
+        else:
+            preds = data if isinstance(data, Iterable) else []
+
+        # Group predictions by stored group key (e.g., "lid::season").
+        groups: Dict[str, List[dict]] = defaultdict(list)
+        count_in = 0
+        for p in preds or []:
+            if not isinstance(p, dict):
+                continue
+            gk = p.get("group") or "global"
+            # Required fields: p_home/p_draw/p_away and observed scores
+            try:
+                ph = float(p.get("p_home", 0.0))
+                pd = float(p.get("p_draw", 0.0))
+                pa = float(p.get("p_away", 0.0))
+            except (ValueError, TypeError):
+                continue
+            oh_raw = p.get("observed_h")
+            oa_raw = p.get("observed_a")
+            if oh_raw is None or oa_raw is None:
+                continue
+            try:
+                oh = int(oh_raw)
+                oa = int(oa_raw)
+            except (ValueError, TypeError):
+                continue
+            # Optional filters: time window and season/group
+            cut_ts = p.get("cutoff_ts")
+            try:
+                cut_ts = int(cut_ts) if cut_ts is not None else None
+            except (ValueError, TypeError):
+                cut_ts = None
+            if since_ts is not None and (cut_ts is None or cut_ts < since_ts):
+                continue
+            if until_ts is not None and (cut_ts is None or cut_ts >= until_ts):
+                continue
+            if season_filter and isinstance(gk, str):
+                # allow simple match on season suffix in group e.g. 'lid::2024-2025'
+                if season_filter not in gk:
+                    continue
+            if groups_include is not None and str(gk) not in groups_include:
+                continue
+            # Normalize probabilities to sum=1
+            s = ph + pd + pa
+            if s <= 0:
+                continue
+            ph, pd, pa = ph / s, pd / s, pa / s
+            outcome = _get_outcome(oh, oa)
+            groups[str(gk)].append(
+                {
+                    "p": (ph, pd, pa),
+                    "outcome": outcome,
+                }
+            )
+            count_in += 1
+
+        if verbose:
+            print(
+                f"[CAL] Loaded predictions: {count_in}; groups: {len(groups)}",
+                flush=True,
+            )
+
+        # Metrics per group
+        docs: List[Document] = []
+        results: Dict[str, dict] = {}
+        num_bins = max(2, int(bins))
+
+        def _bin_index(p: float) -> int:
+            p = min(1.0 - 1e-12, max(0.0, float(p)))
+            return min(num_bins - 1, int(p * num_bins))
+
+        for gk, rows in groups.items():
+            if not rows:
+                continue
+
+            n_samples = float(len(rows))
+            # Overall ECE via confidence bins of max probability
+            conf_sum = [0.0 for _ in range(num_bins)]
+            acc_sum = [0.0 for _ in range(num_bins)]
+            cnt = [0 for _ in range(num_bins)]
+
+            # Brier and LogLoss
+            brier_sum = 0.0
+            logloss_sum = 0.0
+
+            # Per-class reliability bins
+            cls_bins = {
+                "H": {
+                    "p_sum": [0.0] * num_bins,
+                    "y_sum": [0.0] * num_bins,
+                    "cnt": [0] * num_bins,
+                },
+                "D": {
+                    "p_sum": [0.0] * num_bins,
+                    "y_sum": [0.0] * num_bins,
+                    "cnt": [0] * num_bins,
+                },
+                "A": {
+                    "p_sum": [0.0] * num_bins,
+                    "y_sum": [0.0] * num_bins,
+                    "cnt": [0] * num_bins,
+                },
+            }
+
+            for r in rows:
+                ph, pd, pa = r["p"]
+                oh, od, oa = _one_hot(r["outcome"])
+                # overall
+                conf = max(ph, pd, pa)
+                pred_label = (
+                    "H" if ph >= pd and ph >= pa else ("D" if pd >= pa else "A")
+                )
+                correct = (
+                    1.0
+                    if (
+                        (pred_label == "H" and oh == 1)
+                        or (pred_label == "D" and od == 1)
+                        or (pred_label == "A" and oa == 1)
+                    )
+                    else 0.0
+                )
+                bi = _bin_index(conf)
+                conf_sum[bi] += conf
+                acc_sum[bi] += correct
+                cnt[bi] += 1
+
+                # brier (multiclass)
+                brier_sum += (ph - oh) ** 2 + (pd - od) ** 2 + (pa - oa) ** 2
+
+                # logloss
+                p_true = max(eps, ph if oh else (pd if od else pa))
+                logloss_sum += -math.log(p_true)
+
+                # per-class bins
+                for lab, p_val, y_val in (
+                    ("H", ph, oh),
+                    ("D", pd, od),
+                    ("A", pa, oa),
+                ):
+                    bi2 = _bin_index(p_val)
+                    cls_bins[lab]["p_sum"][bi2] += p_val
+                    cls_bins[lab]["y_sum"][bi2] += y_val
+                    cls_bins[lab]["cnt"][bi2] += 1
+
+            # Compute ECE
+            ece = 0.0
+            bins_out: List[dict] = []
+            for i in range(num_bins):
+                if cnt[i] == 0:
+                    continue
+                conf_avg = conf_sum[i] / cnt[i]
+                acc_avg = acc_sum[i] / cnt[i]
+                w = cnt[i] / n_samples
+                ece += w * abs(acc_avg - conf_avg)
+
+            # Build per-class reliability curves
+            for lab in ("H", "D", "A"):
+                p_sum = cls_bins[lab]["p_sum"]
+                y_sum = cls_bins[lab]["y_sum"]
+                c_arr = cls_bins[lab]["cnt"]
+                for i in range(num_bins):
+                    c = c_arr[i]
+                    if c < min_per_bin:
+                        continue
+                    p_avg = p_sum[i] / c
+                    y_avg = y_sum[i] / c
+                    bins_out.append(
+                        {
+                            "group": gk,
+                            "class": lab,
+                            "bin": i,
+                            "bins": num_bins,
+                            "p_low": i / num_bins,
+                            "p_high": (i + 1) / num_bins,
+                            "avg_p": p_avg,
+                            "emp_rate": y_avg,
+                            "count": c,
+                            "source": source_kind,
+                        }
+                    )
+
+            # Compose metrics
+            metrics = {
+                "group": gk,
+                "n": int(n_samples),
+                "brier": brier_sum / n_samples,
+                "logloss": logloss_sum / n_samples,
+                "ece": ece,
+                "bins": num_bins,
+                "min_per_bin": int(min_per_bin),
+                "source": source_kind,
+            }
+            results[gk] = metrics
+
+            if persist and db:
+                docs.append(
+                    Document(
+                        id=f"{gk}:{source_kind}:metrics:{num_bins}",
+                        kind="calibration_metrics",
+                        data=metrics,
+                    )
+                )
+                for b in bins_out:
+                    cls_label = b["class"]
+                    bin_idx = b["bin"]
+                    doc_id = f"{gk}:{source_kind}:bins:{cls_label}:{bin_idx}:{num_bins}"
+                    docs.append(
+                        Document(
+                            id=doc_id,
+                            kind="calibration_bins",
+                            data=b,
+                        )
+                    )
+
+        if persist and db and docs:
+            db.insert_many(docs)
+            if verbose:
+                metrics_cnt = sum(1 for d in docs if d.kind == "calibration_metrics")
+                bins_cnt = sum(1 for d in docs if d.kind == "calibration_bins")
+                print(
+                    (f"[CAL] Persisted metrics={metrics_cnt} " f"bins={bins_cnt}"),
+                    flush=True,
+                )
+
+        return {
+            "groups": list(results.keys()),
+            "metrics": results,
+            "persisted": len(docs) if (persist and db) else 0,
+            "source": source_kind,
+        }

+ 153 - 9
src/databank/analytics/dixon_coles.py

@@ -9,7 +9,7 @@ Purpose:
 
 from __future__ import annotations
 
-from typing import Any, Dict, Iterable, Optional, Tuple
+from typing import Any, Dict, Iterable, Optional, Tuple, cast
 from collections import defaultdict
 import re
 import math
@@ -43,9 +43,10 @@ class DixonColesAnalyzer(AnalyticsBase):
             l2_base: L2 regularization on log base means (default 0.0).
             step_size: Learning rate for "coord" optimizer (default 0.1).
             outer_iters: Max outer iterations for "coord" (default 10).
-            history: "none" (default), "predictions" (per-match), or "snapshots".
+            history: "both" (default), "predictions" (per-match), "snapshots", or "none".
                 - predictions: persist pre-match H/D/A probs and mu/nu for each match.
                 - snapshots: persist team parameter snapshots at cutoffs.
+                - both: do both predictions and snapshots in a single pass.
             snapshot_every: For snapshots, persist every Nth match (default 10).
             max_iters_history: Iterations to use for history fits (default max_iters//2).
             max_goals: Max goals cap for probability table (default 8).
@@ -67,7 +68,7 @@ class DixonColesAnalyzer(AnalyticsBase):
         l2_base: float = float(kwargs.get("l2_base", 0.0))
         step_size: float = float(kwargs.get("step_size", 0.1))
         outer_iters: int = int(kwargs.get("outer_iters", 10))
-        history: str = str(kwargs.get("history", "none"))
+        history: str = str(kwargs.get("history", "both")).lower()
         snapshot_every: int = int(kwargs.get("snapshot_every", 10))
         max_iters_history: Optional[int] = kwargs.get("max_iters_history")
         if max_iters_history is not None:
@@ -76,6 +77,19 @@ class DixonColesAnalyzer(AnalyticsBase):
             except (ValueError, TypeError):
                 max_iters_history = None
         max_goals: int = int(kwargs.get("max_goals", 8))
+        # Progress / flushing controls
+        verbose: bool = bool(kwargs.get("verbose", True))
+        progress_every: int = int(kwargs.get("progress_every", 100))
+        flush_every: int = int(kwargs.get("flush_every", 1000))
+        # Skip switch: avoid recomputing groups already up-to-date
+        skip_if_exists: bool = bool(kwargs.get("skip_if_exists", False))
+        # Safety clamps to avoid extreme values in history
+        param_min: float = float(kwargs.get("param_min", 0.3))
+        param_max: float = float(kwargs.get("param_max", 3.0))
+        base_min: float = float(kwargs.get("base_min", 0.3))
+        base_max: float = float(kwargs.get("base_max", 3.0))
+        mu_max_cap: float = float(kwargs.get("mu_max", 6.0))
+        min_history_matches: int = int(kwargs.get("min_history_matches", 3))
 
         # Helpers
         def _get_ts(match: dict) -> int:
@@ -266,6 +280,20 @@ class DixonColesAnalyzer(AnalyticsBase):
                 for k in def_a:
                     def_a[k] /= gm
 
+            def _clamp_params() -> None:
+                # Clamp parameters to reasonable ranges
+                nonlocal base_h, base_a
+                base_h = max(base_min, min(base_max, float(base_h)))
+                base_a = max(base_min, min(base_max, float(base_a)))
+                for k in att_h:
+                    att_h[k] = max(param_min, min(param_max, float(att_h[k])))
+                for k in att_a:
+                    att_a[k] = max(param_min, min(param_max, float(att_a[k])))
+                for k in def_h:
+                    def_h[k] = max(param_min, min(param_max, float(def_h[k])))
+                for k in def_a:
+                    def_a[k] = max(param_min, min(param_max, float(def_a[k])))
+
             def _expected(r: dict) -> tuple[float, float]:
                 mu = base_h * att_h[r["home"]] * def_a[r["away"]]
                 nu = base_a * att_a[r["away"]] * def_h[r["home"]]
@@ -297,9 +325,15 @@ class DixonColesAnalyzer(AnalyticsBase):
                             den += r["w"] * mu
                         if den > 0:
                             factor = num / den
+                            # Dampen extreme single-step updates
+                            if factor < 0.5:
+                                factor = 0.5
+                            elif factor > 2.0:
+                                factor = 2.0
                             delta = max(delta, abs(1 - factor))
                             att_h[t] *= factor
                     _normalize()
+                    _clamp_params()
 
                     # Update attack_away
                     for t in teams:
@@ -313,9 +347,14 @@ class DixonColesAnalyzer(AnalyticsBase):
                             den += r["w"] * nu
                         if den > 0:
                             factor = num / den
+                            if factor < 0.5:
+                                factor = 0.5
+                            elif factor > 2.0:
+                                factor = 2.0
                             delta = max(delta, abs(1 - factor))
                             att_a[t] *= factor
                     _normalize()
+                    _clamp_params()
 
                     # Update defense_away (affects mu)
                     for t in teams:
@@ -329,9 +368,14 @@ class DixonColesAnalyzer(AnalyticsBase):
                             den += r["w"] * mu
                         if den > 0:
                             factor = num / den
+                            if factor < 0.5:
+                                factor = 0.5
+                            elif factor > 2.0:
+                                factor = 2.0
                             delta = max(delta, abs(1 - factor))
                             def_a[t] *= factor
                     _normalize()
+                    _clamp_params()
 
                     # Update defense_home (affects nu)
                     for t in teams:
@@ -345,9 +389,14 @@ class DixonColesAnalyzer(AnalyticsBase):
                             den += r["w"] * nu
                         if den > 0:
                             factor = num / den
+                            if factor < 0.5:
+                                factor = 0.5
+                            elif factor > 2.0:
+                                factor = 2.0
                             delta = max(delta, abs(1 - factor))
                             def_h[t] *= factor
                     _normalize()
+                    _clamp_params()
 
                     if delta < tol:
                         break
@@ -362,7 +411,7 @@ class DixonColesAnalyzer(AnalyticsBase):
                 log_base_a = math.log(max(1e-9, base_a))
 
                 def _sync_from_logs() -> None:
-                    nonlocal base_h, base_a
+                    nonlocal base_h, base_a, log_base_h, log_base_a
                     for t in teams:
                         att_h[t] = math.exp(log_att_h[t])
                         att_a[t] = math.exp(log_att_a[t])
@@ -370,6 +419,15 @@ class DixonColesAnalyzer(AnalyticsBase):
                         def_a[t] = math.exp(log_def_a[t])
                     base_h = math.exp(log_base_h)
                     base_a = math.exp(log_base_a)
+                    _clamp_params()
+                    # Reflect clamps back to logs
+                    for t in teams:
+                        log_att_h[t] = math.log(att_h[t])
+                        log_att_a[t] = math.log(att_a[t])
+                        log_def_h[t] = math.log(def_h[t])
+                        log_def_a[t] = math.log(def_a[t])
+                    log_base_h = math.log(base_h)
+                    log_base_a = math.log(base_a)
 
                 def _center_logs() -> None:
                     # Enforce identifiability: mean of logs = 0 per block
@@ -512,7 +570,67 @@ class DixonColesAnalyzer(AnalyticsBase):
         all_params: Dict[str, Dict[str, Dict[str, float]]] = {}
         docs: list[Document] = []
         total_matches = 0
+        preds_written = 0
+        snaps_written = 0
+        skipped_groups = 0
+
+        def _maybe_flush(reason: str = "") -> None:
+            nonlocal docs
+            if persist and db and docs and len(docs) >= max(1, flush_every):
+                try:
+                    db.insert_many(docs)
+                    if verbose:
+                        print(
+                            f"[DC] Flushed {len(docs)} docs to DB"
+                            + (f" ({reason})" if reason else ""),
+                            flush=True,
+                        )
+                finally:
+                    docs = []
+
+        if verbose:
+            print(f"[DC] Groups to fit: {len(groups)}", flush=True)
+
         for gk, rows in groups.items():
+            # Optional: skip group if summary doc exists with same match count
+            if skip_if_exists and persist and db:
+                try:
+                    db_any = cast(Any, db)
+                    existing = None
+                    docs_iter = db_any.find(
+                        "dc_params", projection={"_id": 0}, limit=None
+                    )
+                    if docs_iter is not None:
+                        for d in docs_iter:
+                            if not isinstance(d, dict):
+                                continue
+                            if d.get("group") == gk and d.get("summary") is True:
+                                existing = d
+                                break
+                    if isinstance(existing, dict):
+                        try:
+                            prev_matches = int(existing.get("matches", -1))
+                        except (ValueError, TypeError):
+                            prev_matches = -1
+                        if prev_matches == len(rows):
+                            if verbose:
+                                print(
+                                    (
+                                        f"[DC] Skip group '{gk}' "
+                                        f"(skip_if_exists; matches={prev_matches})"
+                                    ),
+                                    flush=True,
+                                )
+                            skipped_groups += 1
+                            continue
+                except (RuntimeError, ValueError, TypeError):
+                    # Diagnostics only: if existence check fails, proceed normally
+                    pass
+            if verbose:
+                print(
+                    f"[DC] Processing group '{gk}' with {len(rows)} matches",
+                    flush=True,
+                )
             p, stats = _fit_group(rows)
             all_params[gk] = p
             total_matches += int(stats.get("matches", 0))
@@ -529,6 +647,7 @@ class DixonColesAnalyzer(AnalyticsBase):
                             },
                         )
                     )
+                    _maybe_flush("dc_params")
                 # Optionally also persist a group-level summary doc
                 docs.append(
                     Document(
@@ -545,14 +664,18 @@ class DixonColesAnalyzer(AnalyticsBase):
                         },
                     )
                 )
+                _maybe_flush("dc_params_summary")
 
-            # Optional history: pre-match predictions or parameter snapshots
-            if history in ("predictions", "snapshots") and persist and db:
+            # Optional history: pre-match predictions and/or parameter snapshots
+            do_preds = history in ("predictions", "both")
+            do_snaps = history in ("snapshots", "both")
+            if (do_preds or do_snaps) and persist and db:
                 # Helper for DC-phi adjusted joint probabilities
                 def _probs_hda(
                     mu: float, nu: float, rho: float
                 ) -> tuple[float, float, float]:
                     def _pois(k: int, lam: float) -> float:
+                        lam = max(1e-9, float(lam))
                         return math.exp(k * math.log(lam) - lam - math.lgamma(k + 1))
 
                     # Use same phi as in likelihood
@@ -606,7 +729,7 @@ class DixonColesAnalyzer(AnalyticsBase):
 
                 for i, r in enumerate(rows_sorted):
                     prior = rows_sorted[:i]
-                    if not prior:
+                    if not prior or len(prior) < max(0, int(min_history_matches)):
                         continue
                     p_h, st_h = _fit_group(
                         prior,
@@ -642,8 +765,10 @@ class DixonColesAnalyzer(AnalyticsBase):
                         * att_a_map.get(r["away"], 1.0)
                         * def_h_map.get(r["home"], 1.0)
                     )
+                    mu = max(1e-9, min(mu_max_cap, float(mu)))
+                    nu = max(1e-9, min(mu_max_cap, float(nu)))
 
-                    if history == "predictions":
+                    if do_preds:
                         ph, pd, pa = _probs_hda(mu, nu, rho_h)
                         docs.append(
                             Document(
@@ -668,8 +793,10 @@ class DixonColesAnalyzer(AnalyticsBase):
                                 },
                             )
                         )
+                        preds_written += 1
+                        _maybe_flush("dc_predictions")
 
-                    if history == "snapshots" and (i % max(1, snapshot_every) == 0):
+                    if do_snaps and (i % max(1, snapshot_every) == 0):
                         cut_ts = int(r["ts"])
                         for team_id, vals in p_h.items():
                             docs.append(
@@ -700,13 +827,30 @@ class DixonColesAnalyzer(AnalyticsBase):
                                     },
                                 )
                             )
+                            snaps_written += 1
+                            _maybe_flush("dc_params_history")
+
+                    if verbose and (i + 1) % max(1, progress_every) == 0:
+                        print(
+                            (
+                                f"[DC] Group '{gk}' history progress: "
+                                f"{i + 1}/{len(rows_sorted)} "
+                                f"(preds={preds_written}, snaps={snaps_written})"
+                            ),
+                            flush=True,
+                        )
 
         if persist and db and docs:
             db.insert_many(docs)
+            if verbose:
+                print(f"[DC] Final flush {len(docs)} docs", flush=True)
 
         return {
             "groups": list(all_params.keys()),
             "params": all_params,
             "matches_used": total_matches,
             "persisted": len(docs) if docs else 0,
+            "predictions_written": preds_written,
+            "snapshots_written": snaps_written,
+            "groups_skipped": skipped_groups,
         }