xls_parser.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # type: ignore
  2. from typing import AsyncGenerator
  3. from core.base.parsers.base_parser import AsyncParser
  4. from core.base.providers import (
  5. CompletionProvider,
  6. DatabaseProvider,
  7. IngestionConfig,
  8. )
  9. class XLSParser(AsyncParser[str | bytes]):
  10. """A parser for XLS (Excel 97-2003) data."""
  11. def __init__(
  12. self,
  13. config: IngestionConfig,
  14. database_provider: DatabaseProvider,
  15. llm_provider: CompletionProvider,
  16. ):
  17. self.database_provider = database_provider
  18. self.llm_provider = llm_provider
  19. self.config = config
  20. try:
  21. import xlrd
  22. self.xlrd = xlrd
  23. except ImportError:
  24. raise ImportError(
  25. "Error: 'xlrd' is required to run XLSParser. "
  26. "Please install it using pip: pip install xlrd"
  27. )
  28. async def ingest(
  29. self, data: bytes, *args, **kwargs
  30. ) -> AsyncGenerator[str, None]:
  31. """Ingest XLS data and yield text from each row."""
  32. if isinstance(data, str):
  33. raise ValueError("XLS data must be in bytes format.")
  34. wb = self.xlrd.open_workbook(file_contents=data)
  35. for sheet in wb.sheets():
  36. for row_idx in range(sheet.nrows):
  37. # Get all values in the row
  38. row_values = []
  39. for col_idx in range(sheet.ncols):
  40. cell = sheet.cell(row_idx, col_idx)
  41. # Handle different cell types
  42. if cell.ctype == self.xlrd.XL_CELL_DATE:
  43. try:
  44. value = self.xlrd.xldate_as_datetime(
  45. cell.value, wb.datemode
  46. ).strftime("%Y-%m-%d")
  47. except Exception:
  48. value = str(cell.value)
  49. elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN:
  50. value = str(bool(cell.value)).lower()
  51. elif cell.ctype == self.xlrd.XL_CELL_ERROR:
  52. value = "#ERROR#"
  53. else:
  54. value = str(cell.value).strip()
  55. row_values.append(value)
  56. # Yield non-empty rows
  57. if any(val.strip() for val in row_values):
  58. yield ", ".join(row_values)
  59. class XLSParserAdvanced(AsyncParser[str | bytes]):
  60. """An advanced parser for XLS data with chunking support."""
  61. def __init__(
  62. self, config: IngestionConfig, llm_provider: CompletionProvider
  63. ):
  64. self.llm_provider = llm_provider
  65. self.config = config
  66. try:
  67. import networkx as nx
  68. import numpy as np
  69. import xlrd
  70. self.nx = nx
  71. self.np = np
  72. self.xlrd = xlrd
  73. except ImportError:
  74. raise ImportError(
  75. "Error: 'networkx', 'numpy', and 'xlrd' are required to run XLSParserAdvanced. "
  76. "Please install them using pip: pip install networkx numpy xlrd"
  77. )
  78. def connected_components(self, arr):
  79. g = self.nx.grid_2d_graph(len(arr), len(arr[0]))
  80. empty_cell_indices = list(zip(*self.np.where(arr == "")))
  81. g.remove_nodes_from(empty_cell_indices)
  82. components = self.nx.connected_components(g)
  83. for component in components:
  84. rows, cols = zip(*component)
  85. min_row, max_row = min(rows), max(rows)
  86. min_col, max_col = min(cols), max(cols)
  87. yield arr[min_row : max_row + 1, min_col : max_col + 1]
  88. def get_cell_value(self, cell, workbook):
  89. """Extract cell value handling different data types."""
  90. if cell.ctype == self.xlrd.XL_CELL_DATE:
  91. try:
  92. return self.xlrd.xldate_as_datetime(
  93. cell.value, workbook.datemode
  94. ).strftime("%Y-%m-%d")
  95. except Exception:
  96. return str(cell.value)
  97. elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN:
  98. return str(bool(cell.value)).lower()
  99. elif cell.ctype == self.xlrd.XL_CELL_ERROR:
  100. return "#ERROR#"
  101. else:
  102. return str(cell.value).strip()
  103. async def ingest(
  104. self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs
  105. ) -> AsyncGenerator[str, None]:
  106. """Ingest XLS data and yield text from each connected component."""
  107. if isinstance(data, str):
  108. raise ValueError("XLS data must be in bytes format.")
  109. workbook = self.xlrd.open_workbook(file_contents=data)
  110. for sheet in workbook.sheets():
  111. # Convert sheet to numpy array with proper value handling
  112. ws_data = self.np.array(
  113. [
  114. [
  115. self.get_cell_value(sheet.cell(row, col), workbook)
  116. for col in range(sheet.ncols)
  117. ]
  118. for row in range(sheet.nrows)
  119. ]
  120. )
  121. for table in self.connected_components(ws_data):
  122. if len(table) <= 1:
  123. continue
  124. num_rows = len(table)
  125. num_rows_per_chunk = num_col_times_num_rows // num_rows
  126. headers = ", ".join(table[0])
  127. for i in range(1, num_rows, num_rows_per_chunk):
  128. chunk = table[i : i + num_rows_per_chunk]
  129. yield headers + "\n" + "\n".join(
  130. [", ".join(row) for row in chunk]
  131. )