123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- from typing import Optional, Any
- from r2r import R2RAsyncClient
- from r2r import R2RClient
- from fastapi import UploadFile
- from app.libs.util import verify_jwt_expiration
- from config.llm import tool_settings
- import nest_asyncio
- # 使得异步代码可以在已运行的事件循环中嵌套
- nest_asyncio.apply()
- client = R2RAsyncClient(tool_settings.R2R_BASE_URL)
- client_sync = R2RClient(tool_settings.R2R_BASE_URL)
- class R2R:
- client: R2RAsyncClient
- client_sync: R2RClient
- def __init__(self):
- self.auth_enabled = tool_settings.R2R_USERNAME and tool_settings.R2R_PASSWORD
- # self.client = R2RAsyncClient(tool_settings.R2R_BASE_URL)
- # self.client_sync = R2RClient(tool_settings.R2R_BASE_URL)
- self.client = client
- self.client_sync = client_sync
- def init_sync(self):
- if not self.auth_enabled:
- return
- # if not self.client_sync:
- # client_sync = R2RClient(tool_settings.R2R_BASE_URL)
- self.client_sync.users.login(
- tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD
- )
- print(
- "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME
- )
- # print(tool_settings.R2R_USERNAME)
- # print(tool_settings.R2R_PASSWORD)
- print(self.client_sync)
- return self.client_sync
- async def init(self):
- if not self.auth_enabled:
- return
- # if not self.client:
- print(
- "1111111111111111111111111111111122222vvdgdfdf" + tool_settings.R2R_USERNAME
- )
- print(tool_settings.R2R_USERNAME)
- print(tool_settings.R2R_PASSWORD)
- # client = R2RAsyncClient(tool_settings.R2R_BASE_URL)
- await self.client.users.login(
- tool_settings.R2R_USERNAME, tool_settings.R2R_PASSWORD
- )
- print(self.client.access_token)
- return self.client
- async def ingest_file(self, file_path: str, metadata: Optional[dict]):
- client = await self._check_login()
- return await client.documents.create(
- file_path=file_path,
- metadata=metadata if metadata else None,
- ingestion_mode="fast",
- id=None,
- )
- async def ingest_fileinfo(self, file: UploadFile, metadata: Optional[dict]):
- client = await self._check_login()
- return await client.documents.create(
- file=file,
- metadata=metadata if metadata else None,
- id=None,
- )
- def search(self, query: str, filters: dict[str, Any]):
- client = self._check_login_sync()
- print(
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaasssssssssssssssssssssssssssssssssssssssssgggggggggggggggggggg"
- )
- search_response = client.retrieval.search(
- query=query,
- # search_mode="basic",
- search_settings={
- "filters": filters,
- "limit": tool_settings.R2R_SEARCH_LIMIT,
- },
- )
- print("vvvvvvvvvvvvvvvvvvmmmmmmmmmmmmmmmmmmmmmmmmmmmmmm")
- # print(search_response)
- print(search_response.results)
- return search_response.results.chunk_search_results
- def list_chunks(self, ids: list[str] = []):
- client = self._check_login_sync()
- print(
- "retrieve_documentsretrieve_documentsretrieve_documentsretrieve_documentsretrieve_documents"
- )
- print(ids)
- allfile = []
- for id in ids:
- listed = client.documents.list_chunks(id=id)
- allfile += listed.results
- return allfile
- def list_documents(
- self,
- id: Optional[str] = "",
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ):
- client = self._check_login_sync()
- """
- docs = client.collections.list_documents(empty_coll_id).results
- assert len(docs) == 0, "Expected no documents in a new empty collection"
- """
- print(
- "collectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollectionscollections"
- )
- if id != "":
- try:
- listed = client.collections.list_documents(
- id=id, limit=limit, offset=offset
- )
- print(listed.results)
- return listed.results
- except Exception as e:
- print(e)
- listed = []
- return listed
- else:
- return []
- async def _check_login(self):
- if not self.auth_enabled:
- return
- # if self.client.access_token and verify_jwt_expiration(self.client.access_token):
- # return
- # else:
- return await self.init()
- def _check_login_sync(self):
- print("access_tokenaccess_tokenaccess_tokenaccess_token")
- # print(client_sync)
- if not self.auth_enabled:
- return
- # try:
- # if self.client_sync.access_token and verify_jwt_expiration(
- # self.client_sync.access_token
- # ):
- # print(self.client_sync.access_token)
- # return
- # except Exception as e:
- # print(e)
- return self.init_sync()
- # 创建 R2R 实例
- # 在您的应用程序启动时调用 initialize_r2r()
- # async def initialize_r2r():
- # await r2r.init()
|