| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- """Run the full pipeline once: seed tokens -> run spider(s) -> persist -> report -> basic analytics.
- Usage (PowerShell):
- # Ensure deps
- # python -m pip install requests pymongo
- # Configure DB if needed
- # $env:DATABANK_DB_URI = "mongodb://localhost:27017"
- # $env:DATABANK_DB_NAME = "databank"
- python scripts/run_pipeline_once.py
- """
- from __future__ import annotations
- import os
- from collections import Counter
- from datetime import UTC, datetime
- from typing import Dict, List
- from databank.db import MongoDB
- from databank.reporter.daily_file import DailyFileReporter
- from databank.scheduler.simple_runner import SimpleRunner
- from databank.spiders.base import BaseSpider, Task
- from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
- from databank.core.tasks import MatchListTask
- def pick_tokens(db: MongoDB, max_tokens: int = 3) -> list[MatchListTask]:
- """Build MatchListTask list from DB seed data (league/season)."""
- leagues = db.find("leagues", projection={"_id": 0}, limit=10)
- seasons = db.find("seasons", projection={"_id": 0}, limit=10)
- if not leagues:
- raise RuntimeError("No leagues found. Seed leagues first.")
- if not seasons:
- raise RuntimeError("No seasons found. Seed seasons first.")
- league = sorted(leagues, key=lambda x: x.get("league_id", 0))[0]
- max_round = int(league.get("max_round", 1))
- season_name = seasons[0]["season"]
- rounds = list(range(1, max_round + 1))[:max_tokens]
- return [
- MatchListTask(
- league_id=int(league["league_id"]), season=season_name, round_no=int(r)
- )
- for r in rounds
- ]
- def main() -> None:
- """Run one full pipeline pass using structured tasks and SimpleRunner."""
- uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
- name = os.getenv("DATABANK_DB_NAME", "databank")
- db = MongoDB(uri=uri, name=name)
- db.connect()
- reporter = DailyFileReporter(timezone="utc+8")
- runner = SimpleRunner(db=db, reporters=[reporter])
- spider = GetLeagueMatchListSpider()
- tasks = pick_tokens(db)
- spiders: Dict[BaseSpider, List[Task]] = {spider: tasks}
- summary = runner.run(spiders)
- # Basic analytics example: count docs per token for this run (from runner.last_docs)
- per_token = Counter(d.data.get("token", "unknown") for d in runner.last_docs)
- top = per_token.most_common(3)
- print(f"Run finished at {datetime.now(UTC).isoformat()}Z")
- print(f"Total persisted: {summary.total_docs}")
- print("Top tokens (by docs):", top)
- if __name__ == "__main__":
- main()
|