1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # type: ignore
- import asyncio
- import json
- from typing import AsyncGenerator
- from core.base.parsers.base_parser import AsyncParser
- from core.base.providers import (
- CompletionProvider,
- DatabaseProvider,
- IngestionConfig,
- )
- class JSONParser(AsyncParser[str | bytes]):
- """A parser for JSON data."""
- def __init__(
- self,
- config: IngestionConfig,
- database_provider: DatabaseProvider,
- llm_provider: CompletionProvider,
- ):
- self.database_provider = database_provider
- self.llm_provider = llm_provider
- self.config = config
- async def ingest(
- self, data: str | bytes, *args, **kwargs
- ) -> AsyncGenerator[str, None]:
- """
- Ingest JSON data and yield a formatted text representation.
- :param data: The JSON data to parse.
- :param kwargs: Additional keyword arguments.
- """
- if isinstance(data, bytes):
- data = data.decode("utf-8")
- loop = asyncio.get_event_loop()
- parsed_json = await loop.run_in_executor(None, json.loads, data)
- parsed_json = json.loads(data)
- formatted_text = await loop.run_in_executor(
- None, self._parse_json, parsed_json
- )
- chunk_size = kwargs.get("chunk_size")
- if chunk_size and isinstance(chunk_size, int):
- # If chunk_size is provided and is an integer, yield the formatted text in chunks
- for i in range(0, len(formatted_text), chunk_size):
- yield formatted_text[i : i + chunk_size]
- await asyncio.sleep(0)
- else:
- # If no valid chunk_size is provided, yield the entire formatted text
- yield formatted_text
- def _parse_json(self, data: dict) -> str:
- def remove_objects_with_null(obj):
- if not isinstance(obj, dict):
- return obj
- result = obj.copy()
- for key, value in obj.items():
- if isinstance(value, dict):
- result[key] = remove_objects_with_null(value)
- elif value is None:
- del result[key]
- return result
- def format_json_as_text(obj, indent=0):
- lines = []
- indent_str = " " * indent
- if isinstance(obj, dict):
- for key, value in obj.items():
- if isinstance(value, (dict, list)):
- nested = format_json_as_text(value, indent + 2)
- lines.append(f"{indent_str}{key}:\n{nested}")
- else:
- lines.append(f"{indent_str}{key}: {value}")
- elif isinstance(obj, list):
- for item in obj:
- nested = format_json_as_text(item, indent + 2)
- lines.append(f"{nested}")
- else:
- return f"{indent_str}{obj}"
- return "\n".join(lines)
- return format_json_as_text(remove_objects_with_null(data))
|