async_client.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import json
  2. from io import BytesIO
  3. from typing import Any, AsyncGenerator
  4. import httpx
  5. from shared.abstractions import R2RException
  6. from .base.base_client import BaseClient
  7. from .v2 import (
  8. AuthMixins,
  9. IngestionMixins,
  10. KGMixins,
  11. ManagementMixins,
  12. RetrievalMixins,
  13. ServerMixins,
  14. )
  15. from .v3 import (
  16. ChunksSDK,
  17. CollectionsSDK,
  18. ConversationsSDK,
  19. DocumentsSDK,
  20. GraphsSDK,
  21. IndicesSDK,
  22. PromptsSDK,
  23. RetrievalSDK,
  24. SystemSDK,
  25. UsersSDK,
  26. )
  27. class R2RAsyncClient(
  28. BaseClient,
  29. AuthMixins,
  30. IngestionMixins,
  31. KGMixins,
  32. ManagementMixins,
  33. RetrievalMixins,
  34. ServerMixins,
  35. ):
  36. """
  37. Asynchronous client for interacting with the R2R API.
  38. """
  39. def __init__(
  40. self,
  41. base_url: str = "http://localhost:7272",
  42. prefix: str = "/v2",
  43. custom_client=None,
  44. timeout: float = 300.0,
  45. ):
  46. super().__init__(base_url, prefix, timeout)
  47. self.client = custom_client or httpx.AsyncClient(timeout=timeout)
  48. self.chunks = ChunksSDK(self)
  49. self.collections = CollectionsSDK(self)
  50. self.conversations = ConversationsSDK(self)
  51. self.documents = DocumentsSDK(self)
  52. self.graphs = GraphsSDK(self)
  53. self.indices = IndicesSDK(self)
  54. self.prompts = PromptsSDK(self)
  55. self.retrieval = RetrievalSDK(self)
  56. self.system = SystemSDK(self)
  57. self.users = UsersSDK(self)
  58. async def _make_request(
  59. self, method: str, endpoint: str, version: str = "v2", **kwargs
  60. ):
  61. url = self._get_full_url(endpoint, version)
  62. request_args = self._prepare_request_args(endpoint, **kwargs)
  63. try:
  64. response = await self.client.request(method, url, **request_args)
  65. await self._handle_response(response)
  66. # return response.json() if response.content else None
  67. # In async_client.py, inside _make_request:
  68. if "application/json" in response.headers.get("Content-Type", ""):
  69. return response.json() if response.content else None
  70. else:
  71. # Return raw binary content as BytesIO
  72. return BytesIO(response.content)
  73. except httpx.RequestError as e:
  74. raise R2RException(
  75. status_code=500,
  76. message=f"Request failed: {str(e)}",
  77. ) from e
  78. async def _make_streaming_request(
  79. self, method: str, endpoint: str, version: str = "v2", **kwargs
  80. ) -> AsyncGenerator[Any, None]:
  81. url = self._get_full_url(endpoint, version)
  82. request_args = self._prepare_request_args(endpoint, **kwargs)
  83. async with httpx.AsyncClient(timeout=self.timeout) as client:
  84. async with client.stream(method, url, **request_args) as response:
  85. await self._handle_response(response)
  86. async for line in response.aiter_lines():
  87. if line.strip(): # Ignore empty lines
  88. try:
  89. yield json.loads(line)
  90. except: # json.JSONDecodeError:
  91. yield line
  92. async def _handle_response(self, response):
  93. if response.status_code >= 400:
  94. try:
  95. error_content = response.json()
  96. if isinstance(error_content, dict):
  97. message = (
  98. error_content.get("detail", {}).get(
  99. "message", str(error_content)
  100. )
  101. if isinstance(error_content.get("detail"), dict)
  102. else error_content.get("detail", str(error_content))
  103. )
  104. else:
  105. message = str(error_content)
  106. except json.JSONDecodeError:
  107. message = response.text
  108. raise R2RException(
  109. status_code=response.status_code, message=message
  110. )
  111. async def close(self):
  112. await self.client.aclose()
  113. async def __aenter__(self):
  114. return self
  115. async def __aexit__(self, exc_type, exc_val, exc_tb):
  116. await self.close()
  117. def set_api_key(self, api_key: str) -> None:
  118. if self.access_token:
  119. raise ValueError("Cannot have both access token and api key.")
  120. self.api_key = api_key
  121. def unset_api_key(self) -> None:
  122. self.api_key = None