p7s_parser.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # type: ignore
  2. import email
  3. import logging
  4. from base64 import b64decode
  5. from datetime import datetime
  6. from email.message import Message
  7. from typing import AsyncGenerator
  8. from core.base.parsers.base_parser import AsyncParser
  9. from core.base.providers import (
  10. CompletionProvider,
  11. DatabaseProvider,
  12. IngestionConfig,
  13. )
  14. logger = logging.getLogger(__name__)
  15. class P7SParser(AsyncParser[str | bytes]):
  16. """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file."""
  17. def __init__(
  18. self,
  19. config: IngestionConfig,
  20. database_provider: DatabaseProvider,
  21. llm_provider: CompletionProvider,
  22. ):
  23. self.database_provider = database_provider
  24. self.llm_provider = llm_provider
  25. self.config = config
  26. try:
  27. from cryptography import x509
  28. from cryptography.hazmat.primitives.serialization import pkcs7
  29. from cryptography.x509.oid import NameOID
  30. self.x509 = x509
  31. self.pkcs7 = pkcs7
  32. self.NameOID = NameOID
  33. except ImportError:
  34. raise ImportError(
  35. "Error: 'cryptography' is required to run P7SParser. "
  36. "Please install it using pip: pip install cryptography"
  37. )
  38. def _format_datetime(self, dt: datetime) -> str:
  39. """Format datetime in a readable way."""
  40. return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
  41. def _get_name_attribute(self, name, oid):
  42. """Safely get name attribute."""
  43. try:
  44. return name.get_attributes_for_oid(oid)[0].value
  45. except (IndexError, ValueError):
  46. return None
  47. def _extract_cert_info(self, cert) -> dict:
  48. """Extract relevant information from a certificate."""
  49. try:
  50. subject = cert.subject
  51. issuer = cert.issuer
  52. info = {
  53. "common_name": self._get_name_attribute(
  54. subject, self.NameOID.COMMON_NAME
  55. ),
  56. "organization": self._get_name_attribute(
  57. subject, self.NameOID.ORGANIZATION_NAME
  58. ),
  59. "email": self._get_name_attribute(
  60. subject, self.NameOID.EMAIL_ADDRESS
  61. ),
  62. "issuer_common_name": self._get_name_attribute(
  63. issuer, self.NameOID.COMMON_NAME
  64. ),
  65. "issuer_organization": self._get_name_attribute(
  66. issuer, self.NameOID.ORGANIZATION_NAME
  67. ),
  68. "serial_number": hex(cert.serial_number)[2:],
  69. "not_valid_before": self._format_datetime(
  70. cert.not_valid_before
  71. ),
  72. "not_valid_after": self._format_datetime(cert.not_valid_after),
  73. "version": cert.version.name,
  74. }
  75. return {k: v for k, v in info.items() if v is not None}
  76. except Exception as e:
  77. logger.warning(f"Error extracting certificate info: {e}")
  78. return {}
  79. def _try_parse_signature(self, data: bytes):
  80. """Try to parse the signature data as PKCS7 containing certificates."""
  81. exceptions = []
  82. # Try DER format PKCS7
  83. try:
  84. certs = self.pkcs7.load_der_pkcs7_certificates(data)
  85. if certs is not None:
  86. return certs
  87. except Exception as e:
  88. exceptions.append(f"DER PKCS7 parsing failed: {str(e)}")
  89. # Try PEM format PKCS7
  90. try:
  91. certs = self.pkcs7.load_pem_pkcs7_certificates(data)
  92. if certs is not None:
  93. return certs
  94. except Exception as e:
  95. exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}")
  96. raise ValueError(
  97. "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n"
  98. + "\n".join(exceptions)
  99. )
  100. def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes:
  101. """Extract the raw PKCS#7 signature data from a MIME message."""
  102. msg: Message = email.message_from_bytes(raw_data)
  103. # If the message is multipart, find the part with application/x-pkcs7-signature
  104. if msg.is_multipart():
  105. for part in msg.walk():
  106. ctype = part.get_content_type()
  107. if ctype == "application/x-pkcs7-signature":
  108. # Get the base64 encoded data from the payload
  109. payload = part.get_payload(decode=False)
  110. # payload at this stage is a base64 string
  111. try:
  112. return b64decode(payload)
  113. except Exception as e:
  114. raise ValueError(
  115. f"Failed to decode base64 PKCS#7 signature: {str(e)}"
  116. )
  117. # If we reach here, no PKCS#7 part was found
  118. raise ValueError(
  119. "No application/x-pkcs7-signature part found in the MIME message."
  120. )
  121. else:
  122. # Not multipart, try to parse directly if it's just a raw P7S
  123. # This scenario is less common; usually it's multipart.
  124. if msg.get_content_type() == "application/x-pkcs7-signature":
  125. payload = msg.get_payload(decode=False)
  126. return b64decode(payload)
  127. raise ValueError(
  128. "The provided data does not contain a valid S/MIME signed message."
  129. )
  130. async def ingest(
  131. self, data: str | bytes, **kwargs
  132. ) -> AsyncGenerator[str, None]:
  133. """Ingest an S/MIME message and extract the PKCS#7 signature information."""
  134. # If data is a string, it might be base64 encoded, or it might be the raw MIME text.
  135. # We should assume it's raw MIME text here because the input includes MIME headers.
  136. if isinstance(data, str):
  137. # Convert to bytes (raw MIME)
  138. data = data.encode("utf-8")
  139. try:
  140. # Extract the raw PKCS#7 data (der/pem) from the MIME message
  141. p7s_data = self._extract_p7s_data_from_mime(data)
  142. # Parse the PKCS#7 data for certificates
  143. certificates = self._try_parse_signature(p7s_data)
  144. if not certificates:
  145. yield "No certificates found in the provided P7S file."
  146. return
  147. # Process each certificate
  148. for i, cert in enumerate(certificates, 1):
  149. if cert_info := self._extract_cert_info(cert):
  150. yield f"Certificate {i}:"
  151. for key, value in cert_info.items():
  152. if value:
  153. yield f"{key.replace('_', ' ').title()}: {value}"
  154. yield "" # Empty line between certificates
  155. else:
  156. yield f"Certificate {i}: No detailed information extracted."
  157. except Exception as e:
  158. raise ValueError(f"Error processing P7S file: {str(e)}")