123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- """Abstractions for documents and their extractions."""
- import json
- import logging
- from datetime import datetime
- from enum import Enum
- from typing import Any, Optional
- from uuid import UUID, uuid4
- from pydantic import Field
- from .base import R2RSerializable
- logger = logging.getLogger()
- class DocumentType(str, Enum):
- """Types of documents that can be stored."""
- # Audio
- MP3 = "mp3"
- # CSV
- CSV = "csv"
- # Email
- EML = "eml"
- MSG = "msg"
- P7S = "p7s"
- # EPUB
- EPUB = "epub"
- # Excel
- XLS = "xls"
- XLSX = "xlsx"
- # HTML
- HTML = "html"
- HTM = "htm"
- # Image
- BMP = "bmp"
- HEIC = "heic"
- JPEG = "jpeg"
- PNG = "png"
- TIFF = "tiff"
- JPG = "jpg"
- SVG = "svg"
- # Markdown
- MD = "md"
- # Org Mode
- ORG = "org"
- # Open Office
- ODT = "odt"
- # PDF
- PDF = "pdf"
- # Plain text
- TXT = "txt"
- JSON = "json"
- # PowerPoint
- PPT = "ppt"
- PPTX = "pptx"
- # reStructured Text
- RST = "rst"
- # Rich Text
- RTF = "rtf"
- # TSV
- TSV = "tsv"
- # Video/GIF
- GIF = "gif"
- # Word
- DOC = "doc"
- DOCX = "docx"
- # XML
- XML = "xml"
- class Document(R2RSerializable):
- id: UUID = Field(default_factory=uuid4)
- collection_ids: list[UUID]
- owner_id: UUID
- document_type: DocumentType
- metadata: dict
- class Config:
- arbitrary_types_allowed = True
- ignore_extra = False
- json_encoders = {
- UUID: str,
- }
- populate_by_name = True
- class IngestionStatus(str, Enum):
- """Status of document processing."""
- PENDING = "pending"
- PARSING = "parsing"
- EXTRACTING = "extracting"
- CHUNKING = "chunking"
- EMBEDDING = "embedding"
- AUGMENTING = "augmenting"
- STORING = "storing"
- ENRICHING = "enriching"
- ENRICHED = "enriched"
- FAILED = "failed"
- SUCCESS = "success"
- def __str__(self):
- return self.value
- @classmethod
- def table_name(cls) -> str:
- return "documents"
- @classmethod
- def id_column(cls) -> str:
- return "document_id"
- class KGExtractionStatus(str, Enum):
- """Status of KG Creation per document."""
- PENDING = "pending"
- PROCESSING = "processing"
- SUCCESS = "success"
- ENRICHED = "enriched"
- FAILED = "failed"
- def __str__(self):
- return self.value
- @classmethod
- def table_name(cls) -> str:
- return "documents"
- @classmethod
- def id_column(cls) -> str:
- return "id"
- class KGEnrichmentStatus(str, Enum):
- """Status of KG Enrichment per collection."""
- PENDING = "pending"
- PROCESSING = "processing"
- OUTDATED = "outdated"
- SUCCESS = "success"
- FAILED = "failed"
- def __str__(self):
- return self.value
- @classmethod
- def table_name(cls) -> str:
- return "collections"
- @classmethod
- def id_column(cls) -> str:
- return "id"
- class DocumentResponse(R2RSerializable):
- """Base class for document information handling."""
- id: UUID
- collection_ids: list[UUID]
- owner_id: UUID
- document_type: DocumentType
- metadata: dict
- title: Optional[str] = None
- version: str
- size_in_bytes: Optional[int]
- ingestion_status: IngestionStatus = IngestionStatus.PENDING
- extraction_status: KGExtractionStatus = KGExtractionStatus.PENDING
- created_at: Optional[datetime] = None
- updated_at: Optional[datetime] = None
- ingestion_attempt_number: Optional[int] = None
- summary: Optional[str] = None
- summary_embedding: Optional[list[float]] = None # Add optional embedding
- def convert_to_db_entry(self):
- """Prepare the document info for database entry, extracting certain fields from metadata."""
- now = datetime.now()
- # Format the embedding properly for Postgres vector type
- embedding = None
- if self.summary_embedding is not None:
- embedding = f"[{','.join(str(x) for x in self.summary_embedding)}]"
- return {
- "id": self.id,
- "collection_ids": self.collection_ids,
- "owner_id": self.owner_id,
- "document_type": self.document_type,
- "metadata": json.dumps(self.metadata),
- "title": self.title or "N/A",
- "version": self.version,
- "size_in_bytes": self.size_in_bytes,
- "ingestion_status": self.ingestion_status.value,
- "extraction_status": self.extraction_status.value,
- "created_at": self.created_at or now,
- "updated_at": self.updated_at or now,
- "ingestion_attempt_number": self.ingestion_attempt_number or 0,
- "summary": self.summary,
- "summary_embedding": embedding,
- }
- class UnprocessedChunk(R2RSerializable):
- """An extraction from a document."""
- id: Optional[UUID] = None
- document_id: Optional[UUID] = None
- collection_ids: list[UUID] = []
- metadata: dict = {}
- text: str
- class UpdateChunk(R2RSerializable):
- """An extraction from a document."""
- id: UUID
- metadata: Optional[dict] = None
- text: str
- class DocumentChunk(R2RSerializable):
- """An extraction from a document."""
- id: UUID
- document_id: UUID
- collection_ids: list[UUID]
- owner_id: UUID
- data: str | bytes
- metadata: dict
- class RawChunk(R2RSerializable):
- text: str
- class IngestionMode(str, Enum):
- hi_res = "hi-res"
- fast = "fast"
- custom = "custom"
- class ChunkEnrichmentStrategy(str, Enum):
- SEMANTIC = "semantic"
- NEIGHBORHOOD = "neighborhood"
- def __str__(self) -> str:
- return self.value
- from .llm import GenerationConfig
- class ChunkEnrichmentSettings(R2RSerializable):
- """
- Settings for chunk enrichment.
- """
- enable_chunk_enrichment: bool = Field(
- default=False,
- description="Whether to enable chunk enrichment or not",
- )
- strategies: list[ChunkEnrichmentStrategy] = Field(
- default=[],
- description="The strategies to use for chunk enrichment. Union of chunks obtained from each strategy is used as context.",
- )
- forward_chunks: int = Field(
- default=3,
- description="The number after the current chunk to include in the LLM context while enriching",
- )
- backward_chunks: int = Field(
- default=3,
- description="The number of chunks before the current chunk in the LLM context while enriching",
- )
- semantic_neighbors: int = Field(
- default=10, description="The number of semantic neighbors to include"
- )
- semantic_similarity_threshold: float = Field(
- default=0.7,
- description="The similarity threshold for semantic neighbors",
- )
- generation_config: GenerationConfig = Field(
- default=GenerationConfig(),
- description="The generation config to use for chunk enrichment",
- )
- ## TODO - Move ingestion config
- class IngestionConfig(R2RSerializable):
- provider: str = "r2r"
- excluded_parsers: list[str] = ["mp4"]
- chunking_strategy: str = "recursive"
- chunk_enrichment_settings: ChunkEnrichmentSettings = (
- ChunkEnrichmentSettings()
- )
- extra_parsers: dict[str, Any] = {}
- audio_transcription_model: str = "openai/whisper-1"
- vision_img_prompt_name: str = "vision_img"
- vision_img_model: str = "openai/gpt-4o"
- vision_pdf_prompt_name: str = "vision_pdf"
- vision_pdf_model: str = "openai/gpt-4o"
- skip_document_summary: bool = False
- document_summary_system_prompt: str = "default_system"
- document_summary_task_prompt: str = "default_summary"
- chunks_for_document_summary: int = 128
- document_summary_model: str = "openai/gpt-4o-mini"
- @property
- def supported_providers(self) -> list[str]:
- return ["r2r", "unstructured_local", "unstructured_api"]
- def validate_config(self) -> None:
- if self.provider not in self.supported_providers:
- raise ValueError(f"Provider {self.provider} is not supported.")
- @classmethod
- def get_default(cls, mode: str) -> "IngestionConfig":
- """Return default ingestion configuration for a given mode."""
- if mode == "hi-res":
- # More thorough parsing, no skipping summaries, possibly larger `chunks_for_document_summary`.
- return cls(
- provider="r2r",
- excluded_parsers=["mp4"],
- chunk_enrichment_settings=ChunkEnrichmentSettings(), # default
- extra_parsers={},
- audio_transcription_model="openai/whisper-1",
- vision_img_prompt_name="vision_img",
- vision_img_model="openai/gpt-4o",
- vision_pdf_prompt_name="vision_pdf",
- vision_pdf_model="openai/gpt-4o",
- skip_document_summary=False,
- document_summary_system_prompt="default_system",
- document_summary_task_prompt="default_summary",
- chunks_for_document_summary=256, # larger for hi-res
- document_summary_model="openai/gpt-4o-mini",
- )
- elif mode == "fast":
- # Skip summaries and other enrichment steps for speed.
- return cls(
- provider="r2r",
- excluded_parsers=["mp4"],
- chunk_enrichment_settings=ChunkEnrichmentSettings(), # default
- extra_parsers={},
- audio_transcription_model="openai/whisper-1",
- vision_img_prompt_name="vision_img",
- vision_img_model="openai/gpt-4o",
- vision_pdf_prompt_name="vision_pdf",
- vision_pdf_model="openai/gpt-4o",
- skip_document_summary=True, # skip summaries
- document_summary_system_prompt="default_system",
- document_summary_task_prompt="default_summary",
- chunks_for_document_summary=64,
- document_summary_model="openai/gpt-4o-mini",
- )
- else:
- # For `custom` or any unrecognized mode, return a base config
- return cls()
|