json_parser.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # type: ignore
  2. import asyncio
  3. import json
  4. from typing import AsyncGenerator
  5. from core.base import R2RException
  6. from core.base.parsers.base_parser import AsyncParser
  7. from core.base.providers import (
  8. CompletionProvider,
  9. DatabaseProvider,
  10. IngestionConfig,
  11. )
  12. class JSONParser(AsyncParser[str | bytes]):
  13. """A parser for JSON data."""
  14. def __init__(
  15. self,
  16. config: IngestionConfig,
  17. database_provider: DatabaseProvider,
  18. llm_provider: CompletionProvider,
  19. ):
  20. self.database_provider = database_provider
  21. self.llm_provider = llm_provider
  22. self.config = config
  23. async def ingest(
  24. self, data: str | bytes, *args, **kwargs
  25. ) -> AsyncGenerator[str, None]:
  26. """Ingest JSON data and yield a formatted text representation.
  27. :param data: The JSON data to parse.
  28. :param kwargs: Additional keyword arguments.
  29. """
  30. if isinstance(data, bytes):
  31. data = data.decode("utf-8")
  32. loop = asyncio.get_event_loop()
  33. try:
  34. parsed_json = await loop.run_in_executor(None, json.loads, data)
  35. formatted_text = await loop.run_in_executor(
  36. None, self._parse_json, parsed_json
  37. )
  38. except json.JSONDecodeError as e:
  39. raise R2RException(
  40. message=f"Failed to parse JSON data, likely due to invalid JSON: {str(e)}",
  41. status_code=400,
  42. ) from e
  43. chunk_size = kwargs.get("chunk_size")
  44. if chunk_size and isinstance(chunk_size, int):
  45. # If chunk_size is provided and is an integer, yield the formatted text in chunks
  46. for i in range(0, len(formatted_text), chunk_size):
  47. yield formatted_text[i : i + chunk_size]
  48. await asyncio.sleep(0)
  49. else:
  50. # If no valid chunk_size is provided, yield the entire formatted text
  51. yield formatted_text
  52. def _parse_json(self, data: dict) -> str:
  53. def remove_objects_with_null(obj):
  54. if not isinstance(obj, dict):
  55. return obj
  56. result = obj.copy()
  57. for key, value in obj.items():
  58. if isinstance(value, dict):
  59. result[key] = remove_objects_with_null(value)
  60. elif value is None:
  61. del result[key]
  62. return result
  63. def format_json_as_text(obj, indent=0):
  64. lines = []
  65. indent_str = " " * indent
  66. if isinstance(obj, dict):
  67. for key, value in obj.items():
  68. if isinstance(value, (dict, list)):
  69. nested = format_json_as_text(value, indent + 2)
  70. lines.append(f"{indent_str}{key}:\n{nested}")
  71. else:
  72. lines.append(f"{indent_str}{key}: {value}")
  73. elif isinstance(obj, list):
  74. for item in obj:
  75. nested = format_json_as_text(item, indent + 2)
  76. lines.append(f"{nested}")
  77. else:
  78. return f"{indent_str}{obj}"
  79. return "\n".join(lines)
  80. return format_json_as_text(remove_objects_with_null(data))