python_parser.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # type: ignore
  2. import re
  3. from typing import AsyncGenerator
  4. from core.base.parsers.base_parser import AsyncParser
  5. from core.base.providers import (
  6. CompletionProvider,
  7. DatabaseProvider,
  8. IngestionConfig,
  9. )
  10. class PythonParser(AsyncParser[str | bytes]):
  11. """A parser for Python source code files."""
  12. def __init__(
  13. self,
  14. config: IngestionConfig,
  15. database_provider: DatabaseProvider,
  16. llm_provider: CompletionProvider,
  17. ):
  18. self.database_provider = database_provider
  19. self.llm_provider = llm_provider
  20. self.config = config
  21. async def ingest(
  22. self, data: str | bytes, *args, **kwargs
  23. ) -> AsyncGenerator[str, None]:
  24. """Ingest Python source code and yield structured text representation.
  25. Extracts docstrings, function/class definitions, and comments while
  26. preserving the code structure in a text format suitable for analysis.
  27. :param data: The Python source code to parse.
  28. :param kwargs: Additional keyword arguments.
  29. """
  30. if isinstance(data, bytes):
  31. data = data.decode("utf-8", errors="ignore")
  32. # Process the Python code
  33. processed_text = self._process_python_code(data)
  34. # Yield the processed text
  35. yield processed_text
  36. def _process_python_code(self, code: str) -> str:
  37. """Process Python code into a more structured text representation.
  38. This method:
  39. 1. Preserves module-level docstrings
  40. 2. Extracts class and function definitions with their docstrings
  41. 3. Preserves comments and code structure
  42. 4. Removes unnecessary whitespace
  43. """
  44. # Split into lines for processing
  45. lines = code.splitlines()
  46. result = []
  47. # Extract module docstring if present
  48. module_docstring = self._extract_module_docstring(code)
  49. if module_docstring:
  50. result.append("MODULE DOCSTRING:")
  51. result.append(module_docstring)
  52. result.append("")
  53. # Extract imports
  54. imports = self._extract_imports(lines)
  55. if imports:
  56. result.append("IMPORTS:")
  57. result.extend(imports)
  58. result.append("")
  59. # Extract class and function definitions with docstrings
  60. definitions = self._extract_definitions(code)
  61. if definitions:
  62. result.append("DEFINITIONS:")
  63. result.extend(definitions)
  64. return "\n".join(result)
  65. def _extract_module_docstring(self, code: str) -> str:
  66. """Extract the module-level docstring if present."""
  67. module_docstring_pattern = r'^"""(.*?)"""'
  68. match = re.search(module_docstring_pattern, code, re.DOTALL)
  69. if match:
  70. return match.group(1).strip()
  71. # Try single quotes if double quotes not found
  72. module_docstring_pattern = r"^'''(.*?)'''"
  73. match = re.search(module_docstring_pattern, code, re.DOTALL)
  74. if match:
  75. return match.group(1).strip()
  76. return ""
  77. def _extract_imports(self, lines: list[str]) -> list[str]:
  78. """Extract import statements from the code."""
  79. imports = []
  80. for line in lines:
  81. line = line.strip()
  82. if line.startswith(("import ", "from ")) and not line.startswith(
  83. "#"
  84. ):
  85. imports.append(line)
  86. return imports
  87. def _extract_definitions(self, code: str) -> list[str]:
  88. """Extract class and function definitions with their docstrings."""
  89. definitions = []
  90. # Pattern for class and function definitions
  91. def_pattern = r'((?:def|class)\s+\w+(?:\(.*?\))?\s*(?:->.*?)?:)(?:\s*"""(.*?)"""|\s*\'\'\'(.*?)\'\'\')?'
  92. matches = re.finditer(def_pattern, code, re.DOTALL)
  93. for match in matches:
  94. definition = match.group(1).strip()
  95. docstring = match.group(2) or match.group(3)
  96. definitions.append(definition)
  97. if docstring:
  98. # Format the docstring with indentation
  99. formatted_docstring = "\n".join(
  100. f" {line.strip()}"
  101. for line in docstring.strip().split("\n")
  102. )
  103. definitions.append(formatted_docstring)
  104. definitions.append("") # Add empty line for readability
  105. return definitions