parsing_pipe.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import logging
  2. from typing import AsyncGenerator, Optional
  3. from uuid import UUID
  4. from core.base import AsyncState, DatabaseProvider, Document, DocumentChunk
  5. from core.base.abstractions import R2RDocumentProcessingError
  6. from core.base.pipes.base_pipe import AsyncPipe
  7. from core.base.providers.ingestion import IngestionProvider
  8. from core.utils import generate_extraction_id
  9. from shared.abstractions import PDFParsingError, PopperNotFoundError
  10. logger = logging.getLogger()
  11. class ParsingPipe(AsyncPipe):
  12. class Input(AsyncPipe.Input):
  13. message: Document
  14. def __init__(
  15. self,
  16. database_provider: DatabaseProvider,
  17. ingestion_provider: IngestionProvider,
  18. config: AsyncPipe.PipeConfig,
  19. *args,
  20. **kwargs,
  21. ):
  22. super().__init__(
  23. config,
  24. *args,
  25. **kwargs,
  26. )
  27. self.database_provider = database_provider
  28. self.ingestion_provider = ingestion_provider
  29. async def _parse(
  30. self,
  31. document: Document,
  32. run_id: UUID,
  33. version: str,
  34. ingestion_config_override: Optional[dict],
  35. ) -> AsyncGenerator[DocumentChunk, None]:
  36. try:
  37. ingestion_config_override = ingestion_config_override or {}
  38. override_provider = ingestion_config_override.pop("provider", None)
  39. if (
  40. override_provider
  41. and override_provider
  42. != self.ingestion_provider.config.provider
  43. ):
  44. raise ValueError(
  45. f"Provider '{override_provider}' does not match ingestion provider '{self.ingestion_provider.config.provider}'."
  46. )
  47. if result := await self.database_provider.files_handler.retrieve_file(
  48. document.id
  49. ):
  50. file_name, file_wrapper, file_size = result
  51. with file_wrapper as file_content_stream:
  52. file_content = file_content_stream.read()
  53. async for extraction in self.ingestion_provider.parse( # type: ignore
  54. file_content, document, ingestion_config_override
  55. ):
  56. id = generate_extraction_id(extraction.id, version=version)
  57. extraction.id = id
  58. extraction.metadata["version"] = version
  59. yield extraction
  60. except (PopperNotFoundError, PDFParsingError) as e:
  61. raise R2RDocumentProcessingError(
  62. error_message=e.message,
  63. document_id=document.id,
  64. status_code=e.status_code,
  65. )
  66. except Exception as e:
  67. raise R2RDocumentProcessingError(
  68. document_id=document.id,
  69. error_message=f"Error parsing document: {str(e)}",
  70. )
  71. async def _run_logic( # type: ignore
  72. self,
  73. input: AsyncPipe.Input,
  74. state: AsyncState,
  75. run_id: UUID,
  76. *args,
  77. **kwargs,
  78. ) -> AsyncGenerator[DocumentChunk, None]:
  79. ingestion_config = kwargs.get("ingestion_config")
  80. async for result in self._parse(
  81. input.message,
  82. run_id,
  83. input.message.metadata.get("version", "v0"),
  84. ingestion_config_override=ingestion_config,
  85. ):
  86. yield result