pdf_parser.py 11 KB


  1. # type: ignore
  2. import asyncio
  3. import base64
  4. import logging
  5. import os
  6. import string
  7. import tempfile
  8. import unicodedata
  9. import uuid
  10. from io import BytesIO
  11. from typing import AsyncGenerator
  12. import aiofiles
  13. from pdf2image import convert_from_path
  14. from core.base.abstractions import GenerationConfig
  15. from core.base.parsers.base_parser import AsyncParser
  16. from core.base.providers import (
  17. CompletionProvider,
  18. DatabaseProvider,
  19. IngestionConfig,
  20. )
  21. logger = logging.getLogger()
  22. class VLMPDFParser(AsyncParser[str | bytes]):
  23. """A parser for PDF documents using vision models for page processing."""
  24. def __init__(
  25. self,
  26. config: IngestionConfig,
  27. database_provider: DatabaseProvider,
  28. llm_provider: CompletionProvider,
  29. ):
  30. self.database_provider = database_provider
  31. self.llm_provider = llm_provider
  32. self.config = config
  33. self.vision_prompt_text = None
  34. try:
  35. from litellm import supports_vision
  36. self.supports_vision = supports_vision
  37. except ImportError:
  38. logger.error("Failed to import LiteLLM vision support")
  39. raise ImportError(
  40. "Please install the `litellm` package to use the VLMPDFParser."
  41. )
  42. def _create_temp_dir(self) -> str:
  43. """Create a unique temporary directory for PDF processing."""
  44. # Create a unique directory name using UUID
  45. unique_id = str(uuid.uuid4())
  46. temp_base = tempfile.gettempdir()
  47. temp_dir = os.path.join(temp_base, f"pdf_images_{unique_id}")
  48. os.makedirs(temp_dir, exist_ok=True)
  49. return temp_dir
  50. async def convert_pdf_to_images(
  51. self, pdf_path: str, temp_dir: str
  52. ) -> list[str]:
  53. """Convert PDF pages to images asynchronously."""
  54. options = {
  55. "pdf_path": pdf_path,
  56. "output_folder": temp_dir,
  57. "dpi": 300, # Configurable via config if needed
  58. "fmt": "jpeg",
  59. "thread_count": 4,
  60. "paths_only": True,
  61. }
  62. try:
  63. image_paths = await asyncio.to_thread(convert_from_path, **options)
  64. return image_paths
  65. except Exception as err:
  66. logger.error(f"Error converting PDF to images: {err}")
  67. raise
  68. async def process_page(
  69. self, image_path: str, page_num: int
  70. ) -> dict[str, str]:
  71. """Process a single PDF page using the vision model."""
  72. try:
  73. # Read and encode image
  74. async with aiofiles.open(image_path, "rb") as image_file:
  75. image_data = await image_file.read()
  76. image_base64 = base64.b64encode(image_data).decode("utf-8")
  77. # Verify model supports vision
  78. if not self.supports_vision(model=self.config.vision_pdf_model):
  79. raise ValueError(
  80. f"Model {self.config.vision_pdf_model} does not support vision"
  81. )
  82. # Configure generation parameters
  83. generation_config = GenerationConfig(
  84. model=self.config.vision_pdf_model,
  85. stream=False,
  86. )
  87. # Prepare message with image
  88. messages = [
  89. {
  90. "role": "user",
  91. "content": [
  92. {"type": "text", "text": self.vision_prompt_text},
  93. {
  94. "type": "image_url",
  95. "image_url": {
  96. "url": f"data:image/jpeg;base64,{image_base64}"
  97. },
  98. },
  99. ],
  100. }
  101. ]
  102. # Get completion from LiteLLM provider
  103. response = await self.llm_provider.aget_completion(
  104. messages=messages, generation_config=generation_config
  105. )
  106. if response.choices and response.choices[0].message:
  107. content = response.choices[0].message.content
  108. if not content:
  109. raise ValueError("No content in response")
  110. return {"page": str(page_num), "content": content}
  111. else:
  112. raise ValueError("No response content")
  113. except Exception as e:
  114. logger.error(
  115. f"Error processing page {page_num} with vision model: {str(e)}"
  116. )
  117. raise
  118. async def ingest(
  119. self, data: str | bytes, maintain_order: bool = False, **kwargs
  120. ) -> AsyncGenerator[dict[str, str], None]:
  121. """
  122. Ingest PDF data and yield descriptions for each page using vision model.
  123. Args:
  124. data: PDF file path or bytes
  125. maintain_order: If True, yields results in page order. If False, yields as completed.
  126. **kwargs: Additional arguments passed to the completion call
  127. Yields:
  128. Dict containing page number and content for each processed page
  129. """
  130. if not self.vision_prompt_text:
  131. self.vision_prompt_text = await self.database_provider.prompts_handler.get_cached_prompt( # type: ignore
  132. prompt_name=self.config.vision_pdf_prompt_name
  133. )
  134. temp_dir = None
  135. try:
  136. # Create temporary directory for image processing
  137. # temp_dir = os.path.join(os.getcwd(), "temp_pdf_images")
  138. # os.makedirs(temp_dir, exist_ok=True)
  139. temp_dir = self._create_temp_dir()
  140. # Handle both file path and bytes input
  141. if isinstance(data, bytes):
  142. pdf_path = os.path.join(temp_dir, "temp.pdf")
  143. async with aiofiles.open(pdf_path, "wb") as f:
  144. await f.write(data)
  145. else:
  146. pdf_path = data
  147. # Convert PDF to images
  148. image_paths = await self.convert_pdf_to_images(pdf_path, temp_dir)
  149. # Create tasks for all pages
  150. tasks = {
  151. asyncio.create_task(
  152. self.process_page(image_path, page_num)
  153. ): page_num
  154. for page_num, image_path in enumerate(image_paths, 1)
  155. }
  156. if maintain_order:
  157. # Store results in order
  158. pending = set(tasks.keys())
  159. results = {}
  160. next_page = 1
  161. while pending:
  162. # Get next completed task
  163. done, pending = await asyncio.wait(
  164. pending, return_when=asyncio.FIRST_COMPLETED
  165. )
  166. # Process completed tasks
  167. for task in done:
  168. result = await task
  169. page_num = int(result["page"])
  170. results[page_num] = result
  171. # Yield results in order
  172. while next_page in results:
  173. yield results.pop(next_page)["content"]
  174. next_page += 1
  175. else:
  176. # Yield results as they complete
  177. for coro in asyncio.as_completed(tasks.keys()):
  178. result = await coro
  179. yield result["content"]
  180. except Exception as e:
  181. logger.error(f"Error processing PDF: {str(e)}")
  182. raise
  183. finally:
  184. # Cleanup temporary files
  185. if temp_dir and os.path.exists(temp_dir):
  186. for file in os.listdir(temp_dir):
  187. os.remove(os.path.join(temp_dir, file))
  188. os.rmdir(temp_dir)
  189. class BasicPDFParser(AsyncParser[str | bytes]):
  190. """A parser for PDF data."""
  191. def __init__(
  192. self,
  193. config: IngestionConfig,
  194. database_provider: DatabaseProvider,
  195. llm_provider: CompletionProvider,
  196. ):
  197. self.database_provider = database_provider
  198. self.llm_provider = llm_provider
  199. self.config = config
  200. try:
  201. from pypdf import PdfReader
  202. self.PdfReader = PdfReader
  203. except ImportError:
  204. raise ValueError(
  205. "Error, `pypdf` is required to run `PyPDFParser`. Please install it using `pip install pypdf`."
  206. )
  207. async def ingest(
  208. self, data: str | bytes, **kwargs
  209. ) -> AsyncGenerator[str, None]:
  210. """Ingest PDF data and yield text from each page."""
  211. if isinstance(data, str):
  212. raise ValueError("PDF data must be in bytes format.")
  213. pdf = self.PdfReader(BytesIO(data))
  214. for page in pdf.pages:
  215. page_text = page.extract_text()
  216. if page_text is not None:
  217. page_text = "".join(
  218. filter(
  219. lambda x: (
  220. unicodedata.category(x)
  221. in [
  222. "Ll",
  223. "Lu",
  224. "Lt",
  225. "Lm",
  226. "Lo",
  227. "Nl",
  228. "No",
  229. ] # Keep letters and numbers
  230. or "\u4E00" <= x <= "\u9FFF" # Chinese characters
  231. or "\u0600" <= x <= "\u06FF" # Arabic characters
  232. or "\u0400" <= x <= "\u04FF" # Cyrillic letters
  233. or "\u0370" <= x <= "\u03FF" # Greek letters
  234. or "\u0E00" <= x <= "\u0E7F" # Thai
  235. or "\u3040" <= x <= "\u309F" # Japanese Hiragana
  236. or "\u30A0" <= x <= "\u30FF" # Katakana
  237. or x in string.printable
  238. ),
  239. page_text,
  240. )
  241. ) # Keep characters in common languages ; # Filter out non-printable characters
  242. yield page_text
  243. class PDFParserUnstructured(AsyncParser[str | bytes]):
  244. def __init__(
  245. self,
  246. config: IngestionConfig,
  247. database_provider: DatabaseProvider,
  248. llm_provider: CompletionProvider,
  249. ):
  250. self.database_provider = database_provider
  251. self.llm_provider = llm_provider
  252. self.config = config
  253. try:
  254. from unstructured.partition.pdf import partition_pdf
  255. self.partition_pdf = partition_pdf
  256. except ImportError as e:
  257. logger.error("PDFParserUnstructured ImportError : ", e)
  258. logger.error(
  259. """Please install missing modules using :
  260. pip install unstructured unstructured_pytesseract unstructured_inference
  261. pip install pdfplumber matplotlib pillow_heif toml
  262. """
  263. )
  264. async def ingest(
  265. self,
  266. data: str | bytes,
  267. partition_strategy: str = "hi_res",
  268. chunking_strategy="by_title",
  269. ) -> AsyncGenerator[str, None]:
  270. # partition the pdf
  271. elements = self.partition_pdf(
  272. file=BytesIO(data),
  273. partition_strategy=partition_strategy,
  274. chunking_strategy=chunking_strategy,
  275. )
  276. for element in elements:
  277. yield element.text