ソースを参照

添加分析器骨架,包括团队提取、Elo、Dixon-Coles、H2H、马尔可夫链、蒙特卡洛和强度分析器;实现基本结构和文档,支持未来的功能扩展和数据处理。

admin 2 ヶ月 前
コミット
103c20cc6b

+ 66 - 0
scripts/run_analyzers.py

@@ -0,0 +1,66 @@
+"""Run selected analyzers over existing DB data or last run results.
+
+This is a scaffold to wire analyzers end-to-end once implementations are ready.
+"""
+
+from __future__ import annotations
+
+import os
+from typing import Iterable, List
+
+from databank.db import MongoDB
+from databank.analytics.base import AnalyticsBase
+from databank.analytics.teams import TeamExtractorAnalyzer
+from databank.analytics.elo import EloAnalyzer
+from databank.analytics.dixon_coles import DixonColesAnalyzer
+from databank.analytics.markov_chain import MarkovChainAnalyzer
+from databank.analytics.h2h import H2HAnalyzer
+from databank.analytics.calibration import CalibrationAnalyzer
+from databank.analytics.monte_carlo import SeasonMonteCarloAnalyzer
+from databank.analytics.sos import StrengthOfScheduleAnalyzer
+
+
+def load_data(db: MongoDB, limit: int | None = 2000) -> List[dict]:
+    """Load match documents from DB as analyzer inputs (scaffold)."""
+    # NOTE: Adjust projection to include what analyzers need.
+    return db.find("match", projection={"_id": 0}, limit=limit)
+
+
+def main() -> None:
+    """Build analyzers and run them on existing data (skeleton)."""
+    uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
+    name = os.getenv("DATABANK_DB_NAME", "databank")
+
+    db = MongoDB(uri=uri, name=name)
+    db.connect()
+
+    data = load_data(db)
+
+    analyzers: Iterable[AnalyticsBase] = [
+        TeamExtractorAnalyzer(),
+        EloAnalyzer(),
+        DixonColesAnalyzer(),
+        MarkovChainAnalyzer(),
+        H2HAnalyzer(),
+        CalibrationAnalyzer(),
+        StrengthOfScheduleAnalyzer(),
+        SeasonMonteCarloAnalyzer(),
+    ]
+
+    for analyzer in analyzers:
+        print(f"Running analyzer: {analyzer.__class__.__name__}")
+        try:
+            analyzer.prepare(data)
+            analyzer.validate(data)
+            transformed = analyzer.transform(data)
+            result = analyzer.compute(transformed, db=db, persist=True)
+            analyzer.finalize(result)
+            print(f" -> Done: {analyzer.__class__.__name__}")
+        except NotImplementedError as exc:
+            print(f" -> Skipped (not implemented): {exc}")
+        except (RuntimeError, ValueError) as exc:  # pragma: no cover - diagnostics only
+            print(f" -> Error: {type(exc).__name__}: {exc}")
+
+
+if __name__ == "__main__":
+    main()

+ 28 - 0
src/databank/analytics/calibration.py

@@ -0,0 +1,28 @@
+"""Probability calibration analyzer skeleton.
+
+Use cases:
+- Calibrate model probabilities (e.g., W/D/L) using Platt scaling or isotonic.
+- Evaluate calibration via reliability curves and ECE/MCE metrics.
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class CalibrationAnalyzer(AnalyticsBase):
+    """Calibrate predicted probabilities and evaluate calibration quality."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Fit calibration mapping and/or apply to incoming probabilities.
+
+        Args:
+            data: Iterable of predictions with observed outcomes.
+            **kwargs: method ("platt"|"isotonic"), per_league, db.
+
+        Returns:
+            Calibration model parameters and post-calibration metrics.
+        """
+        raise NotImplementedError("Calibration analyzer not implemented yet")

+ 35 - 0
src/databank/analytics/dixon_coles.py

@@ -0,0 +1,35 @@
+"""Dixon–Coles (DC) model analyzer skeleton for football scores.
+
+Purpose:
+- Estimate attacking/defending strengths and home advantage via Poisson/DC likelihood.
+- Produce scoreline distribution and W/D/L probabilities per match.
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class DixonColesAnalyzer(AnalyticsBase):
+    """Estimate Poisson/DC parameters and infer probabilities.
+
+    This is a scaffold. Implementation steps usually include:
+    - Build likelihood over historical matches with time decay.
+    - Optimize attack/defense params per team and a home advantage term.
+    - Apply DC correlation adjustment for low-score outcomes.
+    - Infer scoreline and aggregate to W/D/L probabilities.
+    """
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Fit/estimate DC parameters and produce probabilities.
+
+        Args:
+                data: Iterable of match-like docs.
+                **kwargs: decay, regularization, max_goals, db, preview_only.
+
+        Returns:
+                A dict with parameters and/or per-match probability outputs.
+        """
+        raise NotImplementedError("Dixon–Coles model not implemented yet")

+ 227 - 0
src/databank/analytics/elo.py

@@ -0,0 +1,227 @@
+"""Elo rating analyzer skeleton for football.
+
+Features considered (to implement):
+- Base K, goal-difference scaling, home advantage offset, time decay, season reset.
+- Probability mapping and optional calibration as a downstream step.
+"""
+
+from __future__ import annotations
+
+from typing import Any, Iterable, Dict, Tuple, Optional
+import math
+import re
+from datetime import datetime, timezone
+
+from databank.core.models import Document
+from databank.db.base import BaseDB
+
+from .base import AnalyticsBase
+
+
+class EloAnalyzer(AnalyticsBase):
+    """Compute/update team Elo ratings from match history.
+
+    Expected input data:
+    - Match documents in chronological order (or provide a sorter key via kwargs).
+
+    Output:
+    - Upsert ratings into `ratings_history` collection and/or return in-memory dict.
+    - Optionally return per-match expected probabilities.
+    """
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:  # noqa: D401
+        """Compute Elo ratings.
+
+        Args:
+            data: Iterable of match-like docs.
+            **kwargs: Optional config: base_k, home_adv, decay, season_reset, db.
+
+        Returns:
+            Ratings snapshot and/or per-match expectations.
+        """
+        # Config
+        base_k: float = float(kwargs.get("base_k", 20.0))
+        home_adv: float = float(kwargs.get("home_adv", 60.0))
+        persist: bool = bool(kwargs.get("persist", True))
+        return_expectations: bool = bool(kwargs.get("return_expectations", False))
+        db: Optional[BaseDB] = kwargs.get("db")
+
+        # Helpers
+        def _norm(name: str) -> str:
+            s = name.strip().lower()
+            s = re.sub(r"[\s\-_.]+", " ", s)
+            s = re.sub(r"[^0-9a-z\u4e00-\u9fff ]+", "", s)
+            return s.strip()
+
+        def _get_team(match: dict, side: str) -> Tuple[Optional[str], Optional[str]]:
+            # returns (team_id, team_name)
+            id_val = match.get(f"{side}TeamId") or match.get(f"{side}Id")
+            name_val = (
+                match.get(f"{side}TeamName")
+                or match.get(f"{side}Name")
+                or (match.get("homeName") if side == "home" else match.get("awayName"))
+            )
+            team_id = str(id_val) if id_val is not None else None
+            if team_id is None and isinstance(name_val, str):
+                team_id = _norm(name_val)
+            return team_id, name_val
+
+        def _get_score(match: dict, side: str) -> Optional[int]:
+            keys = [
+                f"{side}Score",
+                f"{side}Goals",
+                f"{side}Goal",
+                f"{ 'hostScore' if side=='home' else 'guestScore'}",
+            ]
+            for k in keys:
+                if k in match and match[k] is not None:
+                    try:
+                        return int(match[k])
+                    except (ValueError, TypeError):
+                        continue
+            return None
+
+        def _get_ts(match: dict) -> float:
+            # Try common timestamp fields (ms or s)
+            for k in ("matchTime", "startTime", "time", "date", "matchDate"):
+                if k in match and match[k] is not None:
+                    v = match[k]
+                    try:
+                        if isinstance(v, (int, float)):
+                            ts = float(v)
+                        elif isinstance(v, str) and v.isdigit():
+                            ts = float(v)
+                        else:
+                            # try parse ISO-like string
+                            dt = datetime.fromisoformat(str(v).replace("Z", "+00:00"))
+                            return dt.replace(
+                                tzinfo=dt.tzinfo or timezone.utc
+                            ).timestamp()
+                        # Heuristic: treat large as ms
+                        if ts > 10_000_000_000:  # > ~2001 in ms scale
+                            ts = ts / 1000.0
+                        return ts
+                    except (ValueError, TypeError):
+                        continue
+            return 0.0
+
+        def _finished(match: dict) -> bool:
+            status = match.get("elapsedTime") or match.get("status")
+            if isinstance(status, str) and (
+                "已完场" in status or status.lower() in {"ft", "finished", "end"}
+            ):
+                return True
+            # If scores exist, assume finished
+            hs = _get_score(match, "home")
+            as_ = _get_score(match, "away")
+            return hs is not None and as_ is not None
+
+        # Normalize input
+        items = list(data) if isinstance(data, Iterable) else []
+        matches = []
+        for d in items:
+            m = None
+            if isinstance(d, dict):
+                m = d.get("match") or d.get("data", {}).get("match")
+            if not isinstance(m, dict):
+                continue
+            if not _finished(m):
+                continue
+            ts = _get_ts(m)
+            mid = m.get("matchId") or m.get("id")
+            h_id, _ = _get_team(m, "home")
+            a_id, _ = _get_team(m, "away")
+            hs = _get_score(m, "home")
+            as_ = _get_score(m, "away")
+            if not h_id or not a_id or hs is None or as_ is None:
+                continue
+            matches.append(
+                {
+                    "ts": ts,
+                    "match_id": mid,
+                    "home": h_id,
+                    "away": a_id,
+                    "hs": hs,
+                    "as": as_,
+                }
+            )
+
+        matches.sort(key=lambda x: x["ts"])
+
+        ratings: Dict[str, float] = {}
+        expectations = []
+
+        def _expected(r_home: float, r_away: float) -> float:
+            delta = (r_home + home_adv) - r_away
+            return 1.0 / (1.0 + math.pow(10.0, -delta / 400.0))
+
+        history_docs: list[Document] = []
+        for rec in matches:
+            h = rec["home"]
+            a = rec["away"]
+            hs = rec["hs"]
+            as_ = rec["as"]
+            ts = rec["ts"]
+            mid = rec.get("match_id")
+            rh = ratings.get(h, 1500.0)
+            ra = ratings.get(a, 1500.0)
+            eh = _expected(rh, ra)
+            # actual score
+            if hs > as_:
+                sh = 1.0
+            elif hs == as_:
+                sh = 0.5
+            else:
+                sh = 0.0
+            gd = abs(hs - as_)
+            k = base_k * (1.0 + min(2.0, float(gd)) * 0.5)
+            change = k * (sh - eh)
+            rh2 = rh + change
+            ra2 = ra - change
+            ratings[h] = rh2
+            ratings[a] = ra2
+            # history for this match (no id to allow multiple entries)
+            history_docs.append(
+                Document(
+                    id=None,
+                    kind="ratings_history",
+                    data={"team_id": h, "rating": rh2, "ts": ts, "match_id": mid},
+                )
+            )
+            history_docs.append(
+                Document(
+                    id=None,
+                    kind="ratings_history",
+                    data={"team_id": a, "rating": ra2, "ts": ts, "match_id": mid},
+                )
+            )
+            if return_expectations:
+                expectations.append(
+                    {
+                        "home": h,
+                        "away": a,
+                        "p_home": eh,
+                        "result": sh,
+                        "gd": gd,
+                    }
+                )
+
+        result: Dict[str, Any] = {"ratings": ratings, "processed": len(matches)}
+        if return_expectations:
+            result["expectations"] = expectations
+
+        if persist and db:
+            # snapshot current ratings
+            if ratings:
+                docs = [
+                    Document(
+                        id=tid, kind="elo_ratings", data={"team_id": tid, "rating": r}
+                    )
+                    for tid, r in ratings.items()
+                ]
+                db.insert_many(docs)
+            # append history
+            if history_docs:
+                db.insert_many(history_docs)
+
+        return result

+ 28 - 0
src/databank/analytics/h2h.py

@@ -0,0 +1,28 @@
+"""Head-to-head (H2H) analyzer skeleton.
+
+Idea:
+- Treat each pair of teams as a mini-league; compute pairwise stats and optional
+  pairwise rating (e.g., BTL or Elo dedicated to the pair).
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class H2HAnalyzer(AnalyticsBase):
+    """Compute head-to-head summaries and pairwise ratings."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Compute H2H aggregates and, optionally, pairwise ratings.
+
+        Args:
+            data: Iterable of match-like docs.
+            **kwargs: recent_window, separate_home_away, db.
+
+        Returns:
+            A structure keyed by (team_a, team_b) with stats and optional rating.
+        """
+        raise NotImplementedError("H2H analyzer not implemented yet")

+ 28 - 0
src/databank/analytics/markov_chain.py

@@ -0,0 +1,28 @@
+"""Markov-chain analyzer skeleton for per-team W/D/L state transitions.
+
+Goal:
+- Estimate team-specific transition probabilities over time windows and compute
+  finite-step or stationary (long-run) distributions.
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class MarkovChainAnalyzer(AnalyticsBase):
+    """Compute team Markov transition matrices and steady-state distributions."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Estimate transition matrices and distributions.
+
+        Args:
+            data: Iterable of match-like docs, grouped by team and ordered by time.
+            **kwargs: window, decay, smoothing, db.
+
+        Returns:
+            Dict keyed by team_id with matrices and distributions.
+        """
+        raise NotImplementedError("Markov-chain analyzer not implemented yet")

+ 28 - 0
src/databank/analytics/monte_carlo.py

@@ -0,0 +1,28 @@
+"""Season Monte Carlo simulator skeleton.
+
+Goal:
+- Use per-match W/D/L probabilities to simulate remaining fixtures and estimate
+  ranking/points distributions (e.g., title/European/relegation probabilities).
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class SeasonMonteCarloAnalyzer(AnalyticsBase):
+    """Simulate remaining season outcomes with Monte Carlo runs."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Run Monte Carlo simulations using provided match probabilities.
+
+        Args:
+            data: Remaining fixtures and probability inputs.
+            **kwargs: n_sims, tie_breakers, db.
+
+        Returns:
+            Aggregated standings probabilities and diagnostics.
+        """
+        raise NotImplementedError("Monte Carlo analyzer not implemented yet")

+ 27 - 0
src/databank/analytics/sos.py

@@ -0,0 +1,27 @@
+"""Strength of schedule (SoS) analyzer skeleton.
+
+Compute schedule difficulty based on opponent ratings (Elo/TrueSkill/etc.).
+Provide adjusted metrics to neutralize schedule effects.
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class StrengthOfScheduleAnalyzer(AnalyticsBase):
+    """Compute schedule difficulty and adjusted team metrics."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Compute SoS using opponent ratings and return adjusted metrics.
+
+        Args:
+                data: Matches or team fixtures with ratings.
+                **kwargs: rating_source (elo/trueskill), window, db.
+
+        Returns:
+                A per-team SoS value and optionally adjusted metrics.
+        """
+        raise NotImplementedError("SoS analyzer not implemented yet")

+ 196 - 0
src/databank/analytics/teams.py

@@ -0,0 +1,196 @@
+"""Team extraction and normalization analyzer skeleton.
+
+Purpose:
+- Scan matches, extract home/away team names, normalize to canonical team records.
+- Maintain a teams collection (team_id, name_canonical, aliases, metadata).
+
+Notes:
+- This is a scaffold only; implement normalization and DB writes in compute().
+"""
+
+from __future__ import annotations
+
+from typing import Any, Iterable, Dict, Set
+import re
+
+from databank.core.models import Document
+from databank.db.base import BaseDB
+
+from .base import AnalyticsBase
+
+
+class TeamExtractorAnalyzer(AnalyticsBase):
+    """Extract and normalize teams from match documents.
+
+    Expected input data:
+    - An iterable of match-like documents (e.g., runner.last_docs or DB query results).
+      Each item is expected to have a dict-like attribute `.data` with a nested `match` dict.
+
+    Output:
+    - Should update a `teams` collection in DB (to be implemented by you).
+    - Return a summary dict: {"inserted": int, "updated": int, "seen": int}.
+    """
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:
+        """Compute team extraction and normalization.
+
+        Args:
+            data: Iterable of documents containing match payloads.
+            **kwargs: Optional parameters, e.g., db, dry_run, preview_limit.
+
+        Returns:
+            A summary dict of changes or a preview list in dry-run.
+        """
+        db: BaseDB | None = kwargs.get("db")  # optional DB for persistence
+        dry_run: bool = bool(kwargs.get("dry_run", False))
+
+        def _norm(name: str) -> str:
+            s = name.strip().lower()
+            s = re.sub(r"[\s\-_.]+", " ", s)
+            s = re.sub(r"[^0-9a-z\u4e00-\u9fff ]+", "", s)
+            return s.strip()
+
+        def _extract_team(
+            obj: dict, prefix_candidates: Iterable[str]
+        ) -> tuple[str | None, str | None]:
+            # Try to find team name and id with multiple possible keys
+            name_keys = [
+                "TeamName",
+                "teamName",
+                "name",
+                "team_name",
+                "homeTeamName",
+                "awayTeamName",
+                "homeName",
+                "awayName",
+            ]
+            id_keys = [
+                "TeamId",
+                "teamId",
+                "id",
+                "team_id",
+                "homeTeamId",
+                "awayTeamId",
+                "homeId",
+                "awayId",
+            ]
+            name_val = None
+            id_val = None
+            for pref in prefix_candidates:
+                for nk in name_keys:
+                    key = pref + nk if pref else nk
+                    if key in obj and isinstance(obj[key], str):
+                        name_val = obj[key]
+                        break
+                for ik in id_keys:
+                    key = pref + ik if pref else ik
+                    if key in obj and obj[key] is not None:
+                        id_val = str(obj[key])
+                        break
+            return name_val, id_val
+
+        # Collect canonical teams
+        teams: Dict[str, Dict[str, Any]] = {}
+        aliases_map: Dict[str, Set[str]] = {}
+
+        items: list[Any] = list(data) if isinstance(data, Iterable) else []
+        for d in items:
+            # DB record style: dict with 'match' field
+            match = None
+            if isinstance(d, dict):
+                match = d.get("match") or d.get("data", {}).get("match")
+            if not isinstance(match, dict):
+                continue
+
+            # Extract home/away teams
+            # Try direct fields first
+            home_name = (
+                match.get("homeTeamName")
+                or match.get("homeName")
+                or match.get("hostTeamName")
+            )
+            away_name = (
+                match.get("awayTeamName")
+                or match.get("awayName")
+                or match.get("guestTeamName")
+            )
+            home_id = match.get("homeTeamId") or match.get("homeId")
+            away_id = match.get("awayTeamId") or match.get("awayId")
+
+            # Fallback: generic extractor with prefixes
+            if not home_name or not home_id:
+                n, i = _extract_team(match, ("home", "host", ""))
+                home_name = home_name or n
+                home_id = home_id or i
+            if not away_name or not away_id:
+                n, i = _extract_team(match, ("away", "guest", ""))
+                away_name = away_name or n
+                away_id = away_id or i
+
+            # Build canonical IDs (fallback to normalized name)
+            if home_name:
+                h_norm = _norm(home_name)
+                h_id = str(home_id) if home_id else h_norm
+                rec = teams.setdefault(
+                    h_id, {"team_id": h_id, "name_canonical": h_norm, "aliases": set()}
+                )
+                rec["name_canonical"] = rec.get("name_canonical", h_norm)
+                aliases_map.setdefault(h_id, set()).add(home_name)
+            if away_name:
+                a_norm = _norm(away_name)
+                a_id = str(away_id) if away_id else a_norm
+                rec = teams.setdefault(
+                    a_id, {"team_id": a_id, "name_canonical": a_norm, "aliases": set()}
+                )
+                rec["name_canonical"] = rec.get("name_canonical", a_norm)
+                aliases_map.setdefault(a_id, set()).add(away_name)
+
+        # If DB provided, merge existing aliases for idempotent union updates
+        existing_aliases: Dict[str, Set[str]] = {}
+        if db and teams:
+            ids = list(teams.keys())
+            db_any: Any = db  # allow dynamic attribute access for find
+            try:
+                # Try fetch existing by _id in a single query
+                existing = db_any.find(
+                    "teams",
+                    {"_id": {"$in": ids}},
+                    projection=None,
+                    limit=None,
+                )  # type: ignore[arg-type]
+            except TypeError:
+                # Fallback signature without projection/limit
+                existing = db_any.find(
+                    "teams",
+                    {"_id": {"$in": ids}},
+                )  # type: ignore[assignment]
+            for doc in existing or []:
+                key = str(doc.get("_id") or doc.get("id") or doc.get("team_id") or "")
+                if not key:
+                    continue
+                aliases_list = doc.get("aliases")
+                if not isinstance(aliases_list, list):
+                    aliases_list = (doc.get("data", {}) or {}).get("aliases")
+                if not isinstance(aliases_list, list):
+                    aliases_list = []
+                existing_aliases[key] = set(map(str, aliases_list))
+
+        # Materialize aliases and prepare docs
+        docs: list[Document] = []
+        for tid, rec in teams.items():
+            merged: Set[str] = set(sorted(aliases_map.get(tid, set())))
+            if tid in existing_aliases:
+                merged |= existing_aliases[tid]
+            aliases = sorted(merged)
+            payload = {
+                "team_id": tid,
+                "name_canonical": rec.get("name_canonical"),
+                "aliases": aliases,
+            }
+            docs.append(Document(id=tid, kind="teams", data=payload))
+
+        if dry_run or not db or not docs:
+            return {"seen": len(items), "prepared": len(docs)}
+
+        inserted = db.insert_many(docs)
+        return {"seen": len(items), "prepared": len(docs), "upserted": inserted}

+ 149 - 0
tests/test_analytics_team_elo.py

@@ -0,0 +1,149 @@
+"""Basic unit tests for TeamExtractor and Elo analyzers.
+
+This module uses a simple in-memory FakeDB to validate:
+- Team aliases union behavior
+- Elo snapshot and history persistence contracts
+"""
+
+import unittest
+from typing import Any, Dict, List
+
+from databank.analytics.teams import TeamExtractorAnalyzer
+from databank.analytics.elo import EloAnalyzer
+from databank.core.models import Document
+
+
+class FakeDB:
+    """A minimal in-memory DB stub implementing insert_many/find for tests."""
+
+    def __init__(self) -> None:
+        self.storage: Dict[str, List[Dict[str, Any]]] = {}
+
+    def connect(self) -> None:  # for parity
+        """No-op connect to match real DB interface."""
+        return None
+
+    def insert_many(self, docs: List[Document]) -> int:
+        """Append documents by kind; return inserted count."""
+        count = 0
+        for d in docs:
+            kind = d.kind
+            rec = {"_id": d.id, "kind": kind, "data": d.data}
+            self.storage.setdefault(kind, []).append(rec)
+            count += 1
+        return count
+
+    def find(
+        self,
+        kind: str,
+        query: Dict[str, Any] | None = None,
+        _projection=None,
+        _limit=None,
+    ):  # noqa: A002 - filter naming for parity
+        """Retrieve documents by kind; supports _id $in filter."""
+        data = self.storage.get(kind, [])
+        if not query:
+            return list(data)
+        # Support _id $in
+        ids = None
+        if "_id" in query and isinstance(query["_id"], dict) and "$in" in query["_id"]:
+            ids = set(map(str, query["_id"]["$in"]))
+        if ids is not None:
+            return [d for d in data if str(d.get("_id")) in ids]
+        return list(data)
+
+
+class TestAnalyticsTeamElo(unittest.TestCase):
+    """Tests for TeamExtractorAnalyzer and EloAnalyzer integrations."""
+
+    def test_team_extractor_alias_union(self) -> None:
+        """Teams analyzer should union new aliases with existing ones."""
+        db = FakeDB()
+        # Existing team with alias
+        db.insert_many(
+            [
+                Document(
+                    id="100",
+                    kind="teams",
+                    data={
+                        "team_id": "100",
+                        "name_canonical": "manchester city",
+                        "aliases": ["曼城"],
+                    },
+                )
+            ]
+        )
+        # Two matches with different alias spellings
+        matches = [
+            {
+                "match": {
+                    "homeTeamId": 100,
+                    "homeTeamName": "Manchester City",
+                    "awayTeamId": 200,
+                    "awayTeamName": "Arsenal",
+                }
+            },
+            {
+                "match": {
+                    "homeTeamId": 300,
+                    "homeTeamName": "Chelsea",
+                    "awayTeamId": 100,
+                    "awayTeamName": "Man City",
+                }
+            },
+        ]
+        res = TeamExtractorAnalyzer().compute(matches, db=db)
+        self.assertIn("upserted", res)
+        # Check merged aliases contain existing + new
+        teams = db.find("teams")
+        # Find all records for _id 100 and pick the latest appended (upsert effect)
+        mlist = [x for x in teams if str(x.get("_id")) == "100"]
+        self.assertGreaterEqual(len(mlist), 1)
+        mrec = mlist[-1]
+        aliases = set(mrec["data"]["aliases"])  # type: ignore[index]
+        self.assertTrue({"曼城", "Manchester City", "Man City"}.issubset(aliases))
+
+    def test_elo_persist_snapshot_and_history(self) -> None:
+        """Elo analyzer should persist snapshot and two history entries per match."""
+        db = FakeDB()
+        # Two matches: A beats B 1-0, then draw 0-0
+        matches = [
+            {
+                "match": {
+                    "matchId": 1,
+                    "homeTeamId": "A",
+                    "awayTeamId": "B",
+                    "homeScore": 1,
+                    "awayScore": 0,
+                    "elapsedTime": "已完场",
+                    "matchTime": 1_700_000_000,
+                }
+            },
+            {
+                "match": {
+                    "matchId": 2,
+                    "homeTeamId": "A",
+                    "awayTeamId": "B",
+                    "homeScore": 0,
+                    "awayScore": 0,
+                    "elapsedTime": "已完场",
+                    "matchTime": 1_700_000_100,
+                }
+            },
+        ]
+        res = EloAnalyzer().compute(matches, db=db, persist=True)
+        self.assertEqual(res["processed"], 2)
+        # Snapshot should exist
+        elo = db.find("elo_ratings")
+        self.assertTrue(len(elo) >= 2)
+        # History should have 4 entries (2 matches x 2 teams)
+        hist = db.find("ratings_history")
+        self.assertEqual(len(hist), 4)
+        # Ensure history contains match_id and ts
+        for h in hist:
+            self.assertIn("match_id", h["data"])  # type: ignore[index]
+            self.assertIn("ts", h["data"])  # type: ignore[index]
+
+
+if __name__ == "__main__":
+    unittest.main()