"""Load JSON items from the local info/*.txt files and upsert into MongoDB.""" from __future__ import annotations import json import os import re import sys from pathlib import Path from typing import Any, Dict, Iterable, List from pymongo import MongoClient, UpdateOne from pymongo.collection import Collection from pymongo.results import BulkWriteResult DEFAULT_INFO_DIR = Path(__file__).resolve().parent.parent / "info" MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017") MONGO_DB = os.getenv("MONGO_DB", "haier") MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "info_items") BATCH_SIZE = 500 _TRAILING_COMMA_RE = re.compile(r",\s*([}\]])") def _strip_noise(text: str) -> str: """Remove BOM, null bytes, and whitespace noise.""" return text.replace("\ufeff", "").replace("\x00", "").strip() def _remove_trailing_commas(text: str) -> str: """Best-effort removal of dangling commas before closing braces/brackets.""" return _TRAILING_COMMA_RE.sub(r"\1", text) def _as_ndjson_array(text: str) -> str: """Wrap newline-delimited JSON objects into an array string if applicable.""" lines = [line.strip() for line in text.splitlines() if line.strip()] if lines and all(line.startswith("{") and line.endswith("}") for line in lines): return "[" + ",".join(lines) + "]" return text def parse_json_text(raw: str) -> List[Dict[str, Any]]: """Parse possibly messy JSON text into a list of dictionaries.""" cleaned = _strip_noise(raw) candidates = [cleaned] cleaned_trailing = _remove_trailing_commas(cleaned) if cleaned_trailing != cleaned: candidates.append(cleaned_trailing) ndjson_wrapped = _as_ndjson_array(cleaned) if ndjson_wrapped != cleaned: candidates.append(ndjson_wrapped) if cleaned and not cleaned.lstrip().startswith("["): candidates.append("[" + cleaned + "]") last_error: Exception | None = None for candidate in candidates: try: parsed = json.loads(candidate) except json.JSONDecodeError as exc: # pragma: no cover - defensive path last_error = exc continue if isinstance(parsed, dict): return [parsed] if isinstance(parsed, list): return [item for item in parsed if isinstance(item, dict)] last_error = ValueError("Parsed JSON is not a dict or list") raise ValueError(f"Unable to parse JSON content: {last_error}") def load_items(path: Path) -> List[Dict[str, Any]]: """Parse one info file into a list of dictionaries and tag the source file.""" text = path.read_text(encoding="utf-8", errors="ignore") items = parse_json_text(text) for item in items: item.setdefault("_source_file", path.name) return items def iter_files(info_dir: Path) -> Iterable[Path]: """Yield info txt files in deterministic order for repeatable ingest.""" yield from sorted(info_dir.glob("*.txt")) def bulk_upsert( collection: Collection, items: List[Dict[str, Any]] ) -> BulkWriteResult | None: """Upsert items by id into MongoDB using bulk_write for throughput.""" operations: List[UpdateOne] = [] for item in items: key = {"id": item.get("id")} operations.append(UpdateOne(key, {"$set": item}, upsert=True)) if not operations: return None return collection.bulk_write(operations, ordered=False) def ingest(info_dir: Path) -> None: """Process all info files under info_dir and upsert their records.""" client = MongoClient(MONGO_URI) collection = client[MONGO_DB][MONGO_COLLECTION] collection.create_index("id", unique=True) total_files = 0 total_items = 0 for file_path in iter_files(info_dir): total_files += 1 items = load_items(file_path) total_items += len(items) for start in range(0, len(items), BATCH_SIZE): batch = items[start : start + BATCH_SIZE] bulk_upsert(collection, batch) print(f"{file_path.name}: {len(items)} items upserted") print(f"Completed. Files: {total_files}, items processed: {total_items}.") def main(argv: List[str]) -> int: """CLI entry point; accepts optional info directory argument.""" info_dir = Path(os.getenv("INFO_DIR", DEFAULT_INFO_DIR)) if len(argv) > 1: info_dir = Path(argv[1]) if not info_dir.exists(): print(f"Info directory does not exist: {info_dir}", file=sys.stderr) return 1 ingest(info_dir) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv))