base_router.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import functools
  2. import logging
  3. from abc import abstractmethod
  4. from typing import Callable
  5. from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket
  6. from fastapi.responses import StreamingResponse
  7. from core.base import R2RException, manage_run
  8. from ...abstractions import R2RProviders, R2RServices
  9. logger = logging.getLogger()
  10. class BaseRouterV3:
  11. def __init__(self, providers: R2RProviders, services: R2RServices):
  12. self.providers = providers
  13. self.services = services
  14. self.router = APIRouter()
  15. self.openapi_extras = self._load_openapi_extras()
  16. self._setup_routes()
  17. self._register_workflows()
  18. def get_router(self):
  19. return self.router
  20. def base_endpoint(self, func: Callable):
  21. @functools.wraps(func)
  22. async def wrapper(*args, **kwargs):
  23. async with manage_run(
  24. self.services.ingestion.run_manager, func.__name__
  25. ) as run_id:
  26. auth_user = kwargs.get("auth_user")
  27. if auth_user:
  28. await self.services.ingestion.run_manager.log_run_info( # TODO - this is a bit of a hack
  29. user=auth_user,
  30. )
  31. try:
  32. func_result = await func(*args, **kwargs)
  33. if (
  34. isinstance(func_result, tuple)
  35. and len(func_result) == 2
  36. ):
  37. results, outer_kwargs = func_result
  38. else:
  39. results, outer_kwargs = func_result, {}
  40. if isinstance(results, StreamingResponse):
  41. return results
  42. return {"results": results, **outer_kwargs}
  43. except R2RException:
  44. raise
  45. except Exception as e:
  46. logger.error(
  47. f"Error in base endpoint {func.__name__}() - \n\n{str(e)}",
  48. exc_info=True,
  49. )
  50. raise HTTPException(
  51. status_code=500,
  52. detail={
  53. "message": f"An error '{e}' occurred during {func.__name__}",
  54. "error": str(e),
  55. "error_type": type(e).__name__,
  56. },
  57. ) from e
  58. return wrapper
  59. @classmethod
  60. def build_router(cls, engine):
  61. return cls(engine).router
  62. def _register_workflows(self):
  63. pass
  64. def _load_openapi_extras(self):
  65. return {}
  66. @abstractmethod
  67. def _setup_routes(self):
  68. pass
  69. import functools
  70. import logging
  71. from abc import abstractmethod
  72. from typing import Callable, Optional
  73. from fastapi import APIRouter, Depends, HTTPException, Request
  74. from fastapi.responses import StreamingResponse
  75. from core.base import R2RException, manage_run
  76. from ...abstractions import R2RProviders, R2RServices
  77. logger = logging.getLogger()
  78. class BaseRouterV3:
  79. def __init__(self, providers: R2RProviders, services: R2RServices):
  80. self.providers = providers
  81. self.services = services
  82. self.router = APIRouter()
  83. self.openapi_extras = self._load_openapi_extras()
  84. self.set_rate_limiting()
  85. self._setup_routes()
  86. self._register_workflows()
  87. def get_router(self):
  88. return self.router
  89. def base_endpoint(self, func: Callable):
  90. @functools.wraps(func)
  91. async def wrapper(*args, **kwargs):
  92. async with manage_run(
  93. self.services.ingestion.run_manager, func.__name__
  94. ) as run_id:
  95. auth_user = kwargs.get("auth_user")
  96. if auth_user:
  97. await self.services.ingestion.run_manager.log_run_info(
  98. user=auth_user,
  99. )
  100. try:
  101. func_result = await func(*args, **kwargs)
  102. if (
  103. isinstance(func_result, tuple)
  104. and len(func_result) == 2
  105. ):
  106. results, outer_kwargs = func_result
  107. else:
  108. results, outer_kwargs = func_result, {}
  109. if isinstance(results, StreamingResponse):
  110. return results
  111. return {"results": results, **outer_kwargs}
  112. except R2RException:
  113. raise
  114. except Exception as e:
  115. logger.error(
  116. f"Error in base endpoint {func.__name__}() - \n\n{str(e)}",
  117. exc_info=True,
  118. )
  119. raise HTTPException(
  120. status_code=500,
  121. detail={
  122. "message": f"An error '{e}' occurred during {func.__name__}",
  123. "error": str(e),
  124. "error_type": type(e).__name__,
  125. },
  126. ) from e
  127. return wrapper
  128. @classmethod
  129. def build_router(cls, engine):
  130. return cls(engine).router
  131. def _register_workflows(self):
  132. pass
  133. def _load_openapi_extras(self):
  134. return {}
  135. @abstractmethod
  136. def _setup_routes(self):
  137. pass
  138. def set_rate_limiting(self):
  139. """
  140. Set up a yield dependency for rate limiting and logging.
  141. """
  142. async def rate_limit_dependency(
  143. request: Request,
  144. auth_user=Depends(self.providers.auth.auth_wrapper()),
  145. ):
  146. user_id = auth_user.id
  147. route = request.scope["path"]
  148. # Check the limits before proceeding
  149. try:
  150. if not auth_user.is_superuser:
  151. await self.providers.database.limits_handler.check_limits(
  152. user_id, route
  153. )
  154. except ValueError as e:
  155. raise HTTPException(status_code=429, detail=str(e))
  156. request.state.user_id = user_id
  157. request.state.route = route
  158. # Yield to run the route
  159. try:
  160. yield
  161. finally:
  162. # After the route completes successfully, log the request
  163. await self.providers.database.limits_handler.log_request(
  164. user_id, route
  165. )
  166. async def websocket_rate_limit_dependency(
  167. websocket: WebSocket,
  168. ):
  169. route = websocket.scope["path"]
  170. try:
  171. return True
  172. except ValueError as e:
  173. await websocket.close(code=4429, reason="Rate limit exceeded")
  174. return False
  175. self.rate_limit_dependency = rate_limit_dependency
  176. self.websocket_rate_limit_dependency = websocket_rate_limit_dependency