| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- """Seed seasons into a local MongoDB instance with unique ``season``.
- Rules:
- - initial_year starts from 2016.
- - Determine current_year by today's date:
- - If today > July 1 (strict), current_year = this_year + 1
- - Else, current_year = this_year
- - Generate seasons from initial_year up to (but excluding) current_year, as strings
- like "YYYY-YYYY+1". Stop when initial_year == current_year.
- - Ensure unique index on ``season`` and upsert each document.
- Environment variables (with defaults):
- - DATABANK_DB_URI (default: mongodb://localhost:27017)
- - DATABANK_DB_NAME (default: databank)
- Usage (PowerShell):
- python -m pip install "pymongo>=4.7"
- $env:DATABANK_DB_URI = "mongodb://localhost:27017"
- $env:DATABANK_DB_NAME = "databank"
- python scripts/seed_seasons_mongo.py
- """
- from __future__ import annotations
- import os
- from datetime import date
- from typing import Any, Dict, Iterable, List, Tuple
- from pymongo import MongoClient
- def get_connection() -> Tuple[MongoClient, str]:
- """Create a MongoDB client and return ``(client, db_name)``."""
- uri = os.getenv("DATABANK_DB_URI", "mongodb://localhost:27017")
- name = os.getenv("DATABANK_DB_NAME", "databank")
- client = MongoClient(uri)
- return client, name
- def calc_current_year(today: date | None = None) -> int:
- """Calculate current_year per rule (> July 1 => year+1 else year)."""
- today = today or date.today()
- threshold = date(today.year, 7, 1)
- return today.year + 1 if today > threshold else today.year
- def generate_seasons(initial_year: int, current_year: int) -> List[Dict[str, Any]]:
- """Generate season documents from initial_year to current_year (exclusive)."""
- seasons: List[Dict[str, Any]] = []
- for start in range(initial_year, current_year):
- end = start + 1
- seasons.append(
- {
- "season": f"{start}-{end}",
- "start_year": start,
- "end_year": end,
- }
- )
- return seasons
- def ensure_unique_index_seasons(db) -> None:
- """Ensure a unique index on seasons.season."""
- seasons = db["seasons"]
- seasons.create_index("season", unique=True)
- def upsert_seasons(db, docs: Iterable[Dict[str, Any]]) -> Dict[str, int]:
- """Upsert provided season docs by ``season``; return counters."""
- seasons = db["seasons"]
- inserted = 0
- updated = 0
- for doc in docs:
- key = {"season": doc["season"]}
- result = seasons.replace_one(key, doc, upsert=True)
- if result.matched_count:
- updated += int(result.modified_count == 1)
- else:
- inserted += 1
- return {"inserted": inserted, "updated": updated}
- def main() -> None:
- """Entry point to seed season documents into MongoDB with unique ``season``."""
- client, db_name = get_connection()
- try:
- db = client[db_name]
- ensure_unique_index_seasons(db)
- initial_year = 2016
- current = calc_current_year()
- docs = generate_seasons(initial_year, current)
- stats = upsert_seasons(db, docs)
- print(
- f"Seasons seed done: range=[{initial_year}, {current}) "
- f"inserted={stats['inserted']}, updated={stats['updated']}"
- )
- finally:
- client.close()
- if __name__ == "__main__":
- main()
|