123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- import logging
- from typing import AsyncGenerator, Optional
- from uuid import UUID
- from core.base import AsyncState, DatabaseProvider, Document, DocumentChunk
- from core.base.abstractions import R2RDocumentProcessingError
- from core.base.pipes.base_pipe import AsyncPipe
- from core.base.providers.ingestion import IngestionProvider
- from core.utils import generate_extraction_id
- from shared.abstractions import PDFParsingError, PopperNotFoundError
- logger = logging.getLogger()
- class ParsingPipe(AsyncPipe):
- class Input(AsyncPipe.Input):
- message: Document
- def __init__(
- self,
- database_provider: DatabaseProvider,
- ingestion_provider: IngestionProvider,
- config: AsyncPipe.PipeConfig,
- *args,
- **kwargs,
- ):
- super().__init__(
- config,
- *args,
- **kwargs,
- )
- self.database_provider = database_provider
- self.ingestion_provider = ingestion_provider
- async def _parse(
- self,
- document: Document,
- run_id: UUID,
- version: str,
- ingestion_config_override: Optional[dict],
- ) -> AsyncGenerator[DocumentChunk, None]:
- try:
- ingestion_config_override = ingestion_config_override or {}
- override_provider = ingestion_config_override.pop("provider", None)
- if (
- override_provider
- and override_provider
- != self.ingestion_provider.config.provider
- ):
- raise ValueError(
- f"Provider '{override_provider}' does not match ingestion provider '{self.ingestion_provider.config.provider}'."
- )
- if result := await self.database_provider.files_handler.retrieve_file(
- document.id
- ):
- file_name, file_wrapper, file_size = result
- with file_wrapper as file_content_stream:
- file_content = file_content_stream.read()
- async for extraction in self.ingestion_provider.parse( # type: ignore
- file_content, document, ingestion_config_override
- ):
- id = generate_extraction_id(extraction.id, version=version)
- extraction.id = id
- extraction.metadata["version"] = version
- yield extraction
- except (PopperNotFoundError, PDFParsingError) as e:
- raise R2RDocumentProcessingError(
- error_message=e.message,
- document_id=document.id,
- status_code=e.status_code,
- )
- except Exception as e:
- raise R2RDocumentProcessingError(
- document_id=document.id,
- error_message=f"Error parsing document: {str(e)}",
- )
- async def _run_logic( # type: ignore
- self,
- input: AsyncPipe.Input,
- state: AsyncState,
- run_id: UUID,
- *args,
- **kwargs,
- ) -> AsyncGenerator[DocumentChunk, None]:
- ingestion_config = kwargs.get("ingestion_config")
- async for result in self._parse(
- input.message,
- run_id,
- input.message.metadata.get("version", "v0"),
- ingestion_config_override=ingestion_config,
- ):
- yield result
|