|
@@ -26,37 +26,36 @@ class JSONParser(AsyncParser[str | bytes]):
|
|
|
self.config = config
|
|
self.config = config
|
|
|
|
|
|
|
|
async def ingest(
|
|
async def ingest(
|
|
|
- self, data: str | bytes, *args, **kwargs
|
|
|
|
|
- ) -> AsyncGenerator[str, None]:
|
|
|
|
|
- """Ingest JSON data and yield a formatted text representation.
|
|
|
|
|
-
|
|
|
|
|
- :param data: The JSON data to parse.
|
|
|
|
|
- :param kwargs: Additional keyword arguments.
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ self, data: str | bytes, *args, **kwargs
|
|
|
|
|
+ ) -> AsyncGenerator[str, None]:
|
|
|
if isinstance(data, bytes):
|
|
if isinstance(data, bytes):
|
|
|
data = data.decode("utf-8")
|
|
data = data.decode("utf-8")
|
|
|
|
|
|
|
|
|
|
+ # 处理空数据:直接 yield 空字符串,并返回
|
|
|
|
|
+ if isinstance(data, str) and not data.strip():
|
|
|
|
|
+ yield "" # 保证至少有一个输出,避免分块模式无输出
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
loop = asyncio.get_event_loop()
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
parsed_json = await loop.run_in_executor(None, json.loads, data)
|
|
parsed_json = await loop.run_in_executor(None, json.loads, data)
|
|
|
- formatted_text = await loop.run_in_executor(
|
|
|
|
|
- None, self._parse_json, parsed_json
|
|
|
|
|
- )
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
except json.JSONDecodeError as e:
|
|
|
raise R2RException(
|
|
raise R2RException(
|
|
|
- message=f"Failed to parse JSON data, likely due to invalid JSON: {str(e)}",
|
|
|
|
|
|
|
+ message=f"Failed to parse JSON data: {str(e)}",
|
|
|
status_code=400,
|
|
status_code=400,
|
|
|
) from e
|
|
) from e
|
|
|
|
|
|
|
|
|
|
+ formatted_text = await loop.run_in_executor(
|
|
|
|
|
+ None, self._parse_json, parsed_json
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
chunk_size = kwargs.get("chunk_size")
|
|
chunk_size = kwargs.get("chunk_size")
|
|
|
if chunk_size and isinstance(chunk_size, int):
|
|
if chunk_size and isinstance(chunk_size, int):
|
|
|
- # If chunk_size is provided and is an integer, yield the formatted text in chunks
|
|
|
|
|
for i in range(0, len(formatted_text), chunk_size):
|
|
for i in range(0, len(formatted_text), chunk_size):
|
|
|
yield formatted_text[i : i + chunk_size]
|
|
yield formatted_text[i : i + chunk_size]
|
|
|
await asyncio.sleep(0)
|
|
await asyncio.sleep(0)
|
|
|
else:
|
|
else:
|
|
|
- # If no valid chunk_size is provided, yield the entire formatted text
|
|
|
|
|
yield formatted_text
|
|
yield formatted_text
|
|
|
|
|
|
|
|
def _parse_json(self, data: dict) -> str:
|
|
def _parse_json(self, data: dict) -> str:
|