from typing import Optional, Any from r2r import R2RAsyncClient from app.libs.util import verify_jwt_expiration from config.llm import tool_settings # import nest_asyncio import asyncio # Apply nest_asyncio to allow nested event loops # nest_asyncio.apply() class R2R: client: R2RAsyncClient def __init__(self): self.auth_enabled = tool_settings.R2R_USERNAME and tool_settings.R2R_PASSWORD self.client = None async def init(self): if not self.auth_enabled: return if not self.client: self.client = R2RAsyncClient(tool_settings.R2R_BASE_URL, "/v3") await self.client.users.login( tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD ) print(self.client.access_token) def ingest_file(self, file_path: str, metadata: Optional[dict]): self._check_login() loop = asyncio.get_event_loop() if loop.is_running(): # 如果事件循环已经在运行,可以通过loop.create_task()调度任务 ingest_response = loop.create_task( self.client.documents.create( file_path=file_path, metadata=metadata if metadata else None, id=None, ) ) else: # 如果没有运行中的事件循环,使用 run_until_complete 来执行 ingest_response = loop.run_until_complete( self.client.documents.create( file_path=file_path, metadata=metadata if metadata else None, id=None, ) ) return ingest_response.get("results") def search(self, query: str, filters: dict[str, Any]): self._check_login() loop = asyncio.get_event_loop() if loop.is_running(): # 如果事件循环已经在运行,可以通过loop.create_task()调度任务 search_response = loop.create_task( self.client.retrieval.search( query=query, search_settings={ "filters": filters, "limit": tool_settings.R2R_SEARCH_LIMIT, }, ) ) else: # 如果没有运行中的事件循环,使用 run_until_complete 来执行 search_response = loop.run_until_complete( self.client.retrieval.search( query=query, search_settings={ "filters": filters, "limit": tool_settings.R2R_SEARCH_LIMIT, }, ) ) return search_response.get("results").get("chunk_search_results") def _check_login(self): if not self.auth_enabled: return if not self.client.access_token and verify_jwt_expiration( self.client.access_token ): return else: asyncio.create_task(self.init()) # 创建 R2R 实例 r2r = R2R() # 在您的应用程序启动时调用 initialize_r2r() async def initialize_r2r(): await r2r.init()