"""Run scheduler in incremental mode: only current/latest season per league.""" from __future__ import annotations import os from typing import Dict, List from databank.db import MongoDB from databank.reporter.daily_file import DailyFileReporter from databank.spiders.get_league_match_list import GetLeagueMatchListSpider from databank.spiders.base import BaseSpider from databank.scheduler.orchestrator import DatabankScheduler, TaskProvider from databank.tasks.providers import league_matchlist_from_mongo from databank.analytics.simple_counts import PerTokenCounter essential_env = { "DATABANK_DB_URI": "mongodb://localhost:27017", "DATABANK_DB_NAME": "databank", } def main() -> None: """Entry point: run the scheduler in incremental mode (latest season only).""" uri = os.getenv("DATABANK_DB_URI", essential_env["DATABANK_DB_URI"]) name = os.getenv("DATABANK_DB_NAME", essential_env["DATABANK_DB_NAME"]) db = MongoDB(uri=uri, name=name) db.connect() # Spiders get_match = GetLeagueMatchListSpider() spiders: List[BaseSpider] = [get_match] # Reporters reporters = [DailyFileReporter(timezone="utc+8")] # Task providers wiring (incremental) provider = league_matchlist_from_mongo(mode="incremental") tasks_provider: Dict[BaseSpider, TaskProvider] = {get_match: provider} # Preflight: generate tasks once to validate seeds/config preview_tasks = provider(get_match, db) if not preview_tasks: print( "No tasks generated (incremental mode). " "Please ensure MongoDB has seeds in 'leagues' and 'seasons'." ) print( "Try seeding first: python scripts/seed_leagues_mongo.py " "and python scripts/seed_seasons_mongo.py" ) return else: print( f"Prepared {len(preview_tasks)} task(s) for incremental run " f"(showing up to 3):" ) for t in preview_tasks[:3]: token = t.token() if hasattr(t, "token") else str(t) print(" -", token) # Analyzers analyzers = [PerTokenCounter()] scheduler = DatabankScheduler( db=db, spiders=spiders, reporters=reporters, task_providers=tasks_provider, analyzers=analyzers, ) summary = scheduler.run_once() print("Scheduler(incremental) finished. Total persisted:", summary.total_docs) if summary.total_docs == 0: # Diagnostics: inspect returned docs to explain why nothing persisted docs = scheduler.get_last_docs() if hasattr(scheduler, "get_last_docs") else [] kinds = {} for d in docs: kinds[d.kind] = kinds.get(d.kind, 0) + 1 if kinds: print("Returned document kinds:", kinds) # Show first few error reasons if present errs = [d for d in docs if d.kind == "error"] if errs: preview = errs[:3] print("Sample errors (up to 3):") for e in preview: reason = e.data.get("reason") if isinstance(e.data, dict) else None detail = e.data.get("detail") if isinstance(e.data, dict) else None print(" -", reason, ":", detail) else: print( "Runner produced no documents. " "Check network/API accessibility and spider filters." ) print( "Note: Spider filters keep only groupName='联赛' and " "elapsedTime='已完场', and skip future-dated matches." ) if __name__ == "__main__": main()