ingest_info_to_mongo.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """Load JSON items from the local info/*.txt files and upsert into MongoDB."""
  2. from __future__ import annotations
  3. import json
  4. import os
  5. import re
  6. import sys
  7. from pathlib import Path
  8. from typing import Any, Dict, Iterable, List
  9. from pymongo import MongoClient, UpdateOne
  10. from pymongo.collection import Collection
  11. from pymongo.results import BulkWriteResult
  12. DEFAULT_INFO_DIR = Path(__file__).resolve().parent.parent / "info"
  13. MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
  14. MONGO_DB = os.getenv("MONGO_DB", "haier")
  15. MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", "info_items")
  16. BATCH_SIZE = 500
  17. _TRAILING_COMMA_RE = re.compile(r",\s*([}\]])")
  18. def _strip_noise(text: str) -> str:
  19. """Remove BOM, null bytes, and whitespace noise."""
  20. return text.replace("\ufeff", "").replace("\x00", "").strip()
  21. def _remove_trailing_commas(text: str) -> str:
  22. """Best-effort removal of dangling commas before closing braces/brackets."""
  23. return _TRAILING_COMMA_RE.sub(r"\1", text)
  24. def _as_ndjson_array(text: str) -> str:
  25. """Wrap newline-delimited JSON objects into an array string if applicable."""
  26. lines = [line.strip() for line in text.splitlines() if line.strip()]
  27. if lines and all(line.startswith("{") and line.endswith("}") for line in lines):
  28. return "[" + ",".join(lines) + "]"
  29. return text
  30. def parse_json_text(raw: str) -> List[Dict[str, Any]]:
  31. """Parse possibly messy JSON text into a list of dictionaries."""
  32. cleaned = _strip_noise(raw)
  33. candidates = [cleaned]
  34. cleaned_trailing = _remove_trailing_commas(cleaned)
  35. if cleaned_trailing != cleaned:
  36. candidates.append(cleaned_trailing)
  37. ndjson_wrapped = _as_ndjson_array(cleaned)
  38. if ndjson_wrapped != cleaned:
  39. candidates.append(ndjson_wrapped)
  40. if cleaned and not cleaned.lstrip().startswith("["):
  41. candidates.append("[" + cleaned + "]")
  42. last_error: Exception | None = None
  43. for candidate in candidates:
  44. try:
  45. parsed = json.loads(candidate)
  46. except json.JSONDecodeError as exc: # pragma: no cover - defensive path
  47. last_error = exc
  48. continue
  49. if isinstance(parsed, dict):
  50. return [parsed]
  51. if isinstance(parsed, list):
  52. return [item for item in parsed if isinstance(item, dict)]
  53. last_error = ValueError("Parsed JSON is not a dict or list")
  54. raise ValueError(f"Unable to parse JSON content: {last_error}")
  55. def load_items(path: Path) -> List[Dict[str, Any]]:
  56. """Parse one info file into a list of dictionaries and tag the source file."""
  57. text = path.read_text(encoding="utf-8", errors="ignore")
  58. items = parse_json_text(text)
  59. for item in items:
  60. item.setdefault("_source_file", path.name)
  61. return items
  62. def iter_files(info_dir: Path) -> Iterable[Path]:
  63. """Yield info txt files in deterministic order for repeatable ingest."""
  64. yield from sorted(info_dir.glob("*.txt"))
  65. def bulk_upsert(
  66. collection: Collection, items: List[Dict[str, Any]]
  67. ) -> BulkWriteResult | None:
  68. """Upsert items by id into MongoDB using bulk_write for throughput."""
  69. operations: List[UpdateOne] = []
  70. for item in items:
  71. key = {"id": item.get("id")}
  72. operations.append(UpdateOne(key, {"$set": item}, upsert=True))
  73. if not operations:
  74. return None
  75. return collection.bulk_write(operations, ordered=False)
  76. def ingest(info_dir: Path) -> None:
  77. """Process all info files under info_dir and upsert their records."""
  78. client = MongoClient(MONGO_URI)
  79. collection = client[MONGO_DB][MONGO_COLLECTION]
  80. collection.create_index("id", unique=True)
  81. total_files = 0
  82. total_items = 0
  83. for file_path in iter_files(info_dir):
  84. total_files += 1
  85. items = load_items(file_path)
  86. total_items += len(items)
  87. for start in range(0, len(items), BATCH_SIZE):
  88. batch = items[start : start + BATCH_SIZE]
  89. bulk_upsert(collection, batch)
  90. print(f"{file_path.name}: {len(items)} items upserted")
  91. print(f"Completed. Files: {total_files}, items processed: {total_items}.")
  92. def main(argv: List[str]) -> int:
  93. """CLI entry point; accepts optional info directory argument."""
  94. info_dir = Path(os.getenv("INFO_DIR", DEFAULT_INFO_DIR))
  95. if len(argv) > 1:
  96. info_dir = Path(argv[1])
  97. if not info_dir.exists():
  98. print(f"Info directory does not exist: {info_dir}", file=sys.stderr)
  99. return 1
  100. ingest(info_dir)
  101. return 0
  102. if __name__ == "__main__":
  103. raise SystemExit(main(sys.argv))