123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- import json
- from io import BytesIO
- from typing import Any, Optional
- from uuid import UUID
- from shared.api.models.base import WrappedBooleanResponse
- from shared.api.models.ingestion.responses import WrappedIngestionResponse
- from shared.api.models.management.responses import (
- WrappedChunksResponse,
- WrappedCollectionsResponse,
- WrappedDocumentResponse,
- WrappedDocumentsResponse,
- )
- from ..models import IngestionMode, SearchMode, SearchSettings
- class DocumentsSDK:
- """
- SDK for interacting with documents in the v3 API.
- """
- def __init__(self, client):
- self.client = client
- async def create(
- self,
- file_path: Optional[str] = None,
- raw_text: Optional[str] = None,
- chunks: Optional[list[str]] = None,
- id: Optional[str | UUID] = None,
- ingestion_mode: Optional[str] = None,
- collection_ids: Optional[list[str | UUID]] = None,
- metadata: Optional[dict] = None,
- ingestion_config: Optional[dict | IngestionMode] = None,
- run_with_orchestration: Optional[bool] = True,
- ) -> WrappedIngestionResponse:
- """
- Create a new document from either a file or content.
- Args:
- file_path (Optional[str]): The file to upload, if any
- content (Optional[str]): Optional text content to upload, if no file path is provided
- id (Optional[Union[str, UUID]]): Optional ID to assign to the document
- collection_ids (Optional[list[Union[str, UUID]]]): Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.
- metadata (Optional[dict]): Optional metadata to assign to the document
- ingestion_config (Optional[dict]): Optional ingestion configuration to use
- run_with_orchestration (Optional[bool]): Whether to run with orchestration
- """
- if not file_path and not raw_text and not chunks:
- raise ValueError(
- "Either `file_path`, `raw_text` or `chunks` must be provided"
- )
- if (
- (file_path and raw_text)
- or (file_path and chunks)
- or (raw_text and chunks)
- ):
- raise ValueError(
- "Only one of `file_path`, `raw_text` or `chunks` may be provided"
- )
- data = {}
- files = None
- if id:
- data["id"] = str(id) # json.dumps(str(id))
- if metadata:
- data["metadata"] = json.dumps(metadata)
- if ingestion_config:
- if not isinstance(ingestion_config, dict):
- ingestion_config = ingestion_config.model_dump()
- ingestion_config["app"] = {}
- data["ingestion_config"] = json.dumps(ingestion_config)
- if collection_ids:
- collection_ids = [str(collection_id) for collection_id in collection_ids] # type: ignore
- data["collection_ids"] = json.dumps(collection_ids)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration)
- if ingestion_mode is not None:
- data["ingestion_mode"] = ingestion_mode
- if file_path:
- # Create a new file instance that will remain open during the request
- file_instance = open(file_path, "rb")
- files = [
- (
- "file",
- (file_path, file_instance, "application/octet-stream"),
- )
- ]
- try:
- result = await self.client._make_request(
- "POST",
- "documents",
- data=data,
- files=files,
- version="v3",
- )
- finally:
- # Ensure we close the file after the request is complete
- file_instance.close()
- return result
- elif raw_text:
- data["raw_text"] = raw_text # type: ignore
- return await self.client._make_request(
- "POST",
- "documents",
- data=data,
- version="v3",
- )
- else:
- data["chunks"] = json.dumps(chunks)
- return await self.client._make_request(
- "POST",
- "documents",
- data=data,
- version="v3",
- )
- async def retrieve(
- self,
- id: str | UUID,
- ) -> WrappedDocumentResponse:
- """
- Get a specific document by ID.
- Args:
- id (Union[str, UUID]): ID of document to retrieve
- Returns:
- dict: Document information
- """
- return await self.client._make_request(
- "GET",
- f"documents/{str(id)}",
- version="v3",
- )
- # you could do something like:
- async def download(
- self,
- id: str | UUID,
- ) -> BytesIO:
- response = await self.client._make_request(
- "GET",
- f"documents/{str(id)}/download",
- version="v3",
- # No json parsing here, if possible
- )
- if not isinstance(response, BytesIO):
- raise ValueError("Expected BytesIO response")
- return response
- async def delete(
- self,
- id: str | UUID,
- ) -> WrappedBooleanResponse:
- """
- Delete a specific document.
- Args:
- id (Union[str, UUID]): ID of document to delete
- """
- return await self.client._make_request(
- "DELETE",
- f"documents/{str(id)}",
- version="v3",
- )
- async def list_chunks(
- self,
- id: str | UUID,
- include_vectors: Optional[bool] = False,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedChunksResponse:
- """
- Get chunks for a specific document.
- Args:
- id (Union[str, UUID]): ID of document to retrieve chunks for
- include_vectors (Optional[bool]): Whether to include vector embeddings in the response
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- dict: List of document chunks and pagination information
- """
- params = {
- "offset": offset,
- "limit": limit,
- "include_vectors": include_vectors,
- }
- return await self.client._make_request(
- "GET",
- f"documents/{str(id)}/chunks",
- params=params,
- version="v3",
- )
- async def list_collections(
- self,
- id: str | UUID,
- include_vectors: Optional[bool] = False,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedCollectionsResponse:
- """
- List collections for a specific document.
- Args:
- id (Union[str, UUID]): ID of document to retrieve collections for
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- dict: List of document chunks and pagination information
- """
- params = {
- "offset": offset,
- "limit": limit,
- }
- return await self.client._make_request(
- "GET",
- f"documents/{str(id)}/collections",
- params=params,
- version="v3",
- )
- async def delete_by_filter(
- self,
- filters: dict,
- ) -> WrappedBooleanResponse:
- """
- Delete documents based on filters.
- Args:
- filters (dict): Filters to apply when selecting documents to delete
- """
- filters_json = json.dumps(filters)
- return await self.client._make_request(
- "DELETE",
- "documents/by-filter",
- data=filters_json,
- # params={"filters": filters_json},
- # data=filters,
- version="v3",
- )
- async def extract(
- self,
- id: str | UUID,
- run_type: Optional[str] = None,
- settings: Optional[dict] = None,
- run_with_orchestration: Optional[bool] = True,
- ) -> dict:
- """
- Extract entities and relationships from a document.
- Args:
- id (Union[str, UUID]): ID of document to extract from
- run_type (Optional[str]): Whether to return an estimate or run extraction
- settings (Optional[dict]): Settings for extraction process
- run_with_orchestration (Optional[bool]): Whether to run with orchestration
- Returns:
- dict: Extraction results or cost estimate
- """
- data = {}
- if run_type:
- data["run_type"] = run_type
- if settings:
- data["settings"] = json.dumps(settings)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration)
- return await self.client._make_request(
- "POST",
- f"documents/{str(id)}/extract",
- params=data,
- version="v3",
- )
- async def list_entities(
- self,
- id: str | UUID,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- include_embeddings: Optional[bool] = False,
- ) -> dict:
- """
- List entities extracted from a document.
- Args:
- id (Union[str, UUID]): ID of document to get entities from
- offset (Optional[int]): Number of items to skip
- limit (Optional[int]): Max number of items to return
- include_embeddings (Optional[bool]): Whether to include embeddings
- Returns:
- dict: List of entities and pagination info
- """
- params = {
- "offset": offset,
- "limit": limit,
- "include_embeddings": include_embeddings,
- }
- return await self.client._make_request(
- "GET",
- f"documents/{str(id)}/entities",
- params=params,
- version="v3",
- )
- async def list_relationships(
- self,
- id: str | UUID,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- entity_names: Optional[list[str]] = None,
- relationship_types: Optional[list[str]] = None,
- ) -> dict:
- """
- List relationships extracted from a document.
- Args:
- id (Union[str, UUID]): ID of document to get relationships from
- offset (Optional[int]): Number of items to skip
- limit (Optional[int]): Max number of items to return
- entity_names (Optional[list[str]]): Filter by entity names
- relationship_types (Optional[list[str]]): Filter by relationship types
- Returns:
- dict: List of relationships and pagination info
- """
- params: dict[str, Any] = {
- "offset": offset,
- "limit": limit,
- }
- if entity_names:
- params["entity_names"] = entity_names
- if relationship_types:
- params["relationship_types"] = relationship_types
- return await self.client._make_request(
- "GET",
- f"documents/{str(id)}/relationships",
- params=params,
- version="v3",
- )
- # async def extract(
- # self,
- # id: str | UUID,
- # run_type: Optional[str] = None,
- # run_with_orchestration: Optional[bool] = True,
- # ):
- # data = {}
- # if run_type:
- # data["run_type"] = run_type
- # if run_with_orchestration is not None:
- # data["run_with_orchestration"] = str(run_with_orchestration)
- # return await self.client._make_request(
- # "POST",
- # f"documents/{str(id)}/extract",
- # params=data,
- # version="v3",
- # )
- # Be sure to put at bottom of the page...
- async def list(
- self,
- ids: Optional[list[str | UUID]] = None,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedDocumentsResponse:
- """
- List documents with pagination.
- Args:
- ids (Optional[list[Union[str, UUID]]]): Optional list of document IDs to filter by
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- dict: List of documents and pagination information
- """
- params = {
- "offset": offset,
- "limit": limit,
- }
- if ids:
- params["ids"] = [str(doc_id) for doc_id in ids] # type: ignore
- return await self.client._make_request(
- "GET",
- "documents",
- params=params,
- version="v3",
- )
- async def search(
- self,
- query: str,
- search_mode: Optional[str | SearchMode] = "custom",
- search_settings: Optional[dict | SearchSettings] = None,
- ):
- """
- Conduct a vector and/or KG search.
- Args:
- query (str): The query to search for.
- search_settings (Optional[dict, SearchSettings]]): Vector search settings.
- Returns:
- CombinedSearchResponse: The search response.
- """
- # if search_mode and not isinstance(search_mode, str):
- # search_mode = search_mode.value
- if search_settings and not isinstance(search_settings, dict):
- search_settings = search_settings.model_dump()
- data = {
- "query": query,
- "search_settings": search_settings,
- }
- if search_mode:
- data["search_mode"] = search_mode
- return await self.client._make_request(
- "POST",
- "documents/search",
- json=data,
- version="v3",
- )
|