odt_parser.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # type: ignore
  2. from typing import AsyncGenerator
  3. from core.base.parsers.base_parser import AsyncParser
  4. from core.base.providers import (
  5. CompletionProvider,
  6. DatabaseProvider,
  7. IngestionConfig,
  8. )
  9. class ODTParser(AsyncParser[str | bytes]):
  10. def __init__(
  11. self,
  12. config: IngestionConfig,
  13. database_provider: DatabaseProvider,
  14. llm_provider: CompletionProvider,
  15. ):
  16. self.database_provider = database_provider
  17. self.llm_provider = llm_provider
  18. self.config = config
  19. try:
  20. import xml.etree.ElementTree as ET
  21. import zipfile
  22. self.zipfile = zipfile
  23. self.ET = ET
  24. except ImportError:
  25. raise ImportError("XML parsing libraries not available")
  26. async def ingest(
  27. self, data: str | bytes, **kwargs
  28. ) -> AsyncGenerator[str, None]:
  29. if isinstance(data, str):
  30. raise ValueError("ODT data must be in bytes format.")
  31. from io import BytesIO
  32. file_obj = BytesIO(data)
  33. try:
  34. with self.zipfile.ZipFile(file_obj) as odt:
  35. # ODT files are zip archives containing content.xml
  36. content = odt.read("content.xml")
  37. root = self.ET.fromstring(content)
  38. # ODT XML namespace
  39. ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"}
  40. # Extract paragraphs and headers
  41. for p in root.findall(".//text:p", ns):
  42. text = "".join(p.itertext())
  43. if text.strip():
  44. yield text.strip()
  45. for h in root.findall(".//text:h", ns):
  46. text = "".join(h.itertext())
  47. if text.strip():
  48. yield text.strip()
  49. except Exception as e:
  50. raise ValueError(f"Error processing ODT file: {str(e)}")
  51. finally:
  52. file_obj.close()