1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- # type: ignore
- import os
- import tempfile
- from typing import AsyncGenerator
- from msg_parser import MsOxMessage
- from core.base.parsers.base_parser import AsyncParser
- from core.base.providers import (
- CompletionProvider,
- DatabaseProvider,
- IngestionConfig,
- )
- class MSGParser(AsyncParser[str | bytes]):
- """Parser for MSG (Outlook Message) files using msg_parser."""
- def __init__(
- self,
- config: IngestionConfig,
- database_provider: DatabaseProvider,
- llm_provider: CompletionProvider,
- ):
- self.database_provider = database_provider
- self.llm_provider = llm_provider
- self.config = config
- async def ingest(
- self, data: str | bytes, **kwargs
- ) -> AsyncGenerator[str, None]:
- """Ingest MSG data and yield email content."""
- if isinstance(data, str):
- raise ValueError("MSG data must be in bytes format.")
- tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".msg")
- try:
- tmp_file.write(data)
- tmp_file.close()
- msg = MsOxMessage(tmp_file.name)
- metadata = []
- if msg.subject:
- metadata.append(f"Subject: {msg.subject}")
- if msg.sender:
- metadata.append(f"From: {msg.sender}")
- if msg.to:
- metadata.append(f"To: {', '.join(msg.to)}")
- if msg.sent_date:
- metadata.append(f"Date: {msg.sent_date}")
- if metadata:
- yield "\n".join(metadata)
- if msg.body:
- yield msg.body.strip()
- for attachment in msg.attachments:
- if attachment.Filename:
- yield f"\nAttachment: {attachment.Filename}"
- except Exception as e:
- raise ValueError(f"Error processing MSG file: {str(e)}") from e
- finally:
- os.remove(tmp_file.name)
|