123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import uuid
- import asyncio
- import time
- from typing import AsyncGenerator
- import pytest
- from r2r import R2RAsyncClient, R2RClient, R2RException
- class RetryableR2RAsyncClient(R2RAsyncClient):
- """R2RAsyncClient with automatic retry logic for timeouts"""
- async def _make_request(self, method, endpoint, version="v3", **kwargs):
- retries = 0
- max_retries = 3
- delay = 1.0
- while True:
- try:
- return await super()._make_request(method, endpoint, version, **kwargs)
- except R2RException as e:
- if "Request failed" in str(e) and retries < max_retries:
- retries += 1
- wait_time = delay * (2 ** (retries - 1))
- print(f"Request timed out. Retrying ({retries}/{max_retries}) after {wait_time:.2f}s...")
- await asyncio.sleep(wait_time)
- elif "429" in str(e) and retries < max_retries:
- retries += 1
- wait_time = delay * (3 ** (retries - 1))
- print(f"Rate limited. Retrying ({retries}/{max_retries}) after {wait_time:.2f}s...")
- await asyncio.sleep(wait_time)
- else:
- raise
- class RetryableR2RClient(R2RClient):
- """R2RClient with automatic retry logic for timeouts"""
- def _make_request(self, method, endpoint, version="v3", **kwargs):
- retries = 0
- max_retries = 3
- delay = 1.0
- while True:
- try:
- return super()._make_request(method, endpoint, version, **kwargs)
- except R2RException as e:
- if ("Request failed" in str(e) or "timed out" in str(e)) and retries < max_retries:
- retries += 1
- wait_time = delay * (2 ** (retries - 1))
- print(f"Request timed out. Retrying ({retries}/{max_retries}) after {wait_time:.2f}s...")
- time.sleep(wait_time)
- elif "429" in str(e) and retries < max_retries:
- retries += 1
- wait_time = delay * (3 ** (retries - 1))
- print(f"Rate limited. Retrying ({retries}/{max_retries}) after {wait_time:.2f}s...")
- time.sleep(wait_time)
- else:
- raise
- class TestConfig:
- def __init__(self):
- self.base_url = "http://localhost:7272"
- self.index_wait_time = 1.0
- self.chunk_creation_wait_time = 1.0
- self.superuser_email = "admin@example.com"
- self.superuser_password = "change_me_immediately"
- self.test_timeout = 30 # seconds
- # Change this to session scope to match the client fixture
- @pytest.fixture(scope="session")
- def config() -> TestConfig:
- return TestConfig()
- @pytest.fixture(scope="session")
- async def client(config) -> AsyncGenerator[R2RClient, None]:
- """Create a shared client instance for the test session."""
- yield RetryableR2RClient(config.base_url)
- @pytest.fixture
- def mutable_client(config) -> R2RClient:
- """Create a shared client instance for the test session."""
- return RetryableR2RClient(config.base_url)
- @pytest.fixture
- async def aclient(config) -> AsyncGenerator[R2RAsyncClient, None]:
- """Create a retryable client instance for the test session."""
- yield RetryableR2RAsyncClient(config.base_url)
- @pytest.fixture
- async def superuser_client(
- mutable_client: R2RClient,
- config: TestConfig) -> AsyncGenerator[R2RClient, None]:
- """Creates a superuser client for tests requiring elevated privileges."""
- await mutable_client.users.login(config.superuser_email, config.superuser_password)
- yield mutable_client
- await mutable_client.users.logout()
- @pytest.fixture(scope="session")
- def test_document(client: R2RClient):
- """Create and yield a test document, then clean up."""
- random_suffix = str(uuid.uuid4())
- doc_id = client.documents.create(
- raw_text=f"{random_suffix} Test doc for collections",
- run_with_orchestration=False,
- ).results.document_id
- yield doc_id
- # Cleanup: Try deleting the document if it still exists
- try:
- client.documents.delete(id=doc_id)
- except R2RException:
- pass
- @pytest.fixture(scope="session")
- def test_collection(client: R2RClient, test_document):
- """Create a test collection with sample documents and clean up after
- tests."""
- collection_name = f"Test Collection {uuid.uuid4()}"
- collection_id = client.collections.create(name=collection_name).results.id
- docs = [
- {
- "text":
- f"Aristotle was a Greek philosopher who studied under Plato {str(uuid.uuid4())}.",
- "metadata": {
- "rating": 5,
- "tags": ["philosophy", "greek"],
- "category": "ancient",
- },
- },
- {
- "text":
- f"Socrates is considered a founder of Western philosophy {str(uuid.uuid4())}.",
- "metadata": {
- "rating": 3,
- "tags": ["philosophy", "classical"],
- "category": "ancient",
- },
- },
- {
- "text":
- f"Rene Descartes was a French philosopher. unique_philosopher {str(uuid.uuid4())}",
- "metadata": {
- "rating": 8,
- "tags": ["rationalism", "french"],
- "category": "modern",
- },
- },
- {
- "text":
- f"Immanuel Kant, a German philosopher, influenced Enlightenment thought {str(uuid.uuid4())}.",
- "metadata": {
- "rating": 7,
- "tags": ["enlightenment", "german"],
- "category": "modern",
- },
- },
- ]
- doc_ids = []
- for doc in docs:
- doc_id = client.documents.create(
- raw_text=doc["text"], metadata=doc["metadata"]).results.document_id
- doc_ids.append(doc_id)
- client.collections.add_document(collection_id, doc_id)
- client.collections.add_document(collection_id, test_document)
- yield {"collection_id": collection_id, "document_ids": doc_ids}
- # Cleanup after tests
- try:
- # Remove and delete all documents
- for doc_id in doc_ids:
- try:
- client.documents.delete(id=doc_id)
- except R2RException:
- pass
- # Delete the collection
- try:
- client.collections.delete(collection_id)
- except R2RException:
- pass
- except Exception as e:
- print(f"Error during test_collection cleanup: {e}")
|