123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- # type: ignore
- import email
- import logging
- from base64 import b64decode
- from datetime import datetime
- from email.message import Message
- from typing import AsyncGenerator
- from core.base.parsers.base_parser import AsyncParser
- from core.base.providers import (
- CompletionProvider,
- DatabaseProvider,
- IngestionConfig,
- )
- logger = logging.getLogger(__name__)
- class P7SParser(AsyncParser[str | bytes]):
- """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file."""
- def __init__(
- self,
- config: IngestionConfig,
- database_provider: DatabaseProvider,
- llm_provider: CompletionProvider,
- ):
- self.database_provider = database_provider
- self.llm_provider = llm_provider
- self.config = config
- try:
- from cryptography import x509
- from cryptography.hazmat.primitives.serialization import pkcs7
- from cryptography.x509.oid import NameOID
- self.x509 = x509
- self.pkcs7 = pkcs7
- self.NameOID = NameOID
- except ImportError:
- raise ImportError(
- "Error: 'cryptography' is required to run P7SParser. "
- "Please install it using pip: pip install cryptography"
- )
- def _format_datetime(self, dt: datetime) -> str:
- """Format datetime in a readable way."""
- return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
- def _get_name_attribute(self, name, oid):
- """Safely get name attribute."""
- try:
- return name.get_attributes_for_oid(oid)[0].value
- except (IndexError, ValueError):
- return None
- def _extract_cert_info(self, cert) -> dict:
- """Extract relevant information from a certificate."""
- try:
- subject = cert.subject
- issuer = cert.issuer
- info = {
- "common_name": self._get_name_attribute(
- subject, self.NameOID.COMMON_NAME
- ),
- "organization": self._get_name_attribute(
- subject, self.NameOID.ORGANIZATION_NAME
- ),
- "email": self._get_name_attribute(
- subject, self.NameOID.EMAIL_ADDRESS
- ),
- "issuer_common_name": self._get_name_attribute(
- issuer, self.NameOID.COMMON_NAME
- ),
- "issuer_organization": self._get_name_attribute(
- issuer, self.NameOID.ORGANIZATION_NAME
- ),
- "serial_number": hex(cert.serial_number)[2:],
- "not_valid_before": self._format_datetime(
- cert.not_valid_before
- ),
- "not_valid_after": self._format_datetime(cert.not_valid_after),
- "version": cert.version.name,
- }
- return {k: v for k, v in info.items() if v is not None}
- except Exception as e:
- logger.warning(f"Error extracting certificate info: {e}")
- return {}
- def _try_parse_signature(self, data: bytes):
- """Try to parse the signature data as PKCS7 containing certificates."""
- exceptions = []
- # Try DER format PKCS7
- try:
- certs = self.pkcs7.load_der_pkcs7_certificates(data)
- if certs is not None:
- return certs
- except Exception as e:
- exceptions.append(f"DER PKCS7 parsing failed: {str(e)}")
- # Try PEM format PKCS7
- try:
- certs = self.pkcs7.load_pem_pkcs7_certificates(data)
- if certs is not None:
- return certs
- except Exception as e:
- exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}")
- raise ValueError(
- "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n"
- + "\n".join(exceptions)
- )
- def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes:
- """Extract the raw PKCS#7 signature data from a MIME message."""
- msg: Message = email.message_from_bytes(raw_data)
- # If the message is multipart, find the part with application/x-pkcs7-signature
- if msg.is_multipart():
- for part in msg.walk():
- ctype = part.get_content_type()
- if ctype == "application/x-pkcs7-signature":
- # Get the base64 encoded data from the payload
- payload = part.get_payload(decode=False)
- # payload at this stage is a base64 string
- try:
- return b64decode(payload)
- except Exception as e:
- raise ValueError(
- f"Failed to decode base64 PKCS#7 signature: {str(e)}"
- )
- # If we reach here, no PKCS#7 part was found
- raise ValueError(
- "No application/x-pkcs7-signature part found in the MIME message."
- )
- else:
- # Not multipart, try to parse directly if it's just a raw P7S
- # This scenario is less common; usually it's multipart.
- if msg.get_content_type() == "application/x-pkcs7-signature":
- payload = msg.get_payload(decode=False)
- return b64decode(payload)
- raise ValueError(
- "The provided data does not contain a valid S/MIME signed message."
- )
- async def ingest(
- self, data: str | bytes, **kwargs
- ) -> AsyncGenerator[str, None]:
- """Ingest an S/MIME message and extract the PKCS#7 signature information."""
- # If data is a string, it might be base64 encoded, or it might be the raw MIME text.
- # We should assume it's raw MIME text here because the input includes MIME headers.
- if isinstance(data, str):
- # Convert to bytes (raw MIME)
- data = data.encode("utf-8")
- try:
- # Extract the raw PKCS#7 data (der/pem) from the MIME message
- p7s_data = self._extract_p7s_data_from_mime(data)
- # Parse the PKCS#7 data for certificates
- certificates = self._try_parse_signature(p7s_data)
- if not certificates:
- yield "No certificates found in the provided P7S file."
- return
- # Process each certificate
- for i, cert in enumerate(certificates, 1):
- if cert_info := self._extract_cert_info(cert):
- yield f"Certificate {i}:"
- for key, value in cert_info.items():
- if value:
- yield f"{key.replace('_', ' ').title()}: {value}"
- yield "" # Empty line between certificates
- else:
- yield f"Certificate {i}: No detailed information extracted."
- except Exception as e:
- raise ValueError(f"Error processing P7S file: {str(e)}")
|