doc_parser.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # type: ignore
  2. import re
  3. from io import BytesIO
  4. from typing import AsyncGenerator
  5. from core.base.parsers.base_parser import AsyncParser
  6. from core.base.providers import (
  7. CompletionProvider,
  8. DatabaseProvider,
  9. IngestionConfig,
  10. )
  11. class DOCParser(AsyncParser[str | bytes]):
  12. """A parser for DOC (legacy Microsoft Word) data."""
  13. def __init__(
  14. self,
  15. config: IngestionConfig,
  16. database_provider: DatabaseProvider,
  17. llm_provider: CompletionProvider,
  18. ):
  19. self.database_provider = database_provider
  20. self.llm_provider = llm_provider
  21. self.config = config
  22. try:
  23. import olefile
  24. self.olefile = olefile
  25. except ImportError:
  26. raise ImportError(
  27. "Error: 'olefile' is required to run DOCParser. "
  28. "Please install it using pip: pip install olefile"
  29. )
  30. async def ingest(
  31. self, data: str | bytes, **kwargs
  32. ) -> AsyncGenerator[str, None]:
  33. """Ingest DOC data and yield text from the document."""
  34. if isinstance(data, str):
  35. raise ValueError("DOC data must be in bytes format.")
  36. # Create BytesIO object from the data
  37. file_obj = BytesIO(data)
  38. try:
  39. # Open the DOC file using olefile
  40. ole = self.olefile.OleFileIO(file_obj)
  41. # Check if it's a Word document
  42. if not ole.exists("WordDocument"):
  43. raise ValueError("Not a valid Word document")
  44. # Read the WordDocument stream
  45. word_stream = ole.openstream("WordDocument").read()
  46. # Read the text from the 0Table or 1Table stream (contains the text)
  47. if ole.exists("1Table"):
  48. table_stream = ole.openstream("1Table").read()
  49. elif ole.exists("0Table"):
  50. table_stream = ole.openstream("0Table").read()
  51. else:
  52. table_stream = b""
  53. # Extract text content
  54. text = self._extract_text(word_stream, table_stream)
  55. # Clean and split the text
  56. paragraphs = self._clean_text(text)
  57. # Yield non-empty paragraphs
  58. for paragraph in paragraphs:
  59. if paragraph.strip():
  60. yield paragraph.strip()
  61. except Exception as e:
  62. raise ValueError(f"Error processing DOC file: {str(e)}")
  63. finally:
  64. ole.close()
  65. file_obj.close()
  66. def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str:
  67. """Extract text from Word document streams."""
  68. try:
  69. text = word_stream.replace(b"\x00", b"").decode(
  70. "utf-8", errors="ignore"
  71. )
  72. # If table_stream exists, try to extract additional text
  73. if table_stream:
  74. table_text = table_stream.replace(b"\x00", b"").decode(
  75. "utf-8", errors="ignore"
  76. )
  77. text += table_text
  78. return text
  79. except Exception as e:
  80. raise ValueError(f"Error extracting text: {str(e)}")
  81. def _clean_text(self, text: str) -> list[str]:
  82. """Clean and split the extracted text into paragraphs."""
  83. # Remove binary artifacts and control characters
  84. text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text)
  85. # Remove multiple spaces and newlines
  86. text = re.sub(r"\s+", " ", text)
  87. # Split into paragraphs on double newlines or other common separators
  88. paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text)
  89. # Remove empty or whitespace-only paragraphs
  90. paragraphs = [p.strip() for p in paragraphs if p.strip()]
  91. return paragraphs