123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- # type: ignore
- from email import message_from_bytes, policy
- from typing import AsyncGenerator
- from core.base.parsers.base_parser import AsyncParser
- from core.base.providers import (
- CompletionProvider,
- DatabaseProvider,
- IngestionConfig,
- )
- class EMLParser(AsyncParser[str | bytes]):
- """Parser for EML (email) files."""
- def __init__(
- self,
- config: IngestionConfig,
- database_provider: DatabaseProvider,
- llm_provider: CompletionProvider,
- ):
- self.database_provider = database_provider
- self.llm_provider = llm_provider
- self.config = config
- async def ingest(
- self, data: str | bytes, **kwargs
- ) -> AsyncGenerator[str, None]:
- """Ingest EML data and yield email content."""
- if isinstance(data, str):
- raise ValueError("EML data must be in bytes format.")
- # Parse email with policy for modern email handling
- email_message = message_from_bytes(data, policy=policy.default)
- # Extract and yield email metadata
- metadata = []
- if email_message["Subject"]:
- metadata.append(f"Subject: {email_message['Subject']}")
- if email_message["From"]:
- metadata.append(f"From: {email_message['From']}")
- if email_message["To"]:
- metadata.append(f"To: {email_message['To']}")
- if email_message["Date"]:
- metadata.append(f"Date: {email_message['Date']}")
- if metadata:
- yield "\n".join(metadata)
- # Extract and yield email body
- if email_message.is_multipart():
- for part in email_message.walk():
- if part.get_content_type() == "text/plain":
- text = part.get_content()
- if text.strip():
- yield text.strip()
- elif part.get_content_type() == "text/html":
- # Could add HTML parsing here if needed
- continue
- else:
- body = email_message.get_content()
- if body.strip():
- yield body.strip()
|