pdf_parser.py 11 KB


  1. # type: ignore
  2. import asyncio
  3. import base64
  4. import logging
  5. import os
  6. import string
  7. import tempfile
  8. import unicodedata
  9. import uuid
  10. from io import BytesIO
  11. from typing import AsyncGenerator
  12. import aiofiles
  13. from pdf2image import convert_from_path
  14. from pdf2image.exceptions import PDFInfoNotInstalledError
  15. from core.base.abstractions import GenerationConfig
  16. from core.base.parsers.base_parser import AsyncParser
  17. from core.base.providers import (
  18. CompletionProvider,
  19. DatabaseProvider,
  20. IngestionConfig,
  21. )
  22. from shared.abstractions import PDFParsingError, PopperNotFoundError
  23. logger = logging.getLogger()
  24. class VLMPDFParser(AsyncParser[str | bytes]):
  25. """A parser for PDF documents using vision models for page processing."""
  26. def __init__(
  27. self,
  28. config: IngestionConfig,
  29. database_provider: DatabaseProvider,
  30. llm_provider: CompletionProvider,
  31. ):
  32. self.database_provider = database_provider
  33. self.llm_provider = llm_provider
  34. self.config = config
  35. self.vision_prompt_text = None
  36. try:
  37. from litellm import supports_vision
  38. self.supports_vision = supports_vision
  39. except ImportError:
  40. logger.error("Failed to import LiteLLM vision support")
  41. raise ImportError(
  42. "Please install the `litellm` package to use the VLMPDFParser."
  43. )
  44. def _create_temp_dir(self) -> str:
  45. """Create a unique temporary directory for PDF processing."""
  46. # Create a unique directory name using UUID
  47. unique_id = str(uuid.uuid4())
  48. temp_base = tempfile.gettempdir()
  49. temp_dir = os.path.join(temp_base, f"pdf_images_{unique_id}")
  50. os.makedirs(temp_dir, exist_ok=True)
  51. return temp_dir
  52. async def convert_pdf_to_images(
  53. self, pdf_path: str, temp_dir: str
  54. ) -> list[str]:
  55. """Convert PDF pages to images asynchronously."""
  56. options = {
  57. "pdf_path": pdf_path,
  58. "output_folder": temp_dir,
  59. "dpi": 300, # Configurable via config if needed
  60. "fmt": "jpeg",
  61. "thread_count": 4,
  62. "paths_only": True,
  63. }
  64. try:
  65. return await asyncio.to_thread(convert_from_path, **options)
  66. except PDFInfoNotInstalledError:
  67. raise PopperNotFoundError()
  68. except Exception as err:
  69. logger.error(
  70. f"Error converting PDF to images: {err} type: {type(err)}"
  71. )
  72. raise PDFParsingError(f"Failed to process PDF: {str(err)}", err)
  73. async def process_page(
  74. self, image_path: str, page_num: int
  75. ) -> dict[str, str]:
  76. """Process a single PDF page using the vision model."""
  77. try:
  78. # Read and encode image
  79. async with aiofiles.open(image_path, "rb") as image_file:
  80. image_data = await image_file.read()
  81. image_base64 = base64.b64encode(image_data).decode("utf-8")
  82. # Verify model supports vision
  83. if not self.supports_vision(model=self.config.vision_pdf_model):
  84. raise ValueError(
  85. f"Model {self.config.vision_pdf_model} does not support vision"
  86. )
  87. # Configure generation parameters
  88. generation_config = GenerationConfig(
  89. model=self.config.vision_pdf_model,
  90. stream=False,
  91. )
  92. # Prepare message with image
  93. messages = [
  94. {
  95. "role": "user",
  96. "content": [
  97. {"type": "text", "text": self.vision_prompt_text},
  98. {
  99. "type": "image_url",
  100. "image_url": {
  101. "url": f"data:image/jpeg;base64,{image_base64}"
  102. },
  103. },
  104. ],
  105. }
  106. ]
  107. # Get completion from LiteLLM provider
  108. response = await self.llm_provider.aget_completion(
  109. messages=messages, generation_config=generation_config
  110. )
  111. if response.choices and response.choices[0].message:
  112. content = response.choices[0].message.content
  113. if not content:
  114. raise ValueError("No content in response")
  115. return {"page": str(page_num), "content": content}
  116. else:
  117. raise ValueError("No response content")
  118. except Exception as e:
  119. logger.error(
  120. f"Error processing page {page_num} with vision model: {str(e)}"
  121. )
  122. raise
  123. async def ingest(
  124. self, data: str | bytes, maintain_order: bool = False, **kwargs
  125. ) -> AsyncGenerator[dict[str, str], None]:
  126. """
  127. Ingest PDF data and yield descriptions for each page using vision model.
  128. Args:
  129. data: PDF file path or bytes
  130. maintain_order: If True, yields results in page order. If False, yields as completed.
  131. **kwargs: Additional arguments passed to the completion call
  132. Yields:
  133. Dict containing page number and content for each processed page
  134. """
  135. if not self.vision_prompt_text:
  136. self.vision_prompt_text = await self.database_provider.prompts_handler.get_cached_prompt( # type: ignore
  137. prompt_name=self.config.vision_pdf_prompt_name
  138. )
  139. temp_dir = None
  140. try:
  141. # Create temporary directory for image processing
  142. # temp_dir = os.path.join(os.getcwd(), "temp_pdf_images")
  143. # os.makedirs(temp_dir, exist_ok=True)
  144. temp_dir = self._create_temp_dir()
  145. # Handle both file path and bytes input
  146. if isinstance(data, bytes):
  147. pdf_path = os.path.join(temp_dir, "temp.pdf")
  148. async with aiofiles.open(pdf_path, "wb") as f:
  149. await f.write(data)
  150. else:
  151. pdf_path = data
  152. # Convert PDF to images
  153. image_paths = await self.convert_pdf_to_images(pdf_path, temp_dir)
  154. # Create tasks for all pages
  155. tasks = {
  156. asyncio.create_task(
  157. self.process_page(image_path, page_num)
  158. ): page_num
  159. for page_num, image_path in enumerate(image_paths, 1)
  160. }
  161. if maintain_order:
  162. # Store results in order
  163. pending = set(tasks.keys())
  164. results = {}
  165. next_page = 1
  166. while pending:
  167. # Get next completed task
  168. done, pending = await asyncio.wait(
  169. pending, return_when=asyncio.FIRST_COMPLETED
  170. )
  171. # Process completed tasks
  172. for task in done:
  173. result = await task
  174. page_num = int(result["page"])
  175. results[page_num] = result
  176. # Yield results in order
  177. while next_page in results:
  178. yield results.pop(next_page)["content"]
  179. next_page += 1
  180. else:
  181. # Yield results as they complete
  182. for coro in asyncio.as_completed(tasks.keys()):
  183. result = await coro
  184. yield result["content"]
  185. except Exception as e:
  186. logger.error(f"Error processing PDF: {str(e)}")
  187. raise
  188. finally:
  189. # Cleanup temporary files
  190. if temp_dir and os.path.exists(temp_dir):
  191. for file in os.listdir(temp_dir):
  192. os.remove(os.path.join(temp_dir, file))
  193. os.rmdir(temp_dir)
  194. class BasicPDFParser(AsyncParser[str | bytes]):
  195. """A parser for PDF data."""
  196. def __init__(
  197. self,
  198. config: IngestionConfig,
  199. database_provider: DatabaseProvider,
  200. llm_provider: CompletionProvider,
  201. ):
  202. self.database_provider = database_provider
  203. self.llm_provider = llm_provider
  204. self.config = config
  205. try:
  206. from pypdf import PdfReader
  207. self.PdfReader = PdfReader
  208. except ImportError:
  209. raise ValueError(
  210. "Error, `pypdf` is required to run `PyPDFParser`. Please install it using `pip install pypdf`."
  211. )
  212. async def ingest(
  213. self, data: str | bytes, **kwargs
  214. ) -> AsyncGenerator[str, None]:
  215. """Ingest PDF data and yield text from each page."""
  216. if isinstance(data, str):
  217. raise ValueError("PDF data must be in bytes format.")
  218. pdf = self.PdfReader(BytesIO(data))
  219. for page in pdf.pages:
  220. page_text = page.extract_text()
  221. if page_text is not None:
  222. page_text = "".join(
  223. filter(
  224. lambda x: (
  225. unicodedata.category(x)
  226. in [
  227. "Ll",
  228. "Lu",
  229. "Lt",
  230. "Lm",
  231. "Lo",
  232. "Nl",
  233. "No",
  234. ] # Keep letters and numbers
  235. or "\u4E00" <= x <= "\u9FFF" # Chinese characters
  236. or "\u0600" <= x <= "\u06FF" # Arabic characters
  237. or "\u0400" <= x <= "\u04FF" # Cyrillic letters
  238. or "\u0370" <= x <= "\u03FF" # Greek letters
  239. or "\u0E00" <= x <= "\u0E7F" # Thai
  240. or "\u3040" <= x <= "\u309F" # Japanese Hiragana
  241. or "\u30A0" <= x <= "\u30FF" # Katakana
  242. or x in string.printable
  243. ),
  244. page_text,
  245. )
  246. ) # Keep characters in common languages ; # Filter out non-printable characters
  247. yield page_text
  248. class PDFParserUnstructured(AsyncParser[str | bytes]):
  249. def __init__(
  250. self,
  251. config: IngestionConfig,
  252. database_provider: DatabaseProvider,
  253. llm_provider: CompletionProvider,
  254. ):
  255. self.database_provider = database_provider
  256. self.llm_provider = llm_provider
  257. self.config = config
  258. try:
  259. from unstructured.partition.pdf import partition_pdf
  260. self.partition_pdf = partition_pdf
  261. except ImportError as e:
  262. logger.error("PDFParserUnstructured ImportError : ", e)
  263. logger.error(
  264. """Please install missing modules using :
  265. pip install unstructured unstructured_pytesseract unstructured_inference
  266. pip install pdfplumber matplotlib pillow_heif toml
  267. """
  268. )
  269. async def ingest(
  270. self,
  271. data: str | bytes,
  272. partition_strategy: str = "hi_res",
  273. chunking_strategy="by_title",
  274. ) -> AsyncGenerator[str, None]:
  275. # partition the pdf
  276. elements = self.partition_pdf(
  277. file=BytesIO(data),
  278. partition_strategy=partition_strategy,
  279. chunking_strategy=chunking_strategy,
  280. )
  281. for element in elements:
  282. yield element.text