chunks_router.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. import json
  2. import logging
  3. import textwrap
  4. from typing import Optional
  5. from uuid import UUID
  6. from fastapi import Body, Depends, Path, Query
  7. from core.base import (
  8. ChunkResponse,
  9. GraphSearchSettings,
  10. R2RException,
  11. SearchSettings,
  12. UpdateChunk,
  13. select_search_filters,
  14. )
  15. from core.base.api.models import (
  16. GenericBooleanResponse,
  17. WrappedBooleanResponse,
  18. WrappedChunkResponse,
  19. WrappedChunksResponse,
  20. WrappedVectorSearchResponse,
  21. )
  22. from core.providers import (
  23. HatchetOrchestrationProvider,
  24. SimpleOrchestrationProvider,
  25. )
  26. from ...abstractions import R2RProviders, R2RServices
  27. from .base_router import BaseRouterV3
  28. logger = logging.getLogger()
  29. MAX_CHUNKS_PER_REQUEST = 1024 * 100
  30. class ChunksRouter(BaseRouterV3):
  31. def __init__(
  32. self,
  33. providers: R2RProviders,
  34. services: R2RServices,
  35. ):
  36. super().__init__(providers, services)
  37. def _setup_routes(self):
  38. @self.router.post(
  39. "/chunks/search",
  40. summary="Search Chunks",
  41. dependencies=[Depends(self.rate_limit_dependency)],
  42. openapi_extra={
  43. "x-codeSamples": [
  44. {
  45. "lang": "Python",
  46. "source": textwrap.dedent(
  47. """
  48. from r2r import R2RClient
  49. client = R2RClient("http://localhost:7272")
  50. response = client.chunks.search(
  51. query="search query",
  52. search_settings={
  53. "limit": 10
  54. }
  55. )
  56. """
  57. ),
  58. }
  59. ]
  60. },
  61. )
  62. @self.base_endpoint
  63. async def search_chunks(
  64. query: str = Body(...),
  65. search_settings: SearchSettings = Body(
  66. default_factory=SearchSettings,
  67. ),
  68. auth_user=Depends(self.providers.auth.auth_wrapper()),
  69. ) -> WrappedVectorSearchResponse: # type: ignore
  70. # TODO - Deduplicate this code by sharing the code on the retrieval router
  71. """
  72. Perform a semantic search query over all stored chunks.
  73. This endpoint allows for complex filtering of search results using PostgreSQL-based queries.
  74. Filters can be applied to various fields such as document_id, and internal metadata values.
  75. Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`.
  76. """
  77. search_settings.filters = select_search_filters(
  78. auth_user, search_settings
  79. )
  80. search_settings.graph_settings = GraphSearchSettings(enabled=False)
  81. results = await self.services.retrieval.search(
  82. query=query,
  83. search_settings=search_settings,
  84. )
  85. return results["chunk_search_results"]
  86. @self.router.get(
  87. "/chunks/{id}",
  88. summary="Retrieve Chunk",
  89. dependencies=[Depends(self.rate_limit_dependency)],
  90. openapi_extra={
  91. "x-codeSamples": [
  92. {
  93. "lang": "Python",
  94. "source": textwrap.dedent(
  95. """
  96. from r2r import R2RClient
  97. client = R2RClient("http://localhost:7272")
  98. response = client.chunks.retrieve(
  99. id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  100. )
  101. """
  102. ),
  103. },
  104. {
  105. "lang": "JavaScript",
  106. "source": textwrap.dedent(
  107. """
  108. const { r2rClient } = require("r2r-js");
  109. const client = new r2rClient("http://localhost:7272");
  110. function main() {
  111. const response = await client.chunks.retrieve({
  112. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  113. });
  114. }
  115. main();
  116. """
  117. ),
  118. },
  119. ]
  120. },
  121. )
  122. @self.base_endpoint
  123. async def retrieve_chunk(
  124. id: UUID = Path(...),
  125. auth_user=Depends(self.providers.auth.auth_wrapper()),
  126. ) -> WrappedChunkResponse:
  127. """
  128. Get a specific chunk by its ID.
  129. Returns the chunk's content, metadata, and associated document/collection information.
  130. Users can only retrieve chunks they own or have access to through collections.
  131. """
  132. chunk = await self.services.ingestion.get_chunk(id)
  133. if not chunk:
  134. raise R2RException("Chunk not found", 404)
  135. # # Check access rights
  136. # document = await self.services.management.get_document(chunk.document_id)
  137. # TODO - Add collection ID check
  138. if not auth_user.is_superuser and str(auth_user.id) != str(
  139. chunk["owner_id"]
  140. ):
  141. raise R2RException("Not authorized to access this chunk", 403)
  142. return ChunkResponse( # type: ignore
  143. id=chunk["id"],
  144. document_id=chunk["document_id"],
  145. owner_id=chunk["owner_id"],
  146. collection_ids=chunk["collection_ids"],
  147. text=chunk["text"],
  148. metadata=chunk["metadata"],
  149. # vector = chunk["vector"] # TODO - Add include vector flag
  150. )
  151. @self.router.post(
  152. "/chunks/{id}",
  153. summary="Update Chunk",
  154. dependencies=[Depends(self.rate_limit_dependency)],
  155. openapi_extra={
  156. "x-codeSamples": [
  157. {
  158. "lang": "Python",
  159. "source": textwrap.dedent(
  160. """
  161. from r2r import R2RClient
  162. client = R2RClient("http://localhost:7272")
  163. response = client.chunks.update(
  164. {
  165. "id": "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
  166. "text": "Updated content",
  167. "metadata": {"key": "new value"}
  168. }
  169. )
  170. """
  171. ),
  172. },
  173. {
  174. "lang": "JavaScript",
  175. "source": textwrap.dedent(
  176. """
  177. const { r2rClient } = require("r2r-js");
  178. const client = new r2rClient("http://localhost:7272");
  179. function main() {
  180. const response = await client.chunks.update({
  181. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
  182. text: "Updated content",
  183. metadata: {key: "new value"}
  184. });
  185. }
  186. main();
  187. """
  188. ),
  189. },
  190. ]
  191. },
  192. )
  193. @self.base_endpoint
  194. async def update_chunk(
  195. id: UUID = Path(...),
  196. chunk_update: UpdateChunk = Body(...),
  197. # TODO: Run with orchestration?
  198. auth_user=Depends(self.providers.auth.auth_wrapper()),
  199. ) -> WrappedChunkResponse:
  200. """
  201. Update an existing chunk's content and/or metadata.
  202. The chunk's vectors will be automatically recomputed based on the new content.
  203. Users can only update chunks they own unless they are superusers.
  204. """
  205. # Get the existing chunk to get its chunk_id
  206. existing_chunk = await self.services.ingestion.get_chunk(
  207. chunk_update.id
  208. )
  209. if existing_chunk is None:
  210. raise R2RException(f"Chunk {chunk_update.id} not found", 404)
  211. workflow_input = {
  212. "document_id": str(existing_chunk["document_id"]),
  213. "id": str(chunk_update.id),
  214. "text": chunk_update.text,
  215. "metadata": chunk_update.metadata
  216. or existing_chunk["metadata"],
  217. "user": auth_user.model_dump_json(),
  218. }
  219. logger.info("Running chunk ingestion without orchestration.")
  220. from core.main.orchestration import simple_ingestion_factory
  221. # TODO - CLEAN THIS UP
  222. simple_ingestor = simple_ingestion_factory(self.services.ingestion)
  223. await simple_ingestor["update-chunk"](workflow_input)
  224. return ChunkResponse( # type: ignore
  225. id=chunk_update.id,
  226. document_id=existing_chunk["document_id"],
  227. owner_id=existing_chunk["owner_id"],
  228. collection_ids=existing_chunk["collection_ids"],
  229. text=chunk_update.text,
  230. metadata=chunk_update.metadata or existing_chunk["metadata"],
  231. # vector = existing_chunk.get('vector')
  232. )
  233. @self.router.delete(
  234. "/chunks/{id}",
  235. summary="Delete Chunk",
  236. dependencies=[Depends(self.rate_limit_dependency)],
  237. openapi_extra={
  238. "x-codeSamples": [
  239. {
  240. "lang": "Python",
  241. "source": textwrap.dedent(
  242. """
  243. from r2r import R2RClient
  244. client = R2RClient("http://localhost:7272")
  245. response = client.chunks.delete(
  246. id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  247. )
  248. """
  249. ),
  250. },
  251. {
  252. "lang": "JavaScript",
  253. "source": textwrap.dedent(
  254. """
  255. const { r2rClient } = require("r2r-js");
  256. const client = new r2rClient("http://localhost:7272");
  257. function main() {
  258. const response = await client.chunks.delete({
  259. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  260. });
  261. }
  262. main();
  263. """
  264. ),
  265. },
  266. ]
  267. },
  268. )
  269. @self.base_endpoint
  270. async def delete_chunk(
  271. id: UUID = Path(...),
  272. auth_user=Depends(self.providers.auth.auth_wrapper()),
  273. ) -> WrappedBooleanResponse:
  274. """
  275. Delete a specific chunk by ID.
  276. This permanently removes the chunk and its associated vector embeddings.
  277. The parent document remains unchanged. Users can only delete chunks they
  278. own unless they are superusers.
  279. """
  280. # Get the existing chunk to get its chunk_id
  281. existing_chunk = await self.services.ingestion.get_chunk(id)
  282. if existing_chunk is None:
  283. raise R2RException(
  284. message=f"Chunk {id} not found", status_code=404
  285. )
  286. filters = {
  287. "$and": [
  288. {"owner_id": {"$eq": str(auth_user.id)}},
  289. {"chunk_id": {"$eq": str(id)}},
  290. ]
  291. }
  292. await self.services.management.delete_documents_and_chunks_by_filter(
  293. filters=filters
  294. )
  295. return GenericBooleanResponse(success=True) # type: ignore
  296. @self.router.get(
  297. "/chunks",
  298. dependencies=[Depends(self.rate_limit_dependency)],
  299. summary="List Chunks",
  300. openapi_extra={
  301. "x-codeSamples": [
  302. {
  303. "lang": "Python",
  304. "source": textwrap.dedent(
  305. """
  306. from r2r import R2RClient
  307. client = R2RClient("http://localhost:7272")
  308. response = client.chunks.list(
  309. metadata_filter={"key": "value"},
  310. include_vectors=False,
  311. offset=0,
  312. limit=10,
  313. )
  314. """
  315. ),
  316. },
  317. {
  318. "lang": "JavaScript",
  319. "source": textwrap.dedent(
  320. """
  321. const { r2rClient } = require("r2r-js");
  322. const client = new r2rClient("http://localhost:7272");
  323. function main() {
  324. const response = await client.chunks.list({
  325. metadataFilter: {key: "value"},
  326. includeVectors: false,
  327. offset: 0,
  328. limit: 10,
  329. });
  330. }
  331. main();
  332. """
  333. ),
  334. },
  335. ]
  336. },
  337. )
  338. @self.base_endpoint
  339. async def list_chunks(
  340. metadata_filter: Optional[str] = Query(
  341. None, description="Filter by metadata"
  342. ),
  343. include_vectors: bool = Query(
  344. False, description="Include vector data in response"
  345. ),
  346. offset: int = Query(
  347. 0,
  348. ge=0,
  349. description="Specifies the number of objects to skip. Defaults to 0.",
  350. ),
  351. limit: int = Query(
  352. 100,
  353. ge=1,
  354. le=1000,
  355. description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
  356. ),
  357. auth_user=Depends(self.providers.auth.auth_wrapper()),
  358. ) -> WrappedChunksResponse:
  359. """
  360. List chunks with pagination support.
  361. Returns a paginated list of chunks that the user has access to.
  362. Results can be filtered and sorted based on various parameters.
  363. Vector embeddings are only included if specifically requested.
  364. Regular users can only list chunks they own or have access to through
  365. collections. Superusers can list all chunks in the system.
  366. """ # Build filters
  367. filters = {}
  368. # Add user access control filter
  369. if not auth_user.is_superuser:
  370. filters["owner_id"] = {"$eq": str(auth_user.id)}
  371. # Add metadata filters if provided
  372. if metadata_filter:
  373. metadata_filter = json.loads(metadata_filter)
  374. # Get chunks using the vector handler's list_chunks method
  375. results = await self.services.ingestion.list_chunks(
  376. filters=filters,
  377. include_vectors=include_vectors,
  378. offset=offset,
  379. limit=limit,
  380. )
  381. # Convert to response format
  382. chunks = [
  383. ChunkResponse(
  384. id=chunk["id"],
  385. document_id=chunk["document_id"],
  386. owner_id=chunk["owner_id"],
  387. collection_ids=chunk["collection_ids"],
  388. text=chunk["text"],
  389. metadata=chunk["metadata"],
  390. vector=chunk.get("vector") if include_vectors else None,
  391. )
  392. for chunk in results["results"]
  393. ]
  394. return (chunks, results["page_info"]) # type: ignore