run_pipeline_once.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """Run the full pipeline once: seed tokens -> run spider(s) -> persist -> report -> basic analytics.
  2. Usage (PowerShell):
  3. # Ensure deps
  4. # python -m pip install requests pymongo
  5. # Configure DB if needed
  6. # $env:DATABANK_DB_URI = "mongodb://localhost:27017"
  7. # $env:DATABANK_DB_NAME = "databank"
  8. python scripts/run_pipeline_once.py
  9. """
  10. from __future__ import annotations
  11. import os
  12. from collections import Counter
  13. from datetime import UTC, datetime
  14. from typing import Dict, List
  15. from databank.db import MongoDB
  16. from databank.reporter.daily_file import DailyFileReporter
  17. from databank.scheduler.simple_runner import SimpleRunner
  18. from databank.spiders.base import BaseSpider, Task
  19. from databank.spiders.get_league_match_list import GetLeagueMatchListSpider
  20. from databank.core.tasks import MatchListTask
  21. def pick_tokens(db: MongoDB, max_tokens: int = 3) -> list[MatchListTask]:
  22. """Build MatchListTask list from DB seed data (league/season)."""
  23. leagues = db.find("leagues", projection={"_id": 0}, limit=10)
  24. seasons = db.find("seasons", projection={"_id": 0}, limit=10)
  25. if not leagues:
  26. raise RuntimeError("No leagues found. Seed leagues first.")
  27. if not seasons:
  28. raise RuntimeError("No seasons found. Seed seasons first.")
  29. league = sorted(leagues, key=lambda x: x.get("league_id", 0))[0]
  30. max_round = int(league.get("max_round", 1))
  31. season_name = seasons[0]["season"]
  32. rounds = list(range(1, max_round + 1))[:max_tokens]
  33. return [
  34. MatchListTask(
  35. league_id=int(league["league_id"]), season=season_name, round_no=int(r)
  36. )
  37. for r in rounds
  38. ]
  39. def main() -> None:
  40. """Run one full pipeline pass using structured tasks and SimpleRunner."""
  41. uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
  42. name = os.getenv("DATABANK_DB_NAME", "databank")
  43. db = MongoDB(uri=uri, name=name)
  44. db.connect()
  45. reporter = DailyFileReporter(timezone="utc+8")
  46. runner = SimpleRunner(db=db, reporters=[reporter])
  47. spider = GetLeagueMatchListSpider()
  48. tasks = pick_tokens(db)
  49. spiders: Dict[BaseSpider, List[Task]] = {spider: tasks}
  50. summary = runner.run(spiders)
  51. # Basic analytics example: count docs per token for this run (from runner.last_docs)
  52. per_token = Counter(d.data.get("token", "unknown") for d in runner.last_docs)
  53. top = per_token.most_common(3)
  54. print(f"Run finished at {datetime.now(UTC).isoformat()}Z")
  55. print(f"Total persisted: {summary.total_docs}")
  56. print("Top tokens (by docs):", top)
  57. if __name__ == "__main__":
  58. main()