doc_parser.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # type: ignore
  2. import re
  3. from io import BytesIO
  4. from typing import AsyncGenerator
  5. import olefile
  6. from core.base.parsers.base_parser import AsyncParser
  7. from core.base.providers import (
  8. CompletionProvider,
  9. DatabaseProvider,
  10. IngestionConfig,
  11. )
  12. class DOCParser(AsyncParser[str | bytes]):
  13. """A parser for DOC (legacy Microsoft Word) data."""
  14. def __init__(
  15. self,
  16. config: IngestionConfig,
  17. database_provider: DatabaseProvider,
  18. llm_provider: CompletionProvider,
  19. ):
  20. self.database_provider = database_provider
  21. self.llm_provider = llm_provider
  22. self.config = config
  23. self.olefile = olefile
  24. async def ingest(
  25. self, data: str | bytes, **kwargs
  26. ) -> AsyncGenerator[str, None]:
  27. """Ingest DOC data and yield text from the document."""
  28. if isinstance(data, str):
  29. raise ValueError("DOC data must be in bytes format.")
  30. # Create BytesIO object from the data
  31. file_obj = BytesIO(data)
  32. try:
  33. # Open the DOC file using olefile
  34. ole = self.olefile.OleFileIO(file_obj)
  35. # Check if it's a Word document
  36. if not ole.exists("WordDocument"):
  37. raise ValueError("Not a valid Word document")
  38. # Read the WordDocument stream
  39. word_stream = ole.openstream("WordDocument").read()
  40. # Read the text from the 0Table or 1Table stream (contains the text)
  41. if ole.exists("1Table"):
  42. table_stream = ole.openstream("1Table").read()
  43. elif ole.exists("0Table"):
  44. table_stream = ole.openstream("0Table").read()
  45. else:
  46. table_stream = b""
  47. # Extract text content
  48. text = self._extract_text(word_stream, table_stream)
  49. # Clean and split the text
  50. paragraphs = self._clean_text(text)
  51. # Yield non-empty paragraphs
  52. for paragraph in paragraphs:
  53. if paragraph.strip():
  54. yield paragraph.strip()
  55. except Exception as e:
  56. raise ValueError(f"Error processing DOC file: {str(e)}") from e
  57. finally:
  58. ole.close()
  59. file_obj.close()
  60. def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str:
  61. """Extract text from Word document streams."""
  62. try:
  63. text = word_stream.replace(b"\x00", b"").decode(
  64. "utf-8", errors="ignore"
  65. )
  66. # If table_stream exists, try to extract additional text
  67. if table_stream:
  68. table_text = table_stream.replace(b"\x00", b"").decode(
  69. "utf-8", errors="ignore"
  70. )
  71. text += table_text
  72. return text
  73. except Exception as e:
  74. raise ValueError(f"Error extracting text: {str(e)}") from e
  75. def _clean_text(self, text: str) -> list[str]:
  76. """Clean and split the extracted text into paragraphs."""
  77. # Remove binary artifacts and control characters
  78. text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text)
  79. # Remove multiple spaces and newlines
  80. text = re.sub(r"\s+", " ", text)
  81. # Split into paragraphs on double newlines or other common separators
  82. paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text)
  83. # Remove empty or whitespace-only paragraphs
  84. paragraphs = [p.strip() for p in paragraphs if p.strip()]
  85. return paragraphs