123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- from __future__ import annotations # for Python 3.10+
- import json
- import os
- from contextlib import ExitStack
- from typing import Optional, Union
- from uuid import UUID
- from typing_extensions import deprecated
- from shared.abstractions import IndexMeasure, IndexMethod, VectorTableName
- class IngestionMixins:
- @deprecated("Use client.documents.create() instead")
- async def ingest_files(
- self,
- file_paths: list[str],
- document_ids: Optional[list[Union[str, UUID]]] = None,
- metadatas: Optional[list[dict]] = None,
- ingestion_config: Optional[dict] = None,
- collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
- run_with_orchestration: Optional[bool] = None,
- ) -> dict:
- """
- Ingest files into your R2R deployment
- Args:
- file_paths (List[str]): List of file paths to ingest.
- document_ids (Optional[List[str]]): List of document IDs.
- metadatas (Optional[List[dict]]): List of metadata dictionaries for each file.
- ingestion_config (Optional[Union[dict]]): Custom chunking configuration.
- Returns:
- dict: Ingestion results containing processed, failed, and skipped documents.
- """
- if document_ids is not None and len(file_paths) != len(document_ids):
- raise ValueError(
- "Number of file paths must match number of document IDs."
- )
- if metadatas is not None and len(file_paths) != len(metadatas):
- raise ValueError(
- "Number of metadatas must match number of document IDs."
- )
- with ExitStack() as stack:
- all_file_paths: list[str] = []
- for path in file_paths:
- if os.path.isdir(path):
- for root, _, files in os.walk(path):
- all_file_paths.extend(
- os.path.join(root, file) for file in files
- )
- else:
- all_file_paths.append(path)
- with ExitStack() as stack:
- files_tuples = [
- (
- "files",
- (
- os.path.basename(file),
- stack.enter_context(open(file, "rb")),
- "application/octet-stream",
- ),
- )
- for file in all_file_paths
- ]
- data = {}
- if document_ids:
- data["document_ids"] = json.dumps(
- [str(doc_id) for doc_id in document_ids]
- )
- if metadatas:
- data["metadatas"] = json.dumps(metadatas)
- if ingestion_config:
- data["ingestion_config"] = json.dumps(ingestion_config)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(
- run_with_orchestration
- )
- if collection_ids:
- data["collection_ids"] = json.dumps(
- [
- [
- str(collection_id)
- for collection_id in doc_collection_ids
- ]
- for doc_collection_ids in collection_ids
- ]
- )
- return await self._make_request( # type: ignore
- "POST", "ingest_files", data=data, files=files_tuples
- )
- @deprecated("Use client.documents.update() instead")
- async def update_files(
- self,
- file_paths: list[str],
- document_ids: Optional[list[Union[str, UUID]]] = None,
- metadatas: Optional[list[dict]] = None,
- ingestion_config: Optional[dict] = None,
- collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
- run_with_orchestration: Optional[bool] = None,
- ) -> dict:
- """
- Update existing files in your R2R deployment.
- Args:
- file_paths (List[str]): List of file paths to update.
- document_ids (List[str]): List of document IDs to update.
- metadatas (Optional[List[dict]]): List of updated metadata dictionaries for each file.
- ingestion_config (Optional[Union[dict]]): Custom chunking configuration.
- Returns:
- dict: Update results containing processed, failed, and skipped documents.
- """
- if document_ids is not None and len(file_paths) != len(document_ids):
- raise ValueError(
- "Number of file paths must match number of document IDs."
- )
- if metadatas is not None and len(file_paths) != len(metadatas):
- raise ValueError(
- "Number of file paths must match number of document IDs."
- )
- with ExitStack() as stack:
- files = [
- (
- "files",
- (
- os.path.basename(file),
- stack.enter_context(open(file, "rb")),
- "application/octet-stream",
- ),
- )
- for file in file_paths
- ]
- data = {}
- if document_ids:
- data["document_ids"] = json.dumps(
- [str(doc_id) for doc_id in document_ids]
- )
- if metadatas:
- data["metadatas"] = json.dumps(metadatas)
- if ingestion_config:
- data["ingestion_config"] = json.dumps(ingestion_config)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration)
- if collection_ids:
- data["collection_ids"] = json.dumps(
- [
- [
- str(collection_id)
- for collection_id in doc_collection_ids
- ]
- for doc_collection_ids in collection_ids
- ]
- )
- return await self._make_request( # type: ignore
- "POST", "update_files", data=data, files=files
- )
- @deprecated("Use client.chunks.create() instead")
- async def ingest_chunks(
- self,
- chunks: list[dict],
- document_id: Optional[UUID] = None,
- metadata: Optional[dict] = None,
- collection_ids: Optional[list[list[Union[str, UUID]]]] = None,
- run_with_orchestration: Optional[bool] = None,
- ) -> dict:
- """
- Ingest files into your R2R deployment
- Args:
- chunks (List[dict]): List of dictionaries containing chunk data.
- document_id (Optional[UUID]): The ID of the document to ingest chunks into.
- metadata (Optional[dict]): Metadata dictionary for the document
- Returns:
- dict: Ingestion results containing processed, failed, and skipped documents.
- """
- data = {
- "chunks": chunks,
- "document_id": document_id,
- "metadata": metadata,
- }
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration) # type: ignore
- if collection_ids:
- data["collection_ids"] = json.dumps( # type: ignore
- [
- [
- str(collection_id)
- for collection_id in doc_collection_ids
- ]
- for doc_collection_ids in collection_ids
- ]
- )
- return await self._make_request("POST", "ingest_chunks", json=data) # type: ignore
- @deprecated("Use client.chunks.update() instead")
- async def update_chunks(
- self,
- document_id: UUID,
- chunk_id: UUID,
- text: str,
- metadata: Optional[dict] = None,
- run_with_orchestration: Optional[bool] = None,
- ) -> dict:
- """
- Update the content of an existing chunk.
- Args:
- document_id (UUID): The ID of the document containing the chunk.
- chunk_id (UUID): The ID of the chunk to update.
- text (str): The new text content of the chunk.
- metadata (Optional[dict]): Metadata dictionary for the chunk.
- run_with_orchestration (Optional[bool]): Whether to run the update through orchestration.
- Returns:
- dict: Update results containing processed, failed, and skipped documents.
- """
- data = {
- "text": text,
- "metadata": metadata,
- "run_with_orchestration": run_with_orchestration,
- }
- # Remove None values from payload
- data = {k: v for k, v in data.items() if v is not None}
- return await self._make_request("PUT", f"update_chunk/{document_id}/{chunk_id}", json=data) # type: ignore
- @deprecated("Use client.indices.create() instead")
- async def create_vector_index(
- self,
- table_name: VectorTableName = VectorTableName.CHUNKS,
- index_method: IndexMethod = IndexMethod.hnsw,
- index_measure: IndexMeasure = IndexMeasure.cosine_distance,
- index_arguments: Optional[dict] = None,
- index_name: Optional[str] = None,
- index_column: Optional[list[str]] = None,
- concurrently: bool = True,
- ) -> dict:
- """
- Create a vector index for a given table.
- Args:
- table_name (VectorTableName): Name of the table to create index on
- index_method (IndexMethod): Method to use for indexing (hnsw or ivf_flat)
- index_measure (IndexMeasure): Distance measure to use
- index_arguments (Optional[dict]): Additional arguments for the index
- index_name (Optional[str]): Custom name for the index
- concurrently (bool): Whether to create the index concurrently
- Returns:
- dict: Response containing the creation status
- """
- data = {
- "table_name": table_name,
- "index_method": index_method,
- "index_measure": index_measure,
- "index_arguments": index_arguments,
- "index_name": index_name,
- "index_column": index_column,
- "concurrently": concurrently,
- }
- return await self._make_request( # type: ignore
- "POST", "create_vector_index", json=data
- )
- @deprecated("Use client.indices.list() instead")
- async def list_vector_indices(
- self,
- table_name: VectorTableName = VectorTableName.CHUNKS,
- ) -> dict:
- """
- List all vector indices for a given table.
- Args:
- table_name (VectorTableName): Name of the table to list indices from
- Returns:
- dict: Response containing the list of indices
- """
- params = {"table_name": table_name}
- return await self._make_request( # type: ignore
- "GET", "list_vector_indices", params=params
- )
- @deprecated("Use client.indices.delete() instead")
- async def delete_vector_index(
- self,
- index_name: str,
- table_name: VectorTableName = VectorTableName.CHUNKS,
- concurrently: bool = True,
- ) -> dict:
- """
- Delete a vector index from a given table.
- Args:
- index_name (str): Name of the index to delete
- table_name (VectorTableName): Name of the table containing the index
- concurrently (bool): Whether to delete the index concurrently
- Returns:
- dict: Response containing the deletion status
- """
- data = {
- "index_name": index_name,
- "table_name": table_name,
- "concurrently": concurrently,
- }
- return await self._make_request( # type: ignore
- "DELETE", "delete_vector_index", json=data
- )
- @deprecated("Use client.documents.update() instead")
- async def update_document_metadata(
- self,
- document_id: Union[str, UUID],
- metadata: dict,
- ) -> dict:
- """
- Update the metadata of an existing document.
- Args:
- document_id (Union[str, UUID]): The ID of the document to update.
- metadata (dict): The new metadata to merge with existing metadata.
- run_with_orchestration (Optional[bool]): Whether to run the update through orchestration.
- Returns:
- dict: Update results containing the status of the metadata update.
- """
- data = {
- "metadata": metadata,
- }
- # Remove None values from payload
- data = {k: v for k, v in data.items() if v is not None}
- return await self._make_request( # type: ignore
- "POST", f"update_document_metadata/{document_id}", json=metadata
- )
|