from typing import Optional, Any from r2r import R2RAsyncClient from r2r import R2RClient from fastapi import UploadFile from app.libs.util import verify_jwt_expiration from config.llm import tool_settings # import nest_asyncio # 使得异步代码可以在已运行的事件循环中嵌套 # nest_asyncio.apply() client = R2RAsyncClient(tool_settings.R2R_BASE_URL) client_sync = R2RClient(tool_settings.R2R_BASE_URL) class R2R: client: R2RAsyncClient client_sync: R2RClient def __init__(self): self.auth_enabled = tool_settings.R2R_USERNAME and tool_settings.R2R_PASSWORD # self.client = R2RAsyncClient(tool_settings.R2R_BASE_URL) # self.client_sync = R2RClient(tool_settings.R2R_BASE_URL) self.client = client self.client_sync = client_sync def init_sync(self): if not self.auth_enabled: return # if not self.client_sync: # client_sync = R2RClient(tool_settings.R2R_BASE_URL) self.client_sync.users.login( tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD ) print( "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME ) # print(tool_settings.R2R_USERNAME) # print(tool_settings.R2R_PASSWORD) print(self.client_sync) return self.client_sync async def init(self): if not self.auth_enabled: return # if not self.client: print( "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME ) print(tool_settings.R2R_USERNAME) print(tool_settings.R2R_PASSWORD) # client = R2RAsyncClient(tool_settings.R2R_BASE_URL) await self.client.users.login( tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD ) print(self.client.access_token) return self.client async def ingest_file(self, file_path: str, metadata: Optional[dict]): client = await self._check_login() return await client.documents.create( file_path=file_path, metadata=metadata if metadata else None, ingestion_mode="fast", id=None, run_with_orchestration=False, ) async def ingest_fileinfo(self, file: UploadFile, metadata: Optional[dict]): client = await self._check_login() return await client.documents.create( file=file, metadata=metadata if metadata else None, id=None, ) def search(self, query: str, filters: dict[str, Any]): client = self._check_login_sync() print( "aaaaaaaaaaaaaaaaaaaaaaaaaaaasssssssssssssssssssssssssssssssssssssssssgggggggggggggggggggg" ) search_response = client.retrieval.search( query=query, # search_mode="basic", search_settings={ "filters": filters, "limit": tool_settings.R2R_SEARCH_LIMIT, }, ) print("vvvvvvvvvvvvvvvvvvmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm") # print(search_response) print(search_response.results) return search_response.results.chunk_search_results def ingest_file_sync(self, file_path: str, metadata: Optional[dict]): client = self._check_login_sync() return client.documents.create( file_path=file_path, metadata=metadata if metadata else None, ingestion_mode="fast", id=None, run_with_orchestration=False, ) def list_chunks(self, ids: list[str] = []): client = self._check_login_sync() print( "retrieve_documentsretrieve_documentsretrieve_documentsretrieve_documentsretrieve_documents" ) print(ids) allfile = [] for id in ids: listed = client.documents.list_chunks(id=id) allfile += listed.results return allfile def list_documents( self, id: Optional[str] = "", offset: Optional[int] = 0, limit: Optional[int] = 100, ): client = self._check_login_sync() """ docs = client.collections.list_documents(empty_coll_id).results assert len(docs) == 0, "Expected no documents in a new empty collection" """ print( "collectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollections" ) if id != "": try: listed = client.collections.list_documents( id=id, limit=limit, offset=offset ) print(listed.results) return listed.results except Exception as e: print(e) listed = [] return listed else: return [] async def _check_login(self): if not self.auth_enabled: return if self.client.access_token and verify_jwt_expiration(self.client.access_token): return self.client else: return await self.init() def _check_login_sync(self): print("access_tokenaccess_tokenaccess_tokenaccess_token") # print(client_sync) if not self.auth_enabled: return try: if self.client_sync.access_token and verify_jwt_expiration( self.client_sync.access_token ): print(self.client_sync.access_token) return self.client_sync except Exception as e: print(e) return self.init_sync() # 创建 R2R 实例 # 在您的应用程序启动时调用 initialize_r2r() # async def initialize_r2r(): # await r2r.init()