p7s_parser.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # type: ignore
  2. import email
  3. import logging
  4. from base64 import b64decode
  5. from datetime import datetime
  6. from email.message import Message
  7. from typing import AsyncGenerator
  8. from cryptography import x509
  9. from cryptography.hazmat.primitives.serialization import pkcs7
  10. from cryptography.x509.oid import NameOID
  11. from core.base.parsers.base_parser import AsyncParser
  12. from core.base.providers import (
  13. CompletionProvider,
  14. DatabaseProvider,
  15. IngestionConfig,
  16. )
  17. logger = logging.getLogger(__name__)
  18. class P7SParser(AsyncParser[str | bytes]):
  19. """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file."""
  20. def __init__(
  21. self,
  22. config: IngestionConfig,
  23. database_provider: DatabaseProvider,
  24. llm_provider: CompletionProvider,
  25. ):
  26. self.database_provider = database_provider
  27. self.llm_provider = llm_provider
  28. self.config = config
  29. self.x509 = x509
  30. self.pkcs7 = pkcs7
  31. self.NameOID = NameOID
  32. def _format_datetime(self, dt: datetime) -> str:
  33. """Format datetime in a readable way."""
  34. return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
  35. def _get_name_attribute(self, name, oid):
  36. """Safely get name attribute."""
  37. try:
  38. return name.get_attributes_for_oid(oid)[0].value
  39. except (IndexError, ValueError):
  40. return None
  41. def _extract_cert_info(self, cert) -> dict:
  42. """Extract relevant information from a certificate."""
  43. try:
  44. subject = cert.subject
  45. issuer = cert.issuer
  46. info = {
  47. "common_name": self._get_name_attribute(
  48. subject, self.NameOID.COMMON_NAME
  49. ),
  50. "organization": self._get_name_attribute(
  51. subject, self.NameOID.ORGANIZATION_NAME
  52. ),
  53. "email": self._get_name_attribute(
  54. subject, self.NameOID.EMAIL_ADDRESS
  55. ),
  56. "issuer_common_name": self._get_name_attribute(
  57. issuer, self.NameOID.COMMON_NAME
  58. ),
  59. "issuer_organization": self._get_name_attribute(
  60. issuer, self.NameOID.ORGANIZATION_NAME
  61. ),
  62. "serial_number": hex(cert.serial_number)[2:],
  63. "not_valid_before": self._format_datetime(
  64. cert.not_valid_before
  65. ),
  66. "not_valid_after": self._format_datetime(cert.not_valid_after),
  67. "version": cert.version.name,
  68. }
  69. return {k: v for k, v in info.items() if v is not None}
  70. except Exception as e:
  71. logger.warning(f"Error extracting certificate info: {e}")
  72. return {}
  73. def _try_parse_signature(self, data: bytes):
  74. """Try to parse the signature data as PKCS7 containing certificates."""
  75. exceptions = []
  76. # Try DER format PKCS7
  77. try:
  78. certs = self.pkcs7.load_der_pkcs7_certificates(data)
  79. if certs is not None:
  80. return certs
  81. except Exception as e:
  82. exceptions.append(f"DER PKCS7 parsing failed: {str(e)}")
  83. # Try PEM format PKCS7
  84. try:
  85. certs = self.pkcs7.load_pem_pkcs7_certificates(data)
  86. if certs is not None:
  87. return certs
  88. except Exception as e:
  89. exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}")
  90. raise ValueError(
  91. "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n"
  92. + "\n".join(exceptions)
  93. )
  94. def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes:
  95. """Extract the raw PKCS#7 signature data from a MIME message."""
  96. msg: Message = email.message_from_bytes(raw_data)
  97. # If the message is multipart, find the part with application/x-pkcs7-signature
  98. if msg.is_multipart():
  99. for part in msg.walk():
  100. ctype = part.get_content_type()
  101. if ctype == "application/x-pkcs7-signature":
  102. # Get the base64 encoded data from the payload
  103. payload = part.get_payload(decode=False)
  104. # payload at this stage is a base64 string
  105. try:
  106. return b64decode(payload)
  107. except Exception as e:
  108. raise ValueError(
  109. f"Failed to decode base64 PKCS#7 signature: {str(e)}"
  110. ) from e
  111. # If we reach here, no PKCS#7 part was found
  112. raise ValueError(
  113. "No application/x-pkcs7-signature part found in the MIME message."
  114. )
  115. else:
  116. # Not multipart, try to parse directly if it's just a raw P7S
  117. # This scenario is less common; usually it's multipart.
  118. if msg.get_content_type() == "application/x-pkcs7-signature":
  119. payload = msg.get_payload(decode=False)
  120. return b64decode(payload)
  121. raise ValueError(
  122. "The provided data does not contain a valid S/MIME signed message."
  123. )
  124. async def ingest(
  125. self, data: str | bytes, **kwargs
  126. ) -> AsyncGenerator[str, None]:
  127. """Ingest an S/MIME message and extract the PKCS#7 signature
  128. information."""
  129. # If data is a string, it might be base64 encoded, or it might be the raw MIME text.
  130. # We should assume it's raw MIME text here because the input includes MIME headers.
  131. if isinstance(data, str):
  132. # Convert to bytes (raw MIME)
  133. data = data.encode("utf-8")
  134. try:
  135. # Extract the raw PKCS#7 data (der/pem) from the MIME message
  136. p7s_data = self._extract_p7s_data_from_mime(data)
  137. # Parse the PKCS#7 data for certificates
  138. certificates = self._try_parse_signature(p7s_data)
  139. if not certificates:
  140. yield "No certificates found in the provided P7S file."
  141. return
  142. # Process each certificate
  143. for i, cert in enumerate(certificates, 1):
  144. if cert_info := self._extract_cert_info(cert):
  145. yield f"Certificate {i}:"
  146. for key, value in cert_info.items():
  147. if value:
  148. yield f"{key.replace('_', ' ').title()}: {value}"
  149. yield "" # Empty line between certificates
  150. else:
  151. yield f"Certificate {i}: No detailed information extracted."
  152. except Exception as e:
  153. raise ValueError(f"Error processing P7S file: {str(e)}") from e