Просмотр исходного кода

重构爬虫和任务结构,添加 MatchListTask 模型,更新相关爬虫以支持结构化任务,简化数据处理流程;实现完整的管道执行脚本以整合爬虫、数据库和报告功能。

admin 2 месяцев назад
Родитель
Сommit
2694101a0e

+ 73 - 0
scripts/run_pipeline_once.py

@@ -0,0 +1,73 @@
+"""Run the full pipeline once: seed tokens -> run spider(s) -> persist -> report -> basic analytics.
+
+Usage (PowerShell):
+  # Ensure deps
+  # python -m pip install requests pymongo
+  # Configure DB if needed
+  # $env:DATABANK_DB_URI = "mongodb://localhost:27017"
+  # $env:DATABANK_DB_NAME = "databank"
+  python scripts/run_pipeline_once.py
+"""
+
+from __future__ import annotations
+
+import os
+from collections import Counter
+from datetime import UTC, datetime
+from typing import Dict, List
+
+from databank.db import MongoDB
+from databank.reporter.daily_file import DailyFileReporter
+from databank.scheduler.simple_runner import SimpleRunner
+from databank.spiders.base import BaseSpider, Task
+from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
+from databank.core.tasks import MatchListTask
+
+
+def pick_tokens(db: MongoDB, max_tokens: int = 3) -> list[MatchListTask]:
+    """Build MatchListTask list from DB seed data (league/season)."""
+    leagues = db.find("leagues", projection={"_id": 0}, limit=10)
+    seasons = db.find("seasons", projection={"_id": 0}, limit=10)
+    if not leagues:
+        raise RuntimeError("No leagues found. Seed leagues first.")
+    if not seasons:
+        raise RuntimeError("No seasons found. Seed seasons first.")
+    league = sorted(leagues, key=lambda x: x.get("league_id", 0))[0]
+    max_round = int(league.get("max_round", 1))
+    season_name = seasons[0]["season"]
+    rounds = list(range(1, max_round + 1))[:max_tokens]
+    return [
+        MatchListTask(
+            league_id=int(league["league_id"]), season=season_name, round_no=int(r)
+        )
+        for r in rounds
+    ]
+
+
+def main() -> None:
+    """Run one full pipeline pass using structured tasks and SimpleRunner."""
+    uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
+    name = os.getenv("DATABANK_DB_NAME", "databank")
+
+    db = MongoDB(uri=uri, name=name)
+    db.connect()
+
+    reporter = DailyFileReporter(timezone="utc+8")
+    runner = SimpleRunner(db=db, reporters=[reporter])
+
+    spider = GetLeagueMatchListSpider()
+    tasks = pick_tokens(db)
+    spiders: Dict[BaseSpider, List[Task]] = {spider: tasks}
+    summary = runner.run(spiders)
+
+    # Basic analytics example: count docs per token for this run (from runner.last_docs)
+    per_token = Counter(d.data.get("token", "unknown") for d in runner.last_docs)
+    top = per_token.most_common(3)
+
+    print(f"Run finished at {datetime.now(UTC).isoformat()}Z")
+    print(f"Total persisted: {summary.total_docs}")
+    print("Top tokens (by docs):", top)
+
+
+if __name__ == "__main__":
+    main()

+ 18 - 11
scripts/test_get_league_match_list.py

@@ -19,10 +19,11 @@ from databank.db.base import InsertError
 from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
 from databank.reporter.daily_file import DailyFileReporter
 from databank.core.models import RunSummary
+from databank.core.tasks import MatchListTask
 
 
-def pick_tokens(max_tokens: int = 3) -> list[str]:
-    """Build up to ``max_tokens`` URL tokens from MongoDB collections."""
+def pick_tokens(max_tokens: int = 3) -> list[MatchListTask]:
+    """Build up to ``max_tokens`` structured tasks from MongoDB collections."""
     uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
     name = os.getenv("DATABANK_DB_NAME", "databank")
     db = MongoDB(uri=uri, name=name)
@@ -39,11 +40,17 @@ def pick_tokens(max_tokens: int = 3) -> list[str]:
         max_round = int(league.get("max_round", 1))
         season_name = seasons[0]["season"]
 
-        tokens: list[str] = []
+        tasks: list[MatchListTask] = []
         rounds = list(range(1, max_round + 1))[:max_tokens]
         for r in rounds:
-            tokens.append(f"{league['league_id']}|{season_name}|{r}")
-        return tokens[:max_tokens]
+            tasks.append(
+                MatchListTask(
+                    league_id=int(league["league_id"]),
+                    season=season_name,
+                    round_no=int(r),
+                )
+            )
+        return tasks[:max_tokens]
     finally:
         db.close()
 
@@ -56,7 +63,7 @@ def main() -> None:
     # Prepare summary for duration/error tracking
     summary = RunSummary()
     try:
-        urls = pick_tokens()
+        tasks = pick_tokens()
     except Exception as exc:  # pylint: disable=broad-except
         # Record error and finalize summary
         reporter.notify_error(spider.name, f"pick_tokens failed: {exc}")
@@ -66,7 +73,7 @@ def main() -> None:
         print("pick_tokens failed:", exc)
         return
 
-    reporter.notify_start(spider.name, urls)
+    reporter.notify_start(spider.name, [t.token() for t in tasks])
 
     # DB connection for persistence; success count is based on DB insert result
     uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
@@ -82,9 +89,9 @@ def main() -> None:
     total_time_s_accum = 0.0
 
     try:
-        for url in urls:
+        for task in tasks:
             t0 = perf_counter()
-            docs = spider.run([url])
+            docs = spider.run([task])
             dt = perf_counter() - t0
 
             total_time_s_accum += dt
@@ -119,14 +126,14 @@ def main() -> None:
         db.close()
 
     total_time_s = float(total_time_s_accum)
-    avg_time_s = (total_time_s / len(urls)) if urls else 0.0
+    avg_time_s = (total_time_s / len(tasks)) if tasks else 0.0
 
     # Final summary
     summary.total_docs = persisted_total
     summary.per_spider[spider.name] = persisted_total
     # Attach concise metrics to summary.errors for visibility in the log
     summary.errors.append(
-        f"metrics: attempted_urls={len(urls)} parsed_success={parsed_success_total} "
+        f"metrics: attempted_urls={len(tasks)} parsed_success={parsed_success_total} "
         f"persisted={persisted_total} error_docs={error_docs_total} "
         f"url_time_total_s={total_time_s:.3f} url_time_avg_s={avg_time_s:.3f}"
     )

+ 27 - 0
src/databank/core/tasks.py

@@ -0,0 +1,27 @@
+"""Structured task models used by runners to assemble spider inputs.
+
+This keeps spiders focused on fetching/parsing while runners build payloads.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+
+@dataclass(frozen=True)
+class MatchListTask:
+    """Task for getLeagueMatchList spider.
+
+    Attributes:
+        league_id: League identifier.
+        season: Season string (e.g., "2016-2017").
+        round_no: Round number (1-based).
+    """
+
+    league_id: int
+    season: str
+    round_no: int
+
+    def token(self) -> str:
+        """Render legacy token string used for identity/logging."""
+        return f"{self.league_id}|{self.season}|{self.round_no}"

+ 1 - 1
src/databank/reporter/daily_file.py

@@ -46,7 +46,7 @@ except ImportError as _exc:  # direct-run friendliness
             "Run the demo script instead:\n"
             "  python c:\\Python\\databank\\scripts\\reporter_demo.py\n"
         )
-        raise SystemExit(0)
+        raise SystemExit(0) from _exc
     raise
 
 

+ 113 - 0
src/databank/scheduler/simple_runner.py

@@ -0,0 +1,113 @@
+"""A minimal concrete Runner that coordinates spiders, DB and reporters.
+
+This runner processes each spider's URL list sequentially, persists non-error
+documents to the DB, and emits notifications to reporters. It aggregates a
+RunSummary and exposes `last_docs` for downstream analytics.
+"""
+
+from __future__ import annotations
+
+from datetime import UTC, datetime
+from time import perf_counter
+from typing import Dict, Iterable, List
+
+from databank.core.models import Document, RunSummary
+from databank.db.base import BaseDB, InsertError
+from databank.reporter.base import BaseReporter
+from databank.spiders.base import BaseSpider, Task
+
+
+class SimpleRunner:
+    """A straightforward, synchronous runner.
+
+    Note: This class does not inherit RunnerBase to keep dependencies light
+    and allow extra conveniences (like `last_docs`). It follows the same
+    external contract conceptually: `run(spiders)->RunSummary`.
+    """
+
+    def __init__(self, db: BaseDB, reporters: Iterable[BaseReporter]) -> None:
+        self._db = db
+        self._reporters = list(reporters)
+        self.last_docs: list[Document] = []
+
+    def run(self, spiders: Dict[BaseSpider, List[Task]]) -> RunSummary:
+        """Run spiders over structured tasks and persist results.
+
+        Args:
+            spiders: Mapping from spider to its structured tasks.
+
+        Returns:
+            Aggregated run summary across spiders.
+        """
+        summary = RunSummary()
+        total_persisted = 0
+        total_errors: list[str] = []
+        total_urls = 0
+        total_time_s = 0.0
+
+        for spider, tasks in spiders.items():
+            # Notify start
+            # Render human-readable identifiers for tasks
+            rendered: list[str] = []
+            for tk in tasks:
+                if hasattr(tk, "token") and callable(getattr(tk, "token")):
+                    rendered.append(tk.token())  # type: ignore[union-attr]
+                else:
+                    rendered.append(str(tk))
+            for r in self._reporters:
+                r.notify_start(spider.name, rendered)
+
+            per_spider_persisted = 0
+            total_urls += len(tasks)
+
+            for task in tasks:
+                t0 = perf_counter()
+                docs = spider.run([task])
+                dt = perf_counter() - t0
+                total_time_s += dt
+
+                self.last_docs.extend(docs)
+                ok_docs = [d for d in docs if d.kind != "error"]
+                err_docs = [d for d in docs if d.kind == "error"]
+
+                if ok_docs:
+                    try:
+                        inserted = self._db.insert_many(ok_docs)
+                    except InsertError as exc:
+                        msg = f"insert_many failed: {exc}"
+                        for r in self._reporters:
+                            r.notify_error(spider.name, msg)
+                        total_errors.append(msg)
+                        inserted = 0
+                    for r in self._reporters:
+                        r.notify_success(spider.name, inserted)
+                    per_spider_persisted += inserted
+                    total_persisted += inserted
+
+                for ed in err_docs:
+                    reason = ed.data.get("reason")
+                    detail = ed.data.get("detail")
+                    msg = f"{reason}: {detail}" if detail else str(reason)
+                    for r in self._reporters:
+                        r.notify_error(spider.name, msg)
+                    total_errors.append(msg)
+
+            summary.per_spider[spider.name] = per_spider_persisted
+
+        # finalize summary
+        summary.total_docs = total_persisted
+        # include metrics as one line for the file reporter
+        avg_time_s = (total_time_s / total_urls) if total_urls else 0.0
+        total_errors.append(
+            (
+                f"metrics: attempted_urls={total_urls} "
+                f"persisted={total_persisted} "
+                f"url_time_total_s={total_time_s:.3f} url_time_avg_s={avg_time_s:.3f}"
+            )
+        )
+        summary.errors.extend(total_errors)
+        summary.finished_at = datetime.now(UTC)
+        for r in self._reporters:
+            r.notify_summary(summary)
+
+        return summary

+ 42 - 42
src/databank/spiders/base.py

@@ -29,7 +29,7 @@ from typing import Any, Iterable, Mapping, Sequence, Optional
 from databank.core.models import Document
 
 # Type aliases for clarity
-URL = str
+Task = Any  # A structured task object (e.g., a dataclass) provided by the runner
 Payload = Mapping[str, Any]
 Documents = Sequence[Document]
 
@@ -40,11 +40,11 @@ class SpiderError(Exception):
 
 
 class BuildPayloadError(SpiderError):
-    """Raised when building the request payload fails for a URL."""
+    """Raised when building the request payload fails for a task."""
 
 
 class FetchError(SpiderError):
-    """Raised when fetching raw content for a URL fails."""
+    """Raised when fetching raw content for a task fails."""
 
 
 class ParseError(SpiderError):
@@ -85,40 +85,40 @@ class BaseSpider(ABC):
         self._metadata: dict[str, Any] = {}
 
     @abstractmethod
-    def build_payload(self, url: URL) -> Payload:
-        """Build request params/body or headers from a URL.
+    def build_payload(self, task: Task) -> Payload:
+        """Build request params/body or headers from a structured task.
 
         Args:
-            url: Target URL to be fetched.
+            task: A structured input (e.g., a dataclass) describing the job.
 
         Returns:
             A mapping of request parameters/headers/body to be used by ``fetch``.
 
         Raises:
-            BuildPayloadError: If payload construction fails for ``url``.
+            BuildPayloadError: If payload construction fails for ``task``.
         """
 
     @abstractmethod
-    def fetch(self, url: URL, payload: Payload) -> str:
-        """Fetch raw textual content for a URL using the given payload.
+    def fetch(self, task: Task, payload: Payload) -> str:
+        """Fetch raw textual content for a task using the given payload.
 
         Args:
-            url: Target URL to fetch.
+            task: Structured task to fetch.
             payload: Request parameters prepared by ``build_payload``.
 
         Returns:
             Raw textual content.
 
         Raises:
-            FetchError: If retrieval fails for ``url``.
+            FetchError: If retrieval fails for ``task``.
         """
 
     @abstractmethod
-    def parse(self, url: URL, content: str, payload: Payload) -> Documents:
+    def parse(self, task: Task, content: str, payload: Payload) -> Documents:
         """Parse raw content into a sequence of Documents.
 
         Args:
-            url: URL associated with the content.
+            task: Task associated with the content.
             content: Raw textual content fetched by ``fetch``.
             payload: The payload used to fetch, for context if needed.
 
@@ -126,29 +126,29 @@ class BaseSpider(ABC):
             A sequence of :class:`~databank.core.models.Document` instances.
 
         Raises:
-            ParseError: If parsing fails for ``url``.
+            ParseError: If parsing fails for ``task``.
         """
 
     # ---- Optional lifecycle hooks (no-op by default) ----
-    def on_run_start(self, urls: Iterable[URL]) -> None:  # pragma: no cover
+    def on_run_start(self, tasks: Iterable[Task]) -> None:  # pragma: no cover
         """Hook invoked once before processing a batch of URLs.
 
         Args:
-            urls: Collection of URLs to be processed in this run.
+            tasks: Collection of structured tasks to be processed in this run.
         """
 
     def on_run_end(
-        self, urls: Iterable[URL], results: Sequence[Document], error_count: int
+        self, tasks: Iterable[Task], results: Sequence[Document], error_count: int
     ) -> None:  # pragma: no cover
         """Hook invoked once after processing a batch of URLs.
 
         Args:
-            urls: The same collection passed to ``on_run_start``.
+            tasks: The same collection passed to ``on_run_start``.
             results: All successfully parsed documents.
             error_count: Number of URLs that raised errors.
         """
 
-    def should_fetch(self, _url: URL, _payload: Payload) -> bool:  # pragma: no cover
+    def should_fetch(self, _task: Task, _payload: Payload) -> bool:  # pragma: no cover
         """Return False to skip fetching this URL (e.g., dedup, robots, filters).
 
         Returns:
@@ -156,44 +156,44 @@ class BaseSpider(ABC):
         """
         return True
 
-    def before_fetch(self, url: URL, payload: Payload) -> None:  # pragma: no cover
+    def before_fetch(self, task: Task, payload: Payload) -> None:  # pragma: no cover
         """Hook invoked before fetch; override for logging/metrics/rate-limit.
 
         Args:
-            url: URL to fetch.
+            task: Task to fetch.
             payload: Request parameters prepared by ``build_payload``.
         """
 
     def after_fetch(
-        self, url: URL, payload: Payload, content: str
+        self, task: Task, payload: Payload, content: str
     ) -> None:  # pragma: no cover
         """Hook invoked after fetch; override for logging/metrics/tracing.
 
         Args:
-            url: URL fetched.
+            task: Task fetched.
             payload: Request parameters used to fetch.
             content: Raw textual content returned by ``fetch``.
         """
 
     def handle_error(
-        self, url: URL, payload: Payload, exc: Exception
+        self, task: Task, payload: Payload, exc: Exception
     ) -> None:  # pragma: no cover
         """Handle per-URL errors; default behavior re-raises the exception.
 
         Implementations may log, collect metrics, or convert exceptions.
 
         Args:
-            url: URL whose processing failed.
+            task: Task whose processing failed.
             payload: Payload built for this URL.
             exc: Original exception raised.
         """
         raise exc
 
-    def transform(self, _url: URL, docs: Documents) -> Documents:  # pragma: no cover
+    def transform(self, _task: Task, docs: Documents) -> Documents:  # pragma: no cover
         """Post-parse transformation/normalization stage; default is identity.
 
         Args:
-            _url: URL associated with ``docs``.
+            _task: Task associated with ``docs``.
             docs: Parsed documents.
 
         Returns:
@@ -201,11 +201,11 @@ class BaseSpider(ABC):
         """
         return docs
 
-    def run(self, urls: Iterable[URL]) -> list[Document]:
+    def run(self, tasks: Iterable[Task]) -> list[Document]:
         """Reference orchestration: build -> fetch -> parse.
 
         Steps:
-            1) ``build_payload`` per URL.
+            1) ``build_payload`` per task.
             2) ``should_fetch`` gate.
             3) ``before_fetch`` -> ``fetch`` -> ``after_fetch``.
             4) ``parse`` -> ``transform``.
@@ -213,31 +213,31 @@ class BaseSpider(ABC):
         Implementations may override for concurrency, caching, or tracing.
 
         Args:
-            urls: URLs to process.
+            tasks: Structured tasks to process.
 
         Returns:
             A list of parsed documents across all URLs.
         """
         results: list[Document] = []
-        urls_seq = tuple(urls)
-        self.on_run_start(urls_seq)
+        tasks_seq = tuple(tasks)
+        self.on_run_start(tasks_seq)
         error_count = 0
-        for url in urls_seq:
-            payload = self.build_payload(url)
-            if not self.should_fetch(url, payload):
+        for task in tasks_seq:
+            payload = self.build_payload(task)
+            if not self.should_fetch(task, payload):
                 continue
-            self.before_fetch(url, payload)
+            self.before_fetch(task, payload)
             try:
-                raw = self.fetch(url, payload)
-                self.after_fetch(url, payload, raw)
-                docs = self.parse(url, raw, payload)
-                docs = self.transform(url, docs)
+                raw = self.fetch(task, payload)
+                self.after_fetch(task, payload, raw)
+                docs = self.parse(task, raw, payload)
+                docs = self.transform(task, docs)
             except Exception as exc:  # pylint: disable=broad-except
-                self.handle_error(url, payload, exc)
+                self.handle_error(task, payload, exc)
                 error_count += 1
                 continue
             results.extend(docs)
-        self.on_run_end(urls_seq, results, error_count)
+        self.on_run_end(tasks_seq, results, error_count)
         return results
 
     # ---- Resource management ----

+ 26 - 24
src/databank/spiders/get_league_match_list.py

@@ -36,7 +36,8 @@ try:  # Optional dependency; guide user to install if missing
 except ImportError:  # pragma: no cover
     requests = None  # type: ignore[assignment]
 
-from .base import BaseSpider, BuildPayloadError, URL, Payload, Documents
+from .base import BaseSpider, BuildPayloadError, Task, Payload, Documents
+from ..core.tasks import MatchListTask
 from ..core.models import Document
 
 
@@ -53,36 +54,29 @@ class GetLeagueMatchListSpider(BaseSpider):
 
     endpoint: str = "https://sport.ttyingqiu.com/sportdata/f?platform=web"
 
-    def build_payload(self, url: URL) -> Payload:
-        """Build JSON payload from a URL token `league|season|round`.
+    def build_payload(self, task: Task) -> Payload:
+        """Build JSON payload from a MatchListTask (structured input)."""
+        if not isinstance(task, MatchListTask):
+            raise BuildPayloadError("task must be MatchListTask")
 
-        The scheduler should ideally pass structured data; for testing we parse
-        a token string split by '|'.
-        """
-        try:
-            league_str, season, round_str = url.split("|")
-            league_id = int(league_str)
-            round_no = int(round_str)
-        except Exception as exc:  # pylint: disable=broad-except
-            raise BuildPayloadError(f"Invalid task token: {url}") from exc
-
-        payload = {
-            "leagueId": league_id,
+        payload: dict[str, Any] = {
+            "leagueId": int(task.league_id),
             "pageNo": 1,
             "pageSize": 100,
-            "round": round_no,
+            "round": int(task.round_no),
             "seasonFlag": 0,
-            "seasonName": season,
+            "seasonName": str(task.season),
             "apiName": self.name,
         }
+        payload["_task"] = task
         return payload
 
-    def before_fetch(self, url: URL, payload: Payload) -> None:  # pragma: no cover
+    def before_fetch(self, task: Task, payload: Payload) -> None:  # pragma: no cover
         """Sleep a random 1~2 seconds to respect rate limits."""
         delay = random.uniform(1.0, 2.0)
         time.sleep(delay)
 
-    def fetch(self, url: URL, payload: Payload) -> str:
+    def fetch(self, task: Task, payload: Payload) -> str:
         """POST to the endpoint and return raw text.
 
         On error, return a JSON-encoded error payload so `parse` can emit a
@@ -111,7 +105,7 @@ class GetLeagueMatchListSpider(BaseSpider):
                 }
             )
 
-    def parse(self, url: URL, content: str, payload: Payload) -> Documents:
+    def parse(self, task: Task, content: str, payload: Payload) -> Documents:
         """Parse JSON, filter matchList by groupName == '联赛', return Documents."""
         try:
             data = json.loads(content)
@@ -121,7 +115,11 @@ class GetLeagueMatchListSpider(BaseSpider):
                 Document(
                     id=None,
                     kind="error",
-                    data={"token": url, "reason": "invalid_json", "detail": str(exc)},
+                    data={
+                        "token": task.token() if hasattr(task, "token") else str(task),
+                        "reason": "invalid_json",
+                        "detail": str(exc),
+                    },
                 )
             ]
 
@@ -132,7 +130,7 @@ class GetLeagueMatchListSpider(BaseSpider):
                     id=None,
                     kind="error",
                     data={
-                        "token": url,
+                        "token": task.token() if hasattr(task, "token") else str(task),
                         "reason": str(data.get("error")),
                         "detail": str(data.get("detail")),
                         "payload": dict(payload),
@@ -165,7 +163,7 @@ class GetLeagueMatchListSpider(BaseSpider):
                     id=None,
                     kind="error",
                     data={
-                        "token": url,
+                        "token": task.token() if hasattr(task, "token") else str(task),
                         "reason": "no_match_list",
                         "payload": dict(payload),
                         "raw_keys": (
@@ -193,7 +191,11 @@ class GetLeagueMatchListSpider(BaseSpider):
                 Document(
                     id=doc_id,
                     kind="match",
-                    data={"token": url, "payload": dict(payload), "match": dict(item)},
+                    data={
+                        "token": task.token() if hasattr(task, "token") else str(task),
+                        "payload": dict(payload),
+                        "match": dict(item),
+                    },
                 )
             )
         return docs