odt_parser.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # type: ignore
  2. import xml.etree.ElementTree as ET
  3. import zipfile
  4. from typing import AsyncGenerator
  5. from core.base.parsers.base_parser import AsyncParser
  6. from core.base.providers import (
  7. CompletionProvider,
  8. DatabaseProvider,
  9. IngestionConfig,
  10. )
  11. class ODTParser(AsyncParser[str | bytes]):
  12. def __init__(
  13. self,
  14. config: IngestionConfig,
  15. database_provider: DatabaseProvider,
  16. llm_provider: CompletionProvider,
  17. ):
  18. self.database_provider = database_provider
  19. self.llm_provider = llm_provider
  20. self.config = config
  21. self.zipfile = zipfile
  22. self.ET = ET
  23. async def ingest(
  24. self, data: str | bytes, **kwargs
  25. ) -> AsyncGenerator[str, None]:
  26. if isinstance(data, str):
  27. raise ValueError("ODT data must be in bytes format.")
  28. from io import BytesIO
  29. file_obj = BytesIO(data)
  30. try:
  31. with self.zipfile.ZipFile(file_obj) as odt:
  32. # ODT files are zip archives containing content.xml
  33. content = odt.read("content.xml")
  34. root = self.ET.fromstring(content)
  35. # ODT XML namespace
  36. ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"}
  37. # Extract paragraphs and headers
  38. for p in root.findall(".//text:p", ns):
  39. text = "".join(p.itertext())
  40. if text.strip():
  41. yield text.strip()
  42. for h in root.findall(".//text:h", ns):
  43. text = "".join(h.itertext())
  44. if text.strip():
  45. yield text.strip()
  46. except Exception as e:
  47. raise ValueError(f"Error processing ODT file: {str(e)}") from e
  48. finally:
  49. file_obj.close()