chunks_router.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import json
  2. import logging
  3. import textwrap
  4. from typing import Optional
  5. from uuid import UUID
  6. from fastapi import Body, Depends, Path, Query
  7. from core.base import (
  8. ChunkResponse,
  9. GraphSearchSettings,
  10. R2RException,
  11. SearchSettings,
  12. UpdateChunk,
  13. select_search_filters,
  14. )
  15. from core.base.api.models import (
  16. GenericBooleanResponse,
  17. WrappedBooleanResponse,
  18. WrappedChunkResponse,
  19. WrappedChunksResponse,
  20. WrappedVectorSearchResponse,
  21. )
  22. from ...abstractions import R2RProviders, R2RServices
  23. from ...config import R2RConfig
  24. from .base_router import BaseRouterV3
  25. logger = logging.getLogger()
  26. MAX_CHUNKS_PER_REQUEST = 1024 * 100
  27. class ChunksRouter(BaseRouterV3):
  28. def __init__(
  29. self, providers: R2RProviders, services: R2RServices, config: R2RConfig
  30. ):
  31. logging.info("Initializing ChunksRouter")
  32. super().__init__(providers, services, config)
  33. def _setup_routes(self):
  34. @self.router.post(
  35. "/chunks/search",
  36. summary="Search Chunks",
  37. dependencies=[Depends(self.rate_limit_dependency)],
  38. openapi_extra={
  39. "x-codeSamples": [
  40. {
  41. "lang": "Python",
  42. "source": textwrap.dedent("""
  43. from r2r import R2RClient
  44. client = R2RClient()
  45. response = client.chunks.search(
  46. query="search query",
  47. search_settings={
  48. "limit": 10
  49. }
  50. )
  51. """),
  52. }
  53. ]
  54. },
  55. )
  56. @self.base_endpoint
  57. async def search_chunks(
  58. query: str = Body(...),
  59. search_settings: SearchSettings = Body(
  60. default_factory=SearchSettings,
  61. ),
  62. auth_user=Depends(self.providers.auth.auth_wrapper()),
  63. ) -> WrappedVectorSearchResponse: # type: ignore
  64. # TODO - Deduplicate this code by sharing the code on the retrieval router
  65. """Perform a semantic search query over all stored chunks.
  66. This endpoint allows for complex filtering of search results using PostgreSQL-based queries.
  67. Filters can be applied to various fields such as document_id, and internal metadata values.
  68. Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`.
  69. """
  70. search_settings.filters = select_search_filters(
  71. auth_user, search_settings
  72. )
  73. search_settings.graph_settings = GraphSearchSettings(enabled=False)
  74. results = await self.services.retrieval.search(
  75. query=query,
  76. search_settings=search_settings,
  77. )
  78. return results.chunk_search_results # type: ignore
  79. @self.router.get(
  80. "/chunks/{id}",
  81. summary="Retrieve Chunk",
  82. dependencies=[Depends(self.rate_limit_dependency)],
  83. openapi_extra={
  84. "x-codeSamples": [
  85. {
  86. "lang": "Python",
  87. "source": textwrap.dedent("""
  88. from r2r import R2RClient
  89. client = R2RClient()
  90. response = client.chunks.retrieve(
  91. id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  92. )
  93. """),
  94. },
  95. {
  96. "lang": "JavaScript",
  97. "source": textwrap.dedent("""
  98. const { r2rClient } = require("r2r-js");
  99. const client = new r2rClient();
  100. function main() {
  101. const response = await client.chunks.retrieve({
  102. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  103. });
  104. }
  105. main();
  106. """),
  107. },
  108. ]
  109. },
  110. )
  111. @self.base_endpoint
  112. async def retrieve_chunk(
  113. id: UUID = Path(...),
  114. auth_user=Depends(self.providers.auth.auth_wrapper()),
  115. ) -> WrappedChunkResponse:
  116. """Get a specific chunk by its ID.
  117. Returns the chunk's content, metadata, and associated
  118. document/collection information. Users can only retrieve chunks
  119. they own or have access to through collections.
  120. """
  121. chunk = await self.services.ingestion.get_chunk(id)
  122. if not chunk:
  123. raise R2RException("Chunk not found", 404)
  124. # TODO - Add collection ID check
  125. if not auth_user.is_superuser and str(auth_user.id) != str(
  126. chunk["owner_id"]
  127. ):
  128. raise R2RException("Not authorized to access this chunk", 403)
  129. return ChunkResponse( # type: ignore
  130. id=chunk["id"],
  131. document_id=chunk["document_id"],
  132. owner_id=chunk["owner_id"],
  133. collection_ids=chunk["collection_ids"],
  134. text=chunk["text"],
  135. metadata=chunk["metadata"],
  136. # vector = chunk["vector"] # TODO - Add include vector flag
  137. )
  138. @self.router.post(
  139. "/chunks/{id}",
  140. summary="Update Chunk",
  141. dependencies=[Depends(self.rate_limit_dependency)],
  142. openapi_extra={
  143. "x-codeSamples": [
  144. {
  145. "lang": "Python",
  146. "source": textwrap.dedent("""
  147. from r2r import R2RClient
  148. client = R2RClient()
  149. response = client.chunks.update(
  150. {
  151. "id": "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
  152. "text": "Updated content",
  153. "metadata": {"key": "new value"}
  154. }
  155. )
  156. """),
  157. },
  158. {
  159. "lang": "JavaScript",
  160. "source": textwrap.dedent("""
  161. const { r2rClient } = require("r2r-js");
  162. const client = new r2rClient();
  163. function main() {
  164. const response = await client.chunks.update({
  165. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa",
  166. text: "Updated content",
  167. metadata: {key: "new value"}
  168. });
  169. }
  170. main();
  171. """),
  172. },
  173. ]
  174. },
  175. )
  176. @self.base_endpoint
  177. async def update_chunk(
  178. id: UUID = Path(...),
  179. chunk_update: UpdateChunk = Body(...),
  180. # TODO: Run with orchestration?
  181. auth_user=Depends(self.providers.auth.auth_wrapper()),
  182. ) -> WrappedChunkResponse:
  183. """Update an existing chunk's content and/or metadata.
  184. The chunk's vectors will be automatically recomputed based on the
  185. new content. Users can only update chunks they own unless they are
  186. superusers.
  187. """
  188. # Get the existing chunk to get its chunk_id
  189. existing_chunk = await self.services.ingestion.get_chunk(
  190. chunk_update.id
  191. )
  192. if existing_chunk is None:
  193. raise R2RException(f"Chunk {chunk_update.id} not found", 404)
  194. workflow_input = {
  195. "document_id": str(existing_chunk["document_id"]),
  196. "id": str(chunk_update.id),
  197. "text": chunk_update.text,
  198. "metadata": chunk_update.metadata
  199. or existing_chunk["metadata"],
  200. "user": auth_user.model_dump_json(),
  201. }
  202. logger.info("Running chunk ingestion without orchestration.")
  203. from core.main.orchestration import simple_ingestion_factory
  204. # TODO - CLEAN THIS UP
  205. simple_ingestor = simple_ingestion_factory(self.services.ingestion)
  206. await simple_ingestor["update-chunk"](workflow_input)
  207. return ChunkResponse( # type: ignore
  208. id=chunk_update.id,
  209. document_id=existing_chunk["document_id"],
  210. owner_id=existing_chunk["owner_id"],
  211. collection_ids=existing_chunk["collection_ids"],
  212. text=chunk_update.text,
  213. metadata=chunk_update.metadata or existing_chunk["metadata"],
  214. # vector = existing_chunk.get('vector')
  215. )
  216. @self.router.delete(
  217. "/chunks/{id}",
  218. summary="Delete Chunk",
  219. dependencies=[Depends(self.rate_limit_dependency)],
  220. openapi_extra={
  221. "x-codeSamples": [
  222. {
  223. "lang": "Python",
  224. "source": textwrap.dedent("""
  225. from r2r import R2RClient
  226. client = R2RClient()
  227. response = client.chunks.delete(
  228. id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  229. )
  230. """),
  231. },
  232. {
  233. "lang": "JavaScript",
  234. "source": textwrap.dedent("""
  235. const { r2rClient } = require("r2r-js");
  236. const client = new r2rClient();
  237. function main() {
  238. const response = await client.chunks.delete({
  239. id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa"
  240. });
  241. }
  242. main();
  243. """),
  244. },
  245. ]
  246. },
  247. )
  248. @self.base_endpoint
  249. async def delete_chunk(
  250. id: UUID = Path(...),
  251. auth_user=Depends(self.providers.auth.auth_wrapper()),
  252. ) -> WrappedBooleanResponse:
  253. """Delete a specific chunk by ID.
  254. This permanently removes the chunk and its associated vector
  255. embeddings. The parent document remains unchanged. Users can only
  256. delete chunks they own unless they are superusers.
  257. """
  258. # Get the existing chunk to get its chunk_id
  259. existing_chunk = await self.services.ingestion.get_chunk(id)
  260. if existing_chunk is None:
  261. raise R2RException(
  262. message=f"Chunk {id} not found", status_code=404
  263. )
  264. filters = {
  265. "$and": [
  266. {"owner_id": {"$eq": str(auth_user.id)}},
  267. {"chunk_id": {"$eq": str(id)}},
  268. ]
  269. }
  270. await (
  271. self.services.management.delete_documents_and_chunks_by_filter(
  272. filters=filters
  273. )
  274. )
  275. return GenericBooleanResponse(success=True) # type: ignore
  276. @self.router.get(
  277. "/chunks",
  278. dependencies=[Depends(self.rate_limit_dependency)],
  279. summary="List Chunks",
  280. openapi_extra={
  281. "x-codeSamples": [
  282. {
  283. "lang": "Python",
  284. "source": textwrap.dedent("""
  285. from r2r import R2RClient
  286. client = R2RClient()
  287. response = client.chunks.list(
  288. metadata_filter={"key": "value"},
  289. include_vectors=False,
  290. offset=0,
  291. limit=10,
  292. )
  293. """),
  294. },
  295. {
  296. "lang": "JavaScript",
  297. "source": textwrap.dedent("""
  298. const { r2rClient } = require("r2r-js");
  299. const client = new r2rClient();
  300. function main() {
  301. const response = await client.chunks.list({
  302. metadataFilter: {key: "value"},
  303. includeVectors: false,
  304. offset: 0,
  305. limit: 10,
  306. });
  307. }
  308. main();
  309. """),
  310. },
  311. ]
  312. },
  313. )
  314. @self.base_endpoint
  315. async def list_chunks(
  316. metadata_filter: Optional[str] = Query(
  317. None, description="Filter by metadata"
  318. ),
  319. include_vectors: bool = Query(
  320. False, description="Include vector data in response"
  321. ),
  322. offset: int = Query(
  323. 0,
  324. ge=0,
  325. description="Specifies the number of objects to skip. Defaults to 0.",
  326. ),
  327. limit: int = Query(
  328. 100,
  329. ge=1,
  330. le=1000,
  331. description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.",
  332. ),
  333. auth_user=Depends(self.providers.auth.auth_wrapper()),
  334. ) -> WrappedChunksResponse:
  335. """List chunks with pagination support.
  336. Returns a paginated list of chunks that the user has access to.
  337. Results can be filtered and sorted based on various parameters.
  338. Vector embeddings are only included if specifically requested.
  339. Regular users can only list chunks they own or have access to
  340. through collections. Superusers can list all chunks in the system.
  341. """ # Build filters
  342. filters = {}
  343. # Add user access control filter
  344. if not auth_user.is_superuser:
  345. filters["owner_id"] = {"$eq": str(auth_user.id)}
  346. # Add metadata filters if provided
  347. if metadata_filter:
  348. metadata_filter = json.loads(metadata_filter)
  349. # Get chunks using the vector handler's list_chunks method
  350. results = await self.services.ingestion.list_chunks(
  351. filters=filters,
  352. include_vectors=include_vectors,
  353. offset=offset,
  354. limit=limit,
  355. )
  356. # Convert to response format
  357. chunks = [
  358. ChunkResponse(
  359. id=chunk["id"],
  360. document_id=chunk["document_id"],
  361. owner_id=chunk["owner_id"],
  362. collection_ids=chunk["collection_ids"],
  363. text=chunk["text"],
  364. metadata=chunk["metadata"],
  365. vector=chunk.get("vector") if include_vectors else None,
  366. )
  367. for chunk in results["results"]
  368. ]
  369. return (chunks, {"total_entries": results["total_entries"]}) # type: ignore