async_client.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import json
  2. from io import BytesIO
  3. from typing import Any, AsyncGenerator
  4. import httpx
  5. from shared.abstractions import R2RException
  6. from .base.base_client import BaseClient
  7. from .v3 import (
  8. ChunksSDK,
  9. CollectionsSDK,
  10. ConversationsSDK,
  11. DocumentsSDK,
  12. GraphsSDK,
  13. IndicesSDK,
  14. PromptsSDK,
  15. RetrievalSDK,
  16. SystemSDK,
  17. UsersSDK,
  18. )
  19. class R2RAsyncClient(BaseClient):
  20. """
  21. Asynchronous client for interacting with the R2R API.
  22. """
  23. def __init__(
  24. self,
  25. base_url: str = "https://api.cloud.sciphi.ai",
  26. prefix: str = "/v2",
  27. custom_client=None,
  28. timeout: float = 300.0,
  29. ):
  30. super().__init__(base_url, prefix, timeout)
  31. self.client = custom_client or httpx.AsyncClient(timeout=timeout)
  32. self.chunks = ChunksSDK(self)
  33. self.collections = CollectionsSDK(self)
  34. self.conversations = ConversationsSDK(self)
  35. self.documents = DocumentsSDK(self)
  36. self.graphs = GraphsSDK(self)
  37. self.indices = IndicesSDK(self)
  38. self.prompts = PromptsSDK(self)
  39. self.retrieval = RetrievalSDK(self)
  40. self.system = SystemSDK(self)
  41. self.users = UsersSDK(self)
  42. async def _make_request(
  43. self, method: str, endpoint: str, version: str = "v2", **kwargs
  44. ):
  45. url = self._get_full_url(endpoint, version)
  46. request_args = self._prepare_request_args(endpoint, **kwargs)
  47. try:
  48. response = await self.client.request(method, url, **request_args)
  49. await self._handle_response(response)
  50. # return response.json() if response.content else None
  51. # In async_client.py, inside _make_request:
  52. if "application/json" in response.headers.get("Content-Type", ""):
  53. return response.json() if response.content else None
  54. else:
  55. # Return raw binary content as BytesIO
  56. return BytesIO(response.content)
  57. except httpx.RequestError as e:
  58. raise R2RException(
  59. status_code=500,
  60. message=f"Request failed: {str(e)}",
  61. ) from e
  62. async def _make_streaming_request(
  63. self, method: str, endpoint: str, version: str = "v2", **kwargs
  64. ) -> AsyncGenerator[Any, None]:
  65. url = self._get_full_url(endpoint, version)
  66. request_args = self._prepare_request_args(endpoint, **kwargs)
  67. async with httpx.AsyncClient(timeout=self.timeout) as client:
  68. async with client.stream(method, url, **request_args) as response:
  69. await self._handle_response(response)
  70. async for line in response.aiter_lines():
  71. if line.strip(): # Ignore empty lines
  72. try:
  73. yield json.loads(line)
  74. except: # json.JSONDecodeError:
  75. yield line
  76. async def _handle_response(self, response):
  77. if response.status_code >= 400:
  78. try:
  79. error_content = response.json()
  80. if isinstance(error_content, dict):
  81. message = (
  82. error_content.get("detail", {}).get(
  83. "message", str(error_content)
  84. )
  85. if isinstance(error_content.get("detail"), dict)
  86. else error_content.get("detail", str(error_content))
  87. )
  88. else:
  89. message = str(error_content)
  90. except json.JSONDecodeError:
  91. message = response.text
  92. raise R2RException(
  93. status_code=response.status_code, message=message
  94. )
  95. async def close(self):
  96. await self.client.aclose()
  97. async def __aenter__(self):
  98. return self
  99. async def __aexit__(self, exc_type, exc_val, exc_tb):
  100. await self.close()
  101. def set_api_key(self, api_key: str) -> None:
  102. if self.access_token:
  103. raise ValueError("Cannot have both access token and api key.")
  104. self.api_key = api_key
  105. def unset_api_key(self) -> None:
  106. self.api_key = None