123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import logging
- import os
- from typing import Any, Type
- from ..abstractions import R2RProviders, R2RServices
- from ..api.v3.chunks_router import ChunksRouter
- from ..api.v3.collections_router import CollectionsRouter
- from ..api.v3.conversations_router import ConversationsRouter
- from ..api.v3.documents_router import DocumentsRouter
- from ..api.v3.graph_router import GraphRouter
- from ..api.v3.indices_router import IndicesRouter
- from ..api.v3.prompts_router import PromptsRouter
- from ..api.v3.retrieval_router import RetrievalRouter
- from ..api.v3.system_router import SystemRouter
- from ..api.v3.users_router import UsersRouter
- from ..app import R2RApp
- from ..config import R2RConfig
- from ..services.auth_service import AuthService # noqa: F401
- from ..services.graph_service import GraphService # noqa: F401
- from ..services.ingestion_service import IngestionService # noqa: F401
- from ..services.maintenance_service import MaintenanceService # noqa: F401
- from ..services.management_service import ManagementService # noqa: F401
- from ..services.retrieval_service import ( # type: ignore
- RetrievalService, # noqa: F401 # type: ignore
- )
- from .factory import R2RProviderFactory
- from .utils import install_user_tool_dependencies
- logger = logging.getLogger()
- class R2RBuilder:
- _SERVICES = [
- "auth",
- "ingestion",
- "maintenance",
- "management",
- "retrieval",
- "graph",
- ]
- def __init__(self, config: R2RConfig):
- self.config = config
- async def build(self, *args, **kwargs) -> R2RApp:
- provider_factory = R2RProviderFactory
- try:
- user_tools_path = (
- os.getenv("R2R_USER_TOOLS_PATH") or "../docker/user_tools"
- )
- if os.path.exists(user_tools_path) and os.path.isdir(
- user_tools_path
- ):
- logger.info(
- f"Checking and installing dependencies for user tools at: {user_tools_path}"
- )
- install_user_tool_dependencies(user_tools_path)
- except Exception as e:
- logger.error(f"Error {e} while installing user tool dependencies.")
- raise
- try:
- providers = await self._create_providers(
- provider_factory, *args, **kwargs
- )
- except Exception as e:
- logger.error(f"Error {e} while creating R2RProviders.")
- raise
- service_params = {
- "config": self.config,
- "providers": providers,
- }
- services = self._create_services(service_params)
- await services.maintenance.initialize()
- routers = {
- "chunks_router": ChunksRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "collections_router": CollectionsRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "conversations_router": ConversationsRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "documents_router": DocumentsRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "graph_router": GraphRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "indices_router": IndicesRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "prompts_router": PromptsRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "retrieval_router": RetrievalRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "system_router": SystemRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- "users_router": UsersRouter(
- providers=providers,
- services=services,
- config=self.config,
- ).get_router(),
- }
- return R2RApp(
- config=self.config,
- orchestration_provider=providers.orchestration,
- services=services,
- providers=providers,
- **routers,
- )
- async def _create_providers(
- self, provider_factory: Type[R2RProviderFactory], *args, **kwargs
- ) -> R2RProviders:
- factory = provider_factory(self.config)
- return await factory.create_providers(*args, **kwargs)
- def _create_services(self, service_params: dict[str, Any]) -> R2RServices:
- services = R2RBuilder._SERVICES
- service_instances = {}
- for service_type in services:
- service_class = globals()[f"{service_type.capitalize()}Service"]
- service_instances[service_type] = service_class(**service_params)
- return R2RServices(**service_instances)
|