| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- """Load JSON items from the local info/*.txt files and upsert into MongoDB."""
- from __future__ import annotations
- import json
- import os
- import re
- import sys
- from pathlib import Path
- from typing import Any, Dict, Iterable, List
- from pymongo import MongoClient, UpdateOne
- from pymongo.collection import Collection
- from pymongo.results import BulkWriteResult
- DEFAULT_INFO_DIR = Path(__file__).resolve().parent.parent / "info"
- MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
- MONGO_DB = os.getenv("MONGO_DB", "haier")
- MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "info_items")
- BATCH_SIZE = 500
- _TRAILING_COMMA_RE = re.compile(r",\s*([}\]])")
- def _strip_noise(text: str) -> str:
- """Remove BOM, null bytes, and whitespace noise."""
- return text.replace("\ufeff", "").replace("\x00", "").strip()
- def _remove_trailing_commas(text: str) -> str:
- """Best-effort removal of dangling commas before closing braces/brackets."""
- return _TRAILING_COMMA_RE.sub(r"\1", text)
- def _as_ndjson_array(text: str) -> str:
- """Wrap newline-delimited JSON objects into an array string if applicable."""
- lines = [line.strip() for line in text.splitlines() if line.strip()]
- if lines and all(line.startswith("{") and line.endswith("}") for line in lines):
- return "[" + ",".join(lines) + "]"
- return text
- def parse_json_text(raw: str) -> List[Dict[str, Any]]:
- """Parse possibly messy JSON text into a list of dictionaries."""
- cleaned = _strip_noise(raw)
- candidates = [cleaned]
- cleaned_trailing = _remove_trailing_commas(cleaned)
- if cleaned_trailing != cleaned:
- candidates.append(cleaned_trailing)
- ndjson_wrapped = _as_ndjson_array(cleaned)
- if ndjson_wrapped != cleaned:
- candidates.append(ndjson_wrapped)
- if cleaned and not cleaned.lstrip().startswith("["):
- candidates.append("[" + cleaned + "]")
- last_error: Exception | None = None
- for candidate in candidates:
- try:
- parsed = json.loads(candidate)
- except json.JSONDecodeError as exc: # pragma: no cover - defensive path
- last_error = exc
- continue
- if isinstance(parsed, dict):
- return [parsed]
- if isinstance(parsed, list):
- return [item for item in parsed if isinstance(item, dict)]
- last_error = ValueError("Parsed JSON is not a dict or list")
- raise ValueError(f"Unable to parse JSON content: {last_error}")
- def load_items(path: Path) -> List[Dict[str, Any]]:
- """Parse one info file into a list of dictionaries and tag the source file."""
- text = path.read_text(encoding="utf-8", errors="ignore")
- items = parse_json_text(text)
- for item in items:
- item.setdefault("_source_file", path.name)
- return items
- def iter_files(info_dir: Path) -> Iterable[Path]:
- """Yield info txt files in deterministic order for repeatable ingest."""
- yield from sorted(info_dir.glob("*.txt"))
- def bulk_upsert(
- collection: Collection, items: List[Dict[str, Any]]
- ) -> BulkWriteResult | None:
- """Upsert items by id into MongoDB using bulk_write for throughput."""
- operations: List[UpdateOne] = []
- for item in items:
- key = {"id": item.get("id")}
- operations.append(UpdateOne(key, {"$set": item}, upsert=True))
- if not operations:
- return None
- return collection.bulk_write(operations, ordered=False)
- def ingest(info_dir: Path) -> None:
- """Process all info files under info_dir and upsert their records."""
- client = MongoClient(MONGO_URI)
- collection = client[MONGO_DB][MONGO_COLLECTION]
- collection.create_index("id", unique=True)
- total_files = 0
- total_items = 0
- for file_path in iter_files(info_dir):
- total_files += 1
- items = load_items(file_path)
- total_items += len(items)
- for start in range(0, len(items), BATCH_SIZE):
- batch = items[start : start + BATCH_SIZE]
- bulk_upsert(collection, batch)
- print(f"{file_path.name}: {len(items)} items upserted")
- print(f"Completed. Files: {total_files}, items processed: {total_items}.")
- def main(argv: List[str]) -> int:
- """CLI entry point; accepts optional info directory argument."""
- info_dir = Path(os.getenv("INFO_DIR", DEFAULT_INFO_DIR))
- if len(argv) > 1:
- info_dir = Path(argv[1])
- if not info_dir.exists():
- print(f"Info directory does not exist: {info_dir}", file=sys.stderr)
- return 1
- ingest(info_dir)
- return 0
- if __name__ == "__main__":
- raise SystemExit(main(sys.argv))
|