sync_ingestion.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. from __future__ import annotations # for Python 3.10+
  2. import json
  3. import os
  4. from contextlib import ExitStack
  5. from typing import Optional, Union
  6. from uuid import UUID
  7. from typing_extensions import deprecated
  8. from shared.abstractions import IndexMeasure, IndexMethod, VectorTableName
  9. class SyncIngestionMixins:
  10. @deprecated("Use client.documents.create() instead")
  11. def ingest_files(
  12. self,
  13. file_paths: list[str],
  14. document_ids: Optional[list[Union[str, UUID]]] = None,
  15. metadatas: Optional[list[dict]] = None,
  16. ingestion_config: Optional[dict] = None,
  17. collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
  18. run_with_orchestration: Optional[bool] = None,
  19. ) -> dict:
  20. """
  21. Ingest files into your R2R deployment
  22. Args:
  23. file_paths (List[str]): List of file paths to ingest.
  24. document_ids (Optional[List[str]]): List of document IDs.
  25. metadatas (Optional[List[dict]]): List of metadata dictionaries for each file.
  26. ingestion_config (Optional[Union[dict]]): Custom chunking configuration.
  27. Returns:
  28. dict: Ingestion results containing processed, failed, and skipped documents.
  29. """
  30. if document_ids is not None and len(file_paths) != len(document_ids):
  31. raise ValueError(
  32. "Number of file paths must match number of document IDs."
  33. )
  34. if metadatas is not None and len(file_paths) != len(metadatas):
  35. raise ValueError(
  36. "Number of metadatas must match number of document IDs."
  37. )
  38. with ExitStack() as stack:
  39. all_file_paths: list[str] = []
  40. for path in file_paths:
  41. if os.path.isdir(path):
  42. for root, _, files in os.walk(path):
  43. all_file_paths.extend(
  44. os.path.join(root, file) for file in files
  45. )
  46. else:
  47. all_file_paths.append(path)
  48. with ExitStack() as stack:
  49. files_tuples = [
  50. (
  51. "files",
  52. (
  53. os.path.basename(file),
  54. stack.enter_context(open(file, "rb")),
  55. "application/octet-stream",
  56. ),
  57. )
  58. for file in all_file_paths
  59. ]
  60. data = {}
  61. if document_ids:
  62. data["document_ids"] = json.dumps(
  63. [str(doc_id) for doc_id in document_ids]
  64. )
  65. if metadatas:
  66. data["metadatas"] = json.dumps(metadatas)
  67. if ingestion_config:
  68. data["ingestion_config"] = json.dumps(ingestion_config)
  69. if run_with_orchestration is not None:
  70. data["run_with_orchestration"] = str(
  71. run_with_orchestration
  72. )
  73. if collection_ids:
  74. data["collection_ids"] = json.dumps(
  75. [
  76. [
  77. str(collection_id)
  78. for collection_id in doc_collection_ids
  79. ]
  80. for doc_collection_ids in collection_ids
  81. ]
  82. )
  83. return self._make_request( # type: ignore
  84. "POST", "ingest_files", data=data, files=files_tuples
  85. )
  86. @deprecated("Use client.documents.update() instead")
  87. def update_files(
  88. self,
  89. file_paths: list[str],
  90. document_ids: Optional[list[Union[str, UUID]]] = None,
  91. metadatas: Optional[list[dict]] = None,
  92. ingestion_config: Optional[dict] = None,
  93. collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
  94. run_with_orchestration: Optional[bool] = None,
  95. ) -> dict:
  96. """
  97. Update existing files in your R2R deployment.
  98. Args:
  99. file_paths (List[str]): List of file paths to update.
  100. document_ids (List[str]): List of document IDs to update.
  101. metadatas (Optional[List[dict]]): List of updated metadata dictionaries for each file.
  102. ingestion_config (Optional[Union[dict]]): Custom chunking configuration.
  103. Returns:
  104. dict: Update results containing processed, failed, and skipped documents.
  105. """
  106. if document_ids is not None and len(file_paths) != len(document_ids):
  107. raise ValueError(
  108. "Number of file paths must match number of document IDs."
  109. )
  110. if metadatas is not None and len(file_paths) != len(metadatas):
  111. raise ValueError(
  112. "Number of file paths must match number of document IDs."
  113. )
  114. with ExitStack() as stack:
  115. files = [
  116. (
  117. "files",
  118. (
  119. os.path.basename(file),
  120. stack.enter_context(open(file, "rb")),
  121. "application/octet-stream",
  122. ),
  123. )
  124. for file in file_paths
  125. ]
  126. data = {}
  127. if document_ids:
  128. data["document_ids"] = json.dumps(
  129. [str(doc_id) for doc_id in document_ids]
  130. )
  131. if metadatas:
  132. data["metadatas"] = json.dumps(metadatas)
  133. if ingestion_config:
  134. data["ingestion_config"] = json.dumps(ingestion_config)
  135. if run_with_orchestration is not None:
  136. data["run_with_orchestration"] = str(run_with_orchestration)
  137. if collection_ids:
  138. data["collection_ids"] = json.dumps(
  139. [
  140. [
  141. str(collection_id)
  142. for collection_id in doc_collection_ids
  143. ]
  144. for doc_collection_ids in collection_ids
  145. ]
  146. )
  147. return self._make_request( # type: ignore
  148. "POST", "update_files", data=data, files=files
  149. )
  150. @deprecated("Use client.chunks.create() instead")
  151. def ingest_chunks(
  152. self,
  153. chunks: list[dict],
  154. document_id: Optional[UUID] = None,
  155. metadata: Optional[dict] = None,
  156. collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
  157. run_with_orchestration: Optional[bool] = None,
  158. ) -> dict:
  159. """
  160. Ingest files into your R2R deployment
  161. Args:
  162. chunks (List[dict]): List of dictionaries containing chunk data.
  163. document_id (Optional[UUID]): The ID of the document to ingest chunks into.
  164. metadata (Optional[dict]): Metadata dictionary for the document
  165. Returns:
  166. dict: Ingestion results containing processed, failed, and skipped documents.
  167. """
  168. data = {
  169. "chunks": chunks,
  170. "document_id": document_id,
  171. "metadata": metadata,
  172. }
  173. if run_with_orchestration is not None:
  174. data["run_with_orchestration"] = str(run_with_orchestration) # type: ignore
  175. if collection_ids:
  176. data["collection_ids"] = json.dumps( # type: ignore
  177. [
  178. [
  179. str(collection_id)
  180. for collection_id in doc_collection_ids
  181. ]
  182. for doc_collection_ids in collection_ids
  183. ]
  184. )
  185. return self._make_request("POST", "ingest_chunks", json=data) # type: ignore
  186. @deprecated("Use client.chunks.update() instead")
  187. def update_chunks(
  188. self,
  189. document_id: UUID,
  190. chunk_id: UUID,
  191. text: str,
  192. metadata: Optional[dict] = None,
  193. run_with_orchestration: Optional[bool] = None,
  194. ) -> dict:
  195. """
  196. Update the content of an existing chunk.
  197. Args:
  198. document_id (UUID): The ID of the document containing the chunk.
  199. chunk_id (UUID): The ID of the chunk to update.
  200. text (str): The new text content of the chunk.
  201. metadata (Optional[dict]): Metadata dictionary for the chunk.
  202. run_with_orchestration (Optional[bool]): Whether to run the update through orchestration.
  203. Returns:
  204. dict: Update results containing processed, failed, and skipped documents.
  205. """
  206. data = {
  207. "text": text,
  208. "metadata": metadata,
  209. "run_with_orchestration": run_with_orchestration,
  210. }
  211. # Remove None values from payload
  212. data = {k: v for k, v in data.items() if v is not None}
  213. return self._make_request("PUT", f"update_chunk/{document_id}/{chunk_id}", json=data) # type: ignore
  214. @deprecated("Use client.indices.create() instead")
  215. def create_vector_index(
  216. self,
  217. table_name: VectorTableName = VectorTableName.CHUNKS,
  218. index_method: IndexMethod = IndexMethod.hnsw,
  219. index_measure: IndexMeasure = IndexMeasure.cosine_distance,
  220. index_arguments: Optional[dict] = None,
  221. index_name: Optional[str] = None,
  222. index_column: Optional[list[str]] = None,
  223. concurrently: bool = True,
  224. ) -> dict:
  225. """
  226. Create a vector index for a given table.
  227. Args:
  228. table_name (VectorTableName): Name of the table to create index on
  229. index_method (IndexMethod): Method to use for indexing (hnsw or ivf_flat)
  230. index_measure (IndexMeasure): Distance measure to use
  231. index_arguments (Optional[dict]): Additional arguments for the index
  232. index_name (Optional[str]): Custom name for the index
  233. concurrently (bool): Whether to create the index concurrently
  234. Returns:
  235. dict: Response containing the creation status
  236. """
  237. data = {
  238. "table_name": table_name,
  239. "index_method": index_method,
  240. "index_measure": index_measure,
  241. "index_arguments": index_arguments,
  242. "index_name": index_name,
  243. "index_column": index_column,
  244. "concurrently": concurrently,
  245. }
  246. return self._make_request( # type: ignore
  247. "POST", "create_vector_index", json=data
  248. )
  249. @deprecated("Use client.indices.list() instead")
  250. def list_vector_indices(
  251. self,
  252. table_name: VectorTableName = VectorTableName.CHUNKS,
  253. ) -> dict:
  254. """
  255. List all vector indices for a given table.
  256. Args:
  257. table_name (VectorTableName): Name of the table to list indices from
  258. Returns:
  259. dict: Response containing the list of indices
  260. """
  261. params = {"table_name": table_name}
  262. return self._make_request( # type: ignore
  263. "GET", "list_vector_indices", params=params
  264. )
  265. @deprecated("Use client.indices.delete() instead")
  266. def delete_vector_index(
  267. self,
  268. index_name: str,
  269. table_name: VectorTableName = VectorTableName.CHUNKS,
  270. concurrently: bool = True,
  271. ) -> dict:
  272. """
  273. Delete a vector index from a given table.
  274. Args:
  275. index_name (str): Name of the index to delete
  276. table_name (VectorTableName): Name of the table containing the index
  277. concurrently (bool): Whether to delete the index concurrently
  278. Returns:
  279. dict: Response containing the deletion status
  280. """
  281. data = {
  282. "index_name": index_name,
  283. "table_name": table_name,
  284. "concurrently": concurrently,
  285. }
  286. return self._make_request( # type: ignore
  287. "DELETE", "delete_vector_index", json=data
  288. )
  289. @deprecated("Use client.documents.update() instead")
  290. def update_document_metadata(
  291. self,
  292. document_id: Union[str, UUID],
  293. metadata: dict,
  294. ) -> dict:
  295. """
  296. Update the metadata of an existing document.
  297. Args:
  298. document_id (Union[str, UUID]): The ID of the document to update.
  299. metadata (dict): The new metadata to merge with existing metadata.
  300. run_with_orchestration (Optional[bool]): Whether to run the update through orchestration.
  301. Returns:
  302. dict: Update results containing the status of the metadata update.
  303. """
  304. data = {
  305. "metadata": metadata,
  306. }
  307. # Remove None values from payload
  308. data = {k: v for k, v in data.items() if v is not None}
  309. return self._make_request( # type: ignore
  310. "POST", f"update_document_metadata/{document_id}", json=metadata
  311. )