123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784 |
- import json
- import os
- import tempfile
- from datetime import datetime
- from io import BytesIO
- from pathlib import Path
- from typing import Any, Optional
- from uuid import UUID
- import requests
- from shared.abstractions import R2RClientException
- from shared.api.models import (
- WrappedBooleanResponse,
- WrappedChunksResponse,
- WrappedCollectionsResponse,
- WrappedDocumentResponse,
- WrappedDocumentSearchResponse,
- WrappedDocumentsResponse,
- WrappedEntitiesResponse,
- WrappedGenericMessageResponse,
- WrappedIngestionResponse,
- WrappedRelationshipsResponse,
- )
- from ..models import (
- GraphCreationSettings,
- IngestionMode,
- SearchMode,
- SearchSettings,
- )
- class DocumentsSDK:
- """SDK for interacting with documents in the v3 API."""
- def __init__(self, client):
- self.client = client
- def create(
- self,
- file_path: Optional[str] = None,
- raw_text: Optional[str] = None,
- chunks: Optional[list[str]] = None,
- s3_url: Optional[str] = None,
- id: Optional[str | UUID] = None,
- ingestion_mode: Optional[IngestionMode | str] = None,
- collection_ids: Optional[list[str | UUID]] = None,
- metadata: Optional[dict[str, Any]] = None,
- ingestion_config: Optional[dict | IngestionMode] = None,
- run_with_orchestration: Optional[bool] = True,
- ) -> WrappedIngestionResponse:
- """Create a new document from either a file, raw text, or chunks.
- Args:
- file_path (Optional[str]): The path to the file to upload, if any.
- raw_text (Optional[str]): Raw text content to upload, if no file path is provided.
- chunks (Optional[list[str]]): Pre-processed text chunks to ingest.
- s3_url (Optional[str]): A presigned S3 URL to upload the file from, if any.
- id (Optional[str | UUID]): Optional ID to assign to the document.
- ingestion_mode (Optional[IngestionMode | str]): The ingestion mode preset ('hi-res', 'ocr', 'fast', 'custom'). Defaults to 'custom'.
- collection_ids (Optional[list[str | UUID]]): Collection IDs to associate. Defaults to user's default collection if None.
- metadata (Optional[dict]): Optional metadata to assign to the document.
- ingestion_config (Optional[dict | IngestionMode]): Optional ingestion config or preset mode enum. Used when ingestion_mode='custom'.
- run_with_orchestration (Optional[bool]): Whether to run with orchestration (default: True).
- Returns:
- WrappedIngestionResponse
- """
- if (
- sum(x is not None for x in [file_path, raw_text, chunks, s3_url])
- != 1
- ):
- raise ValueError(
- "Exactly one of file_path, raw_text, chunks, or s3_url must be provided."
- )
- data: dict[str, Any] = {}
- files = None
- if id:
- data["id"] = str(id)
- if metadata:
- data["metadata"] = json.dumps(metadata)
- if ingestion_config:
- if isinstance(ingestion_config, IngestionMode):
- ingestion_config = {"mode": ingestion_config.value}
- app_config: dict[str, Any] = (
- {}
- if isinstance(ingestion_config, dict)
- else ingestion_config["app"]
- )
- ingestion_config = dict(ingestion_config)
- ingestion_config["app"] = app_config
- data["ingestion_config"] = json.dumps(ingestion_config)
- if collection_ids:
- collection_ids = [
- str(collection_id) for collection_id in collection_ids
- ]
- data["collection_ids"] = json.dumps(collection_ids)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration)
- if ingestion_mode is not None:
- data["ingestion_mode"] = (
- ingestion_mode.value
- if isinstance(ingestion_mode, IngestionMode)
- else ingestion_mode
- )
- if file_path:
- # Create a new file instance that will remain open during the request
- file_instance = open(file_path, "rb")
- filename = os.path.basename(file_path)
- files = [
- (
- "file",
- (filename, file_instance, "application/octet-stream"),
- )
- ]
- try:
- response_dict = self.client._make_request(
- "POST",
- "documents",
- data=data,
- files=files,
- version="v3",
- )
- finally:
- # Ensure we close the file after the request is complete
- file_instance.close()
- elif raw_text:
- data["raw_text"] = raw_text
- response_dict = self.client._make_request(
- "POST",
- "documents",
- data=data,
- version="v3",
- )
- elif chunks:
- data["chunks"] = json.dumps(chunks)
- response_dict = self.client._make_request(
- "POST",
- "documents",
- data=data,
- version="v3",
- )
- elif s3_url:
- try:
- s3_file = requests.get(s3_url)
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- temp_file_path = temp_file.name
- temp_file.write(s3_file.content)
- # Get the filename from the URL
- filename = os.path.basename(s3_url.split("?")[0]) or "s3_file"
- with open(temp_file_path, "rb") as file_instance:
- files = [
- (
- "file",
- (
- filename,
- file_instance,
- "application/octet-stream",
- ),
- )
- ]
- response_dict = self.client._make_request(
- "POST",
- "documents",
- data=data,
- files=files,
- version="v3",
- )
- except requests.RequestException as e:
- raise R2RClientException(
- f"Failed to download file from S3 URL: {s3_url}"
- ) from e
- finally:
- # Clean up the temporary file
- if os.path.exists(temp_file_path):
- os.unlink(temp_file_path)
- return WrappedIngestionResponse(**response_dict)
- def append_metadata(
- self,
- id: str | UUID,
- metadata: list[dict[str, Any]],
- ) -> WrappedDocumentResponse:
- """Append metadata to a document.
- Args:
- id (str | UUID): ID of document to append metadata to
- metadata (list[dict]): Metadata to append
- Returns:
- WrappedDocumentResponse
- """
- data = json.dumps(metadata)
- response_dict = self.client._make_request(
- "PATCH",
- f"documents/{str(id)}/metadata",
- data=data,
- version="v3",
- )
- return WrappedDocumentResponse(**response_dict)
- def replace_metadata(
- self,
- id: str | UUID,
- metadata: list[dict[str, Any]],
- ) -> WrappedDocumentResponse:
- """Replace metadata for a document.
- Args:
- id (str | UUID): ID of document to replace metadata for
- metadata (list[dict]): The metadata that will replace the existing metadata
- Returns:
- WrappedDocumentResponse
- """
- data = json.dumps(metadata)
- response_dict = self.client._make_request(
- "PUT",
- f"documents/{str(id)}/metadata",
- data=data,
- version="v3",
- )
- return WrappedDocumentResponse(**response_dict)
- def retrieve(
- self,
- id: str | UUID,
- ) -> WrappedDocumentResponse:
- """Get a specific document by ID.
- Args:
- id (str | UUID): ID of document to retrieve
- Returns:
- WrappedDocumentResponse
- """
- response_dict = self.client._make_request(
- "GET",
- f"documents/{str(id)}",
- version="v3",
- )
- return WrappedDocumentResponse(**response_dict)
- def download(
- self,
- id: str | UUID,
- ) -> BytesIO:
- """Download a document's original file content.
- Args:
- id (str | UUID): ID of document to download
- Returns:
- BytesIO: In-memory bytes buffer containing the document's file content.
- """
- response = self.client._make_request(
- "GET",
- f"documents/{str(id)}/download",
- version="v3",
- )
- if not isinstance(response, BytesIO):
- raise ValueError(
- f"Expected BytesIO response, got {type(response)}"
- )
- return response
- def download_zip(
- self,
- document_ids: Optional[list[str | UUID]] = None,
- start_date: Optional[datetime] = None,
- end_date: Optional[datetime] = None,
- output_path: Optional[str | Path] = None,
- ) -> Optional[BytesIO]:
- """Download multiple documents as a zip file.
- Args:
- document_ids (Optional[list[str | UUID]]): IDs to include. May be required for non-superusers.
- start_date (Optional[datetime]): Filter documents created on or after this date.
- end_date (Optional[datetime]): Filter documents created on or before this date.
- output_path (Optional[str | Path]): If provided, save the zip file to this path and return None. Otherwise, return BytesIO.
- Returns:
- Optional[BytesIO]: BytesIO object with zip content if output_path is None, else None.
- """
- params: dict[str, Any] = {}
- if document_ids:
- params["document_ids"] = [str(doc_id) for doc_id in document_ids]
- if start_date:
- params["start_date"] = start_date.isoformat()
- if end_date:
- params["end_date"] = end_date.isoformat()
- response = self.client._make_request(
- "GET",
- "documents/download_zip",
- params=params,
- version="v3",
- )
- if not isinstance(response, BytesIO):
- raise ValueError(
- f"Expected BytesIO response, got {type(response)}"
- )
- if output_path:
- output_path = (
- Path(output_path)
- if isinstance(output_path, str)
- else output_path
- )
- with open(output_path, "wb") as f:
- f.write(response.getvalue())
- return None
- return response
- def export(
- self,
- output_path: str | Path,
- columns: Optional[list[str]] = None,
- filters: Optional[dict[str, Any]] = None,
- include_header: bool = True,
- ) -> None:
- """Export documents to a CSV file, streaming the results directly to
- disk.
- Args:
- output_path (str | Path): Local path where the CSV file should be saved
- columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
- filters (Optional[dict]): Optional filters to apply when selecting documents
- include_header (bool): Whether to include column headers in the CSV (default: True)
- Returns:
- None
- """
- output_path = (
- str(output_path) if isinstance(output_path, Path) else output_path
- )
- data: dict[str, Any] = {"include_header": include_header}
- if columns:
- data["columns"] = columns
- if filters:
- data["filters"] = filters
- with open(output_path, "wb") as f:
- response = self.client.client.post(
- f"{self.client.base_url}/v3/documents/export",
- json=data,
- headers={
- "Accept": "text/csv",
- **self.client._get_auth_header(),
- },
- )
- if response.status_code != 200:
- raise ValueError(
- f"Export failed with status {response.status_code}",
- response,
- )
- for chunk in response.iter_bytes():
- if chunk:
- f.write(chunk)
- def export_entities(
- self,
- id: str | UUID,
- output_path: str | Path,
- columns: Optional[list[str]] = None,
- filters: Optional[dict] = None,
- include_header: bool = True,
- ) -> None:
- """Export entities to a CSV file, streaming the results directly to
- disk.
- Args:
- output_path (str | Path): Local path where the CSV file should be saved
- columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
- filters (Optional[dict]): Optional filters to apply when selecting documents
- include_header (bool): Whether to include column headers in the CSV (default: True)
- Returns:
- None
- """
- # Convert path to string if it's a Path object
- output_path = (
- str(output_path) if isinstance(output_path, Path) else output_path
- )
- # Prepare request data
- data: dict[str, Any] = {"include_header": include_header}
- if columns:
- data["columns"] = columns
- if filters:
- data["filters"] = filters
- # Stream response directly to file
- with open(output_path, "wb") as f:
- response = self.client.client.post(
- f"{self.client.base_url}/v3/documents/{str(id)}/entities/export",
- json=data,
- headers={
- "Accept": "text/csv",
- **self.client._get_auth_header(),
- },
- )
- if response.status_code != 200:
- raise ValueError(
- f"Export failed with status {response.status_code}",
- response,
- )
- for chunk in response.iter_bytes():
- if chunk:
- f.write(chunk)
- def export_relationships(
- self,
- id: str | UUID,
- output_path: str | Path,
- columns: Optional[list[str]] = None,
- filters: Optional[dict] = None,
- include_header: bool = True,
- ) -> None:
- """Export document relationships to a CSV file, streaming the results
- directly to disk.
- Args:
- output_path (str | Path): Local path where the CSV file should be saved
- columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
- filters (Optional[dict]): Optional filters to apply when selecting documents
- include_header (bool): Whether to include column headers in the CSV (default: True)
- Returns:
- None
- """
- # Convert path to string if it's a Path object
- output_path = (
- str(output_path) if isinstance(output_path, Path) else output_path
- )
- # Prepare request data
- data: dict[str, Any] = {"include_header": include_header}
- if columns:
- data["columns"] = columns
- if filters:
- data["filters"] = filters
- # Stream response directly to file
- with open(output_path, "wb") as f:
- response = self.client.client.post(
- f"{self.client.base_url}/v3/documents/{str(id)}/relationships/export",
- json=data,
- headers={
- "Accept": "text/csv",
- **self.client._get_auth_header(),
- },
- )
- if response.status_code != 200:
- raise ValueError(
- f"Export failed with status {response.status_code}",
- response,
- )
- for chunk in response.iter_bytes():
- if chunk:
- f.write(chunk)
- def delete(
- self,
- id: str | UUID,
- ) -> WrappedBooleanResponse:
- """Delete a specific document.
- Args:
- id (str | UUID): ID of document to delete
- Returns:
- WrappedBooleanResponse
- """
- response_dict = self.client._make_request(
- "DELETE",
- f"documents/{str(id)}",
- version="v3",
- )
- return WrappedBooleanResponse(**response_dict)
- def list_chunks(
- self,
- id: str | UUID,
- include_vectors: Optional[bool] = False,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedChunksResponse:
- """Get chunks for a specific document.
- Args:
- id (str | UUID): ID of document to retrieve chunks for
- include_vectors (Optional[bool]): Whether to include vector embeddings in the response
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- WrappedChunksResponse
- """
- params = {
- "offset": offset,
- "limit": limit,
- "include_vectors": include_vectors,
- }
- response_dict = self.client._make_request(
- "GET",
- f"documents/{str(id)}/chunks",
- params=params,
- version="v3",
- )
- return WrappedChunksResponse(**response_dict)
- def list_collections(
- self,
- id: str | UUID,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedCollectionsResponse:
- """List collections for a specific document.
- Args:
- id (str | UUID): ID of document to retrieve collections for
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- WrappedCollectionsResponse
- """
- params = {
- "offset": offset,
- "limit": limit,
- }
- response_dict = self.client._make_request(
- "GET",
- f"documents/{str(id)}/collections",
- params=params,
- version="v3",
- )
- return WrappedCollectionsResponse(**response_dict)
- def delete_by_filter(
- self,
- filters: dict[str, Any],
- ) -> WrappedBooleanResponse:
- """Delete documents based on metadata filters.
- Args:
- filters (dict): Filters to apply (e.g., `{"metadata.year": {"$lt": 2020}}`).
- Returns:
- WrappedBooleanResponse
- """
- filters_json = json.dumps(filters)
- response_dict = self.client._make_request(
- "DELETE",
- "documents/by-filter",
- data=filters_json,
- version="v3",
- )
- return WrappedBooleanResponse(**response_dict)
- def extract(
- self,
- id: str | UUID,
- settings: Optional[dict | GraphCreationSettings] = None,
- run_with_orchestration: Optional[bool] = True,
- ) -> WrappedGenericMessageResponse:
- """Extract entities and relationships from a document.
- Args:
- id (str, UUID): ID of document to extract from
- settings (Optional[dict]): Settings for extraction process
- run_with_orchestration (Optional[bool]): Whether to run with orchestration
- Returns:
- WrappedGenericMessageResponse
- """
- data: dict[str, Any] = {}
- if settings:
- data["settings"] = json.dumps(settings)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = str(run_with_orchestration)
- response_dict = self.client._make_request(
- "POST",
- f"documents/{str(id)}/extract",
- params=data,
- version="v3",
- )
- return WrappedGenericMessageResponse(**response_dict)
- def list_entities(
- self,
- id: str | UUID,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- include_embeddings: Optional[bool] = False,
- ) -> WrappedEntitiesResponse:
- """List entities extracted from a document.
- Args:
- id (str | UUID): ID of document to get entities from
- offset (Optional[int]): Number of items to skip
- limit (Optional[int]): Max number of items to return
- include_embeddings (Optional[bool]): Whether to include embeddings
- Returns:
- WrappedEntitiesResponse
- """
- params = {
- "offset": offset,
- "limit": limit,
- "include_embeddings": include_embeddings,
- }
- response_dict = self.client._make_request(
- "GET",
- f"documents/{str(id)}/entities",
- params=params,
- version="v3",
- )
- return WrappedEntitiesResponse(**response_dict)
- def list_relationships(
- self,
- id: str | UUID,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- entity_names: Optional[list[str]] = None,
- relationship_types: Optional[list[str]] = None,
- ) -> WrappedRelationshipsResponse:
- """List relationships extracted from a document.
- Args:
- id (str | UUID): ID of document to get relationships from
- offset (Optional[int]): Number of items to skip
- limit (Optional[int]): Max number of items to return
- entity_names (Optional[list[str]]): Filter by entity names
- relationship_types (Optional[list[str]]): Filter by relationship types
- Returns:
- WrappedRelationshipsResponse
- """
- params: dict[str, Any] = {
- "offset": offset,
- "limit": limit,
- }
- if entity_names:
- params["entity_names"] = entity_names
- if relationship_types:
- params["relationship_types"] = relationship_types
- response_dict = self.client._make_request(
- "GET",
- f"documents/{str(id)}/relationships",
- params=params,
- version="v3",
- )
- return WrappedRelationshipsResponse(**response_dict)
- def list(
- self,
- ids: Optional[list[str | UUID]] = None,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- include_summary_embeddings: Optional[bool] = False,
- owner_only: Optional[bool] = False,
- ) -> WrappedDocumentsResponse:
- """List documents with pagination.
- Args:
- ids (Optional[list[str | UUID]]): Optional list of document IDs to filter by.
- offset (int, optional): Number of objects to skip. Defaults to 0.
- limit (int, optional): Max number of objects to return (1-1000). Defaults to 100.
- include_summary_embeddings (Optional[bool]): Whether to include summary embeddings (default: False).
- owner_only (Optional[bool]): If true, only returns documents owned by the user, not all accessible documents.
- Returns:
- WrappedDocumentsResponse
- """
- params: dict[str, Any] = {
- "offset": offset,
- "limit": limit,
- "include_summary_embeddings": include_summary_embeddings,
- "owner_only": owner_only,
- }
- if ids:
- params["ids"] = [str(doc_id) for doc_id in ids]
- response_dict = self.client._make_request(
- "GET",
- "documents",
- params=params,
- version="v3",
- )
- return WrappedDocumentsResponse(**response_dict)
- def search(
- self,
- query: str,
- search_mode: Optional[str | SearchMode] = SearchMode.custom,
- search_settings: Optional[dict | SearchSettings] = None,
- ) -> WrappedDocumentSearchResponse:
- """Conduct a search query on document summaries.
- Args:
- query (str): The search query.
- search_mode (Optional[str | SearchMode]): Search mode ('basic', 'advanced', 'custom'). Defaults to 'custom'.
- search_settings (Optional[dict | SearchSettings]): Search settings (filters, limits, hybrid options, etc.).
- Returns:
- WrappedDocumentSearchResponse
- """
- if search_settings and not isinstance(search_settings, dict):
- search_settings = search_settings.model_dump()
- data: dict[str, Any] = {
- "query": query,
- "search_settings": search_settings,
- }
- if search_mode:
- data["search_mode"] = search_mode
- response_dict = self.client._make_request(
- "POST",
- "documents/search",
- json=data,
- version="v3",
- )
- return WrappedDocumentSearchResponse(**response_dict)
- def deduplicate(
- self,
- id: str | UUID,
- settings: Optional[dict | GraphCreationSettings] = None,
- run_with_orchestration: Optional[bool] = True,
- ) -> WrappedGenericMessageResponse:
- """Deduplicate entities and relationships from a document.
- Args:
- id (str | UUID): ID of document to deduplicate entities for.
- settings (Optional[dict | GraphCreationSettings]): Settings for deduplication process.
- run_with_orchestration (Optional[bool]): Whether to run with orchestration (default: True).
- Returns:
- WrappedGenericMessageResponse: Indicating task status.
- """
- data: dict[str, Any] = {}
- if settings:
- data["settings"] = json.dumps(settings)
- if run_with_orchestration is not None:
- data["run_with_orchestration"] = run_with_orchestration
- response_dict = self.client._make_request(
- "POST",
- f"documents/{str(id)}/deduplicate",
- params=data,
- version="v3",
- )
- return WrappedGenericMessageResponse(**response_dict)
|