ingestion_service.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815
  1. import asyncio
  2. import json
  3. import logging
  4. import uuid
  5. from datetime import datetime
  6. from typing import Any, AsyncGenerator, Optional, Sequence
  7. from uuid import UUID
  8. from fastapi import HTTPException
  9. from core.base import (
  10. Document,
  11. DocumentChunk,
  12. DocumentResponse,
  13. DocumentType,
  14. GenerationConfig,
  15. IngestionStatus,
  16. R2RException,
  17. RawChunk,
  18. RunManager,
  19. UnprocessedChunk,
  20. Vector,
  21. VectorEntry,
  22. VectorType,
  23. decrement_version,
  24. )
  25. from core.base.abstractions import (
  26. ChunkEnrichmentSettings,
  27. ChunkEnrichmentStrategy,
  28. IndexMeasure,
  29. IndexMethod,
  30. VectorTableName,
  31. )
  32. from core.base.api.models import User
  33. from core.telemetry.telemetry_decorator import telemetry_event
  34. from ..abstractions import R2RAgents, R2RPipelines, R2RPipes, R2RProviders
  35. from ..config import R2RConfig
  36. from .base import Service
  37. logger = logging.getLogger()
  38. MB_CONVERSION_FACTOR = 1024 * 1024
  39. STARTING_VERSION = "v0"
  40. MAX_FILES_PER_INGESTION = 100
  41. OVERVIEW_FETCH_PAGE_SIZE = 1_000
  42. class IngestionService(Service):
  43. def __init__(
  44. self,
  45. config: R2RConfig,
  46. providers: R2RProviders,
  47. pipes: R2RPipes,
  48. pipelines: R2RPipelines,
  49. agents: R2RAgents,
  50. run_manager: RunManager,
  51. ) -> None:
  52. super().__init__(
  53. config,
  54. providers,
  55. pipes,
  56. pipelines,
  57. agents,
  58. run_manager,
  59. )
  60. @telemetry_event("IngestFile")
  61. async def ingest_file_ingress(
  62. self,
  63. file_data: dict,
  64. user: User,
  65. document_id: UUID,
  66. size_in_bytes,
  67. metadata: Optional[dict] = None,
  68. version: Optional[str] = None,
  69. collection_ids: Optional[list[UUID]] = None,
  70. *args: Any,
  71. **kwargs: Any,
  72. ) -> dict:
  73. try:
  74. if not file_data:
  75. raise R2RException(
  76. status_code=400, message="No files provided for ingestion."
  77. )
  78. if not file_data.get("filename"):
  79. raise R2RException(
  80. status_code=400, message="File name not provided."
  81. )
  82. metadata = metadata or {}
  83. version = version or STARTING_VERSION
  84. document_info = self._create_document_info_from_file(
  85. document_id,
  86. user,
  87. file_data["filename"],
  88. metadata,
  89. version,
  90. size_in_bytes,
  91. )
  92. existing_document_info = (
  93. await self.providers.database.documents_handler.get_documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  94. offset=0,
  95. limit=100,
  96. filter_user_ids=[user.id],
  97. filter_document_ids=[document_id],
  98. )
  99. )["results"]
  100. if len(existing_document_info) > 0:
  101. existing_doc = existing_document_info[0]
  102. if existing_doc.ingestion_status == IngestionStatus.SUCCESS:
  103. raise R2RException(
  104. status_code=409,
  105. message=f"Document {document_id} already exists. Submit a DELETE request to `/documents/{document_id}` to delete this document and allow for re-ingestion.",
  106. )
  107. elif existing_doc.ingestion_status != IngestionStatus.FAILED:
  108. raise R2RException(
  109. status_code=409,
  110. message=f"Document {document_id} is currently ingesting with status {existing_doc.ingestion_status}.",
  111. )
  112. await self.providers.database.documents_handler.upsert_documents_overview(
  113. document_info
  114. )
  115. return {
  116. "info": document_info,
  117. }
  118. except R2RException as e:
  119. logger.error(f"R2RException in ingest_file_ingress: {str(e)}")
  120. raise
  121. except Exception as e:
  122. raise HTTPException(
  123. status_code=500, detail=f"Error during ingestion: {str(e)}"
  124. )
  125. def _create_document_info_from_file(
  126. self,
  127. document_id: UUID,
  128. user: User,
  129. file_name: str,
  130. metadata: dict,
  131. version: str,
  132. size_in_bytes: int,
  133. ) -> DocumentResponse:
  134. file_extension = (
  135. file_name.split(".")[-1].lower() if file_name != "N/A" else "txt"
  136. )
  137. if file_extension.upper() not in DocumentType.__members__:
  138. raise R2RException(
  139. status_code=415,
  140. message=f"'{file_extension}' is not a valid DocumentType.",
  141. )
  142. metadata = metadata or {}
  143. metadata["version"] = version
  144. return DocumentResponse(
  145. id=document_id,
  146. owner_id=user.id,
  147. collection_ids=metadata.get("collection_ids", []),
  148. document_type=DocumentType[file_extension.upper()],
  149. title=(
  150. metadata.get("title", file_name.split("/")[-1])
  151. if file_name != "N/A"
  152. else "N/A"
  153. ),
  154. metadata=metadata,
  155. version=version,
  156. size_in_bytes=size_in_bytes,
  157. ingestion_status=IngestionStatus.PENDING,
  158. created_at=datetime.now(),
  159. updated_at=datetime.now(),
  160. )
  161. def _create_document_info_from_chunks(
  162. self,
  163. document_id: UUID,
  164. user: User,
  165. chunks: list[RawChunk],
  166. metadata: dict,
  167. version: str,
  168. ) -> DocumentResponse:
  169. metadata = metadata or {}
  170. metadata["version"] = version
  171. return DocumentResponse(
  172. id=document_id,
  173. owner_id=user.id,
  174. collection_ids=metadata.get("collection_ids", []),
  175. document_type=DocumentType.TXT,
  176. title=metadata.get("title", f"Ingested Chunks - {document_id}"),
  177. metadata=metadata,
  178. version=version,
  179. size_in_bytes=sum(
  180. len(chunk.text.encode("utf-8")) for chunk in chunks
  181. ),
  182. ingestion_status=IngestionStatus.PENDING,
  183. created_at=datetime.now(),
  184. updated_at=datetime.now(),
  185. )
  186. async def parse_file(
  187. self, document_info: DocumentResponse, ingestion_config: dict
  188. ) -> AsyncGenerator[DocumentChunk, None]:
  189. return await self.pipes.parsing_pipe.run(
  190. input=self.pipes.parsing_pipe.Input(
  191. message=Document(
  192. id=document_info.id,
  193. collection_ids=document_info.collection_ids,
  194. owner_id=document_info.owner_id,
  195. metadata={
  196. "document_type": document_info.document_type.value,
  197. **document_info.metadata,
  198. },
  199. document_type=document_info.document_type,
  200. )
  201. ),
  202. state=None,
  203. run_manager=self.run_manager,
  204. ingestion_config=ingestion_config,
  205. )
  206. async def augment_document_info(
  207. self,
  208. document_info: DocumentResponse,
  209. chunked_documents: list[dict],
  210. ) -> None:
  211. if not self.config.ingestion.skip_document_summary:
  212. document = f"Document Title: {document_info.title}\n"
  213. if document_info.metadata != {}:
  214. document += f"Document Metadata: {json.dumps(document_info.metadata)}\n"
  215. document += "Document Text:\n"
  216. for chunk in chunked_documents[
  217. 0 : self.config.ingestion.chunks_for_document_summary
  218. ]:
  219. document += chunk["data"]
  220. messages = await self.providers.database.prompts_handler.get_message_payload(
  221. system_prompt_name=self.config.ingestion.document_summary_system_prompt,
  222. task_prompt_name=self.config.ingestion.document_summary_task_prompt,
  223. task_inputs={"document": document},
  224. )
  225. response = await self.providers.llm.aget_completion(
  226. messages=messages,
  227. generation_config=GenerationConfig(
  228. model=self.config.ingestion.document_summary_model
  229. ),
  230. )
  231. document_info.summary = response.choices[0].message.content # type: ignore
  232. if not document_info.summary:
  233. raise ValueError("Expected a generated response.")
  234. embedding = await self.providers.embedding.async_get_embedding(
  235. text=document_info.summary,
  236. )
  237. document_info.summary_embedding = embedding
  238. return
  239. async def embed_document(
  240. self,
  241. chunked_documents: list[dict],
  242. ) -> AsyncGenerator[VectorEntry, None]:
  243. return await self.pipes.embedding_pipe.run(
  244. input=self.pipes.embedding_pipe.Input(
  245. message=[
  246. DocumentChunk.from_dict(chunk)
  247. for chunk in chunked_documents
  248. ]
  249. ),
  250. state=None,
  251. run_manager=self.run_manager,
  252. )
  253. async def store_embeddings(
  254. self,
  255. embeddings: Sequence[dict | VectorEntry],
  256. ) -> AsyncGenerator[str, None]:
  257. vector_entries = [
  258. (
  259. embedding
  260. if isinstance(embedding, VectorEntry)
  261. else VectorEntry.from_dict(embedding)
  262. )
  263. for embedding in embeddings
  264. ]
  265. return await self.pipes.vector_storage_pipe.run(
  266. input=self.pipes.vector_storage_pipe.Input(message=vector_entries),
  267. state=None,
  268. run_manager=self.run_manager,
  269. )
  270. async def finalize_ingestion(
  271. self, document_info: DocumentResponse
  272. ) -> None:
  273. async def empty_generator():
  274. yield document_info
  275. return empty_generator()
  276. async def update_document_status(
  277. self,
  278. document_info: DocumentResponse,
  279. status: IngestionStatus,
  280. ) -> None:
  281. document_info.ingestion_status = status
  282. await self._update_document_status_in_db(document_info)
  283. async def _update_document_status_in_db(
  284. self, document_info: DocumentResponse
  285. ):
  286. try:
  287. await self.providers.database.documents_handler.upsert_documents_overview(
  288. document_info
  289. )
  290. except Exception as e:
  291. logger.error(
  292. f"Failed to update document status: {document_info.id}. Error: {str(e)}"
  293. )
  294. async def _collect_results(self, result_gen: Any) -> list[dict]:
  295. results = []
  296. async for res in result_gen:
  297. results.append(res.model_dump_json())
  298. return results
  299. @telemetry_event("IngestChunks")
  300. async def ingest_chunks_ingress(
  301. self,
  302. document_id: UUID,
  303. metadata: Optional[dict],
  304. chunks: list[RawChunk],
  305. user: User,
  306. *args: Any,
  307. **kwargs: Any,
  308. ) -> DocumentResponse:
  309. if not chunks:
  310. raise R2RException(
  311. status_code=400, message="No chunks provided for ingestion."
  312. )
  313. metadata = metadata or {}
  314. version = STARTING_VERSION
  315. document_info = self._create_document_info_from_chunks(
  316. document_id,
  317. user,
  318. chunks,
  319. metadata,
  320. version,
  321. )
  322. existing_document_info = (
  323. await self.providers.database.documents_handler.get_documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  324. offset=0,
  325. limit=100,
  326. filter_user_ids=[user.id],
  327. filter_document_ids=[document_id],
  328. )
  329. )["results"]
  330. if len(existing_document_info) > 0:
  331. existing_doc = existing_document_info[0]
  332. if existing_doc.ingestion_status != IngestionStatus.FAILED:
  333. raise R2RException(
  334. status_code=409,
  335. message=f"Document {document_id} was already ingested and is not in a failed state.",
  336. )
  337. await self.providers.database.documents_handler.upsert_documents_overview(
  338. document_info
  339. )
  340. return document_info
  341. @telemetry_event("UpdateChunk")
  342. async def update_chunk_ingress(
  343. self,
  344. document_id: UUID,
  345. chunk_id: UUID,
  346. text: str,
  347. user: User,
  348. metadata: Optional[dict] = None,
  349. *args: Any,
  350. **kwargs: Any,
  351. ) -> dict:
  352. # Verify chunk exists and user has access
  353. existing_chunks = await self.providers.database.chunks_handler.list_document_chunks( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  354. document_id=document_id,
  355. offset=0,
  356. limit=1,
  357. )
  358. if not existing_chunks["results"]:
  359. raise R2RException(
  360. status_code=404,
  361. message=f"Chunk with chunk_id {chunk_id} not found.",
  362. )
  363. existing_chunk = (
  364. await self.providers.database.chunks_handler.get_chunk(chunk_id)
  365. )
  366. if not existing_chunk:
  367. raise R2RException(
  368. status_code=404,
  369. message=f"Chunk with id {chunk_id} not found",
  370. )
  371. if (
  372. str(existing_chunk["owner_id"]) != str(user.id)
  373. and not user.is_superuser
  374. ):
  375. raise R2RException(
  376. status_code=403,
  377. message="You don't have permission to modify this chunk.",
  378. )
  379. # Handle metadata merging
  380. if metadata is not None:
  381. merged_metadata = {
  382. **existing_chunk["metadata"],
  383. **metadata,
  384. }
  385. else:
  386. merged_metadata = existing_chunk["metadata"]
  387. # Create updated extraction
  388. extraction_data = {
  389. "id": chunk_id,
  390. "document_id": document_id,
  391. "collection_ids": kwargs.get(
  392. "collection_ids", existing_chunk["collection_ids"]
  393. ),
  394. "owner_id": existing_chunk["owner_id"],
  395. "data": text or existing_chunk["text"],
  396. "metadata": merged_metadata,
  397. }
  398. extraction = DocumentChunk(**extraction_data).model_dump()
  399. embedding_generator = await self.embed_document([extraction])
  400. embeddings = [
  401. embedding.model_dump() async for embedding in embedding_generator
  402. ]
  403. storage_generator = await self.store_embeddings(embeddings)
  404. async for _ in storage_generator:
  405. pass
  406. return extraction
  407. async def _get_enriched_chunk_text(
  408. self,
  409. chunk_idx: int,
  410. chunk: dict,
  411. document_id: UUID,
  412. chunk_enrichment_settings: ChunkEnrichmentSettings,
  413. list_document_chunks: list[dict],
  414. document_chunks_dict: dict,
  415. ) -> VectorEntry:
  416. # get chunks in context
  417. context_chunk_ids: list[UUID] = []
  418. for enrichment_strategy in chunk_enrichment_settings.strategies:
  419. if enrichment_strategy == ChunkEnrichmentStrategy.NEIGHBORHOOD:
  420. context_chunk_ids.extend(
  421. list_document_chunks[chunk_idx - prev]["chunk_id"]
  422. for prev in range(
  423. 1, chunk_enrichment_settings.backward_chunks + 1
  424. )
  425. if chunk_idx - prev >= 0
  426. )
  427. context_chunk_ids.extend(
  428. list_document_chunks[chunk_idx + next]["chunk_id"]
  429. for next in range(
  430. 1, chunk_enrichment_settings.forward_chunks + 1
  431. )
  432. if chunk_idx + next < len(list_document_chunks)
  433. )
  434. elif enrichment_strategy == ChunkEnrichmentStrategy.SEMANTIC:
  435. semantic_neighbors = await self.providers.database.chunks_handler.get_semantic_neighbors(
  436. offset=0,
  437. limit=chunk_enrichment_settings.semantic_neighbors,
  438. document_id=document_id,
  439. chunk_id=chunk["chunk_id"],
  440. similarity_threshold=chunk_enrichment_settings.semantic_similarity_threshold,
  441. )
  442. context_chunk_ids.extend(
  443. neighbor["chunk_id"] for neighbor in semantic_neighbors
  444. )
  445. # weird behavior, sometimes we get UUIDs
  446. # FIXME: figure out why
  447. context_chunk_ids_str = list(
  448. set(
  449. [
  450. str(context_chunk_id)
  451. for context_chunk_id in context_chunk_ids
  452. ]
  453. )
  454. )
  455. context_chunk_ids_uuid = [
  456. UUID(context_chunk_id)
  457. for context_chunk_id in context_chunk_ids_str
  458. ]
  459. context_chunk_texts = [
  460. (
  461. document_chunks_dict[context_chunk_id]["text"],
  462. document_chunks_dict[context_chunk_id]["metadata"][
  463. "chunk_order"
  464. ],
  465. )
  466. for context_chunk_id in context_chunk_ids_uuid
  467. ]
  468. # sort by chunk_order
  469. context_chunk_texts.sort(key=lambda x: x[1])
  470. # enrich chunk
  471. try:
  472. updated_chunk_text = (
  473. (
  474. await self.providers.llm.aget_completion(
  475. messages=await self.providers.database.prompts_handler.get_message_payload(
  476. task_prompt_name="chunk_enrichment",
  477. task_inputs={
  478. "context_chunks": "\n".join(
  479. text for text, _ in context_chunk_texts
  480. ),
  481. "chunk": chunk["text"],
  482. },
  483. ),
  484. generation_config=chunk_enrichment_settings.generation_config,
  485. )
  486. )
  487. .choices[0]
  488. .message.content
  489. )
  490. except Exception as e:
  491. updated_chunk_text = chunk["text"]
  492. chunk["metadata"]["chunk_enrichment_status"] = "failed"
  493. else:
  494. if not updated_chunk_text:
  495. updated_chunk_text = chunk["text"]
  496. chunk["metadata"]["chunk_enrichment_status"] = "failed"
  497. else:
  498. chunk["metadata"]["chunk_enrichment_status"] = "success"
  499. data = await self.providers.embedding.async_get_embedding(
  500. updated_chunk_text or chunk["text"]
  501. )
  502. chunk["metadata"]["original_text"] = chunk["text"]
  503. return VectorEntry(
  504. id=uuid.uuid5(uuid.NAMESPACE_DNS, str(chunk["chunk_id"])),
  505. vector=Vector(data=data, type=VectorType.FIXED, length=len(data)),
  506. document_id=document_id,
  507. owner_id=chunk["owner_id"],
  508. collection_ids=chunk["collection_ids"],
  509. text=updated_chunk_text or chunk["text"],
  510. metadata=chunk["metadata"],
  511. )
  512. async def chunk_enrichment(
  513. self,
  514. document_id: UUID,
  515. chunk_enrichment_settings: ChunkEnrichmentSettings,
  516. ) -> int:
  517. # just call the pipe on every chunk of the document
  518. # TODO: Why is the config not recognized as an ingestionconfig but as a providerconfig?
  519. chunk_enrichment_settings = (
  520. self.providers.ingestion.config.chunk_enrichment_settings # type: ignore
  521. )
  522. # get all list_document_chunks
  523. list_document_chunks = (
  524. await self.providers.database.chunks_handler.list_document_chunks( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  525. document_id=document_id,
  526. offset=0,
  527. limit=100,
  528. )
  529. )["results"]
  530. new_vector_entries = []
  531. document_chunks_dict = {
  532. chunk["chunk_id"]: chunk for chunk in list_document_chunks
  533. }
  534. tasks = []
  535. total_completed = 0
  536. for chunk_idx, chunk in enumerate(list_document_chunks):
  537. tasks.append(
  538. self._get_enriched_chunk_text(
  539. chunk_idx,
  540. chunk,
  541. document_id,
  542. chunk_enrichment_settings,
  543. list_document_chunks,
  544. document_chunks_dict,
  545. )
  546. )
  547. if len(tasks) == 128:
  548. new_vector_entries.extend(await asyncio.gather(*tasks))
  549. total_completed += 128
  550. logger.info(
  551. f"Completed {total_completed} out of {len(list_document_chunks)} chunks for document {document_id}"
  552. )
  553. tasks = []
  554. new_vector_entries.extend(await asyncio.gather(*tasks))
  555. logger.info(
  556. f"Completed enrichment of {len(list_document_chunks)} chunks for document {document_id}"
  557. )
  558. # delete old chunks from vector db
  559. await self.providers.database.chunks_handler.delete(
  560. filters={
  561. "document_id": document_id,
  562. },
  563. )
  564. # embed and store the enriched chunk
  565. await self.providers.database.chunks_handler.upsert_entries(
  566. new_vector_entries
  567. )
  568. return len(new_vector_entries)
  569. # TODO - This should return a typed object
  570. async def list_chunks(
  571. self,
  572. offset: int,
  573. limit: int,
  574. filters: Optional[dict[str, Any]] = None,
  575. include_vectors: bool = False,
  576. *args: Any,
  577. **kwargs: Any,
  578. ) -> dict:
  579. return await self.providers.database.chunks_handler.list_chunks(
  580. offset=offset,
  581. limit=limit,
  582. filters=filters,
  583. include_vectors=include_vectors,
  584. )
  585. # TODO - This should return a typed object
  586. async def get_chunk(
  587. self,
  588. # document_id: UUID,
  589. chunk_id: UUID,
  590. *args: Any,
  591. **kwargs: Any,
  592. ) -> dict:
  593. return await self.providers.database.chunks_handler.get_chunk(chunk_id)
  594. async def update_document_metadata(
  595. self,
  596. document_id: UUID,
  597. metadata: dict,
  598. user: User,
  599. ) -> None:
  600. # Verify document exists and user has access
  601. existing_document = await self.providers.database.documents_handler.get_documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  602. offset=0,
  603. limit=100,
  604. filter_document_ids=[document_id],
  605. filter_user_ids=[user.id],
  606. )
  607. if not existing_document["results"]:
  608. raise R2RException(
  609. status_code=404,
  610. message=f"Document with id {document_id} not found or you don't have access.",
  611. )
  612. existing_document = existing_document["results"][0]
  613. # Merge metadata
  614. merged_metadata = {
  615. **existing_document.metadata, # type: ignore
  616. **metadata,
  617. }
  618. # Update document metadata
  619. existing_document.metadata = merged_metadata # type: ignore
  620. await self.providers.database.documents_handler.upsert_documents_overview(
  621. existing_document # type: ignore
  622. )
  623. class IngestionServiceAdapter:
  624. @staticmethod
  625. def _parse_user_data(user_data) -> User:
  626. if isinstance(user_data, str):
  627. try:
  628. user_data = json.loads(user_data)
  629. except json.JSONDecodeError as e:
  630. raise ValueError(
  631. f"Invalid user data format: {user_data}"
  632. ) from e
  633. return User.from_dict(user_data)
  634. @staticmethod
  635. def _parse_chunk_enrichment_settings(
  636. chunk_enrichment_settings: dict,
  637. ) -> ChunkEnrichmentSettings:
  638. if isinstance(chunk_enrichment_settings, str):
  639. try:
  640. chunk_enrichment_settings = json.loads(
  641. chunk_enrichment_settings
  642. )
  643. except json.JSONDecodeError as e:
  644. raise ValueError(
  645. f"Invalid chunk enrichment settings format: {chunk_enrichment_settings}"
  646. ) from e
  647. return ChunkEnrichmentSettings.from_dict(chunk_enrichment_settings)
  648. @staticmethod
  649. def parse_ingest_file_input(data: dict) -> dict:
  650. return {
  651. "user": IngestionServiceAdapter._parse_user_data(data["user"]),
  652. "metadata": data["metadata"],
  653. "document_id": (
  654. UUID(data["document_id"]) if data["document_id"] else None
  655. ),
  656. "version": data.get("version"),
  657. "ingestion_config": data["ingestion_config"] or {},
  658. "file_data": data["file_data"],
  659. "size_in_bytes": data["size_in_bytes"],
  660. "collection_ids": data.get("collection_ids", []),
  661. }
  662. @staticmethod
  663. def parse_ingest_chunks_input(data: dict) -> dict:
  664. return {
  665. "user": IngestionServiceAdapter._parse_user_data(data["user"]),
  666. "metadata": data["metadata"],
  667. "document_id": data["document_id"],
  668. "chunks": [
  669. UnprocessedChunk.from_dict(chunk) for chunk in data["chunks"]
  670. ],
  671. "id": data.get("id"),
  672. }
  673. @staticmethod
  674. def parse_update_chunk_input(data: dict) -> dict:
  675. return {
  676. "user": IngestionServiceAdapter._parse_user_data(data["user"]),
  677. "document_id": UUID(data["document_id"]),
  678. "id": UUID(data["id"]),
  679. "text": data["text"],
  680. "metadata": data.get("metadata"),
  681. "collection_ids": data.get("collection_ids", []),
  682. }
  683. @staticmethod
  684. def parse_update_files_input(data: dict) -> dict:
  685. return {
  686. "user": IngestionServiceAdapter._parse_user_data(data["user"]),
  687. "document_ids": [UUID(doc_id) for doc_id in data["document_ids"]],
  688. "metadatas": data["metadatas"],
  689. "ingestion_config": data["ingestion_config"],
  690. "file_sizes_in_bytes": data["file_sizes_in_bytes"],
  691. "file_datas": data["file_datas"],
  692. }
  693. @staticmethod
  694. def parse_create_vector_index_input(data: dict) -> dict:
  695. return {
  696. "table_name": VectorTableName(data["table_name"]),
  697. "index_method": IndexMethod(data["index_method"]),
  698. "index_measure": IndexMeasure(data["index_measure"]),
  699. "index_name": data["index_name"],
  700. "index_column": data["index_column"],
  701. "index_arguments": data["index_arguments"],
  702. "concurrently": data["concurrently"],
  703. }
  704. @staticmethod
  705. def parse_list_vector_indices_input(input_data: dict) -> dict:
  706. return {"table_name": input_data["table_name"]}
  707. @staticmethod
  708. def parse_delete_vector_index_input(input_data: dict) -> dict:
  709. return {
  710. "index_name": input_data["index_name"],
  711. "table_name": input_data.get("table_name"),
  712. "concurrently": input_data.get("concurrently", True),
  713. }
  714. @staticmethod
  715. def parse_select_vector_index_input(input_data: dict) -> dict:
  716. return {
  717. "index_name": input_data["index_name"],
  718. "table_name": input_data.get("table_name"),
  719. }
  720. @staticmethod
  721. def parse_update_document_metadata_input(data: dict) -> dict:
  722. return {
  723. "document_id": data["document_id"],
  724. "metadata": data["metadata"],
  725. "user": IngestionServiceAdapter._parse_user_data(data["user"]),
  726. }