瀏覽代碼

添加调度器和分析器,重构任务提供者,支持增量和全量模式;实现控制文档处理和进度显示,优化数据解析和任务生成逻辑。

admin 2 月之前
父節點
當前提交
fbf6c21bfa

+ 64 - 0
scripts/run_scheduler.py

@@ -0,0 +1,64 @@
+"""Run the formal Databank scheduler orchestrating spiders, reporters, and analyzers.
+
+Usage (PowerShell):
+  # Ensure deps
+  # python -m pip install requests pymongo
+  # Configure DB if needed
+  # $env:DATABANK_DB_URI = "mongodb://localhost:27017"
+  # $env:DATABANK_DB_NAME = "databank"
+  python scripts/run_scheduler.py
+"""
+
+from __future__ import annotations
+
+import os
+from typing import Dict, List
+
+from databank.db import MongoDB
+from databank.reporter.daily_file import DailyFileReporter
+from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
+from databank.spiders.base import BaseSpider
+from databank.scheduler.orchestrator import DatabankScheduler, TaskProvider
+from databank.tasks.providers import league_matchlist_from_mongo
+from databank.analytics.simple_counts import PerTokenCounter
+
+
+def main() -> None:
+    """Entry point that builds and runs the Databank scheduler once."""
+    uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
+    name = os.getenv("DATABANK_DB_NAME", "databank")
+
+    db = MongoDB(uri=uri, name=name)
+    db.connect()
+
+    # Spiders
+    get_match = GetLeagueMatchListSpider()
+    spiders: List[BaseSpider] = [get_match]
+
+    # Reporters
+    reporters = [DailyFileReporter(timezone="utc+8")]
+
+    # Task providers wiring (no caps in production)
+    tasks_provider: Dict[BaseSpider, TaskProvider] = {
+        get_match: league_matchlist_from_mongo(),
+    }
+
+    # Analyzers
+    analyzers = [PerTokenCounter()]
+
+    # Orchestrator
+    scheduler = DatabankScheduler(
+        db=db,
+        spiders=spiders,
+        reporters=reporters,
+        task_providers=tasks_provider,
+        analyzers=analyzers,
+        interval_s=None,  # set to seconds to loop
+    )
+
+    summary = scheduler.run_once()
+    print("Scheduler finished. Total persisted:", summary.total_docs)
+
+
+if __name__ == "__main__":
+    main()

+ 59 - 0
scripts/run_scheduler_full.py

@@ -0,0 +1,59 @@
+"""Run scheduler in full mode: all seasons and rounds for available leagues."""
+
+from __future__ import annotations
+
+import os
+from typing import Dict, List
+
+from databank.db import MongoDB
+from databank.reporter.daily_file import DailyFileReporter
+from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
+from databank.spiders.base import BaseSpider
+from databank.scheduler.orchestrator import DatabankScheduler, TaskProvider
+from databank.tasks.providers import league_matchlist_from_mongo
+from databank.analytics.simple_counts import PerTokenCounter
+
+
+essential_env = {
+    "DATABANK_DB_URI": "mongodb://localhost:27017",
+    "DATABANK_DB_NAME": "databank",
+}
+
+
+def main() -> None:
+    """Entry point: run the scheduler in full mode (all seasons and rounds)."""
+    uri = os.getenv("DATABANK_DB_URI", essential_env["DATABANK_DB_URI"])
+    name = os.getenv("DATABANK_DB_NAME", essential_env["DATABANK_DB_NAME"])
+
+    db = MongoDB(uri=uri, name=name)
+    db.connect()
+
+    # Spiders
+    get_match = GetLeagueMatchListSpider()
+    spiders: List[BaseSpider] = [get_match]
+
+    # Reporters
+    reporters = [DailyFileReporter(timezone="utc+8")]
+
+    # Task providers wiring (full, no caps)
+    tasks_provider: Dict[BaseSpider, TaskProvider] = {
+        get_match: league_matchlist_from_mongo(mode="full"),
+    }
+
+    # Analyzers
+    analyzers = [PerTokenCounter()]
+
+    scheduler = DatabankScheduler(
+        db=db,
+        spiders=spiders,
+        reporters=reporters,
+        task_providers=tasks_provider,
+        analyzers=analyzers,
+    )
+
+    summary = scheduler.run_once()
+    print("Scheduler(full) finished. Total persisted:", summary.total_docs)
+
+
+if __name__ == "__main__":
+    main()

+ 105 - 0
scripts/run_scheduler_incremental.py

@@ -0,0 +1,105 @@
+"""Run scheduler in incremental mode: only current/latest season per league."""
+
+from __future__ import annotations
+
+import os
+from typing import Dict, List
+
+from databank.db import MongoDB
+from databank.reporter.daily_file import DailyFileReporter
+from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
+from databank.spiders.base import BaseSpider
+from databank.scheduler.orchestrator import DatabankScheduler, TaskProvider
+from databank.tasks.providers import league_matchlist_from_mongo
+from databank.analytics.simple_counts import PerTokenCounter
+
+
+essential_env = {
+    "DATABANK_DB_URI": "mongodb://localhost:27017",
+    "DATABANK_DB_NAME": "databank",
+}
+
+
+def main() -> None:
+    """Entry point: run the scheduler in incremental mode (latest season only)."""
+    uri = os.getenv("DATABANK_DB_URI", essential_env["DATABANK_DB_URI"])
+    name = os.getenv("DATABANK_DB_NAME", essential_env["DATABANK_DB_NAME"])
+
+    db = MongoDB(uri=uri, name=name)
+    db.connect()
+
+    # Spiders
+    get_match = GetLeagueMatchListSpider()
+    spiders: List[BaseSpider] = [get_match]
+
+    # Reporters
+    reporters = [DailyFileReporter(timezone="utc+8")]
+
+    # Task providers wiring (incremental)
+    provider = league_matchlist_from_mongo(mode="incremental")
+    tasks_provider: Dict[BaseSpider, TaskProvider] = {get_match: provider}
+
+    # Preflight: generate tasks once to validate seeds/config
+    preview_tasks = provider(get_match, db)
+    if not preview_tasks:
+        print(
+            "No tasks generated (incremental mode). "
+            "Please ensure MongoDB has seeds in 'leagues' and 'seasons'."
+        )
+        print(
+            "Try seeding first: python scripts/seed_leagues_mongo.py "
+            "and python scripts/seed_seasons_mongo.py"
+        )
+        return
+    else:
+        print(
+            f"Prepared {len(preview_tasks)} task(s) for incremental run "
+            f"(showing up to 3):"
+        )
+        for t in preview_tasks[:3]:
+            token = t.token() if hasattr(t, "token") else str(t)
+            print(" -", token)
+
+    # Analyzers
+    analyzers = [PerTokenCounter()]
+
+    scheduler = DatabankScheduler(
+        db=db,
+        spiders=spiders,
+        reporters=reporters,
+        task_providers=tasks_provider,
+        analyzers=analyzers,
+    )
+
+    summary = scheduler.run_once()
+    print("Scheduler(incremental) finished. Total persisted:", summary.total_docs)
+    if summary.total_docs == 0:
+        # Diagnostics: inspect returned docs to explain why nothing persisted
+        docs = scheduler.get_last_docs() if hasattr(scheduler, "get_last_docs") else []
+        kinds = {}
+        for d in docs:
+            kinds[d.kind] = kinds.get(d.kind, 0) + 1
+        if kinds:
+            print("Returned document kinds:", kinds)
+            # Show first few error reasons if present
+            errs = [d for d in docs if d.kind == "error"]
+            if errs:
+                preview = errs[:3]
+                print("Sample errors (up to 3):")
+                for e in preview:
+                    reason = e.data.get("reason") if isinstance(e.data, dict) else None
+                    detail = e.data.get("detail") if isinstance(e.data, dict) else None
+                    print(" -", reason, ":", detail)
+        else:
+            print(
+                "Runner produced no documents. "
+                "Check network/API accessibility and spider filters."
+            )
+        print(
+            "Note: Spider filters keep only groupName='联赛' and "
+            "elapsedTime='已完场', and skip future-dated matches."
+        )
+
+
+if __name__ == "__main__":
+    main()

+ 23 - 0
src/databank/analytics/simple_counts.py

@@ -0,0 +1,23 @@
+"""A tiny analyzer that counts documents per spider token.
+
+This is an example AnalyticsBase implementation to demonstrate analyzer wiring.
+"""
+
+from __future__ import annotations
+
+from collections import Counter
+from typing import Any
+
+from .base import AnalyticsBase
+
+
+class PerTokenCounter(AnalyticsBase):
+    """Count documents per token field."""
+
+    def compute(self, data: Any, **kwargs: Any) -> Any:  # noqa: D401 - short doc ok
+        docs = list(data or [])
+        counts = Counter(
+            d.data.get("token", "unknown") for d in docs if getattr(d, "data", None)
+        )
+        # You could emit logs or write to another sink here.
+        return counts

+ 121 - 0
src/databank/scheduler/orchestrator.py

@@ -0,0 +1,121 @@
+"""Orchestrator Scheduler that wires DB, Spiders, Reporters and Analyzers.
+
+This scheduler is designed to be future-proof for multiple spiders/reporters/analyzers.
+It uses the existing SimpleRunner to execute one run over tasks provided per spider,
+then invokes analyzers on the produced documents and summary.
+"""
+
+from __future__ import annotations
+
+import time
+from typing import Callable, Dict, Iterable, List, Mapping, Optional
+import warnings
+
+from databank.analytics.base import AnalyticsBase
+from databank.core.models import RunSummary
+from databank.db.base import BaseDB
+from databank.reporter.base import BaseReporter
+from databank.spiders.base import BaseSpider, Task
+from .simple_runner import SimpleRunner
+from .base import SchedulerBase
+
+
+TaskProvider = Callable[[BaseSpider, BaseDB], List[Task]]
+
+
+class DatabankScheduler(SchedulerBase):
+    """A formal scheduler/orchestrator for the Databank pipeline.
+
+    Responsibilities:
+    - Build tasks per spider using injected task providers.
+    - Coordinate a run via SimpleRunner and reporters.
+    - Invoke analyzers over run results.
+    - Optionally loop with a fixed interval to create a simple schedule.
+    """
+
+    def __init__(
+        self,
+        *,
+        db: BaseDB,
+        spiders: Iterable[BaseSpider],
+        reporters: Iterable[BaseReporter],
+        task_providers: Mapping[BaseSpider, TaskProvider],
+        analyzers: Optional[Iterable[AnalyticsBase]] = None,
+        interval_s: Optional[float] = None,
+    ) -> None:
+        super().__init__()
+        self._db = db
+        self._spiders = list(spiders)
+        self._reporters = list(reporters)
+        self._task_providers = dict(task_providers)
+        self._analyzers = list(analyzers or [])
+        self._interval_s = float(interval_s) if interval_s else None
+        self._runner = SimpleRunner(db=db, reporters=reporters)
+
+    def run_once(self) -> RunSummary:
+        """Execute a single end-to-end run across all spiders."""
+        # Build tasks per spider
+        spiders_to_tasks: Dict[BaseSpider, List[Task]] = {}
+        for sp in self._spiders:
+            provider = self._task_providers.get(sp)
+            if provider is None:
+                # Skip spiders without task providers
+                continue
+            tasks = provider(sp, self._db)
+            spiders_to_tasks[sp] = tasks
+
+        summary = self._runner.run(spiders_to_tasks)
+
+        # If no tasks were generated at all, surface a warning for easier troubleshooting
+        total_tasks = sum(len(v) for v in spiders_to_tasks.values())
+        if total_tasks == 0:
+            warnings.warn(
+                "DatabankScheduler.run_once: no tasks generated for any spider."
+            )
+            for r in self._reporters:
+                try:
+                    r.notify_error("scheduler", "no_tasks_generated")
+                except (RuntimeError, ValueError):  # pragma: no cover - best effort
+                    pass
+
+        # Invoke analyzers with runner output
+        docs = list(self._runner.last_docs)
+        for az in self._analyzers:
+            try:
+                az.prepare(docs)
+                az.validate(docs)
+                processed = az.transform(docs)
+                result = az.compute(processed, summary=summary)
+                az.finalize(result)
+            except (RuntimeError, ValueError) as exc:
+                # Do not fail the whole run due to analytics issues
+                msg = f"analyzer_error: {az.__class__.__name__}: {exc}"
+                warnings.warn(msg)
+                for r in self._reporters:
+                    try:
+                        r.notify_error("analyzer", msg)
+                    except (RuntimeError, ValueError):  # pragma: no cover - best effort
+                        warnings.warn(
+                            f"reporter_notify_error_failed: {type(r).__name__}"
+                        )
+
+        return summary
+
+    # Diagnostics helper
+    def get_last_docs(self):  # pragma: no cover - simple passthrough
+        """Return documents produced by the last runner execution."""
+        return list(self._runner.last_docs)
+
+    def schedule(self) -> None:  # pragma: no cover - may run long
+        """Run once or loop with a fixed interval until interrupted."""
+        if not self._interval_s or self._interval_s <= 0:
+            self.run_once()
+            return
+
+        try:
+            while True:
+                self.run_once()
+                time.sleep(self._interval_s)
+        except KeyboardInterrupt:
+            # Graceful shutdown
+            return

+ 54 - 1
src/databank/scheduler/simple_runner.py

@@ -60,14 +60,42 @@ class SimpleRunner:
             per_spider_persisted = 0
             total_urls += len(tasks)
 
+            # Progress metrics for terminal display
+            total_tasks = len(tasks)
+            processed_tasks = 0
+            t0_spider = perf_counter()
+
+            # Track seasons that requested early stop (keyed by "league_id|season")
+            stop_seasons: set[str] = set()
+
             for task in tasks:
                 t0 = perf_counter()
+                # Skip tasks if their season has been signaled to stop
+                league_id = getattr(task, "league_id", None)
+                season = getattr(task, "season", None)
+                task_stop_key = f"{league_id}|{season}"
+                if task_stop_key in stop_seasons:
+                    processed_tasks += 1
+                    # update progress display even when skipping
+                    elapsed = perf_counter() - t0_spider
+                    avg = (elapsed / processed_tasks) if processed_tasks else 0.0
+                    remain = max(total_tasks - processed_tasks, 0)
+                    eta = remain * avg
+                    print(
+                        f"\r[{spider.name}] {processed_tasks}/{total_tasks} "
+                        f"elapsed={elapsed:.1f}s eta={eta:.1f}s",
+                        end="",
+                        flush=True,
+                    )
+                    continue
+
                 docs = spider.run([task])
                 dt = perf_counter() - t0
                 total_time_s += dt
 
                 self.last_docs.extend(docs)
-                ok_docs = [d for d in docs if d.kind != "error"]
+                control_docs = [d for d in docs if d.kind == "control"]
+                ok_docs = [d for d in docs if d.kind not in ("error", "control")]
                 err_docs = [d for d in docs if d.kind == "error"]
 
                 if ok_docs:
@@ -92,7 +120,32 @@ class SimpleRunner:
                         r.notify_error(spider.name, msg)
                     total_errors.append(msg)
 
+                # Handle control documents (do not persist). If action == stop_season,
+                # mark this league+season to skip remaining tasks in this batch.
+                if control_docs:
+                    for cd in control_docs:
+                        data = cd.data or {}
+                        action = data.get("action")
+                        if action == "stop_season":
+                            key = f"{data.get('league_id', league_id)}|{data.get('season', season)}"
+                            stop_seasons.add(key)
+
+                # update progress metrics after handling the task
+                processed_tasks += 1
+                elapsed = perf_counter() - t0_spider
+                avg = (elapsed / processed_tasks) if processed_tasks else 0.0
+                remain = max(total_tasks - processed_tasks, 0)
+                eta = remain * avg
+                print(
+                    f"\r[{spider.name}] {processed_tasks}/{total_tasks} "
+                    f"elapsed={elapsed:.1f}s eta={eta:.1f}s",
+                    end="",
+                    flush=True,
+                )
+
             summary.per_spider[spider.name] = per_spider_persisted
+            # finalize line for this spider's progress
+            print()
 
         # finalize summary
         summary.total_docs = total_persisted

+ 115 - 8
src/databank/spiders/get_league_match_list.py

@@ -30,6 +30,7 @@ import json
 import random
 import time
 from typing import Any, Mapping
+from datetime import datetime, timedelta, timezone
 
 try:  # Optional dependency; guide user to install if missing
     import requests
@@ -54,6 +55,11 @@ class GetLeagueMatchListSpider(BaseSpider):
 
     endpoint: str = "https://sport.ttyingqiu.com/sportdata/f?platform=web"
 
+    # UTC+8 timezone for date comparisons
+    tz = timezone(timedelta(hours=8))
+    # When future-dated matches exceed this number in a round, suggest stopping the season
+    future_exceed_limit: int = 5
+
     def build_payload(self, task: Task) -> Payload:
         """Build JSON payload from a MatchListTask (structured input)."""
         if not isinstance(task, MatchListTask):
@@ -92,8 +98,12 @@ class GetLeagueMatchListSpider(BaseSpider):
         try:
             timeout = float(self.request_timeout_s or 15.0)
             headers: Mapping[str, str] = dict(self.default_headers or {})
+            # Remove internal fields (e.g., `_task`) to avoid JSON serialization issues
+            safe_payload = {
+                k: v for k, v in dict(payload).items() if not str(k).startswith("_")
+            }
             resp = requests.post(
-                self.endpoint, headers=headers, json=dict(payload), timeout=timeout
+                self.endpoint, headers=headers, json=safe_payload, timeout=timeout
             )
             resp.raise_for_status()
             return resp.text
@@ -106,7 +116,14 @@ class GetLeagueMatchListSpider(BaseSpider):
             )
 
     def parse(self, task: Task, content: str, payload: Payload) -> Documents:
-        """Parse JSON, filter matchList by groupName == '联赛', return Documents."""
+        """Parse JSON, keep only finished league matches and handle future dates.
+
+        Rules:
+        - Keep items where groupName == '联赛' AND elapsedTime == '已完场'.
+        - Skip matches whose date is after today (UTC+8); count them.
+                - If future_count >= future_exceed_limit, emit a control Document
+                    advising to stop the season.
+        """
         try:
             data = json.loads(content)
         except json.JSONDecodeError as exc:
@@ -125,6 +142,9 @@ class GetLeagueMatchListSpider(BaseSpider):
 
         # If fetch reported an error, convert to error document directly
         if isinstance(data, Mapping) and "error" in data:
+            safe_payload = {
+                k: v for k, v in dict(payload).items() if not str(k).startswith("_")
+            }
             return [
                 Document(
                     id=None,
@@ -133,7 +153,7 @@ class GetLeagueMatchListSpider(BaseSpider):
                         "token": task.token() if hasattr(task, "token") else str(task),
                         "reason": str(data.get("error")),
                         "detail": str(data.get("detail")),
-                        "payload": dict(payload),
+                        "payload": safe_payload,
                     },
                 )
             ]
@@ -158,6 +178,9 @@ class GetLeagueMatchListSpider(BaseSpider):
 
         if not match_list:
             # Return error document if API failed or schema unexpected
+            safe_payload = {
+                k: v for k, v in dict(payload).items() if not str(k).startswith("_")
+            }
             return [
                 Document(
                     id=None,
@@ -165,7 +188,7 @@ class GetLeagueMatchListSpider(BaseSpider):
                     data={
                         "token": task.token() if hasattr(task, "token") else str(task),
                         "reason": "no_match_list",
-                        "payload": dict(payload),
+                        "payload": safe_payload,
                         "raw_keys": (
                             list(data.keys())
                             if isinstance(data, dict)
@@ -175,29 +198,113 @@ class GetLeagueMatchListSpider(BaseSpider):
                 )
             ]
 
-        # Filter by groupName == "联赛"
-        filtered = [
+        # First stage: league-only and finished-only (elapsedTime == '已完场')
+        stage1 = [
             item
             for item in match_list
-            if isinstance(item, Mapping) and item.get("groupName") == "联赛"
+            if isinstance(item, Mapping)
+            and item.get("groupName") == "联赛"
+            and item.get("elapsedTime") == "已完场"
         ]
 
+        # Second stage: drop future-dated matches and count them
+        today = datetime.now(self.tz).date()
+
+        def _extract_dt(it: Mapping[str, Any]) -> datetime | None:
+            for key in (
+                "matchTime",
+                "matchDate",
+                "startTime",
+                "gameTime",
+                "beginTime",
+                "match_time",
+                "start_time",
+            ):
+                if key not in it:
+                    continue
+                val = it.get(key)
+                # Timestamp numbers (sec or ms)
+                if isinstance(val, (int, float)):
+                    ts = float(val)
+                    if ts > 1e12:  # likely ms
+                        ts /= 1000.0
+                    try:
+                        return datetime.fromtimestamp(ts, tz=self.tz)
+                    except (ValueError, OSError):
+                        continue
+                # String formats
+                if isinstance(val, str):
+                    s = val.strip()
+                    for fmt in (
+                        "%Y-%m-%d %H:%M:%S",
+                        "%Y-%m-%d %H:%M",
+                        "%Y/%m/%d %H:%M:%S",
+                        "%Y/%m/%d %H:%M",
+                        "%Y-%m-%d",
+                        "%Y/%m/%d",
+                    ):
+                        try:
+                            return datetime.strptime(s, fmt).replace(tzinfo=self.tz)
+                        except ValueError:
+                            pass
+                    try:
+                        dt_iso = datetime.fromisoformat(s)
+                        if dt_iso.tzinfo is None:
+                            dt_iso = dt_iso.replace(tzinfo=self.tz)
+                        else:
+                            dt_iso = dt_iso.astimezone(self.tz)
+                        return dt_iso
+                    except ValueError:
+                        continue
+            return None
+
+        future_count = 0
+        filtered: list[Mapping[str, Any]] = []
+        for it in stage1:
+            dt = _extract_dt(it)
+            if dt is not None and dt.date() > today:
+                future_count += 1
+                continue
+            filtered.append(it)
+
         docs: list[Document] = []
         for item in filtered:
             doc_id = (
                 str(item.get("matchId")) if item.get("matchId") is not None else None
             )
+            # Ensure to store payload without internal fields
+            safe_payload = {
+                k: v for k, v in dict(payload).items() if not str(k).startswith("_")
+            }
             docs.append(
                 Document(
                     id=doc_id,
                     kind="match",
                     data={
                         "token": task.token() if hasattr(task, "token") else str(task),
-                        "payload": dict(payload),
+                        "payload": safe_payload,
                         "match": dict(item),
                     },
                 )
             )
+        # Append a control document if threshold reached
+        if future_count >= self.future_exceed_limit:
+            docs.append(
+                Document(
+                    id=None,
+                    kind="control",
+                    data={
+                        "action": "stop_season",
+                        "reason": "too_many_future_matches",
+                        "future_count": future_count,
+                        "future_exceed_limit": self.future_exceed_limit,
+                        "season": getattr(task, "season", None),
+                        "league_id": getattr(task, "league_id", None),
+                        "round_no": getattr(task, "round_no", None),
+                    },
+                )
+            )
+
         return docs
 
 

+ 106 - 0
src/databank/tasks/providers.py

@@ -0,0 +1,106 @@
+"""Task providers: functions that derive Task lists for spiders.
+
+These providers encapsulate how tasks are sourced (from DB/config/etc.).
+They can be injected into the scheduler to generate per-spider tasks.
+"""
+
+from __future__ import annotations
+
+from typing import List
+from collections.abc import Callable
+
+from databank.db.base import BaseDB
+from databank.db.mongo import MongoDB
+from databank.spiders.base import BaseSpider, Task
+from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
+from databank.core.tasks import MatchListTask
+
+
+def league_matchlist_from_mongo(
+    *,
+    mode: str = "incremental",
+    max_leagues: int | None = None,
+    max_rounds: int | None = None,
+) -> Callable[[BaseSpider, BaseDB], List[Task]]:
+    """Factory returning a provider that builds MatchListTasks from MongoDB.
+
+    Modes:
+    - incremental (default): only the latest/current season per league.
+    - full: all seasons and all rounds for available leagues.
+
+    Args:
+        mode: "incremental" or "full".
+        max_leagues: Optional cap of leagues to include (safety for large datasets).
+        max_rounds: Optional cap of rounds per season (safety for large datasets).
+    """
+
+    def _provider(spider: BaseSpider, db: BaseDB) -> List[Task]:
+        """Produce MatchListTask items for GetLeagueMatchListSpider from MongoDB.
+
+        Behavior depends on factory "mode":
+        - incremental: for each league, only generate tasks for the latest season.
+        - full: for each league, for every season, generate all (or capped) rounds.
+        """
+        # Only supports MongoDB currently
+        if not isinstance(db, MongoDB):
+            return []
+        if not isinstance(spider, GetLeagueMatchListSpider):
+            return []
+
+        # Fetch metadata
+        leagues = db.find("leagues", projection={"_id": 0})
+        seasons = db.find("seasons", projection={"_id": 0})
+        if not leagues:
+            return []
+        if not seasons:
+            return []
+
+        # Sort leagues and seasons deterministically
+        leagues_sorted = sorted(leagues, key=lambda x: x.get("league_id", 0))
+        if max_leagues:
+            leagues_sorted = leagues_sorted[: int(max_leagues)]
+        # season might be a string like "2024-2025"; sort desc to get latest
+        seasons_sorted = sorted(
+            seasons, key=lambda x: str(x.get("season", "")), reverse=True
+        )
+
+        tasks: List[Task] = []
+
+        if mode == "incremental":
+            # For each league, only latest season
+            latest_season = seasons_sorted[0]["season"]
+            for lg in leagues_sorted:
+                league_id = int(lg.get("league_id", 0))
+                max_round = int(lg.get("max_round", 1))
+                rounds = list(range(1, max_round + 1))
+                if max_rounds:
+                    rounds = rounds[: int(max_rounds)]
+                for r in rounds:
+                    tasks.append(
+                        MatchListTask(
+                            league_id=league_id, season=latest_season, round_no=int(r)
+                        )
+                    )
+            return tasks
+
+        # full mode: all seasons for each league
+        for lg in leagues_sorted:
+            league_id = int(lg.get("league_id", 0))
+            max_round = int(lg.get("max_round", 1))
+            rounds = list(range(1, max_round + 1))
+            if max_rounds:
+                rounds = rounds[: int(max_rounds)]
+            for s in seasons_sorted:
+                season_name = s.get("season")
+                if not season_name:
+                    continue
+                for r in rounds:
+                    tasks.append(
+                        MatchListTask(
+                            league_id=league_id, season=season_name, round_no=int(r)
+                        )
+                    )
+
+        return tasks
+
+    return _provider