123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130 |
- import base64
- import json
- import logging
- import mimetypes
- import textwrap
- from datetime import datetime
- from io import BytesIO
- from typing import Any, Optional
- from uuid import UUID
- from fastapi import Body, Depends, File, Form, Path, Query, UploadFile
- from fastapi.background import BackgroundTasks
- from fastapi.responses import FileResponse, StreamingResponse
- from pydantic import Json
- from core.base import (
- IngestionConfig,
- IngestionMode,
- R2RException,
- SearchMode,
- SearchSettings,
- UnprocessedChunk,
- Workflow,
- generate_document_id,
- generate_id,
- select_search_filters,
- )
- from core.base.abstractions import KGCreationSettings, KGRunType, StoreType
- from core.base.api.models import (
- GenericBooleanResponse,
- WrappedBooleanResponse,
- WrappedChunksResponse,
- WrappedCollectionsResponse,
- WrappedDocumentResponse,
- WrappedDocumentsResponse,
- WrappedEntitiesResponse,
- WrappedGenericMessageResponse,
- WrappedIngestionResponse,
- WrappedRelationshipsResponse,
- )
- from core.utils import update_settings_from_dict
- from ...abstractions import R2RProviders, R2RServices
- from .base_router import BaseRouterV3
- logger = logging.getLogger()
- MAX_CHUNKS_PER_REQUEST = 1024 * 100
- def merge_search_settings(
- base: SearchSettings, overrides: SearchSettings
- ) -> SearchSettings:
- # Convert both to dict
- base_dict = base.model_dump()
- overrides_dict = overrides.model_dump(exclude_unset=True)
- # Update base_dict with values from overrides_dict
- # This ensures that any field set in overrides takes precedence
- for k, v in overrides_dict.items():
- base_dict[k] = v
- # Construct a new SearchSettings from the merged dict
- return SearchSettings(**base_dict)
- def merge_ingestion_config(
- base: IngestionConfig, overrides: IngestionConfig
- ) -> IngestionConfig:
- base_dict = base.model_dump()
- overrides_dict = overrides.model_dump(exclude_unset=True)
- for k, v in overrides_dict.items():
- base_dict[k] = v
- return IngestionConfig(**base_dict)
- class DocumentsRouter(BaseRouterV3):
- def __init__(
- self,
- providers: R2RProviders,
- services: R2RServices,
- ):
- super().__init__(providers, services)
- self._register_workflows()
- def _prepare_search_settings(
- self,
- auth_user: Any,
- search_mode: SearchMode,
- search_settings: Optional[SearchSettings],
- ) -> SearchSettings:
- """
- Prepare the effective search settings based on the provided search_mode,
- optional user-overrides in search_settings, and applied filters.
- """
- if search_mode != SearchMode.custom:
- # Start from mode defaults
- effective_settings = SearchSettings.get_default(search_mode.value)
- if search_settings:
- # Merge user-provided overrides
- effective_settings = merge_search_settings(
- effective_settings, search_settings
- )
- else:
- # Custom mode: use provided settings or defaults
- effective_settings = search_settings or SearchSettings()
- # Apply user-specific filters
- effective_settings.filters = select_search_filters(
- auth_user, effective_settings
- )
- return effective_settings
- # TODO - Remove this legacy method
- def _register_workflows(self):
- self.providers.orchestration.register_workflows(
- Workflow.INGESTION,
- self.services.ingestion,
- {
- "ingest-files": (
- "Ingest files task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document created and ingested successfully."
- ),
- "ingest-chunks": (
- "Ingest chunks task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document created and ingested successfully."
- ),
- "update-files": (
- "Update file task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Update task queued successfully."
- ),
- "update-chunk": (
- "Update chunk task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Chunk update completed successfully."
- ),
- "update-document-metadata": (
- "Update document metadata task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Document metadata update completed successfully."
- ),
- "create-vector-index": (
- "Vector index creation task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index creation task completed successfully."
- ),
- "delete-vector-index": (
- "Vector index deletion task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index deletion task completed successfully."
- ),
- "select-vector-index": (
- "Vector index selection task queued successfully."
- if self.providers.orchestration.config.provider != "simple"
- else "Vector index selection task completed successfully."
- ),
- },
- )
- def _prepare_ingestion_config(
- self,
- ingestion_mode: IngestionMode,
- ingestion_config: Optional[IngestionConfig],
- ) -> IngestionConfig:
- # If not custom, start from defaults
- if ingestion_mode != IngestionMode.custom:
- effective_config = IngestionConfig.get_default(
- ingestion_mode.value, app=self.providers.auth.config.app
- )
- if ingestion_config:
- effective_config = merge_ingestion_config(
- effective_config, ingestion_config
- )
- else:
- # custom mode
- effective_config = ingestion_config or IngestionConfig(
- app=self.providers.auth.config.app
- )
- effective_config.validate_config()
- return effective_config
- def _setup_routes(self):
- @self.router.post(
- "/documents",
- dependencies=[Depends(self.rate_limit_dependency)],
- status_code=202,
- summary="Create a new document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.create(
- file_path="pg_essay_1.html",
- metadata={"metadata_1":"some random metadata"},
- id=None
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.create({
- file: { path: "examples/data/marmeladov.txt", name: "marmeladov.txt" },
- metadata: { title: "marmeladov.txt" },
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents create /path/to/file.txt
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X POST "https://api.example.com/v3/documents" \\
- -H "Content-Type: multipart/form-data" \\
- -H "Authorization: Bearer YOUR_API_KEY" \\
- -F "file=@pg_essay_1.html;type=text/html" \\
- -F 'metadata={}' \\
- -F 'id=null'
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def create_document(
- file: Optional[UploadFile] = File(
- None,
- description="The file to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- raw_text: Optional[str] = Form(
- None,
- description="Raw text content to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- chunks: Optional[Json[list[str]]] = Form(
- None,
- description="Pre-processed text chunks to ingest. Exactly one of file, raw_text, or chunks must be provided.",
- ),
- id: Optional[UUID] = Form(
- None,
- description="The ID of the document. If not provided, a new ID will be generated.",
- ),
- collection_ids: Optional[Json[list[UUID]]] = Form(
- None,
- description="Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.",
- ),
- metadata: Optional[Json[dict]] = Form(
- None,
- description="Metadata to associate with the document, such as title, description, or custom fields.",
- ),
- ingestion_mode: IngestionMode = Form(
- default=IngestionMode.custom,
- description=(
- "Ingestion modes:\n"
- "- `hi-res`: Thorough ingestion with full summaries and enrichment.\n"
- "- `fast`: Quick ingestion with minimal enrichment and no summaries.\n"
- "- `custom`: Full control via `ingestion_config`.\n\n"
- "If `filters` or `limit` (in `ingestion_config`) are provided alongside `hi-res` or `fast`, "
- "they will override the default settings for that mode."
- ),
- ),
- ingestion_config: Optional[Json[IngestionConfig]] = Form(
- None,
- description="An optional dictionary to override the default chunking configuration for the ingestion process. If not provided, the system will use the default server-side chunking configuration.",
- ),
- run_with_orchestration: Optional[bool] = Form(
- True,
- description="Whether or not ingestion runs with orchestration, default is `True`. When set to `False`, the ingestion process will run synchronous and directly return the result.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedIngestionResponse:
- """
- Creates a new Document object from an input file, text content, or chunks. The chosen `ingestion_mode` determines
- how the ingestion process is configured:
- **Ingestion Modes:**
- - `hi-res`: Comprehensive parsing and enrichment, including summaries and possibly more thorough parsing.
- - `fast`: Speed-focused ingestion that skips certain enrichment steps like summaries.
- - `custom`: Provide a full `ingestion_config` to customize the entire ingestion process.
- Either a file or text content must be provided, but not both. Documents are shared through `Collections` which allow for tightly specified cross-user interactions.
- The ingestion process runs asynchronously and its progress can be tracked using the returned
- task_id.
- """
- if not auth_user.is_superuser:
- user_document_count = (
- await self.services.management.documents_overview(
- user_ids=[auth_user.id],
- offset=0,
- limit=1,
- )
- )["total_entries"]
- user_max_documents = (
- await self.services.management.get_user_max_documents(
- auth_user.id
- )
- )
- if user_document_count >= user_max_documents:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of documents allowed ({user_max_documents}).",
- )
- # Get chunks using the vector handler's list_chunks method
- user_chunk_count = (
- await self.services.ingestion.list_chunks(
- filters={"owner_id": {"$eq": str(auth_user.id)}},
- offset=0,
- limit=1,
- )
- )["page_info"]["total_entries"]
- user_max_chunks = (
- await self.services.management.get_user_max_chunks(
- auth_user.id
- )
- )
- if user_chunk_count >= user_max_chunks:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of chunks allowed ({user_max_chunks}).",
- )
- user_collections_count = (
- await self.services.management.collections_overview(
- user_ids=[auth_user.id],
- offset=0,
- limit=1,
- )
- )["total_entries"]
- user_max_collections = (
- await self.services.management.get_user_max_collections(
- auth_user.id
- )
- )
- if user_collections_count >= user_max_collections:
- raise R2RException(
- status_code=403,
- message=f"User has reached the maximum number of collections allowed ({user_max_collections}).",
- )
- effective_ingestion_config = self._prepare_ingestion_config(
- ingestion_mode=ingestion_mode,
- ingestion_config=ingestion_config,
- )
- if not file and not raw_text and not chunks:
- raise R2RException(
- status_code=422,
- message="Either a `file`, `raw_text`, or `chunks` must be provided.",
- )
- if (
- (file and raw_text)
- or (file and chunks)
- or (raw_text and chunks)
- ):
- raise R2RException(
- status_code=422,
- message="Only one of `file`, `raw_text`, or `chunks` may be provided.",
- )
- # Check if the user is a superuser
- metadata = metadata or {}
- if chunks:
- if len(chunks) == 0:
- raise R2RException("Empty list of chunks provided", 400)
- if len(chunks) > MAX_CHUNKS_PER_REQUEST:
- raise R2RException(
- f"Maximum of {MAX_CHUNKS_PER_REQUEST} chunks per request",
- 400,
- )
- document_id = generate_document_id(
- json.dumps(chunks), auth_user.id
- )
- # FIXME: Metadata doesn't seem to be getting passed through
- raw_chunks_for_doc = [
- UnprocessedChunk(
- text=chunk,
- metadata=metadata,
- id=generate_id(),
- )
- for chunk in chunks
- ]
- # Prepare workflow input
- workflow_input = {
- "document_id": str(document_id),
- "chunks": [
- chunk.model_dump(mode="json")
- for chunk in raw_chunks_for_doc
- ],
- "metadata": metadata, # Base metadata for the document
- "user": auth_user.model_dump_json(),
- "ingestion_config": effective_ingestion_config.model_dump(
- mode="json"
- ),
- }
- # TODO - Modify create_chunks so that we can add chunks to existing document
- if run_with_orchestration:
- # Run ingestion with orchestration
- raw_message = (
- await self.providers.orchestration.run_workflow(
- "ingest-chunks",
- {"request": workflow_input},
- options={
- "additional_metadata": {
- "document_id": str(document_id),
- }
- },
- )
- )
- raw_message["document_id"] = str(document_id)
- return raw_message # type: ignore
- else:
- logger.info(
- "Running chunk ingestion without orchestration."
- )
- from core.main.orchestration import (
- simple_ingestion_factory,
- )
- simple_ingestor = simple_ingestion_factory(
- self.services.ingestion
- )
- await simple_ingestor["ingest-chunks"](workflow_input)
- return { # type: ignore
- "message": "Document created and ingested successfully.",
- "document_id": str(document_id),
- "task_id": None,
- }
- else:
- if file:
- file_data = await self._process_file(file)
- content_length = len(file_data["content"])
- file_content = BytesIO(
- base64.b64decode(file_data["content"])
- )
- file_data.pop("content", None)
- document_id = id or generate_document_id(
- file_data["filename"], auth_user.id
- )
- elif raw_text:
- content_length = len(raw_text)
- file_content = BytesIO(raw_text.encode("utf-8"))
- document_id = id or generate_document_id(
- raw_text, auth_user.id
- )
- file_data = {
- "filename": "N/A",
- "content_type": "text/plain",
- }
- else:
- raise R2RException(
- status_code=422,
- message="Either a file or content must be provided.",
- )
- workflow_input = {
- "file_data": file_data,
- "document_id": str(document_id),
- "collection_ids": (
- [str(cid) for cid in collection_ids]
- if collection_ids
- else None
- ),
- "metadata": metadata,
- "ingestion_config": effective_ingestion_config.model_dump(
- mode="json"
- ),
- "user": auth_user.model_dump_json(),
- "size_in_bytes": content_length,
- }
- file_name = file_data["filename"]
- await self.providers.database.files_handler.store_file(
- document_id,
- file_name,
- file_content,
- file_data["content_type"],
- )
- if run_with_orchestration:
- raw_message: dict[str, str | None] = await self.providers.orchestration.run_workflow( # type: ignore
- "ingest-files",
- {"request": workflow_input},
- options={
- "additional_metadata": {
- "document_id": str(document_id),
- }
- },
- )
- raw_message["document_id"] = str(document_id)
- return raw_message # type: ignore
- else:
- logger.info(
- f"Running ingestion without orchestration for file {file_name} and document_id {document_id}."
- )
- # TODO - Clean up implementation logic here to be more explicitly `synchronous`
- from core.main.orchestration import simple_ingestion_factory
- simple_ingestor = simple_ingestion_factory(
- self.services.ingestion
- )
- await simple_ingestor["ingest-files"](workflow_input)
- return { # type: ignore
- "message": "Document created and ingested successfully.",
- "document_id": str(document_id),
- "task_id": None,
- }
- @self.router.post(
- "/documents/export",
- summary="Export documents to CSV",
- dependencies=[Depends(self.rate_limit_dependency)],
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient("http://localhost:7272")
- # when using auth, do client.login(...)
- response = client.documents.export(
- output_path="export.csv",
- columns=["id", "title", "created_at"],
- include_header=True,
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient("http://localhost:7272");
- function main() {
- await client.documents.export({
- outputPath: "export.csv",
- columns: ["id", "title", "created_at"],
- includeHeader: true,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X POST "http://127.0.0.1:7272/v3/documents/export" \
- -H "Authorization: Bearer YOUR_API_KEY" \
- -H "Content-Type: application/json" \
- -H "Accept: text/csv" \
- -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \
- --output export.csv
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def export_documents(
- background_tasks: BackgroundTasks,
- columns: Optional[list[str]] = Body(
- None, description="Specific columns to export"
- ),
- filters: Optional[dict] = Body(
- None, description="Filters to apply to the export"
- ),
- include_header: Optional[bool] = Body(
- True, description="Whether to include column headers"
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> FileResponse:
- """
- Export documents as a downloadable CSV file.
- """
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can export data.",
- 403,
- )
- csv_file_path, temp_file = (
- await self.services.management.export_documents(
- columns=columns,
- filters=filters,
- include_header=include_header,
- )
- )
- background_tasks.add_task(temp_file.close)
- return FileResponse(
- path=csv_file_path,
- media_type="text/csv",
- filename="documents_export.csv",
- )
- @self.router.get(
- "/documents/download_zip",
- dependencies=[Depends(self.rate_limit_dependency)],
- response_class=StreamingResponse,
- summary="Export multiple documents as zip",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- client.documents.download_zip(
- document_ids=["uuid1", "uuid2"],
- start_date="2024-01-01",
- end_date="2024-12-31"
- )
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/download_zip?document_ids=uuid1,uuid2&start_date=2024-01-01&end_date=2024-12-31" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def export_files(
- document_ids: Optional[list[UUID]] = Query(
- None,
- description="List of document IDs to include in the export. If not provided, all accessible documents will be included.",
- ),
- start_date: Optional[datetime] = Query(
- None,
- description="Filter documents created on or after this date.",
- ),
- end_date: Optional[datetime] = Query(
- None,
- description="Filter documents created before this date.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> StreamingResponse:
- """
- Export multiple documents as a zip file. Documents can be filtered by IDs and/or date range.
- The endpoint allows downloading:
- - Specific documents by providing their IDs
- - Documents within a date range
- - All accessible documents if no filters are provided
- Files are streamed as a zip archive to handle potentially large downloads efficiently.
- """
- if not auth_user.is_superuser:
- # For non-superusers, verify access to requested documents
- if document_ids:
- documents_overview = (
- await self.services.management.documents_overview(
- user_ids=[auth_user.id],
- document_ids=document_ids,
- offset=0,
- limit=len(document_ids),
- )
- )
- if len(documents_overview["results"]) != len(document_ids):
- raise R2RException(
- status_code=403,
- message="You don't have access to one or more requested documents.",
- )
- if not document_ids:
- raise R2RException(
- status_code=403,
- message="Non-superusers must provide document IDs to export.",
- )
- zip_name, zip_content, zip_size = (
- await self.services.management.export_files(
- document_ids=document_ids,
- start_date=start_date,
- end_date=end_date,
- )
- )
- async def stream_file():
- yield zip_content.getvalue()
- return StreamingResponse(
- stream_file(),
- media_type="application/zip",
- headers={
- "Content-Disposition": f'attachment; filename="{zip_name}"',
- "Content-Length": str(zip_size),
- },
- )
- @self.router.get(
- "/documents",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List documents",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list(
- limit=10,
- offset=0
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.list({
- limit: 10,
- offset: 0,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents create /path/to/file.txt
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_documents(
- ids: list[str] = Query(
- [],
- description="A list of document IDs to retrieve. If not provided, all documents will be returned.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_summary_embeddings: int = Query(
- False,
- description="Specifies whether or not to include embeddings of each document summary.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedDocumentsResponse:
- """
- Returns a paginated list of documents the authenticated user has access to.
- Results can be filtered by providing specific document IDs. Regular users will only see
- documents they own or have access to through collections. Superusers can see all documents.
- The documents are returned in order of last modification, with most recent first.
- """
- requesting_user_id = (
- None if auth_user.is_superuser else [auth_user.id]
- )
- filter_collection_ids = (
- None if auth_user.is_superuser else auth_user.collection_ids
- )
- document_uuids = [UUID(document_id) for document_id in ids]
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=requesting_user_id,
- collection_ids=filter_collection_ids,
- document_ids=document_uuids,
- offset=offset,
- limit=limit,
- )
- )
- if not include_summary_embeddings:
- for document in documents_overview_response["results"]:
- document.summary_embedding = None
- return ( # type: ignore
- documents_overview_response["results"],
- {
- "total_entries": documents_overview_response[
- "total_entries"
- ]
- },
- )
- @self.router.get(
- "/documents/{id}",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Retrieve a document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.retrieve(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.retrieve({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents retrieve b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedDocumentResponse:
- """
- Retrieves detailed information about a specific document by its ID.
- This endpoint returns the document's metadata, status, and system information. It does not
- return the document's content - use the `/documents/{id}/download` endpoint for that.
- Users can only retrieve documents they own or have access to through collections.
- Superusers can retrieve any document.
- """
- request_user_ids = (
- None if auth_user.is_superuser else [auth_user.id]
- )
- filter_collection_ids = (
- None if auth_user.is_superuser else auth_user.collection_ids
- )
- documents_overview_response = await self.services.management.documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
- user_ids=request_user_ids,
- collection_ids=filter_collection_ids,
- document_ids=[id],
- offset=0,
- limit=100,
- )
- results = documents_overview_response["results"]
- if len(results) == 0:
- raise R2RException("Document not found.", 404)
- return results[0]
- @self.router.get(
- "/documents/{id}/chunks",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document chunks",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_chunks(
- id="32b6a70f-a995-5c51-85d2-834f06283a1e"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listChunks({
- id: "32b6a70f-a995-5c51-85d2-834f06283a1e",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-chunks b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/chunks" \\
- -H "Authorization: Bearer YOUR_API_KEY"\
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def list_chunks(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve chunks for.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_vectors: Optional[bool] = Query(
- False,
- description="Whether to include vector embeddings in the response.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedChunksResponse:
- """
- Retrieves the text chunks that were generated from a document during ingestion.
- Chunks represent semantic sections of the document and are used for retrieval
- and analysis.
- Users can only access chunks from documents they own or have access to through
- collections. Vector embeddings are only included if specifically requested.
- Results are returned in chunk sequence order, representing their position in
- the original document.
- """
- list_document_chunks = (
- await self.services.management.list_document_chunks(
- id, offset, limit, include_vectors
- )
- )
- if not list_document_chunks["results"]:
- raise R2RException(
- "No chunks found for the given document ID.", 404
- )
- is_owner = str(
- list_document_chunks["results"][0].get("owner_id")
- ) == str(auth_user.id)
- document_collections = (
- await self.services.management.collections_overview(
- offset=0,
- limit=-1,
- document_ids=[id],
- )
- )
- user_has_access = (
- is_owner
- or set(auth_user.collection_ids).intersection(
- {ele.id for ele in document_collections["results"]}
- )
- != set()
- )
- if not user_has_access and not auth_user.is_superuser:
- raise R2RException(
- "Not authorized to access this document's chunks.", 403
- )
- return ( # type: ignore
- list_document_chunks["results"],
- {"total_entries": list_document_chunks["total_entries"]},
- )
- @self.router.get(
- "/documents/{id}/download",
- dependencies=[Depends(self.rate_limit_dependency)],
- response_class=StreamingResponse,
- summary="Download document content",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.download(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.download({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/download" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document_file(
- id: str = Path(..., description="Document ID"),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> StreamingResponse:
- """
- Downloads the original file content of a document.
- For uploaded files, returns the original file with its proper MIME type.
- For text-only documents, returns the content as plain text.
- Users can only download documents they own or have access to through collections.
- """
- try:
- document_uuid = UUID(id)
- except ValueError:
- raise R2RException(
- status_code=422, message="Invalid document ID format."
- )
- # Retrieve the document's information
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=None,
- collection_ids=None,
- document_ids=[document_uuid],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- document = documents_overview_response["results"][0]
- is_owner = str(document.owner_id) == str(auth_user.id)
- if not auth_user.is_superuser and not is_owner:
- document_collections = (
- await self.services.management.collections_overview(
- offset=0,
- limit=-1,
- document_ids=[document_uuid],
- )
- )
- document_collection_ids = {
- str(ele.id) for ele in document_collections["results"]
- }
- user_collection_ids = {
- str(cid) for cid in auth_user.collection_ids
- }
- has_collection_access = user_collection_ids.intersection(
- document_collection_ids
- )
- if not has_collection_access:
- raise R2RException(
- "Not authorized to access this document.", 403
- )
- file_tuple = await self.services.management.download_file(
- document_uuid
- )
- if not file_tuple:
- raise R2RException(status_code=404, message="File not found.")
- file_name, file_content, file_size = file_tuple
- mime_type, _ = mimetypes.guess_type(file_name)
- if not mime_type:
- mime_type = "application/octet-stream"
- async def file_stream():
- chunk_size = 1024 * 1024 # 1MB
- while True:
- data = file_content.read(chunk_size)
- if not data:
- break
- yield data
- return StreamingResponse(
- file_stream(),
- media_type=mime_type,
- headers={
- "Content-Disposition": f'inline; filename="{file_name}"',
- "Content-Length": str(file_size),
- },
- )
- @self.router.delete(
- "/documents/by-filter",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Delete documents by filter",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.delete_by_filter(
- filters={"document_type": {"$eq": "txt"}}
- )
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X DELETE "https://api.example.com/v3/documents/by-filter?filters=%7B%22document_type%22%3A%7B%22%24eq%22%3A%22text%22%7D%2C%22created_at%22%3A%7B%22%24lt%22%3A%222023-01-01T00%3A00%3A00Z%22%7D%7D" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def delete_document_by_filter(
- filters: Json[dict] = Body(
- ..., description="JSON-encoded filters"
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedBooleanResponse:
- """
- Delete documents based on provided filters. Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`. Deletion requests are limited to a user's own documents.
- """
- filters_dict = {
- "$and": [{"owner_id": {"$eq": str(auth_user.id)}}, filters]
- }
- await self.services.management.delete_documents_and_chunks_by_filter(
- filters=filters_dict
- )
- return GenericBooleanResponse(success=True) # type: ignore
- @self.router.delete(
- "/documents/{id}",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Delete a document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.delete(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.delete({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents delete b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X DELETE "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def delete_document_by_id(
- id: UUID = Path(..., description="Document ID"),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedBooleanResponse:
- """
- Delete a specific document. All chunks corresponding to the document are deleted, and all other references to the document are removed.
- NOTE - Deletions do not yet impact the knowledge graph or other derived data. This feature is planned for a future release.
- """
- filters = {"document_id": {"$eq": str(id)}}
- if not auth_user.is_superuser:
- filters = {
- "$and": [{"owner_id": {"$eq": str(auth_user.id)}}, filters]
- }
- await self.services.management.delete_documents_and_chunks_by_filter(
- filters=filters
- )
- return GenericBooleanResponse(success=True) # type: ignore
- @self.router.get(
- "/documents/{id}/collections",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document collections",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_collections(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", offset=0, limit=10
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listCollections({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-collections b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/collections" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_document_collections(
- id: str = Path(..., description="Document ID"),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedCollectionsResponse:
- """
- Retrieves all collections that contain the specified document. This endpoint is restricted
- to superusers only and provides a system-wide view of document organization.
- Collections are used to organize documents and manage access control. A document can belong
- to multiple collections, and users can access documents through collection membership.
- The results are paginated and ordered by collection creation date, with the most recently
- created collections appearing first.
- NOTE - This endpoint is only available to superusers, it will be extended to regular users in a future release.
- """
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can get the collections belonging to a document.",
- 403,
- )
- collections_response = (
- await self.services.management.collections_overview(
- offset=offset,
- limit=limit,
- document_ids=[UUID(id)], # Convert string ID to UUID
- )
- )
- return collections_response["results"], { # type: ignore
- "total_entries": collections_response["total_entries"]
- }
- @self.router.post(
- "/documents/{id}/extract",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Extract entities and relationships",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.extract(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- ],
- },
- )
- @self.base_endpoint
- async def extract(
- id: UUID = Path(
- ...,
- description="The ID of the document to extract entities and relationships from.",
- ),
- run_type: KGRunType = Body(
- default=KGRunType.RUN,
- description="Whether to return an estimate of the creation cost or to actually extract the document.",
- ),
- settings: Optional[KGCreationSettings] = Body(
- default=None,
- description="Settings for the entities and relationships extraction process.",
- ),
- run_with_orchestration: Optional[bool] = Body(
- default=True,
- description="Whether to run the entities and relationships extraction process with orchestration.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedGenericMessageResponse:
- """
- Extracts entities and relationships from a document.
- The entities and relationships extraction process involves:
- 1. Parsing documents into semantic chunks
- 2. Extracting entities and relationships using LLMs
- 3. Storing the created entities and relationships in the knowledge graph
- 4. Preserving the document's metadata and content, and associating the elements with collections the document belongs to
- """
- settings = settings.dict() if settings else None # type: ignore
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can extract entities and relationships from a document.",
- 403,
- )
- # If no run type is provided, default to estimate
- if not run_type:
- run_type = KGRunType.ESTIMATE
- # Apply runtime settings overrides
- server_graph_creation_settings = (
- self.providers.database.config.graph_creation_settings
- )
- if settings:
- server_graph_creation_settings = update_settings_from_dict(
- server_settings=server_graph_creation_settings,
- settings_dict=settings, # type: ignore
- )
- if run_type is KGRunType.ESTIMATE:
- return { # type: ignore
- "message": "Estimate retrieved successfully",
- "task_id": None,
- "id": id,
- "estimate": await self.services.graph.get_creation_estimate(
- document_id=id,
- graph_creation_settings=server_graph_creation_settings,
- ),
- }
- if run_with_orchestration:
- workflow_input = {
- "document_id": str(id),
- "graph_creation_settings": server_graph_creation_settings.model_dump_json(),
- "user": auth_user.json(),
- }
- return await self.providers.orchestration.run_workflow( # type: ignore
- "extract-triples", {"request": workflow_input}, {}
- )
- else:
- from core.main.orchestration import simple_kg_factory
- logger.info("Running extract-triples without orchestration.")
- simple_kg = simple_kg_factory(self.services.graph)
- await simple_kg["extract-triples"](workflow_input)
- return { # type: ignore
- "message": "Graph created successfully.",
- "task_id": None,
- }
- @self.router.get(
- "/documents/{id}/entities",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Lists the entities from the document",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.extract(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
- )
- """
- ),
- },
- ],
- },
- )
- @self.base_endpoint
- async def get_entities(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve entities from.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- include_embeddings: Optional[bool] = Query(
- False,
- description="Whether to include vector embeddings in the response.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedEntitiesResponse:
- """
- Retrieves the entities that were extracted from a document. These represent
- important semantic elements like people, places, organizations, concepts, etc.
- Users can only access entities from documents they own or have access to through
- collections. Entity embeddings are only included if specifically requested.
- Results are returned in the order they were extracted from the document.
- """
- # if (
- # not auth_user.is_superuser
- # and id not in auth_user.collection_ids
- # ):
- # raise R2RException(
- # "The currently authenticated user does not have access to the specified collection.",
- # 403,
- # )
- # First check if the document exists and user has access
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=(
- None if auth_user.is_superuser else [auth_user.id]
- ),
- collection_ids=(
- None
- if auth_user.is_superuser
- else auth_user.collection_ids
- ),
- document_ids=[id],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- # Get all entities for this document from the document_entity table
- (
- entities,
- count,
- ) = await self.providers.database.graphs_handler.entities.get(
- parent_id=id,
- store_type=StoreType.DOCUMENTS,
- offset=offset,
- limit=limit,
- include_embeddings=include_embeddings,
- )
- return entities, {"total_entries": count} # type: ignore
- @self.router.post(
- "/documents/{id}/entities/export",
- summary="Export document entities to CSV",
- dependencies=[Depends(self.rate_limit_dependency)],
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient("http://localhost:7272")
- # when using auth, do client.login(...)
- response = client.documents.export_entities(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- output_path="export.csv",
- columns=["id", "title", "created_at"],
- include_header=True,
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient("http://localhost:7272");
- function main() {
- await client.documents.exportEntities({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- outputPath: "export.csv",
- columns: ["id", "title", "created_at"],
- includeHeader: true,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X POST "http://127.0.0.1:7272/v3/documents/export_entities" \
- -H "Authorization: Bearer YOUR_API_KEY" \
- -H "Content-Type: application/json" \
- -H "Accept: text/csv" \
- -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \
- --output export.csv
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def export_entities(
- background_tasks: BackgroundTasks,
- id: UUID = Path(
- ...,
- description="The ID of the document to export entities from.",
- ),
- columns: Optional[list[str]] = Body(
- None, description="Specific columns to export"
- ),
- filters: Optional[dict] = Body(
- None, description="Filters to apply to the export"
- ),
- include_header: Optional[bool] = Body(
- True, description="Whether to include column headers"
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> FileResponse:
- """
- Export documents as a downloadable CSV file.
- """
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can export data.",
- 403,
- )
- csv_file_path, temp_file = (
- await self.services.management.export_document_entities(
- id=id,
- columns=columns,
- filters=filters,
- include_header=include_header,
- )
- )
- background_tasks.add_task(temp_file.close)
- return FileResponse(
- path=csv_file_path,
- media_type="text/csv",
- filename="documents_export.csv",
- )
- @self.router.get(
- "/documents/{id}/relationships",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="List document relationships",
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient()
- # when using auth, do client.login(...)
- response = client.documents.list_relationships(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- offset=0,
- limit=100
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient();
- function main() {
- const response = await client.documents.listRelationships({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- offset: 0,
- limit: 100,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- r2r documents list-relationships b4ac4dd6-5f27-596e-a55b-7cf242ca30aa
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/relationships" \\
- -H "Authorization: Bearer YOUR_API_KEY"
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def get_relationships(
- id: UUID = Path(
- ...,
- description="The ID of the document to retrieve relationships for.",
- ),
- offset: int = Query(
- 0,
- ge=0,
- description="Specifies the number of objects to skip. Defaults to 0.",
- ),
- limit: int = Query(
- 100,
- ge=1,
- le=1000,
- description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
- ),
- entity_names: Optional[list[str]] = Query(
- None,
- description="Filter relationships by specific entity names.",
- ),
- relationship_types: Optional[list[str]] = Query(
- None,
- description="Filter relationships by specific relationship types.",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> WrappedRelationshipsResponse:
- """
- Retrieves the relationships between entities that were extracted from a document. These represent
- connections and interactions between entities found in the text.
- Users can only access relationships from documents they own or have access to through
- collections. Results can be filtered by entity names and relationship types.
- Results are returned in the order they were extracted from the document.
- """
- # if (
- # not auth_user.is_superuser
- # and id not in auth_user.collection_ids
- # ):
- # raise R2RException(
- # "The currently authenticated user does not have access to the specified collection.",
- # 403,
- # )
- # First check if the document exists and user has access
- documents_overview_response = (
- await self.services.management.documents_overview(
- user_ids=(
- None if auth_user.is_superuser else [auth_user.id]
- ),
- collection_ids=(
- None
- if auth_user.is_superuser
- else auth_user.collection_ids
- ),
- document_ids=[id],
- offset=0,
- limit=1,
- )
- )
- if not documents_overview_response["results"]:
- raise R2RException("Document not found.", 404)
- # Get relationships for this document
- (
- relationships,
- count,
- ) = await self.providers.database.graphs_handler.relationships.get(
- parent_id=id,
- store_type=StoreType.DOCUMENTS,
- entity_names=entity_names,
- relationship_types=relationship_types,
- offset=offset,
- limit=limit,
- )
- return relationships, {"total_entries": count} # type: ignore
- @self.router.post(
- "/documents/{id}/relationships/export",
- summary="Export document relationships to CSV",
- dependencies=[Depends(self.rate_limit_dependency)],
- openapi_extra={
- "x-codeSamples": [
- {
- "lang": "Python",
- "source": textwrap.dedent(
- """
- from r2r import R2RClient
- client = R2RClient("http://localhost:7272")
- # when using auth, do client.login(...)
- response = client.documents.export_entities(
- id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- output_path="export.csv",
- columns=["id", "title", "created_at"],
- include_header=True,
- )
- """
- ),
- },
- {
- "lang": "JavaScript",
- "source": textwrap.dedent(
- """
- const { r2rClient } = require("r2r-js");
- const client = new r2rClient("http://localhost:7272");
- function main() {
- await client.documents.exportEntities({
- id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
- outputPath: "export.csv",
- columns: ["id", "title", "created_at"],
- includeHeader: true,
- });
- }
- main();
- """
- ),
- },
- {
- "lang": "CLI",
- "source": textwrap.dedent(
- """
- """
- ),
- },
- {
- "lang": "cURL",
- "source": textwrap.dedent(
- """
- curl -X POST "http://127.0.0.1:7272/v3/documents/export_entities" \
- -H "Authorization: Bearer YOUR_API_KEY" \
- -H "Content-Type: application/json" \
- -H "Accept: text/csv" \
- -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \
- --output export.csv
- """
- ),
- },
- ]
- },
- )
- @self.base_endpoint
- async def export_relationships(
- background_tasks: BackgroundTasks,
- id: UUID = Path(
- ...,
- description="The ID of the document to export entities from.",
- ),
- columns: Optional[list[str]] = Body(
- None, description="Specific columns to export"
- ),
- filters: Optional[dict] = Body(
- None, description="Filters to apply to the export"
- ),
- include_header: Optional[bool] = Body(
- True, description="Whether to include column headers"
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ) -> FileResponse:
- """
- Export documents as a downloadable CSV file.
- """
- if not auth_user.is_superuser:
- raise R2RException(
- "Only a superuser can export data.",
- 403,
- )
- csv_file_path, temp_file = (
- await self.services.management.export_document_relationships(
- id=id,
- columns=columns,
- filters=filters,
- include_header=include_header,
- )
- )
- background_tasks.add_task(temp_file.close)
- return FileResponse(
- path=csv_file_path,
- media_type="text/csv",
- filename="documents_export.csv",
- )
- @self.router.post(
- "/documents/search",
- dependencies=[Depends(self.rate_limit_dependency)],
- summary="Search document summaries",
- )
- @self.base_endpoint
- async def search_documents(
- query: str = Body(
- ...,
- description="The search query to perform.",
- ),
- search_mode: SearchMode = Body(
- default=SearchMode.custom,
- description=(
- "Default value of `custom` allows full control over search settings.\n\n"
- "Pre-configured search modes:\n"
- "`basic`: A simple semantic-based search.\n"
- "`advanced`: A more powerful hybrid search combining semantic and full-text.\n"
- "`custom`: Full control via `search_settings`.\n\n"
- "If `filters` or `limit` are provided alongside `basic` or `advanced`, "
- "they will override the default settings for that mode."
- ),
- ),
- search_settings: SearchSettings = Body(
- default_factory=SearchSettings,
- description="Settings for document search",
- ),
- auth_user=Depends(self.providers.auth.auth_wrapper()),
- ): # -> WrappedDocumentSearchResponse: # type: ignore
- """
- Perform a search query on the automatically generated document summaries in the system.
- This endpoint allows for complex filtering of search results using PostgreSQL-based queries.
- Filters can be applied to various fields such as document_id, and internal metadata values.
- Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`.
- """
- effective_settings = self._prepare_search_settings(
- auth_user, search_mode, search_settings
- )
- query_embedding = (
- await self.providers.embedding.async_get_embedding(query)
- )
- results = await self.services.retrieval.search_documents(
- query=query,
- query_embedding=query_embedding,
- settings=effective_settings,
- )
- return results
- @staticmethod
- async def _process_file(file):
- import base64
- content = await file.read()
- return {
- "filename": file.filename,
- "content": base64.b64encode(content).decode("utf-8"),
- "content_type": file.content_type,
- }
|