| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 | from typing import Optional, Anyfrom r2r import R2RAsyncClientfrom r2r import R2RClientfrom fastapi import UploadFilefrom app.libs.util import verify_jwt_expirationfrom config.llm import tool_settingsimport nest_asyncio# 使得异步代码可以在已运行的事件循环中嵌套nest_asyncio.apply()class R2R:    client: R2RAsyncClient    client_sync: R2RClient    def __init__(self):        self.auth_enabled = tool_settings.R2R_USERNAME and tool_settings.R2R_PASSWORD        self.client = None        self.client_sync = None    def init_sync(self):        if not self.auth_enabled:            return        if not self.client_sync:            self.client_sync = R2RClient(tool_settings.R2R_BASE_URL, "/v3")            self.client_sync.users.login(                tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD            )        print(            "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME        )        print(tool_settings.R2R_USERNAME)        print(tool_settings.R2R_PASSWORD)        print(self.client_sync.access_token)    async def init(self):        if not self.auth_enabled:            return        if not self.client:            self.client = R2RAsyncClient(tool_settings.R2R_BASE_URL, "/v3")        print(            "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME        )        print(tool_settings.R2R_USERNAME)        print(tool_settings.R2R_PASSWORD)        await self.client.users.login(            tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD        )        print(self.client.access_token)    async def ingest_file(self, file_path: str, metadata: Optional[dict]):        await self._check_login()        return await self.client.documents.create(            file_path=file_path,            metadata=metadata if metadata else None,            id=None,        )    async def ingest_fileinfo(self, file: UploadFile, metadata: Optional[dict]):        await self._check_login()        return await self.client.documents.create(            file=file,            metadata=metadata if metadata else None,            id=None,        )    def search(self, query: str, filters: dict[str, Any]):        self._check_login_sync()        print(            "aaaaaaaaaaaaaaaaaaaaaaaaaaaasssssssssssssssssssssssssssssssssssssssssgggggggggggggggggggg"        )        search_response = self.client_sync.retrieval.search(            query=query,            search_settings={                "filters": filters,                "limit": tool_settings.R2R_SEARCH_LIMIT,            },        )        print("vvvvvvvvvvvvvvvvvvmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm")        print(search_response.get("results"))        return search_response.get("results").get("chunk_search_results")    def list(        self,        ids: Optional[list[str]] = None,        offset: Optional[int] = 0,        limit: Optional[int] = 100,    ):        self._check_login_sync()        print("aaaaaaaaaaaaaaaaaaaaaaaaaaaaassssssssssssssssssssssssssssssssssss")        print(ids)        """            listed = mutable_client.documents.list(limit=2, offset=0)    results = listed.results    assert len(results) == 2, "Expected 2 results for paginated listing"        """        print("listlistlistlistlistlistlistlistlistlistlistlistlistlistlistlistlist")        if len(ids) > 0:            listed = self.client_sync.documents.list(                ids=ids, limit=limit, offset=offset            )            print(listed.get("results"))            return listed.get("results")        else:            return []    def list_documents(        self,        id: Optional[str] = "",        offset: Optional[int] = 0,        limit: Optional[int] = 100,    ):        self._check_login_sync()        """            docs = client.collections.list_documents(empty_coll_id).results            assert len(docs) == 0, "Expected no documents in a new empty collection"        """        print(            "collectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollections"        )        if id != "":            try:                listed = self.client_sync.collections.list_documents(                    id=id, limit=limit, offset=offset                )                print(listed.get("results"))                return listed.get("results")            except Exception as e:                print(e)                listed = []                return listed        else:            return []    async def _check_login(self):        if not self.auth_enabled:            return        if not self.client.access_token and verify_jwt_expiration(            self.client.access_token        ):            return        else:            await self.init()    def _check_login_sync(self):        if not self.auth_enabled:            return        try:            if not self.client_sync.access_token and verify_jwt_expiration(                self.client_sync.access_token            ):                return        except Exception as e:            print(e)        self.client_sync.users.login(            tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD        )# 创建 R2R 实例r2r = R2R()# 在您的应用程序启动时调用 initialize_r2r()async def initialize_r2r():    await r2r.init()
 |