12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697 |
- import base64
- import json
- import logging
- import mimetypes
- import textwrap
- from io import BytesIO
- from typing import Any, Optional
- from uuid import UUID
- from fastapi import Body, Depends, File, Form, Path, Query, UploadFile
- from fastapi.responses import StreamingResponse
- from pydantic import Json
- from core.base import (
- IngestionConfig,
- IngestionMode,
- R2RException,
- SearchMode,
- SearchSettings,
- UnprocessedChunk,
- Workflow,
- generate_document_id,
- generate_id,
- select_search_filters,
- )
- from core.base.abstractions import KGCreationSettings, KGRunType
- from core.base.api.models import (
- GenericBooleanResponse,
- WrappedBooleanResponse,
- WrappedChunksResponse,
- WrappedCollectionsResponse,
- WrappedDocumentResponse,
- WrappedDocumentsResponse,
- WrappedEntitiesResponse,
- WrappedGenericMessageResponse,
- WrappedIngestionResponse,
- WrappedRelationshipsResponse,
- )
- from core.utils import update_settings_from_dict
- from ...abstractions import R2RProviders, R2RServices
- from .base_router import BaseRouterV3
- logger = logging.getLogger()
- MAX_CHUNKS_PER_REQUEST = 1024 * 100
- def merge_search_settings(
- base: SearchSettings, overrides: SearchSettings
- ) -> SearchSettings:
- # Convert both to dict
- base_dict = base.model_dump()
- overrides_dict = overrides.model_dump(exclude_unset=True)
- # Update base_dict with values from overrides_dict
- # This ensures that any field set in overrides takes precedence
- for k, v in overrides_dict.items():
- base_dict[k] = v
- # Construct a new SearchSettings from the merged dict
- return SearchSettings(**base_dict)
- def merge_ingestion_config(
- base: IngestionConfig, overrides: IngestionConfig
- ) -> IngestionConfig:
- base_dict = base.model_dump()
- overrides_dict = overrides.model_dump(exclude_unset=True)
- for k, v in overrides_dict.items():
- base_dict[k] = v
- return IngestionConfig(**base_dict)
- class DocumentsRouter(BaseRouterV3):
- def __init__(
- self,
- providers: R2RProviders,
- services: R2RServices,
- ):
- super().__init__(providers, services)
- self._register_workflows()
- def _prepare_search_settings(
- self,
- auth_user: Any,
- search_mode: SearchMode,
- search_settings: Optional[SearchSettings],
- ) -> SearchSettings:
- """
- Prepare the effective search settings based on the provided search_mode,
- optional user-overrides in search_settings, and applied filters.
- """
- if search_mode != SearchMode.custom:
- # Start from mode defaults
- effective_settings = SearchSettings.get_default(search_mode.value)
- if search_settings:
- # Merge user-provided overrides
- effective_settings = merge_search_settings(
- effective_settings, search_settings
- )
- else:
- # Custom mode: use provided settings or defaults
- effective_settings = search_settings or SearchSettings()
- # Apply user-specific filters
- effective_settings.filters = select_search_filters(
- auth_user, effective_settings
- )
- return effective_settings
- # TODO - Remove this legacy method
- def _register_workflows(self):
- self.providers.orchestration.register_workflows(
- Workflow.INGESTION,
- self.services.ingestion,
- {
- "ingest-files": (
- "Ingest files task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document created and ingested successfully."
- ),
- "ingest-chunks": (
- "Ingest chunks task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document created and ingested successfully."
- ),
- "update-files": (
- "Update file task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Update task queued successfully."
- ),
- "update-chunk": (
- "Update chunk task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Chunk update completed successfully."
- ),
- "update-document-metadata": (
- "Update document metadata task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document metadata update completed successfully."
- ),
- "create-vector-index": (
- "Vector index creation task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index creation task completed successfully."
- ),
- "delete-vector-index": (
- "Vector index deletion task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index deletion task completed successfully."
- ),
- "select-vector-index": (
- "Vector index selection task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index selection task completed successfully."
- ),
- },
- )
- def _prepare_ingestion_config(
- self,
- ingestion_mode: IngestionMode,
- ingestion_config: Optional[IngestionConfig],
- ) -> IngestionConfig:
- # If not custom, start from defaults
- if ingestion_mode != IngestionMode.custom:
- effective_config = IngestionConfig.get_default(
- ingestion_mode.value, app=self.providers.auth.config.app
- )
- if ingestion_config:
- effective_config = merge_ingestion_config(
- effective_config, ingestion_config
- )
- else:
- # custom mode
- effective_config = ingestion_config or IngestionConfig(
- app=self.providers.auth.config.app
- )
- effective_config.validate_config()
- return effective_config
- def _setup_routes(self):
- @self.router.post(
- "/documents",
- dependencies=[Depends(self.rate_limit_dependency)],
- status_code=202,
- summary="Create a new document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.create(
- file_path="pg_essay_1.html",
- metadata={"metadata_1":"some random metadata"},
- id=None
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.create({
- file: { path: "examples/data/marmeladov.txt", name: "marmeladov.txt" },
- metadata: { title: "marmeladov.txt" },
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents create /path/to/file.txt
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X POST "https://api.example.com/v3/documents" \\
- -H "Content-Type: multipart/form-data" \\
- -H "Authorization: Bearer YOUR_API_KEY" \\
- -F "file=@pg_essay_1.html;type=text/html" \\
- -F 'metadata={}' \\
- -F 'id=null'
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def create_document(
- file: Optional[UploadFile] = File(
- None,
- description="The file to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- raw_text: Optional[str] = Form(
- None,
- description="Raw text content to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- chunks: Optional[Json[list[str]]] = Form(
- None,
- description="Pre-processed text chunks to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- id: Optional[UUID] = Form(
- None,
- description="The ID of the document. If not provided, a new ID will be generated.",
- ),
- collection_ids: Optional[Json[list[UUID]]] = Form(
- None,
- description="Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.",
- ),
- metadata: Optional[Json[dict]] = Form(
- None,
- description="Metadata to associate with the document, such as title, description, or custom fields.",
- ),
- ingestion_mode: IngestionMode = Form(
- default=IngestionMode.custom,
- description=(
- "Ingestion modes:\n"
- "- `hi-res`: Thorough ingestion with full summaries and enrichment.\n"
- "- `fast`: Quick ingestion with minimal enrichment and no summaries.\n"
- "- `custom`: Full control via `ingestion_config`.\n\n"
- "If `filters` or `limit` (in `ingestion_config`) are provided alongside `hi-res` or `fast`, "
- "they will override the default settings for that mode."
- ),
- ),
- ingestion_config: Optional[Json[IngestionConfig]] = Form(
- None,
- description="An optional dictionary to override the default chunking configuration for the ingestion process. If not provided, the system will use the default server-side chunking configuration.",
- ),
- run_with_orchestration: Optional[bool] = Form(
- True,
- description="Whether or not ingestion runs with orchestration, default is `True`. When set to `False`, the ingestion process will run synchronous and directly return the result.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedIngestionResponse:
- """
- Creates a new Document object from an input file, text content, or chunks. The chosen `ingestion_mode` determines
- how the ingestion process is configured:
- **Ingestion Modes:**
- - `hi-res`: Comprehensive parsing and enrichment, including summaries and possibly more thorough parsing.
- - `fast`: Speed-focused ingestion that skips certain enrichment steps like summaries.
- - `custom`: Provide a full `ingestion_config` to customize the entire ingestion process.
- Either a file or text content must be provided, but not both. Documents are shared through `Collections` which allow for tightly specified cross-user interactions.
- The ingestion process runs asynchronously and its progress can be tracked using the returned
- task_id.
- """
- if not auth_user.is_superuser:
- user_document_count = (
- await self.services.management.documents_overview(
- user_ids=[auth_user.id],
- offset=0,
- limit=1,
- )
- )["total_entries"]
- user_max_documents = (
- await self.services.management.get_user_max_documents(
- auth_user.id
- )
- )
- if user_document_count >= user_max_documents:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of documents allowed ({user_max_documents}).",
- )
- # Get chunks using the vector handler's list_chunks method
- user_chunk_count = (
- await self.services.ingestion.list_chunks(
- filters={"owner_id": {"$eq": str(auth_user.id)}},
- offset=0,
- limit=1,
- )
- )["page_info"]["total_entries"]
- user_max_chunks = (
- await self.services.management.get_user_max_chunks(
- auth_user.id
- )
- )
- if user_chunk_count >= user_max_chunks:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of chunks allowed ({user_max_chunks}).",
- )
- user_collections_count = (
- await self.services.management.collections_overview(
- user_ids=[auth_user.id],
- offset=0,
- limit=1,
- )
- )["total_entries"]
- user_max_collections = (
- await self.services.management.get_user_max_collections(
- auth_user.id
- )
- )
- if user_collections_count >= user_max_collections:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of collections allowed ({user_max_collections}).",
- )
- effective_ingestion_config = self._prepare_ingestion_config(
- ingestion_mode=ingestion_mode,
- ingestion_config=ingestion_config,
- )
- if not file and not raw_text and not chunks:
- raise R2RException(
- status_code=422,
- message="Either a `file`, `raw_text`, or `chunks` must be provided.",
- )
- if (
- (file and raw_text)
- or (file and chunks)
- or (raw_text and chunks)
- ):
- raise R2RException(
- status_code=422,
- message="Only one of `file`, `raw_text`, or `chunks` may be provided.",
- )
- # Check if the user is a superuser
- metadata = metadata or {}
- if chunks:
- if len(chunks) == 0:
- raise R2RException("Empty list of chunks provided", 400)
- if len(chunks) > MAX_CHUNKS_PER_REQUEST:
- raise R2RException(
- f"Maximum of {MAX_CHUNKS_PER_REQUEST} chunks per request",
- 400,
- )
- document_id = generate_document_id(
- json.dumps(chunks), auth_user.id
- )
- # FIXME: Metadata doesn't seem to be getting passed through
- raw_chunks_for_doc = [
- UnprocessedChunk(
- text=chunk,
- metadata=metadata,
- id=generate_id(),
- )
- for chunk in chunks
- ]
- # Prepare workflow input
- workflow_input = {
- "document_id": str(document_id),
- "chunks": [
- chunk.model_dump(mode="json")
- for chunk in raw_chunks_for_doc
- ],
- "metadata": metadata, # Base metadata for the document
- "user": auth_user.model_dump_json(),
- "ingestion_config": effective_ingestion_config.model_dump(
- mode="json"
- ),
- }
- # TODO - Modify create_chunks so that we can add chunks to existing document
- if run_with_orchestration:
- # Run ingestion with orchestration
- raw_message = (
- await self.providers.orchestration.run_workflow(
- "ingest-chunks",
- {"request": workflow_input},
- options={
- "additional_metadata": {
- "document_id": str(document_id),
- }
- },
- )
- )
- raw_message["document_id"] = str(document_id)
- return raw_message
- else:
- logger.info(
- "Running chunk ingestion without orchestration."
- )
- from core.main.orchestration import (
- simple_ingestion_factory,
- )
- simple_ingestor = simple_ingestion_factory(
- self.services.ingestion
- )
- await simple_ingestor["ingest-chunks"](workflow_input)
- return { # type: ignore
- "message": "Document created and ingested successfully.",
- "document_id": str(document_id),
- "task_id": None,
- }
- else:
- if file:
- file_data = await self._process_file(file)
- content_length = len(file_data["content"])
- file_content = BytesIO(
- base64.b64decode(file_data["content"])
- )
- file_data.pop("content", None)
- document_id = id or generate_document_id(
- file_data["filename"], auth_user.id
- )
- elif raw_text:
- content_length = len(raw_text)
- file_content = BytesIO(raw_text.encode("utf-8"))
- document_id = id or generate_document_id(
- raw_text, auth_user.id
- )
- file_data = {
- "filename": "N/A",
- "content_type": "text/plain",
- }
- else:
- raise R2RException(
- status_code=422,
- message="Either a file or content must be provided.",
- )
- workflow_input = {
- "file_data": file_data,
- "document_id": str(document_id),
- "collection_ids": (
- [str(cid) for cid in collection_ids]
- if collection_ids
- else None
- ),
- "metadata": metadata,
- "ingestion_config": effective_ingestion_config.model_dump(
- mode="json"
- ),
- "user": auth_user.model_dump_json(),
- "size_in_bytes": content_length,
- }
- file_name = file_data["filename"]
- await self.providers.database.files_handler.store_file(
- document_id,
- file_name,
- file_content,
- file_data["content_type"],
- )
- if run_with_orchestration:
- raw_message: dict[str, str | None] = await self.providers.orchestration.run_workflow( # type: ignore
- "ingest-files",
- {"request": workflow_input},
- options={
- "additional_metadata": {
- "document_id": str(document_id),
- }
- },
- )
- raw_message["document_id"] = str(document_id)
- return raw_message # type: ignore
- else:
- logger.info(
- f"Running ingestion without orchestration for file {file_name} and document_id {document_id}."
- )
- # TODO - Clean up implementation logic here to be more explicitly `synchronous`
- from core.main.orchestration import simple_ingestion_factory
- simple_ingestor = simple_ingestion_factory(
- self.services.ingestion
- )
- await simple_ingestor["ingest-files"](workflow_input)
- return { # type: ignore
- "message": "Document created and ingested successfully.",
- "document_id": str(document_id),
- "task_id": None,
- }
- @self.router.get(
- "/documents",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List documents",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list(
- limit=10,
- offset=0
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.list({
- limit: 10,
- offset: 0,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents create /path/to/file.txt
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_documents(
- ids: list[str] = Query(
- [],
- description="A list of document IDs to retrieve. If not provided, all documents will be returned.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_summary_embeddings: int = Query(
- False,
- description="Specifies whether or not to include embeddings of each document summary.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedDocumentsResponse:
- """
- Returns a paginated list of documents the authenticated user has access to.
- Results can be filtered by providing specific document IDs. Regular users will only see
- documents they own or have access to through collections. Superusers can see all documents.
- The documents are returned in order of last modification, with most recent first.
- """
- requesting_user_id = (
- None if auth_user.is_superuser else [auth_user.id]
- )
- filter_collection_ids = (
- None if auth_user.is_superuser else auth_user.collection_ids
- )
- document_uuids = [UUID(document_id) for document_id in ids]
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=requesting_user_id,
- collection_ids=filter_collection_ids,
- document_ids=document_uuids,
- offset=offset,
- limit=limit,
- )
- )
- if not include_summary_embeddings:
- for document in documents_overview_response["results"]:
- document.summary_embedding = None
- return ( # type: ignore
- documents_overview_response["results"],
- {
- "total_entries": documents_overview_response[
- "total_entries"
- ]
- },
- )
- @self.router.get(
- "/documents/{id}",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Retrieve a document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.retrieve(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.retrieve({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents retrieve b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedDocumentResponse:
- """
- Retrieves detailed information about a specific document by its ID.
- This endpoint returns the document's metadata, status, and system information. It does not
- return the document's content - use the `/documents/{id}/download` endpoint for that.
- Users can only retrieve documents they own or have access to through collections.
- Superusers can retrieve any document.
- """
- request_user_ids = (
- None if auth_user.is_superuser else [auth_user.id]
- )
- filter_collection_ids = (
- None if auth_user.is_superuser else auth_user.collection_ids
- )
- documents_overview_response = await self.services.management.documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
- user_ids=request_user_ids,
- collection_ids=filter_collection_ids,
- document_ids=[id],
- offset=0,
- limit=100,
- )
- results = documents_overview_response["results"]
- if len(results) == 0:
- raise R2RException("Document not found.", 404)
- return results[0]
- @self.router.get(
- "/documents/{id}/chunks",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document chunks",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_chunks(
- id="32b6a70f-a995-5c51-85d2-834f06283a1e"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listChunks({
- id: "32b6a70f-a995-5c51-85d2-834f06283a1e",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-chunks b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/chunks" \\
- -H "Authorization: Bearer YOUR_API_KEY"\
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def list_chunks(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve chunks for.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_vectors: Optional[bool] = Query(
- False,
- description="Whether to include vector embeddings in the response.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedChunksResponse:
- """
- Retrieves the text chunks that were generated from a document during ingestion.
- Chunks represent semantic sections of the document and are used for retrieval
- and analysis.
- Users can only access chunks from documents they own or have access to through
- collections. Vector embeddings are only included if specifically requested.
- Results are returned in chunk sequence order, representing their position in
- the original document.
- """
- list_document_chunks = (
- await self.services.management.list_document_chunks(
- id, offset, limit, include_vectors
- )
- )
- if not list_document_chunks["results"]:
- raise R2RException(
- "No chunks found for the given document ID.", 404
- )
- is_owner = str(
- list_document_chunks["results"][0].get("owner_id")
- ) == str(auth_user.id)
- document_collections = (
- await self.services.management.collections_overview(
- offset=0,
- limit=-1,
- document_ids=[id],
- )
- )
- user_has_access = (
- is_owner
- or set(auth_user.collection_ids).intersection(
- {ele.id for ele in document_collections["results"]}
- )
- != set()
- )
- if not user_has_access and not auth_user.is_superuser:
- raise R2RException(
- "Not authorized to access this document's chunks.", 403
- )
- return ( # type: ignore
- list_document_chunks["results"],
- {"total_entries": list_document_chunks["total_entries"]},
- )
- @self.router.get(
- "/documents/{id}/download",
- dependencies=[Depends(self.rate_limit_dependency)],
- response_class=StreamingResponse,
- summary="Download document content",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.download(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.download({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/download" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document_file(
- id: str = Path(..., description="Document ID"),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> StreamingResponse:
- """
- Downloads the original file content of a document.
- For uploaded files, returns the original file with its proper MIME type.
- For text-only documents, returns the content as plain text.
- Users can only download documents they own or have access to through collections.
- """
- try:
- document_uuid = UUID(id)
- except ValueError:
- raise R2RException(
- status_code=422, message="Invalid document ID format."
- )
- # Retrieve the document's information
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=None,
- collection_ids=None,
- document_ids=[document_uuid],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- document = documents_overview_response["results"][0]
- is_owner = str(document.owner_id) == str(auth_user.id)
- if not auth_user.is_superuser and not is_owner:
- document_collections = (
- await self.services.management.collections_overview(
- offset=0,
- limit=-1,
- document_ids=[document_uuid],
- )
- )
- document_collection_ids = {
- str(ele.id) for ele in document_collections["results"]
- }
- user_collection_ids = {
- str(cid) for cid in auth_user.collection_ids
- }
- has_collection_access = user_collection_ids.intersection(
- document_collection_ids
- )
- if not has_collection_access:
- raise R2RException(
- "Not authorized to access this document.", 403
- )
- file_tuple = await self.services.management.download_file(
- document_uuid
- )
- if not file_tuple:
- raise R2RException(status_code=404, message="File not found.")
- file_name, file_content, file_size = file_tuple
- mime_type, _ = mimetypes.guess_type(file_name)
- if not mime_type:
- mime_type = "application/octet-stream"
- async def file_stream():
- chunk_size = 1024 * 1024 # 1MB
- while True:
- data = file_content.read(chunk_size)
- if not data:
- break
- yield data
- return StreamingResponse(
- file_stream(),
- media_type=mime_type,
- headers={
- "Content-Disposition": f'inline; filename="{file_name}"',
- "Content-Length": str(file_size),
- },
- )
- @self.router.delete(
- "/documents/by-filter",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Delete documents by filter",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.delete_by_filter(
- filters={"document_type": {"$eq": "txt"}}
- )
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X DELETE "https://api.example.com/v3/documents/by-filter?filters=%7B%22document_type%22%3A%7B%22%24eq%22%3A%22text%22%7D%2C%22created_at%22%3A%7B%22%24lt%22%3A%222023-01-01T00%3A00%3A00Z%22%7D%7D" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def delete_document_by_filter(
- filters: Json[dict] = Body(
- ..., description="JSON-encoded filters"
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedBooleanResponse:
- """
- Delete documents based on provided filters. Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`. Deletion requests are limited to a user's own documents.
- """
- filters_dict = {
- "$and": [{"owner_id": {"$eq": str(auth_user.id)}}, filters]
- }
- await self.services.management.delete_documents_and_chunks_by_filter(
- filters=filters_dict
- )
- return GenericBooleanResponse(success=True) # type: ignore
- @self.router.delete(
- "/documents/{id}",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Delete a document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.delete(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.delete({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents delete b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X DELETE "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def delete_document_by_id(
- id: UUID = Path(..., description="Document ID"),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedBooleanResponse:
- """
- Delete a specific document. All chunks corresponding to the document are deleted, and all other references to the document are removed.
- NOTE - Deletions do not yet impact the knowledge graph or other derived data. This feature is planned for a future release.
- """
- filters = {"document_id": {"$eq": str(id)}}
- if not auth_user.is_superuser:
- filters = {
- "$and": [{"owner_id": {"$eq": str(auth_user.id)}}, filters]
- }
- await self.services.management.delete_documents_and_chunks_by_filter(
- filters=filters
- )
- return GenericBooleanResponse(success=True) # type: ignore
- @self.router.get(
- "/documents/{id}/collections",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document collections",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_collections(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", offset=0, limit=10
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listCollections({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-collections b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/collections" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document_collections(
- id: str = Path(..., description="Document ID"),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedCollectionsResponse:
- """
- Retrieves all collections that contain the specified document. This endpoint is restricted
- to superusers only and provides a system-wide view of document organization.
- Collections are used to organize documents and manage access control. A document can belong
- to multiple collections, and users can access documents through collection membership.
- The results are paginated and ordered by collection creation date, with the most recently
- created collections appearing first.
- NOTE - This endpoint is only available to superusers, it will be extended to regular users in a future release.
- """
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can get the collections belonging to a document.",
- 403,
- )
- collections_response = (
- await self.services.management.collections_overview(
- offset=offset,
- limit=limit,
- document_ids=[UUID(id)], # Convert string ID to UUID
- )
- )
- return collections_response["results"], { # type: ignore
- "total_entries": collections_response["total_entries"]
- }
- @self.router.post(
- "/documents/{id}/extract",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Extract entities and relationships",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.extract(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- ],
- },
- )
- @self.base_endpoint
- async def extract(
- id: UUID = Path(
- ...,
- description="The ID of the document to extract entities and relationships from.",
- ),
- run_type: KGRunType = Body(
- default=KGRunType.RUN,
- description="Whether to return an estimate of the creation cost or to actually extract the document.",
- ),
- settings: Optional[KGCreationSettings] = Body(
- default=None,
- description="Settings for the entities and relationships extraction process.",
- ),
- run_with_orchestration: Optional[bool] = Body(
- default=True,
- description="Whether to run the entities and relationships extraction process with orchestration.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedGenericMessageResponse:
- """
- Extracts entities and relationships from a document.
- The entities and relationships extraction process involves:
- 1. Parsing documents into semantic chunks
- 2. Extracting entities and relationships using LLMs
- 3. Storing the created entities and relationships in the knowledge graph
- 4. Preserving the document's metadata and content, and associating the elements with collections the document belongs to
- """
- settings = settings.dict() if settings else None # type: ignore
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can extract entities and relationships from a document.",
- 403,
- )
- # If no run type is provided, default to estimate
- if not run_type:
- run_type = KGRunType.ESTIMATE
- # Apply runtime settings overrides
- server_graph_creation_settings = (
- self.providers.database.config.graph_creation_settings
- )
- if settings:
- server_graph_creation_settings = update_settings_from_dict(
- server_settings=server_graph_creation_settings,
- settings_dict=settings, # type: ignore
- )
- if run_type is KGRunType.ESTIMATE:
- return { # type: ignore
- "message": "Estimate retrieved successfully",
- "task_id": None,
- "id": id,
- "estimate": await self.services.graph.get_creation_estimate(
- document_id=id,
- graph_creation_settings=server_graph_creation_settings,
- ),
- }
- if run_with_orchestration:
- workflow_input = {
- "document_id": str(id),
- "graph_creation_settings": server_graph_creation_settings.model_dump_json(),
- "user": auth_user.json(),
- }
- return await self.providers.orchestration.run_workflow(
- "extract-triples", {"request": workflow_input}, {}
- )
- else:
- from core.main.orchestration import simple_kg_factory
- logger.info("Running extract-triples without orchestration.")
- simple_kg = simple_kg_factory(self.services.graph)
- await simple_kg["extract-triples"](workflow_input)
- return { # type: ignore
- "message": "Graph created successfully.",
- "task_id": None,
- }
- @self.router.get(
- "/documents/{id}/entities",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Lists the entities from the document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.extract(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- ],
- },
- )
- @self.base_endpoint
- async def get_entities(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve entities from.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_embeddings: Optional[bool] = Query(
- False,
- description="Whether to include vector embeddings in the response.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedEntitiesResponse:
- """
- Retrieves the entities that were extracted from a document. These represent
- important semantic elements like people, places, organizations, concepts, etc.
- Users can only access entities from documents they own or have access to through
- collections. Entity embeddings are only included if specifically requested.
- Results are returned in the order they were extracted from the document.
- """
- # if (
- # not auth_user.is_superuser
- # and id not in auth_user.collection_ids
- # ):
- # raise R2RException(
- # "The currently authenticated user does not have access to the specified collection.",
- # 403,
- # )
- # First check if the document exists and user has access
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=(
- None if auth_user.is_superuser else [auth_user.id]
- ),
- collection_ids=(
- None
- if auth_user.is_superuser
- else auth_user.collection_ids
- ),
- document_ids=[id],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- # Get all entities for this document from the document_entity table
- (
- entities,
- count,
- ) = await self.providers.database.graphs_handler.entities.get(
- parent_id=id,
- store_type="documents",
- offset=offset,
- limit=limit,
- include_embeddings=include_embeddings,
- )
- return entities, {"total_entries": count} # type: ignore
- @self.router.get(
- "/documents/{id}/relationships",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document relationships",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_relationships(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- offset=0,
- limit=100
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listRelationships({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- offset: 0,
- limit: 100,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-relationships b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/relationships" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_relationships(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve relationships for.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- entity_names: Optional[list[str]] = Query(
- None,
- description="Filter relationships by specific entity names.",
- ),
- relationship_types: Optional[list[str]] = Query(
- None,
- description="Filter relationships by specific relationship types.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedRelationshipsResponse:
- """
- Retrieves the relationships between entities that were extracted from a document. These represent
- connections and interactions between entities found in the text.
- Users can only access relationships from documents they own or have access to through
- collections. Results can be filtered by entity names and relationship types.
- Results are returned in the order they were extracted from the document.
- """
- # if (
- # not auth_user.is_superuser
- # and id not in auth_user.collection_ids
- # ):
- # raise R2RException(
- # "The currently authenticated user does not have access to the specified collection.",
- # 403,
- # )
- # First check if the document exists and user has access
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=(
- None if auth_user.is_superuser else [auth_user.id]
- ),
- collection_ids=(
- None
- if auth_user.is_superuser
- else auth_user.collection_ids
- ),
- document_ids=[id],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- # Get relationships for this document
- (
- relationships,
- count,
- ) = await self.providers.database.graphs_handler.relationships.get(
- parent_id=id,
- store_type="documents",
- entity_names=entity_names,
- relationship_types=relationship_types,
- offset=offset,
- limit=limit,
- )
- return relationships, {"total_entries": count} # type: ignore
- @self.router.post(
- "/documents/search",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Search document summaries",
- )
- @self.base_endpoint
- async def search_documents(
- query: str = Body(
- ...,
- description="The search query to perform.",
- ),
- search_mode: SearchMode = Body(
- default=SearchMode.custom,
- description=(
- "Default value of `custom` allows full control over search settings.\n\n"
- "Pre-configured search modes:\n"
- "`basic`: A simple semantic-based search.\n"
- "`advanced`: A more powerful hybrid search combining semantic and full-text.\n"
- "`custom`: Full control via `search_settings`.\n\n"
- "If `filters` or `limit` are provided alongside `basic` or `advanced`, "
- "they will override the default settings for that mode."
- ),
- ),
- search_settings: SearchSettings = Body(
- default_factory=SearchSettings,
- description="Settings for document search",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ): # -> WrappedDocumentSearchResponse: # type: ignore
- """
- Perform a search query on the automatically generated document summaries in the system.
- This endpoint allows for complex filtering of search results using PostgreSQL-based queries.
- Filters can be applied to various fields such as document_id, and internal metadata values.
- Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`.
- """
- effective_settings = self._prepare_search_settings(
- auth_user, search_mode, search_settings
- )
- query_embedding = (
- await self.providers.embedding.async_get_embedding(query)
- )
- results = await self.services.retrieval.search_documents(
- query=query,
- query_embedding=query_embedding,
- settings=effective_settings,
- )
- return results
- @staticmethod
- async def _process_file(file):
- import base64
- content = await file.read()
- return {
- "filename": file.filename,
- "content": base64.b64encode(content).decode("utf-8"),
- "content_type": file.content_type,
- }
|