msg_parser.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # type: ignore
  2. import os
  3. import tempfile
  4. from typing import AsyncGenerator
  5. from msg_parser import MsOxMessage
  6. from core.base.parsers.base_parser import AsyncParser
  7. from core.base.providers import (
  8. CompletionProvider,
  9. DatabaseProvider,
  10. IngestionConfig,
  11. )
  12. class MSGParser(AsyncParser[str | bytes]):
  13. """Parser for MSG (Outlook Message) files using msg_parser."""
  14. def __init__(
  15. self,
  16. config: IngestionConfig,
  17. database_provider: DatabaseProvider,
  18. llm_provider: CompletionProvider,
  19. ):
  20. self.database_provider = database_provider
  21. self.llm_provider = llm_provider
  22. self.config = config
  23. async def ingest(
  24. self, data: str | bytes, **kwargs
  25. ) -> AsyncGenerator[str, None]:
  26. """Ingest MSG data and yield email content."""
  27. if isinstance(data, str):
  28. raise ValueError("MSG data must be in bytes format.")
  29. tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".msg")
  30. try:
  31. tmp_file.write(data)
  32. tmp_file.close()
  33. msg = MsOxMessage(tmp_file.name)
  34. metadata = []
  35. if msg.subject:
  36. metadata.append(f"Subject: {msg.subject}")
  37. if msg.sender:
  38. metadata.append(f"From: {msg.sender}")
  39. if msg.to:
  40. metadata.append(f"To: {', '.join(msg.to)}")
  41. if msg.sent_date:
  42. metadata.append(f"Date: {msg.sent_date}")
  43. if metadata:
  44. yield "\n".join(metadata)
  45. if msg.body:
  46. yield msg.body.strip()
  47. for attachment in msg.attachments:
  48. if attachment.Filename:
  49. yield f"\nAttachment: {attachment.Filename}"
  50. except Exception as e:
  51. raise ValueError(f"Error processing MSG file: {str(e)}") from e
  52. finally:
  53. os.remove(tmp_file.name)