org_parser.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # type: ignore
  2. from typing import AsyncGenerator
  3. from core.base.parsers.base_parser import AsyncParser
  4. from core.base.providers import (
  5. CompletionProvider,
  6. DatabaseProvider,
  7. IngestionConfig,
  8. )
  9. class ORGParser(AsyncParser[str | bytes]):
  10. """Parser for ORG (Emacs Org-mode) files."""
  11. def __init__(
  12. self,
  13. config: IngestionConfig,
  14. database_provider: DatabaseProvider,
  15. llm_provider: CompletionProvider,
  16. ):
  17. self.database_provider = database_provider
  18. self.llm_provider = llm_provider
  19. self.config = config
  20. try:
  21. import orgparse
  22. self.orgparse = orgparse
  23. except ImportError:
  24. raise ImportError(
  25. "Error: 'orgparse' is required to run ORGParser. "
  26. "Please install it using pip: pip install orgparse"
  27. )
  28. def _process_node(self, node) -> list[str]:
  29. """Process an org-mode node and return its content."""
  30. contents = []
  31. # Add heading with proper level of asterisks
  32. if node.level > 0:
  33. contents.append(f"{'*' * node.level} {node.heading}")
  34. # Add body content if exists
  35. if node.body:
  36. contents.append(node.body.strip())
  37. return contents
  38. async def ingest(
  39. self, data: str | bytes, **kwargs
  40. ) -> AsyncGenerator[str, None]:
  41. """Ingest ORG data and yield document content."""
  42. if isinstance(data, bytes):
  43. data = data.decode("utf-8")
  44. try:
  45. # Create a temporary file-like object for orgparse
  46. from io import StringIO
  47. file_obj = StringIO(data)
  48. # Parse the org file
  49. root = self.orgparse.load(file_obj)
  50. # Process root node if it has content
  51. if root.body:
  52. yield root.body.strip()
  53. # Process all nodes
  54. for node in root[1:]: # Skip root node in iteration
  55. contents = self._process_node(node)
  56. for content in contents:
  57. if content.strip():
  58. yield content.strip()
  59. except Exception as e:
  60. raise ValueError(f"Error processing ORG file: {str(e)}")
  61. finally:
  62. file_obj.close()