import logging import os from typing import Any, Type from ..abstractions import R2RProviders, R2RServices from ..api.v3.chunks_router import ChunksRouter from ..api.v3.collections_router import CollectionsRouter from ..api.v3.conversations_router import ConversationsRouter from ..api.v3.documents_router import DocumentsRouter from ..api.v3.graph_router import GraphRouter from ..api.v3.indices_router import IndicesRouter from ..api.v3.prompts_router import PromptsRouter from ..api.v3.retrieval_router import RetrievalRouter from ..api.v3.system_router import SystemRouter from ..api.v3.users_router import UsersRouter from ..app import R2RApp from ..config import R2RConfig from ..services.auth_service import AuthService # noqa: F401 from ..services.graph_service import GraphService # noqa: F401 from ..services.ingestion_service import IngestionService # noqa: F401 from ..services.maintenance_service import MaintenanceService # noqa: F401 from ..services.management_service import ManagementService # noqa: F401 from ..services.retrieval_service import ( # type: ignore RetrievalService, # noqa: F401 # type: ignore ) from .factory import R2RProviderFactory from .utils import install_user_tool_dependencies logger = logging.getLogger() class R2RBuilder: _SERVICES = [ "auth", "ingestion", "maintenance", "management", "retrieval", "graph", ] def __init__(self, config: R2RConfig): self.config = config async def build(self, *args, **kwargs) -> R2RApp: provider_factory = R2RProviderFactory try: user_tools_path = ( os.getenv("R2R_USER_TOOLS_PATH") or "../docker/user_tools" ) if os.path.exists(user_tools_path) and os.path.isdir( user_tools_path ): logger.info( f"Checking and installing dependencies for user tools at: {user_tools_path}" ) install_user_tool_dependencies(user_tools_path) except Exception as e: logger.error(f"Error {e} while installing user tool dependencies.") raise try: providers = await self._create_providers( provider_factory, *args, **kwargs ) except Exception as e: logger.error(f"Error {e} while creating R2RProviders.") raise service_params = { "config": self.config, "providers": providers, } services = self._create_services(service_params) await services.maintenance.initialize() routers = { "chunks_router": ChunksRouter( providers=providers, services=services, config=self.config, ).get_router(), "collections_router": CollectionsRouter( providers=providers, services=services, config=self.config, ).get_router(), "conversations_router": ConversationsRouter( providers=providers, services=services, config=self.config, ).get_router(), "documents_router": DocumentsRouter( providers=providers, services=services, config=self.config, ).get_router(), "graph_router": GraphRouter( providers=providers, services=services, config=self.config, ).get_router(), "indices_router": IndicesRouter( providers=providers, services=services, config=self.config, ).get_router(), "prompts_router": PromptsRouter( providers=providers, services=services, config=self.config, ).get_router(), "retrieval_router": RetrievalRouter( providers=providers, services=services, config=self.config, ).get_router(), "system_router": SystemRouter( providers=providers, services=services, config=self.config, ).get_router(), "users_router": UsersRouter( providers=providers, services=services, config=self.config, ).get_router(), } return R2RApp( config=self.config, orchestration_provider=providers.orchestration, services=services, providers=providers, **routers, ) async def _create_providers( self, provider_factory: Type[R2RProviderFactory], *args, **kwargs ) -> R2RProviders: factory = provider_factory(self.config) return await factory.create_providers(*args, **kwargs) def _create_services(self, service_params: dict[str, Any]) -> R2RServices: services = R2RBuilder._SERVICES service_instances = {} for service_type in services: service_class = globals()[f"{service_type.capitalize()}Service"] service_instances[service_type] = service_class(**service_params) return R2RServices(**service_instances)