from typing import Optional, Any from r2r import R2RAsyncClient from fastapi import UploadFile from app.libs.util import verify_jwt_expiration from config.llm import tool_settings class R2R: client: R2RAsyncClient def __init__(self): self.auth_enabled = tool_settings.R2R_USERNAME and tool_settings.R2R_PASSWORD self.client = None async def init(self): if not self.auth_enabled: return if not self.client: self.client = R2RAsyncClient(tool_settings.R2R_BASE_URL, "/v3") print( "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME ) print(tool_settings.R2R_USERNAME) print(tool_settings.R2R_PASSWORD) await self.client.users.login( tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD ) print(self.client.access_token) async def ingest_file(self, file_path: str, metadata: Optional[dict]): await self._check_login() return await self.client.documents.create( file_path=file_path, metadata=metadata if metadata else None, id=None, ) async def ingest_fileinfo(self, file: UploadFile, metadata: Optional[dict]): await self._check_login() return await self.client.documents.create( file=file, metadata=metadata if metadata else None, id=None, ) async def search(self, query: str, filters: dict[str, Any]): await self._check_login() print( "aaaaaaaaaaaaaaaaaaaaaaaaaaaasssssssssssssssssssssssssssssssssssssssssgggggggggggggggggggg" ) search_response = await self.client.retrieval.search( query=query, search_settings={ "filters": filters, "limit": tool_settings.R2R_SEARCH_LIMIT, }, ) print("vvvvvvvvvvvvvvvvvvmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm") print(search_response.get("results")) return search_response.get("results").get("chunk_search_results") async def list( self, ids: Optional[list[str]] = None, offset: Optional[int] = 0, limit: Optional[int] = 100, ): await self._check_login() """ listed = mutable_client.documents.list(limit=2, offset=0) results = listed.results assert len(results) == 2, "Expected 2 results for paginated listing" """ print("listlistlistlistlistlistlistlistlistlistlistlistlistlistlistlistlist") if len(ids) > 0: listed = await self.client.documents.list( ids=ids, limit=limit, offset=offset ) print(listed.get("results")) return listed.get("results") else: return [] async def list_documents( self, id: Optional[str] = "", offset: Optional[int] = 0, limit: Optional[int] = 100, ): await self._check_login() """ docs = client.collections.list_documents(empty_coll_id).results assert len(docs) == 0, "Expected no documents in a new empty collection" """ print( "collectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollections" ) if id != "": listed = await self.client.collections.list_documents( ids=id, limit=limit, offset=offset ) print(listed.get("results")) return listed.get("results") else: return [] async def _check_login(self): if not self.auth_enabled: return if not self.client.access_token and verify_jwt_expiration( self.client.access_token ): return else: await self.init() # 创建 R2R 实例 r2r = R2R() # 在您的应用程序启动时调用 initialize_r2r() async def initialize_r2r(): await r2r.init()