management_service.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. import logging
  2. import os
  3. from collections import defaultdict
  4. from copy import copy
  5. from typing import Any, BinaryIO, Optional, Tuple
  6. from uuid import UUID
  7. import toml
  8. from fastapi.responses import StreamingResponse
  9. from core.base import (
  10. CollectionResponse,
  11. DocumentResponse,
  12. GenerationConfig,
  13. KGEnrichmentStatus,
  14. Message,
  15. Prompt,
  16. R2RException,
  17. RunManager,
  18. User,
  19. )
  20. from core.base.logger.base import RunType
  21. from core.base.utils import validate_uuid
  22. from core.telemetry.telemetry_decorator import telemetry_event
  23. from ..abstractions import R2RAgents, R2RPipelines, R2RPipes, R2RProviders
  24. from ..config import R2RConfig
  25. from .base import Service
  26. logger = logging.getLogger()
  27. class ManagementService(Service):
  28. def __init__(
  29. self,
  30. config: R2RConfig,
  31. providers: R2RProviders,
  32. pipes: R2RPipes,
  33. pipelines: R2RPipelines,
  34. agents: R2RAgents,
  35. run_manager: RunManager,
  36. ):
  37. super().__init__(
  38. config,
  39. providers,
  40. pipes,
  41. pipelines,
  42. agents,
  43. run_manager,
  44. )
  45. @telemetry_event("AppSettings")
  46. async def app_settings(self):
  47. prompts = (
  48. await self.providers.database.prompts_handler.get_all_prompts()
  49. )
  50. config_toml = self.config.to_toml()
  51. config_dict = toml.loads(config_toml)
  52. return {
  53. "config": config_dict,
  54. "prompts": prompts,
  55. "r2r_project_name": os.environ["R2R_PROJECT_NAME"],
  56. # "r2r_version": get_version("r2r"),
  57. }
  58. @telemetry_event("UsersOverview")
  59. async def users_overview(
  60. self,
  61. offset: int,
  62. limit: int,
  63. user_ids: Optional[list[UUID]] = None,
  64. *args,
  65. **kwargs,
  66. ):
  67. return await self.providers.database.users_handler.get_users_overview(
  68. offset=offset,
  69. limit=limit,
  70. user_ids=user_ids,
  71. )
  72. @telemetry_event("Delete")
  73. async def delete(
  74. self,
  75. filters: dict[str, Any],
  76. *args,
  77. **kwargs,
  78. ):
  79. """
  80. Takes a list of filters like
  81. "{key: {operator: value}, key: {operator: value}, ...}"
  82. and deletes entries matching the given filters from both vector and relational databases.
  83. NOTE: This method is not atomic and may result in orphaned entries in the documents overview table.
  84. NOTE: This method assumes that filters delete entire contents of any touched documents.
  85. """
  86. ### TODO - FIX THIS, ENSURE THAT DOCUMENTS OVERVIEW IS CLEARED
  87. def validate_filters(filters: dict[str, Any]) -> None:
  88. ALLOWED_FILTERS = {
  89. "id",
  90. "collection_ids",
  91. "chunk_id",
  92. # TODO - Modify these checks such that they can be used PROPERLY for nested filters
  93. "$and",
  94. "$or",
  95. }
  96. if not filters:
  97. raise R2RException(
  98. status_code=422, message="No filters provided"
  99. )
  100. for field in filters:
  101. if field not in ALLOWED_FILTERS:
  102. raise R2RException(
  103. status_code=422,
  104. message=f"Invalid filter field: {field}",
  105. )
  106. for field in ["document_id", "owner_id", "chunk_id"]:
  107. if field in filters:
  108. op = next(iter(filters[field].keys()))
  109. try:
  110. validate_uuid(filters[field][op])
  111. except ValueError:
  112. raise R2RException(
  113. status_code=422,
  114. message=f"Invalid UUID: {filters[field][op]}",
  115. )
  116. if "collection_ids" in filters:
  117. op = next(iter(filters["collection_ids"].keys()))
  118. for id_str in filters["collection_ids"][op]:
  119. try:
  120. validate_uuid(id_str)
  121. except ValueError:
  122. raise R2RException(
  123. status_code=422, message=f"Invalid UUID: {id_str}"
  124. )
  125. validate_filters(filters)
  126. logger.info(f"Deleting entries with filters: {filters}")
  127. try:
  128. def transform_chunk_id_to_id(
  129. filters: dict[str, Any]
  130. ) -> dict[str, Any]:
  131. if isinstance(filters, dict):
  132. transformed = {}
  133. for key, value in filters.items():
  134. if key == "chunk_id":
  135. transformed["id"] = value
  136. elif key in ["$and", "$or"]:
  137. transformed[key] = [
  138. transform_chunk_id_to_id(item)
  139. for item in value
  140. ]
  141. else:
  142. transformed[key] = transform_chunk_id_to_id(value)
  143. return transformed
  144. return filters
  145. filters_xf = transform_chunk_id_to_id(copy(filters))
  146. await self.providers.database.chunks_handler.delete(filters)
  147. vector_delete_results = (
  148. await self.providers.database.chunks_handler.delete(filters_xf)
  149. )
  150. except Exception as e:
  151. logger.error(f"Error deleting from vector database: {e}")
  152. vector_delete_results = {}
  153. document_ids_to_purge: set[UUID] = set()
  154. if vector_delete_results:
  155. document_ids_to_purge.update(
  156. UUID(result.get("document_id"))
  157. for result in vector_delete_results.values()
  158. if result.get("document_id")
  159. )
  160. # TODO: This might be appropriate to move elsewhere and revisit filter logic in other methods
  161. def extract_filters(filters: dict[str, Any]) -> dict[str, list[str]]:
  162. relational_filters: dict = {}
  163. def process_filter(filter_dict: dict[str, Any]):
  164. if "document_id" in filter_dict:
  165. relational_filters.setdefault(
  166. "filter_document_ids", []
  167. ).append(filter_dict["document_id"]["$eq"])
  168. if "owner_id" in filter_dict:
  169. relational_filters.setdefault(
  170. "filter_user_ids", []
  171. ).append(filter_dict["owner_id"]["$eq"])
  172. if "collection_ids" in filter_dict:
  173. relational_filters.setdefault(
  174. "filter_collection_ids", []
  175. ).extend(filter_dict["collection_ids"]["$in"])
  176. # Handle nested conditions
  177. if "$and" in filters:
  178. for condition in filters["$and"]:
  179. process_filter(condition)
  180. elif "$or" in filters:
  181. for condition in filters["$or"]:
  182. process_filter(condition)
  183. else:
  184. process_filter(filters)
  185. return relational_filters
  186. relational_filters = extract_filters(filters)
  187. if relational_filters:
  188. try:
  189. documents_overview = (
  190. await self.providers.database.documents_handler.get_documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  191. offset=0,
  192. limit=1000,
  193. **relational_filters, # type: ignore
  194. )
  195. )["results"]
  196. except Exception as e:
  197. logger.error(
  198. f"Error fetching documents from relational database: {e}"
  199. )
  200. documents_overview = []
  201. if documents_overview:
  202. document_ids_to_purge.update(
  203. doc.id for doc in documents_overview
  204. )
  205. if not document_ids_to_purge:
  206. raise R2RException(
  207. status_code=404, message="No entries found for deletion."
  208. )
  209. for document_id in document_ids_to_purge:
  210. remaining_chunks = await self.providers.database.chunks_handler.list_document_chunks( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended.
  211. document_id=document_id,
  212. offset=0,
  213. limit=1000,
  214. )
  215. if remaining_chunks["total_entries"] == 0:
  216. try:
  217. await self.providers.database.chunks_handler.delete(
  218. {"document_id": {"$eq": document_id}}
  219. )
  220. logger.info(
  221. f"Deleted document ID {document_id} from documents_overview."
  222. )
  223. except Exception as e:
  224. logger.error(
  225. f"Error deleting document ID {document_id} from documents_overview: {e}"
  226. )
  227. await self.providers.database.graphs_handler.entities.delete(
  228. parent_id=document_id,
  229. store_type="documents", # type: ignore
  230. )
  231. await self.providers.database.graphs_handler.relationships.delete(
  232. parent_id=document_id,
  233. store_type="documents", # type: ignore
  234. )
  235. await self.providers.database.documents_handler.delete(
  236. document_id=document_id
  237. )
  238. collections = await self.providers.database.collections_handler.get_collections_overview(
  239. offset=0, limit=1000, filter_document_ids=[document_id]
  240. )
  241. # TODO - Loop over all collections
  242. for collection in collections["results"]:
  243. await self.providers.database.documents_handler.set_workflow_status(
  244. id=collection.id,
  245. status_type="graph_sync_status",
  246. status=KGEnrichmentStatus.OUTDATED,
  247. )
  248. await self.providers.database.documents_handler.set_workflow_status(
  249. id=collection.id,
  250. status_type="graph_cluster_status",
  251. status=KGEnrichmentStatus.OUTDATED,
  252. )
  253. return None
  254. @telemetry_event("DownloadFile")
  255. async def download_file(
  256. self, document_id: UUID
  257. ) -> Optional[Tuple[str, BinaryIO, int]]:
  258. if result := await self.providers.database.files_handler.retrieve_file(
  259. document_id
  260. ):
  261. return result
  262. return None
  263. @telemetry_event("DocumentsOverview")
  264. async def documents_overview(
  265. self,
  266. offset: int,
  267. limit: int,
  268. user_ids: Optional[list[UUID]] = None,
  269. collection_ids: Optional[list[UUID]] = None,
  270. document_ids: Optional[list[UUID]] = None,
  271. *args: Any,
  272. **kwargs: Any,
  273. ):
  274. return await self.providers.database.documents_handler.get_documents_overview(
  275. offset=offset,
  276. limit=limit,
  277. filter_document_ids=document_ids,
  278. filter_user_ids=user_ids,
  279. filter_collection_ids=collection_ids,
  280. )
  281. @telemetry_event("DocumentChunks")
  282. async def list_document_chunks(
  283. self,
  284. document_id: UUID,
  285. offset: int,
  286. limit: int,
  287. include_vectors: bool = False,
  288. *args,
  289. **kwargs,
  290. ):
  291. return (
  292. await self.providers.database.chunks_handler.list_document_chunks(
  293. document_id=document_id,
  294. offset=offset,
  295. limit=limit,
  296. include_vectors=include_vectors,
  297. )
  298. )
  299. @telemetry_event("AssignDocumentToCollection")
  300. async def assign_document_to_collection(
  301. self, document_id: UUID, collection_id: UUID
  302. ):
  303. await self.providers.database.chunks_handler.assign_document_chunks_to_collection(
  304. document_id, collection_id
  305. )
  306. await self.providers.database.collections_handler.assign_document_to_collection_relational(
  307. document_id, collection_id
  308. )
  309. await self.providers.database.documents_handler.set_workflow_status(
  310. id=collection_id,
  311. status_type="graph_sync_status",
  312. status=KGEnrichmentStatus.OUTDATED,
  313. )
  314. await self.providers.database.documents_handler.set_workflow_status(
  315. id=collection_id,
  316. status_type="graph_cluster_status",
  317. status=KGEnrichmentStatus.OUTDATED,
  318. )
  319. return {"message": "Document assigned to collection successfully"}
  320. @telemetry_event("RemoveDocumentFromCollection")
  321. async def remove_document_from_collection(
  322. self, document_id: UUID, collection_id: UUID
  323. ):
  324. await self.providers.database.collections_handler.remove_document_from_collection_relational(
  325. document_id, collection_id
  326. )
  327. await self.providers.database.chunks_handler.remove_document_from_collection_vector(
  328. document_id, collection_id
  329. )
  330. await self.providers.database.graphs_handler.delete_node_via_document_id(
  331. document_id, collection_id
  332. )
  333. return None
  334. def _process_relationships(
  335. self, relationships: list[Tuple[str, str, str]]
  336. ) -> Tuple[dict[str, list[str]], dict[str, dict[str, list[str]]]]:
  337. graph = defaultdict(list)
  338. grouped: dict[str, dict[str, list[str]]] = defaultdict(
  339. lambda: defaultdict(list)
  340. )
  341. for subject, relation, obj in relationships:
  342. graph[subject].append(obj)
  343. grouped[subject][relation].append(obj)
  344. if obj not in graph:
  345. graph[obj] = []
  346. return dict(graph), dict(grouped)
  347. def generate_output(
  348. self,
  349. grouped_relationships: dict[str, dict[str, list[str]]],
  350. graph: dict[str, list[str]],
  351. descriptions_dict: dict[str, str],
  352. print_descriptions: bool = True,
  353. ) -> list[str]:
  354. output = []
  355. # Print grouped relationships
  356. for subject, relations in grouped_relationships.items():
  357. output.append(f"\n== {subject} ==")
  358. if print_descriptions and subject in descriptions_dict:
  359. output.append(f"\tDescription: {descriptions_dict[subject]}")
  360. for relation, objects in relations.items():
  361. output.append(f" {relation}:")
  362. for obj in objects:
  363. output.append(f" - {obj}")
  364. if print_descriptions and obj in descriptions_dict:
  365. output.append(
  366. f" Description: {descriptions_dict[obj]}"
  367. )
  368. # Print basic graph statistics
  369. output.extend(
  370. [
  371. "\n== Graph Statistics ==",
  372. f"Number of nodes: {len(graph)}",
  373. f"Number of edges: {sum(len(neighbors) for neighbors in graph.values())}",
  374. f"Number of connected components: {self._count_connected_components(graph)}",
  375. ]
  376. )
  377. # Find central nodes
  378. central_nodes = self._get_central_nodes(graph)
  379. output.extend(
  380. [
  381. "\n== Most Central Nodes ==",
  382. *(
  383. f" {node}: {centrality:.4f}"
  384. for node, centrality in central_nodes
  385. ),
  386. ]
  387. )
  388. return output
  389. def _count_connected_components(self, graph: dict[str, list[str]]) -> int:
  390. visited = set()
  391. components = 0
  392. def dfs(node):
  393. visited.add(node)
  394. for neighbor in graph[node]:
  395. if neighbor not in visited:
  396. dfs(neighbor)
  397. for node in graph:
  398. if node not in visited:
  399. dfs(node)
  400. components += 1
  401. return components
  402. def _get_central_nodes(
  403. self, graph: dict[str, list[str]]
  404. ) -> list[Tuple[str, float]]:
  405. degree = {node: len(neighbors) for node, neighbors in graph.items()}
  406. total_nodes = len(graph)
  407. centrality = {
  408. node: deg / (total_nodes - 1) for node, deg in degree.items()
  409. }
  410. return sorted(centrality.items(), key=lambda x: x[1], reverse=True)[:5]
  411. @telemetry_event("CreateCollection")
  412. async def create_collection(
  413. self,
  414. owner_id: UUID,
  415. name: Optional[str] = None,
  416. description: str = "",
  417. ) -> CollectionResponse:
  418. result = await self.providers.database.collections_handler.create_collection(
  419. owner_id=owner_id,
  420. name=name,
  421. description=description,
  422. )
  423. graph_result = await self.providers.database.graphs_handler.create(
  424. collection_id=result.id,
  425. name=name,
  426. description=description,
  427. )
  428. return result
  429. @telemetry_event("UpdateCollection")
  430. async def update_collection(
  431. self,
  432. collection_id: UUID,
  433. name: Optional[str] = None,
  434. description: Optional[str] = None,
  435. generate_description: bool = False,
  436. ) -> CollectionResponse:
  437. if generate_description:
  438. description = await self.summarize_collection(
  439. id=collection_id, offset=0, limit=100
  440. )
  441. return await self.providers.database.collections_handler.update_collection(
  442. collection_id=collection_id,
  443. name=name,
  444. description=description,
  445. )
  446. @telemetry_event("DeleteCollection")
  447. async def delete_collection(self, collection_id: UUID) -> bool:
  448. await self.providers.database.collections_handler.delete_collection_relational(
  449. collection_id
  450. )
  451. await self.providers.database.chunks_handler.delete_collection_vector(
  452. collection_id
  453. )
  454. return True
  455. @telemetry_event("ListCollections")
  456. async def collections_overview(
  457. self,
  458. offset: int,
  459. limit: int,
  460. user_ids: Optional[list[UUID]] = None,
  461. document_ids: Optional[list[UUID]] = None,
  462. collection_ids: Optional[list[UUID]] = None,
  463. ) -> dict[str, list[CollectionResponse] | int]:
  464. return await self.providers.database.collections_handler.get_collections_overview(
  465. offset=offset,
  466. limit=limit,
  467. filter_user_ids=user_ids,
  468. filter_document_ids=document_ids,
  469. filter_collection_ids=collection_ids,
  470. )
  471. @telemetry_event("AddUserToCollection")
  472. async def add_user_to_collection(
  473. self, user_id: UUID, collection_id: UUID
  474. ) -> bool:
  475. return (
  476. await self.providers.database.users_handler.add_user_to_collection(
  477. user_id, collection_id
  478. )
  479. )
  480. @telemetry_event("RemoveUserFromCollection")
  481. async def remove_user_from_collection(
  482. self, user_id: UUID, collection_id: UUID
  483. ) -> bool:
  484. x = await self.providers.database.users_handler.remove_user_from_collection(
  485. user_id, collection_id
  486. )
  487. return x
  488. @telemetry_event("GetUsersInCollection")
  489. async def get_users_in_collection(
  490. self, collection_id: UUID, offset: int = 0, limit: int = 100
  491. ) -> dict[str, list[User] | int]:
  492. return await self.providers.database.users_handler.get_users_in_collection(
  493. collection_id, offset=offset, limit=limit
  494. )
  495. @telemetry_event("GetDocumentsInCollection")
  496. async def documents_in_collection(
  497. self, collection_id: UUID, offset: int = 0, limit: int = 100
  498. ) -> dict[str, list[DocumentResponse] | int]:
  499. return await self.providers.database.collections_handler.documents_in_collection(
  500. collection_id, offset=offset, limit=limit
  501. )
  502. @telemetry_event("SummarizeCollection")
  503. async def summarize_collection(
  504. self, id: UUID, offset: int, limit: int
  505. ) -> str:
  506. documents_in_collection_response = await self.documents_in_collection(
  507. collection_id=id,
  508. offset=offset,
  509. limit=limit,
  510. )
  511. document_summaries = [
  512. document.summary
  513. for document in documents_in_collection_response["results"]
  514. ]
  515. logger.info(
  516. f"Summarizing collection {id} with {len(document_summaries)} of {documents_in_collection_response['total_entries']} documents."
  517. )
  518. formatted_summaries = "\n\n".join(document_summaries)
  519. messages = await self.providers.database.prompts_handler.get_message_payload(
  520. system_prompt_name=self.config.database.collection_summary_system_prompt,
  521. task_prompt_name=self.config.database.collection_summary_task_prompt,
  522. task_inputs={"document_summaries": formatted_summaries},
  523. )
  524. response = await self.providers.llm.aget_completion(
  525. messages=messages,
  526. generation_config=GenerationConfig(
  527. model=self.config.ingestion.document_summary_model
  528. ),
  529. )
  530. collection_summary = response.choices[0].message.content
  531. if not collection_summary:
  532. raise ValueError("Expected a generated response.")
  533. return collection_summary
  534. @telemetry_event("AddPrompt")
  535. async def add_prompt(
  536. self, name: str, template: str, input_types: dict[str, str]
  537. ) -> dict:
  538. try:
  539. await self.providers.database.prompts_handler.add_prompt(
  540. name, template, input_types
  541. )
  542. return f"Prompt '{name}' added successfully." # type: ignore
  543. except ValueError as e:
  544. raise R2RException(status_code=400, message=str(e))
  545. @telemetry_event("GetPrompt")
  546. async def get_cached_prompt(
  547. self,
  548. prompt_name: str,
  549. inputs: Optional[dict[str, Any]] = None,
  550. prompt_override: Optional[str] = None,
  551. ) -> dict:
  552. try:
  553. return {
  554. "message": (
  555. await self.providers.database.prompts_handler.get_cached_prompt(
  556. prompt_name, inputs, prompt_override
  557. )
  558. )
  559. }
  560. except ValueError as e:
  561. raise R2RException(status_code=404, message=str(e))
  562. @telemetry_event("GetPrompt")
  563. async def get_prompt(
  564. self,
  565. prompt_name: str,
  566. inputs: Optional[dict[str, Any]] = None,
  567. prompt_override: Optional[str] = None,
  568. ) -> dict:
  569. try:
  570. return await self.providers.database.prompts_handler.get_prompt( # type: ignore
  571. name=prompt_name,
  572. inputs=inputs,
  573. prompt_override=prompt_override,
  574. )
  575. except ValueError as e:
  576. raise R2RException(status_code=404, message=str(e))
  577. @telemetry_event("GetAllPrompts")
  578. async def get_all_prompts(self) -> dict[str, Prompt]:
  579. return await self.providers.database.prompts_handler.get_all_prompts()
  580. @telemetry_event("UpdatePrompt")
  581. async def update_prompt(
  582. self,
  583. name: str,
  584. template: Optional[str] = None,
  585. input_types: Optional[dict[str, str]] = None,
  586. ) -> dict:
  587. try:
  588. await self.providers.database.prompts_handler.update_prompt(
  589. name, template, input_types
  590. )
  591. return f"Prompt '{name}' updated successfully." # type: ignore
  592. except ValueError as e:
  593. raise R2RException(status_code=404, message=str(e))
  594. @telemetry_event("DeletePrompt")
  595. async def delete_prompt(self, name: str) -> dict:
  596. try:
  597. await self.providers.database.prompts_handler.delete_prompt(name)
  598. return {"message": f"Prompt '{name}' deleted successfully."}
  599. except ValueError as e:
  600. raise R2RException(status_code=404, message=str(e))
  601. @telemetry_event("GetConversation")
  602. async def get_conversation(
  603. self,
  604. conversation_id: str,
  605. auth_user=None,
  606. ) -> Tuple[str, list[Message], list[dict]]:
  607. return await self.providers.database.conversations_handler.get_conversation( # type: ignore
  608. conversation_id
  609. )
  610. async def verify_conversation_access(
  611. self, conversation_id: str, user_id: UUID
  612. ) -> bool:
  613. return await self.providers.database.conversations_handler.verify_conversation_access(
  614. conversation_id, user_id
  615. )
  616. @telemetry_event("CreateConversation")
  617. async def create_conversation(
  618. self, user_id: Optional[UUID] = None, auth_user=None
  619. ) -> dict:
  620. return await self.providers.database.conversations_handler.create_conversation( # type: ignore
  621. user_id=user_id
  622. )
  623. @telemetry_event("ConversationsOverview")
  624. async def conversations_overview(
  625. self,
  626. offset: int,
  627. limit: int,
  628. conversation_ids: Optional[list[UUID]] = None,
  629. user_ids: Optional[UUID | list[UUID]] = None,
  630. auth_user=None,
  631. ) -> dict[str, list[dict] | int]:
  632. return await self.providers.database.conversations_handler.get_conversations_overview(
  633. offset=offset,
  634. limit=limit,
  635. user_ids=user_ids,
  636. conversation_ids=conversation_ids,
  637. )
  638. @telemetry_event("AddMessage")
  639. async def add_message(
  640. self,
  641. conversation_id: str,
  642. content: Message,
  643. parent_id: Optional[str] = None,
  644. metadata: Optional[dict] = None,
  645. auth_user=None,
  646. ) -> str:
  647. return await self.providers.database.conversations_handler.add_message(
  648. conversation_id, content, parent_id, metadata
  649. )
  650. @telemetry_event("EditMessage")
  651. async def edit_message(
  652. self,
  653. message_id: str,
  654. new_content: str,
  655. additional_metadata: dict,
  656. auth_user=None,
  657. ) -> Tuple[str, str]:
  658. return (
  659. await self.providers.database.conversations_handler.edit_message(
  660. message_id, new_content, additional_metadata
  661. )
  662. )
  663. @telemetry_event("updateMessageMetadata")
  664. async def update_message_metadata(
  665. self, message_id: str, metadata: dict, auth_user=None
  666. ):
  667. await self.providers.database.conversations_handler.update_message_metadata(
  668. message_id, metadata
  669. )
  670. @telemetry_event("DeleteConversation")
  671. async def delete_conversation(self, conversation_id: str, auth_user=None):
  672. await self.providers.database.conversations_handler.delete_conversation(
  673. conversation_id
  674. )