json_parser.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # type: ignore
  2. import asyncio
  3. import json
  4. from typing import AsyncGenerator
  5. from core.base.parsers.base_parser import AsyncParser
  6. from core.base.providers import (
  7. CompletionProvider,
  8. DatabaseProvider,
  9. IngestionConfig,
  10. )
  11. class JSONParser(AsyncParser[str | bytes]):
  12. """A parser for JSON data."""
  13. def __init__(
  14. self,
  15. config: IngestionConfig,
  16. database_provider: DatabaseProvider,
  17. llm_provider: CompletionProvider,
  18. ):
  19. self.database_provider = database_provider
  20. self.llm_provider = llm_provider
  21. self.config = config
  22. async def ingest(
  23. self, data: str | bytes, *args, **kwargs
  24. ) -> AsyncGenerator[str, None]:
  25. """
  26. Ingest JSON data and yield a formatted text representation.
  27. :param data: The JSON data to parse.
  28. :param kwargs: Additional keyword arguments.
  29. """
  30. if isinstance(data, bytes):
  31. data = data.decode("utf-8")
  32. loop = asyncio.get_event_loop()
  33. parsed_json = await loop.run_in_executor(None, json.loads, data)
  34. parsed_json = json.loads(data)
  35. formatted_text = await loop.run_in_executor(
  36. None, self._parse_json, parsed_json
  37. )
  38. chunk_size = kwargs.get("chunk_size")
  39. if chunk_size and isinstance(chunk_size, int):
  40. # If chunk_size is provided and is an integer, yield the formatted text in chunks
  41. for i in range(0, len(formatted_text), chunk_size):
  42. yield formatted_text[i : i + chunk_size]
  43. await asyncio.sleep(0)
  44. else:
  45. # If no valid chunk_size is provided, yield the entire formatted text
  46. yield formatted_text
  47. def _parse_json(self, data: dict) -> str:
  48. def remove_objects_with_null(obj):
  49. if not isinstance(obj, dict):
  50. return obj
  51. result = obj.copy()
  52. for key, value in obj.items():
  53. if isinstance(value, dict):
  54. result[key] = remove_objects_with_null(value)
  55. elif value is None:
  56. del result[key]
  57. return result
  58. def format_json_as_text(obj, indent=0):
  59. lines = []
  60. indent_str = " " * indent
  61. if isinstance(obj, dict):
  62. for key, value in obj.items():
  63. if isinstance(value, (dict, list)):
  64. nested = format_json_as_text(value, indent + 2)
  65. lines.append(f"{indent_str}{key}:\n{nested}")
  66. else:
  67. lines.append(f"{indent_str}{key}: {value}")
  68. elif isinstance(obj, list):
  69. for item in obj:
  70. nested = format_json_as_text(item, indent + 2)
  71. lines.append(f"{nested}")
  72. else:
  73. return f"{indent_str}{obj}"
  74. return "\n".join(lines)
  75. return format_json_as_text(remove_objects_with_null(data))