epub_parser.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # type: ignore
  2. import logging
  3. from typing import AsyncGenerator
  4. from core.base.parsers.base_parser import AsyncParser
  5. from core.base.providers import (
  6. CompletionProvider,
  7. DatabaseProvider,
  8. IngestionConfig,
  9. )
  10. logger = logging.getLogger(__name__)
  11. class EPUBParser(AsyncParser[str | bytes]):
  12. """Parser for EPUB electronic book files."""
  13. def __init__(
  14. self,
  15. config: IngestionConfig,
  16. database_provider: DatabaseProvider,
  17. llm_provider: CompletionProvider,
  18. ):
  19. self.database_provider = database_provider
  20. self.llm_provider = llm_provider
  21. self.config = config
  22. try:
  23. import epub
  24. self.epub = epub
  25. except ImportError:
  26. raise ImportError(
  27. "Error: 'epub' is required to run EPUBParser. "
  28. "Please install it using pip: pip install epub"
  29. )
  30. def _safe_get_metadata(self, book, field: str) -> str | None:
  31. """Safely extract metadata field from epub book."""
  32. try:
  33. return getattr(book, field, None) or getattr(book.opf, field, None)
  34. except Exception as e:
  35. logger.debug(f"Error getting {field} metadata: {e}")
  36. return None
  37. def _clean_text(self, content: bytes) -> str:
  38. """Clean HTML content and return plain text."""
  39. try:
  40. import re
  41. text = content.decode("utf-8", errors="ignore")
  42. # Remove HTML tags
  43. text = re.sub(r"<[^>]+>", " ", text)
  44. # Normalize whitespace
  45. text = re.sub(r"\s+", " ", text)
  46. # Remove any remaining HTML entities
  47. text = re.sub(r"&[^;]+;", " ", text)
  48. return text.strip()
  49. except Exception as e:
  50. logger.warning(f"Error cleaning text: {e}")
  51. return ""
  52. async def ingest(
  53. self, data: str | bytes, **kwargs
  54. ) -> AsyncGenerator[str, None]:
  55. """Ingest EPUB data and yield book content."""
  56. if isinstance(data, str):
  57. raise ValueError("EPUB data must be in bytes format.")
  58. from io import BytesIO
  59. file_obj = BytesIO(data)
  60. try:
  61. book = self.epub.open_epub(file_obj)
  62. # Safely extract metadata
  63. metadata = []
  64. for field, label in [
  65. ("title", "Title"),
  66. ("creator", "Author"),
  67. ("language", "Language"),
  68. ("publisher", "Publisher"),
  69. ("date", "Date"),
  70. ]:
  71. if value := self._safe_get_metadata(book, field):
  72. metadata.append(f"{label}: {value}")
  73. if metadata:
  74. yield "\n".join(metadata)
  75. # Extract content from items
  76. try:
  77. manifest = getattr(book.opf, "manifest", {}) or {}
  78. for item in manifest.values():
  79. try:
  80. if (
  81. getattr(item, "mime_type", "")
  82. == "application/xhtml+xml"
  83. ):
  84. if content := book.read_item(item):
  85. if cleaned_text := self._clean_text(content):
  86. yield cleaned_text
  87. except Exception as e:
  88. logger.warning(f"Error processing item: {e}")
  89. continue
  90. except Exception as e:
  91. logger.warning(f"Error accessing manifest: {e}")
  92. # Fallback: try to get content directly
  93. if hasattr(book, "read_item"):
  94. for item_id in getattr(book, "items", []):
  95. try:
  96. if content := book.read_item(item_id):
  97. if cleaned_text := self._clean_text(content):
  98. yield cleaned_text
  99. except Exception as e:
  100. logger.warning(f"Error in fallback reading: {e}")
  101. continue
  102. except Exception as e:
  103. logger.error(f"Error processing EPUB file: {str(e)}")
  104. raise ValueError(f"Error processing EPUB file: {str(e)}")
  105. finally:
  106. try:
  107. file_obj.close()
  108. except Exception as e:
  109. logger.warning(f"Error closing file: {e}")