import logging import textwrap from datetime import datetime, timezone import psutil from fastapi import Depends from core.base import R2RException from core.base.api.models import ( GenericMessageResponse, WrappedGenericMessageResponse, WrappedServerStatsResponse, WrappedSettingsResponse, ) from ...abstractions import R2RProviders, R2RServices from ...config import R2RConfig from .base_router import BaseRouterV3 class SystemRouter(BaseRouterV3): def __init__( self, providers: R2RProviders, services: R2RServices, config: R2RConfig, ): logging.info("Initializing SystemRouter") super().__init__(providers, services, config) self.start_time = datetime.now(timezone.utc) def _setup_routes(self): @self.router.get( "/health", openapi_extra={ "x-codeSamples": [ { "lang": "Python", "source": textwrap.dedent(""" from r2r import R2RClient client = R2RClient() # when using auth, do client.login(...) result = client.system.health() """), }, { "lang": "JavaScript", "source": textwrap.dedent(""" const { r2rClient } = require("r2r-js"); const client = new r2rClient(); function main() { const response = await client.system.health(); } main(); """), }, { "lang": "cURL", "source": textwrap.dedent(""" curl -X POST "https://api.example.com/v3/health"\\ -H "Content-Type: application/json" \\ -H "Authorization: Bearer YOUR_API_KEY" \\ """), }, ] }, ) @self.base_endpoint async def health_check() -> WrappedGenericMessageResponse: return GenericMessageResponse(message="ok") # type: ignore @self.router.get( "/system/settings", dependencies=[Depends(self.rate_limit_dependency)], openapi_extra={ "x-codeSamples": [ { "lang": "Python", "source": textwrap.dedent(""" from r2r import R2RClient client = R2RClient() # when using auth, do client.login(...) result = client.system.settings() """), }, { "lang": "JavaScript", "source": textwrap.dedent(""" const { r2rClient } = require("r2r-js"); const client = new r2rClient(); function main() { const response = await client.system.settings(); } main(); """), }, { "lang": "cURL", "source": textwrap.dedent(""" curl -X POST "https://api.example.com/v3/system/settings" \\ -H "Content-Type: application/json" \\ -H "Authorization: Bearer YOUR_API_KEY" \\ """), }, ] }, ) @self.base_endpoint async def app_settings( auth_user=Depends(self.providers.auth.auth_wrapper()), ) -> WrappedSettingsResponse: if not auth_user.is_superuser: raise R2RException( "Only a superuser can call the `system/settings` endpoint.", 403, ) return await self.services.management.app_settings() @self.router.get( "/system/status", dependencies=[Depends(self.rate_limit_dependency)], openapi_extra={ "x-codeSamples": [ { "lang": "Python", "source": textwrap.dedent(""" from r2r import R2RClient client = R2RClient() # when using auth, do client.login(...) result = client.system.status() """), }, { "lang": "JavaScript", "source": textwrap.dedent(""" const { r2rClient } = require("r2r-js"); const client = new r2rClient(); function main() { const response = await client.system.status(); } main(); """), }, { "lang": "cURL", "source": textwrap.dedent(""" curl -X POST "https://api.example.com/v3/system/status" \\ -H "Content-Type: application/json" \\ -H "Authorization: Bearer YOUR_API_KEY" \\ """), }, ] }, ) @self.base_endpoint async def server_stats( auth_user=Depends(self.providers.auth.auth_wrapper()), ) -> WrappedServerStatsResponse: if not auth_user.is_superuser: raise R2RException( "Only an authorized user can call the `system/status` endpoint.", 403, ) return { # type: ignore "start_time": self.start_time.isoformat(), "uptime_seconds": ( datetime.now(timezone.utc) - self.start_time ).total_seconds(), "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, }