ppt_parser.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # type: ignore
  2. import struct
  3. from io import BytesIO
  4. from typing import AsyncGenerator
  5. from core.base.parsers.base_parser import AsyncParser
  6. from core.base.providers import (
  7. CompletionProvider,
  8. DatabaseProvider,
  9. IngestionConfig,
  10. )
  11. class PPTParser(AsyncParser[str | bytes]):
  12. """A parser for legacy PPT (PowerPoint 97-2003) files."""
  13. def __init__(
  14. self,
  15. config: IngestionConfig,
  16. database_provider: DatabaseProvider,
  17. llm_provider: CompletionProvider,
  18. ):
  19. self.database_provider = database_provider
  20. self.llm_provider = llm_provider
  21. self.config = config
  22. try:
  23. import olefile
  24. self.olefile = olefile
  25. except ImportError:
  26. raise ImportError(
  27. "Error: 'olefile' is required to run PPTParser. "
  28. "Please install it using pip: pip install olefile"
  29. )
  30. def _extract_text_from_record(self, data: bytes) -> str:
  31. """Extract text from a PPT text record."""
  32. try:
  33. # Skip record header
  34. text_data = data[8:]
  35. # Convert from UTF-16-LE
  36. return text_data.decode("utf-16-le", errors="ignore").strip()
  37. except Exception:
  38. return ""
  39. async def ingest(
  40. self, data: str | bytes, **kwargs
  41. ) -> AsyncGenerator[str, None]:
  42. """Ingest PPT data and yield text from each slide."""
  43. if isinstance(data, str):
  44. raise ValueError("PPT data must be in bytes format.")
  45. try:
  46. ole = self.olefile.OleFileIO(BytesIO(data))
  47. # PPT stores text in PowerPoint Document stream
  48. if not ole.exists("PowerPoint Document"):
  49. raise ValueError("Not a valid PowerPoint file")
  50. # Read PowerPoint Document stream
  51. ppt_stream = ole.openstream("PowerPoint Document")
  52. content = ppt_stream.read()
  53. # Text records start with 0x0FA0 or 0x0FD0
  54. text_markers = [b"\xA0\x0F", b"\xD0\x0F"]
  55. current_position = 0
  56. while current_position < len(content):
  57. # Look for text markers
  58. for marker in text_markers:
  59. marker_pos = content.find(marker, current_position)
  60. if marker_pos != -1:
  61. # Get record size from header (4 bytes after marker)
  62. size_bytes = content[marker_pos + 2 : marker_pos + 6]
  63. record_size = struct.unpack("<I", size_bytes)[0]
  64. # Extract record data
  65. record_data = content[
  66. marker_pos : marker_pos + record_size + 8
  67. ]
  68. text = self._extract_text_from_record(record_data)
  69. if text.strip():
  70. yield text.strip()
  71. current_position = marker_pos + record_size + 8
  72. break
  73. else:
  74. current_position += 1
  75. except Exception as e:
  76. raise ValueError(f"Error processing PPT file: {str(e)}")
  77. finally:
  78. ole.close()