123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- # type: ignore
- from typing import AsyncGenerator
- from core.base.parsers.base_parser import AsyncParser
- from core.base.providers import (
- CompletionProvider,
- DatabaseProvider,
- IngestionConfig,
- )
- class MSGParser(AsyncParser[str | bytes]):
- """Parser for MSG (Outlook Message) files."""
- def __init__(
- self,
- config: IngestionConfig,
- database_provider: DatabaseProvider,
- llm_provider: CompletionProvider,
- ):
- self.database_provider = database_provider
- self.llm_provider = llm_provider
- self.config = config
- try:
- import extract_msg
- self.extract_msg = extract_msg
- except ImportError:
- raise ImportError(
- "Error: 'extract-msg' is required to run MSGParser. "
- "Please install it using pip: pip install extract-msg"
- )
- async def ingest(
- self, data: str | bytes, **kwargs
- ) -> AsyncGenerator[str, None]:
- """Ingest MSG data and yield email content."""
- if isinstance(data, str):
- raise ValueError("MSG data must be in bytes format.")
- from io import BytesIO
- file_obj = BytesIO(data)
- try:
- msg = self.extract_msg.Message(file_obj)
- # Extract metadata
- metadata = []
- if msg.subject:
- metadata.append(f"Subject: {msg.subject}")
- if msg.sender:
- metadata.append(f"From: {msg.sender}")
- if msg.to:
- metadata.append(f"To: {msg.to}")
- if msg.date:
- metadata.append(f"Date: {msg.date}")
- if metadata:
- yield "\n".join(metadata)
- # Extract body
- if msg.body:
- yield msg.body.strip()
- # Extract attachments (optional)
- for attachment in msg.attachments:
- if hasattr(attachment, "name"):
- yield f"\nAttachment: {attachment.name}"
- except Exception as e:
- raise ValueError(f"Error processing MSG file: {str(e)}")
- finally:
- file_obj.close()
|