json_parser.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # type: ignore
  2. import asyncio
  3. import json
  4. from typing import AsyncGenerator
  5. from core.base import R2RException
  6. from core.base.parsers.base_parser import AsyncParser
  7. from core.base.providers import (
  8. CompletionProvider,
  9. DatabaseProvider,
  10. IngestionConfig,
  11. )
  12. class JSONParser(AsyncParser[str | bytes]):
  13. """A parser for JSON data."""
  14. def __init__(
  15. self,
  16. config: IngestionConfig,
  17. database_provider: DatabaseProvider,
  18. llm_provider: CompletionProvider,
  19. ):
  20. self.database_provider = database_provider
  21. self.llm_provider = llm_provider
  22. self.config = config
  23. async def ingest(
  24. self, data: str | bytes, *args, **kwargs
  25. ) -> AsyncGenerator[str, None]:
  26. if isinstance(data, bytes):
  27. data = data.decode("utf-8")
  28. # 处理空数据:直接 yield 空字符串,并返回
  29. if isinstance(data, str) and not data.strip():
  30. yield "" # 保证至少有一个输出,避免分块模式无输出
  31. return
  32. loop = asyncio.get_event_loop()
  33. try:
  34. parsed_json = await loop.run_in_executor(None, json.loads, data)
  35. except json.JSONDecodeError as e:
  36. raise R2RException(
  37. message=f"Failed to parse JSON data: {str(e)}",
  38. status_code=400,
  39. ) from e
  40. formatted_text = await loop.run_in_executor(
  41. None, self._parse_json, parsed_json
  42. )
  43. chunk_size = kwargs.get("chunk_size")
  44. if chunk_size and isinstance(chunk_size, int):
  45. for i in range(0, len(formatted_text), chunk_size):
  46. yield formatted_text[i : i + chunk_size]
  47. await asyncio.sleep(0)
  48. else:
  49. yield formatted_text
  50. def _parse_json(self, data: dict) -> str:
  51. def remove_objects_with_null(obj):
  52. if not isinstance(obj, dict):
  53. return obj
  54. result = obj.copy()
  55. for key, value in obj.items():
  56. if isinstance(value, dict):
  57. result[key] = remove_objects_with_null(value)
  58. elif value is None:
  59. del result[key]
  60. return result
  61. def format_json_as_text(obj, indent=0):
  62. lines = []
  63. indent_str = " " * indent
  64. if isinstance(obj, dict):
  65. for key, value in obj.items():
  66. if isinstance(value, (dict, list)):
  67. nested = format_json_as_text(value, indent + 2)
  68. lines.append(f"{indent_str}{key}:\n{nested}")
  69. else:
  70. lines.append(f"{indent_str}{key}: {value}")
  71. elif isinstance(obj, list):
  72. for item in obj:
  73. nested = format_json_as_text(item, indent + 2)
  74. lines.append(f"{nested}")
  75. else:
  76. return f"{indent_str}{obj}"
  77. return "\n".join(lines)
  78. return format_json_as_text(remove_objects_with_null(data))