|
|
@@ -0,0 +1,211 @@
|
|
|
+"""Concrete MongoDB backend implementing `BaseDB` for local reads/writes.
|
|
|
+
|
|
|
+This implementation keeps configuration simple and relies on options provided
|
|
|
+via `BaseDB.configure(**options)` or constructor `**options`:
|
|
|
+
|
|
|
+Options:
|
|
|
+- uri: Full MongoDB connection URI (preferred). Default: mongodb://localhost:27017
|
|
|
+- name: Database name. Default: databank
|
|
|
+- host: Host (used only if `uri` is absent). Default: localhost
|
|
|
+- port: Port (used only if `uri` is absent). Default: 27017
|
|
|
+- username/password: Credentials to embed in URI if `uri` is absent.
|
|
|
+- indexes: Optional mapping specifying indexes to ensure on `ensure_indexes`.
|
|
|
+ Example:
|
|
|
+ {
|
|
|
+ "leagues": [
|
|
|
+ {"keys": [("league_id", 1)], "unique": True}
|
|
|
+ ],
|
|
|
+ "seasons": [
|
|
|
+ {"keys": [("season", 1)], "unique": True}
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+Note: This module requires `pymongo`. Install via:
|
|
|
+ python -m pip install "pymongo>=4.7"
|
|
|
+"""
|
|
|
+
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+from typing import Any, Iterable, Mapping, Optional
|
|
|
+
|
|
|
+from pymongo import MongoClient
|
|
|
+from pymongo.collection import Collection
|
|
|
+from pymongo.database import Database
|
|
|
+from pymongo.errors import PyMongoError
|
|
|
+
|
|
|
+from .base import BaseDB, ConnectError, InsertError
|
|
|
+from ..core.models import Document
|
|
|
+
|
|
|
+
|
|
|
+class MongoDB(BaseDB):
|
|
|
+ """MongoDB implementation of `BaseDB`.
|
|
|
+
|
|
|
+ This class supports simple inserts (with optional upsert by `_id`) and
|
|
|
+ convenience read helpers (`find`, `find_one`). It is designed to be used
|
|
|
+ with local MongoDB by default but works with any URI.
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, *, logger=None, **options: object) -> None:
|
|
|
+ """Initialize with optional logger and options (see module docstring)."""
|
|
|
+ super().__init__(logger=logger, **options)
|
|
|
+ self._client: Optional[MongoClient] = None
|
|
|
+ self._db: Optional[Database] = None
|
|
|
+
|
|
|
+ # ---- BaseDB required methods ----
|
|
|
+ def connect(self) -> None:
|
|
|
+ """Establish a connection to MongoDB and select the database.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ ConnectError: If the connection fails or database is not resolvable.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ uri = self._build_uri()
|
|
|
+ name = str(self.options.get("name") or "databank")
|
|
|
+ self._client = MongoClient(uri)
|
|
|
+ # Validate connection
|
|
|
+ self._client.admin.command("ping")
|
|
|
+ self._db = self._client[name]
|
|
|
+ self.on_connect()
|
|
|
+ if self.logger:
|
|
|
+ self.logger.debug("MongoDB connected: uri=%s db=%s", uri, name)
|
|
|
+ except PyMongoError as exc: # pragma: no cover - environment dependent
|
|
|
+ raise ConnectError(f"Failed to connect to MongoDB: {exc}") from exc
|
|
|
+
|
|
|
+ def ensure_indexes(self) -> None:
|
|
|
+ """Create indexes defined in `options[\"indexes\"]` if provided.
|
|
|
+
|
|
|
+ The expected format is a mapping of collection name to a list of index
|
|
|
+ specs. Each index spec is a dict with keys:
|
|
|
+ - keys: list[tuple[str, int]] (e.g., [("league_id", ASCENDING)])
|
|
|
+ - unique: bool (optional)
|
|
|
+ - name: str (optional)
|
|
|
+ """
|
|
|
+ if self._db is None:
|
|
|
+ return
|
|
|
+ indexes = self.options.get("indexes")
|
|
|
+ if not isinstance(indexes, Mapping):
|
|
|
+ return
|
|
|
+ for coll_name, specs in indexes.items():
|
|
|
+ if not isinstance(specs, Iterable):
|
|
|
+ continue
|
|
|
+ coll = self._db[coll_name]
|
|
|
+ for spec in specs:
|
|
|
+ if not isinstance(spec, Mapping) or "keys" not in spec:
|
|
|
+ continue
|
|
|
+ keys = list(spec["keys"]) # type: ignore[assignment]
|
|
|
+ unique = bool(spec.get("unique", False))
|
|
|
+ name = spec.get("name")
|
|
|
+ coll.create_index(keys, unique=unique, name=name)
|
|
|
+
|
|
|
+ def insert_many(self, docs: Iterable[Document]) -> int:
|
|
|
+ """Insert or upsert documents into collections named by `doc.kind`.
|
|
|
+
|
|
|
+ - If `doc.id` is present, performs a replace-one with upsert keyed by `_id`.
|
|
|
+ - Otherwise, inserts the document (non-upsert) letting MongoDB assign `_id`.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Count of inserted/updated documents.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ InsertError: On any underlying PyMongo failure.
|
|
|
+ """
|
|
|
+ if self._db is None:
|
|
|
+ raise InsertError("Database not connected. Call connect() first.")
|
|
|
+ count = 0
|
|
|
+ try:
|
|
|
+ for doc in docs:
|
|
|
+ coll = self._collection_for(doc)
|
|
|
+ payload: dict[str, Any] = dict(doc.data)
|
|
|
+ payload.setdefault("created_at", doc.created_at)
|
|
|
+ if doc.id:
|
|
|
+ payload["_id"] = doc.id
|
|
|
+ res = coll.replace_one({"_id": doc.id}, payload, upsert=True)
|
|
|
+ # Count insert or actual modification as success
|
|
|
+ count += (
|
|
|
+ 1 if (res.matched_count == 0 or res.modified_count == 1) else 0
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ coll.insert_one(payload)
|
|
|
+ count += 1
|
|
|
+ self.after_insert([], count)
|
|
|
+ return count
|
|
|
+ except PyMongoError as exc: # pragma: no cover - environment dependent
|
|
|
+ raise InsertError(f"Failed to insert documents: {exc}") from exc
|
|
|
+
|
|
|
+ def close(self) -> None:
|
|
|
+ """Close the MongoDB client and release resources."""
|
|
|
+ try:
|
|
|
+ if self._client is not None:
|
|
|
+ self._client.close()
|
|
|
+ if self.logger:
|
|
|
+ self.logger.debug("MongoDB client closed")
|
|
|
+ finally:
|
|
|
+ self._client = None
|
|
|
+ self._db = None
|
|
|
+ self.on_close()
|
|
|
+
|
|
|
+ # ---- Convenience read helpers (not part of BaseDB interface) ----
|
|
|
+ def find(
|
|
|
+ self,
|
|
|
+ collection: str,
|
|
|
+ query: Optional[Mapping[str, Any]] = None,
|
|
|
+ projection: Optional[Mapping[str, int]] = None,
|
|
|
+ limit: Optional[int] = None,
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
+ """Find documents in a collection and return them as a list.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ collection: Collection name to read from.
|
|
|
+ query: MongoDB filter dict.
|
|
|
+ projection: Fields to include/exclude (MongoDB projection dict).
|
|
|
+ limit: Optional max number of documents to return.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ A list of raw MongoDB documents (dicts).
|
|
|
+ """
|
|
|
+ self._require_db()
|
|
|
+ coll = self._db[collection] # type: ignore[index]
|
|
|
+ cursor = coll.find(filter=query or {}, projection=projection)
|
|
|
+ if limit:
|
|
|
+ cursor = cursor.limit(int(limit))
|
|
|
+ return list(cursor)
|
|
|
+
|
|
|
+ def find_one(
|
|
|
+ self,
|
|
|
+ collection: str,
|
|
|
+ query: Optional[Mapping[str, Any]] = None,
|
|
|
+ projection: Optional[Mapping[str, int]] = None,
|
|
|
+ ) -> Optional[dict[str, Any]]:
|
|
|
+ """Find a single document matching the query or return None."""
|
|
|
+ self._require_db()
|
|
|
+ coll = self._db[collection] # type: ignore[index]
|
|
|
+ return coll.find_one(filter=query or {}, projection=projection)
|
|
|
+
|
|
|
+ # ---- Internals ----
|
|
|
+ def _require_db(self) -> None:
|
|
|
+ """Ensure `connect()` has been called and DB is available."""
|
|
|
+ if self._db is None:
|
|
|
+ raise ConnectError("Database not connected. Call connect() first.")
|
|
|
+
|
|
|
+ def _collection_for(self, doc: Document) -> Collection:
|
|
|
+ """Resolve the collection for a document based on its `kind`."""
|
|
|
+ self._require_db()
|
|
|
+ name = doc.kind or "documents"
|
|
|
+ return self._db[name] # type: ignore[index]
|
|
|
+
|
|
|
+ def _build_uri(self) -> str:
|
|
|
+ """Build a MongoDB URI from options if `uri` not explicitly provided."""
|
|
|
+ uri = self.options.get("uri")
|
|
|
+ if isinstance(uri, str) and uri:
|
|
|
+ return uri
|
|
|
+ host = str(self.options.get("host") or "localhost")
|
|
|
+ port_val = self.options.get("port")
|
|
|
+ if isinstance(port_val, (int, str)):
|
|
|
+ port = int(port_val)
|
|
|
+ else:
|
|
|
+ port = 27017
|
|
|
+ username = self.options.get("username")
|
|
|
+ password = self.options.get("password")
|
|
|
+ if username and password:
|
|
|
+ return f"mongodb://{username}:{password}@{host}:{port}"
|
|
|
+ return f"mongodb://{host}:{port}"
|