123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807 |
- import uuid
- import pytest
- from r2r import R2RClient, R2RException
- @pytest.fixture(scope="session")
- def config():
- class TestConfig:
- base_url = "http://localhost:7272"
- superuser_email = "admin@example.com"
- superuser_password = "change_me_immediately"
- known_collection_id = "122fdf6a-e116-546b-a8f6-e4cb2e2c0a09" # Example known collection ID
- return TestConfig()
- # @pytest.fixture(scope="session")
- def client(config):
- return R2RClient(config.base_url)
- @pytest.fixture
- def superuser_login(client: R2RClient, config):
- """A fixture that ensures the client is logged in as superuser."""
- client.users.login(config.superuser_email, config.superuser_password)
- yield
- # After test, if needed, we can logout or reset
- # client.users.logout()
- def register_and_return_user_id(client: R2RClient, email: str,
- password: str) -> str:
- return client.users.create(email, password).results.id
- def test_register_user(client: R2RClient):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "test_password123"
- user = client.users.create(random_email, password).results
- assert user.id is not None, "No user ID returned after registration."
- client.users.logout()
- def test_user_refresh_token(client: R2RClient):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "test_password123"
- register_and_return_user_id(client, random_email, password)
- client.users.login(random_email, password)
- old_access_token = client.access_token
- new_access_token = client.users.refresh_token().results.access_token.token
- assert new_access_token != old_access_token, (
- "Refresh token did not provide a new access token.")
- def test_change_password(client: R2RClient):
- random_email = f"{uuid.uuid4()}@example.com"
- old_password = "old_password123"
- new_password = "new_password456"
- register_and_return_user_id(client, random_email, old_password)
- client.users.login(random_email, old_password)
- change_resp = client.users.change_password(old_password,
- new_password).results
- assert change_resp.message is not None, "Change password failed."
- # Check old password no longer works
- client.users.logout()
- with pytest.raises(R2RException) as exc_info:
- client.users.login(random_email, old_password)
- assert exc_info.value.status_code == 401, (
- "Old password should not work anymore.")
- # New password should work
- client.users.login(random_email, new_password)
- client.users.logout()
- @pytest.mark.skip(
- reason=
- "Requires a real or mocked reset token retrieval if verification is implemented."
- )
- def test_request_and_reset_password(client: R2RClient):
- # This test scenario assumes you can obtain a valid reset token somehow.
- random_email = f"{uuid.uuid4()}@example.com"
- password = "initial_password123"
- register_and_return_user_id(client, random_email, password)
- client.users.logout()
- # Request password reset
- reset_req = client.users.request_password_reset(random_email).results
- assert reset_req.message is not None, "Request password reset failed."
- # Suppose we can retrieve a reset_token from test hooks or logs:
- reset_token = (
- "FAKE_RESET_TOKEN_FOR_TESTING" # Replace with actual if available
- )
- new_password = "new_reset_password789"
- # Attempt reset
- resp = client.users.reset_password(reset_token, new_password).results
- assert resp.message is not None, "Reset password failed."
- # Verify login with new password
- client.users.login(random_email, new_password)
- client.users.logout()
- def test_users_list(client: R2RClient, superuser_login):
- users_list = client.users.list().results
- assert isinstance(users_list, list), "Listing users failed."
- client.users.logout()
- def test_get_current_user(client: R2RClient, superuser_login):
- me = client.users.me().results
- assert me.id is not None, "Failed to get current user."
- client.users.logout()
- def test_get_user_by_id(client: R2RClient, superuser_login):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- user = client.users.retrieve(user_id).results
- assert user.id == user_id, "Retrieved user does not match requested ID."
- client.users.logout()
- def test_update_user(client: R2RClient, superuser_login):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- updated_name = "Updated Name"
- update_resp = client.users.update(user_id, name=updated_name).results
- assert update_resp.name == updated_name, "User update failed."
- client.users.logout()
- def test_user_collections(client: R2RClient, superuser_login, config):
- # Create a user and list their collections
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- collections = client.users.list_collections(user_id).results
- assert isinstance(collections, list), "Listing user collections failed."
- client.users.logout()
- def test_add_remove_user_from_collection(client: R2RClient, superuser_login,
- config):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- # Add user to known collection
- add_resp = client.users.add_to_collection(
- user_id, config.known_collection_id).results
- assert add_resp.success, "Failed to add user to collection."
- # Verify
- collections = client.users.list_collections(user_id).results
- assert any(
- str(col.id) == str(config.known_collection_id)
- for col in collections), "User not in collection after add."
- # Remove user from collection
- remove_resp = client.users.remove_from_collection(
- user_id, config.known_collection_id).results
- assert remove_resp.success, "Failed to remove user from collection."
- collections_after = client.users.list_collections(user_id).results
- assert not any(
- str(col.id) == str(config.known_collection_id) for col in
- collections_after), "User still in collection after removal."
- client.users.logout()
- def test_delete_user(client: R2RClient):
- # Create and then delete user
- client.users.logout()
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- client.users.create(random_email, password)
- client.users.login(random_email, password)
- user_id = client.users.me().results.id
- del_resp = client.users.delete(user_id, password).results
- assert del_resp.success, "User deletion failed."
- with pytest.raises(R2RException) as exc_info:
- client.users.login(random_email, password)
- assert exc_info.value.status_code == 404, (
- "User still exists after deletion.")
- def test_superuser_downgrade_permissions(client: R2RClient, superuser_login,
- config):
- user_email = f"test_super_{uuid.uuid4()}@test.com"
- user_password = "securepass"
- new_user_id = register_and_return_user_id(client, user_email,
- user_password)
- # Upgrade user to superuser
- upgraded_user = client.users.update(new_user_id, is_superuser=True).results
- assert upgraded_user.is_superuser == True, (
- "User not upgraded to superuser.")
- # Logout admin, login as new superuser
- client.users.logout()
- client.users.login(user_email, user_password)
- all_users = client.users.list().results
- assert isinstance(all_users, list), "New superuser cannot list users."
- # Downgrade back to normal (re-login as original admin)
- client.users.logout()
- client.users.login(config.superuser_email, config.superuser_password)
- downgraded_user = client.users.update(new_user_id,
- is_superuser=False).results
- assert downgraded_user.is_superuser == False, "User not downgraded."
- # Now login as downgraded user and verify no superuser access
- client.users.logout()
- client.users.login(user_email, user_password)
- with pytest.raises(R2RException) as exc_info:
- client.users.list()
- assert exc_info.value.status_code == 403, (
- "Downgraded user still has superuser privileges.")
- client.users.logout()
- def test_non_owner_delete_collection(client: R2RClient):
- # Create owner user
- owner_email = f"owner_{uuid.uuid4()}@test.com"
- owner_password = "pwd123"
- client.users.create(owner_email, owner_password)
- client.users.login(owner_email, owner_password)
- coll_id = client.collections.create(name="Owner Collection").results.id
- # Create another user and get their ID
- non_owner_email = f"nonowner_{uuid.uuid4()}@test.com"
- non_owner_password = "pwd1234"
- client.users.logout()
- client.users.create(non_owner_email, non_owner_password)
- client.users.login(non_owner_email, non_owner_password)
- non_owner_id = client.users.me().results.id
- client.users.logout()
- # Owner adds non-owner to collection
- client.users.login(owner_email, owner_password)
- client.collections.add_user(coll_id, non_owner_id)
- client.users.logout()
- # Non-owner tries to delete collection
- client.users.login(non_owner_email, non_owner_password)
- with pytest.raises(R2RException) as exc_info:
- result = client.collections.delete(coll_id)
- assert exc_info.value.status_code == 403, (
- "Wrong error code for non-owner deletion attempt")
- # Cleanup
- client.users.logout()
- client.users.login(owner_email, owner_password)
- client.collections.delete(coll_id)
- client.users.logout()
- def test_update_user_with_invalid_email(client: R2RClient, superuser_login):
- # Create a user
- email = f"{uuid.uuid4()}@example.com"
- password = "password"
- user_id = register_and_return_user_id(client, email, password)
- # Attempt to update to invalid email
- with pytest.raises(R2RException) as exc_info:
- client.users.update(user_id, email="not-an-email")
- # Expect a validation error (likely 422)
- assert exc_info.value.status_code in [
- 400,
- 422,
- ], "Expected validation error for invalid email."
- client.users.logout()
- def test_update_user_email_already_exists(client: R2RClient, superuser_login):
- # Create two users
- email1 = f"{uuid.uuid4()}@example.com"
- email2 = f"{uuid.uuid4()}@example.com"
- password = "password"
- user1_id = register_and_return_user_id(client, email1, password)
- user2_id = register_and_return_user_id(client, email2, password)
- # Try updating user2's email to user1's email
- with pytest.raises(R2RException) as exc_info:
- client.users.update(user2_id, email=email1)
- # Expect a conflict (likely 409) or validation error
- # TODO - Error code should be in [400, 409, 422], not 500
- assert exc_info.value.status_code in [
- 400,
- 409,
- 422,
- 500,
- ], "Expected error updating email to an existing user's email."
- client.users.logout()
- def test_delete_user_with_incorrect_password(client: R2RClient):
- email = f"{uuid.uuid4()}@example.com"
- password = "correct_password"
- # user_id = register_and_return_user_id(client: R2RClient, email, password)
- client.users.create(email, password)
- client.users.login(email, password)
- user_id = client.users.me().results.id
- # Attempt deletion with incorrect password
- with pytest.raises(R2RException) as exc_info:
- client.users.delete(user_id, "wrong_password")
- # TODO - Error code should be in [401, 403]
- assert exc_info.value.status_code in [
- 400,
- 401,
- 403,
- ], "Expected auth error with incorrect password on delete."
- def test_login_with_incorrect_password(client: R2RClient):
- email = f"{uuid.uuid4()}@example.com"
- password = "password123"
- client.users.create(email, password)
- # Try incorrect password
- with pytest.raises(R2RException) as exc_info:
- client.users.login(email, "wrongpass")
- assert exc_info.value.status_code == 401, (
- "Expected 401 when logging in with incorrect password.")
- client.users.logout()
- def test_refresh_token(client: R2RClient):
- # Assume that refresh token endpoint checks token validity
- # Try using a bogus refresh token
- email = f"{uuid.uuid4()}@example.com"
- password = "password123"
- client.users.create(email, password)
- client.users.login(email, password)
- client.users.refresh_token() # refresh_token="invalid_token")
- # assert exc_info.value.status_code in [400, 401], "Expected error using invalid refresh token."
- client.users.logout()
- @pytest.mark.skip(reason="Email verification logic not implemented.")
- def test_verification_with_invalid_code(client: R2RClient):
- # If your system supports email verification
- email = f"{uuid.uuid4()}@example.com"
- password = "password"
- register_and_return_user_id(client, email, password)
- # Try verifying with invalid code
- with pytest.raises(R2RException) as exc_info:
- client.users.verify_email(email, "wrong_code")
- assert exc_info.value.status_code in [
- 400,
- 422,
- ], "Expected error verifying with invalid code."
- client.users.logout()
- @pytest.mark.skip(
- reason="Verification and token logic depends on implementation.")
- def test_password_reset_with_invalid_token(client: R2RClient):
- email = f"{uuid.uuid4()}@example.com"
- password = "initialpass"
- register_and_return_user_id(client, email, password)
- client.users.logout()
- # Assume request password reset done here if needed
- # Try resetting with invalid token
- with pytest.raises(R2RException) as exc_info:
- client.users.reset_password("invalid_token", "newpass123")
- assert exc_info.value.status_code in [
- 400,
- 422,
- ], "Expected error resetting password with invalid token."
- client.users.logout()
- @pytest.fixture
- def user_with_api_key(client: R2RClient):
- """Fixture that creates a user and returns their ID and API key details."""
- random_email = f"{uuid.uuid4()}@example.com"
- password = "api_key_test_password"
- user_id = client.users.create(random_email, password).results.id
- # Login to create an API key
- client.users.login(random_email, password)
- api_key_resp = client.users.create_api_key(user_id).results
- api_key = api_key_resp.api_key
- key_id = api_key_resp.key_id
- yield user_id, api_key, key_id
- # Cleanup
- try:
- client.users.delete_api_key(user_id, key_id)
- except:
- pass
- client.users.logout()
- def test_api_key_lifecycle(client: R2RClient):
- """Test the complete lifecycle of API keys including creation, listing, and
- deletion."""
- # Create user and login
- email = f"{uuid.uuid4()}@example.com"
- password = "api_key_test_password"
- user_id = client.users.create(email, password).results.id
- client.users.login(email, password)
- # Create API key
- api_key_resp = client.users.create_api_key(user_id).results
- assert api_key_resp.api_key is not None, "API key not returned"
- assert api_key_resp.key_id is not None, "Key ID not returned"
- assert api_key_resp.public_key is not None, "Public key not returned"
- key_id = api_key_resp.key_id
- # List API keys
- list_resp = client.users.list_api_keys(user_id).results
- assert len(list_resp) > 0, "No API keys found after creation"
- assert list_resp[0].key_id == key_id, (
- "Listed key ID doesn't match created key")
- assert list_resp[0].updated_at is not None, "Updated timestamp missing"
- assert list_resp[0].public_key is not None, "Public key missing in list"
- # Delete API key using key_id
- delete_resp = client.users.delete_api_key(user_id, key_id).results
- assert delete_resp.success, "Failed to delete API key"
- # Verify deletion
- list_resp_after = client.users.list_api_keys(user_id).results
- assert not any(
- k.key_id == key_id
- for k in list_resp_after), ("API key still exists after deletion")
- client.users.logout()
- def test_api_key_authentication(client: R2RClient, user_with_api_key):
- """Test using an API key for authentication."""
- user_id, api_key, _ = user_with_api_key
- # Create new client with API key
- api_client = R2RClient(client.base_url)
- api_client.set_api_key(api_key)
- # Test API key authentication
- me_id = api_client.users.me().results.id
- assert me_id == user_id, "API key authentication failed"
- def test_api_key_permissions(client: R2RClient, user_with_api_key):
- """Test API key permission restrictions."""
- user_id, api_key, _ = user_with_api_key
- # Create new client with API key
- api_client = R2RClient(client.base_url)
- api_client.set_api_key(api_key)
- # Should not be able to list all users (superuser only)
- with pytest.raises(R2RException) as exc_info:
- api_client.users.list()
- assert exc_info.value.status_code == 403, (
- "Non-superuser API key shouldn't list users")
- def test_invalid_api_key(client: R2RClient):
- """Test behavior with invalid API key."""
- api_client = R2RClient(client.base_url)
- api_client.set_api_key("invalid.api.key")
- with pytest.raises(R2RException) as exc_info:
- api_client.users.me()
- assert exc_info.value.status_code == 401, (
- "Expected 401 for invalid API key")
- def test_multiple_api_keys(client: R2RClient):
- """Test creating and managing multiple API keys for a single user."""
- email = f"{uuid.uuid4()}@example.com"
- password = "multi_key_test_password"
- user_id = client.users.create(email, password).results.id
- client.users.login(email, password)
- # Create multiple API keys
- key_ids = []
- for i in range(3):
- key_resp = client.users.create_api_key(user_id).results
- key_ids.append(key_resp.key_id)
- # List and verify all keys exist
- list_resp = client.users.list_api_keys(user_id).results
- assert len(list_resp) >= 3, "Not all API keys were created"
- # Delete keys one by one and verify counts
- for key_id in key_ids:
- client.users.delete_api_key(user_id, key_id)
- current_keys = client.users.list_api_keys(user_id).results
- assert not any(k.key_id == key_id for k in current_keys), (
- f"Key {key_id} still exists after deletion")
- client.users.logout()
- def test_update_user_limits_overrides(client: R2RClient):
- # 1) Create user
- user_email = f"test_{uuid.uuid4()}@example.com"
- client.users.create(user_email, "SomePassword123!")
- client.users.login(user_email, "SomePassword123!")
- # 2) Confirm the default overrides is None
- fetched_user = client.users.me().results
- client.users.logout()
- assert len(fetched_user.limits_overrides) == 0
- # 3) Update the overrides
- overrides = {
- "global_per_min": 10,
- "monthly_limit": 3000,
- "route_overrides": {
- "/some-route": {
- "route_per_min": 5
- },
- },
- }
- client.users.update(id=fetched_user.id, limits_overrides=overrides)
- # 4) Fetch user again, check
- client.users.login(user_email, "SomePassword123!")
- updated_user = client.users.me().results
- assert len(updated_user.limits_overrides) != 0
- assert updated_user.limits_overrides["global_per_min"] == 10
- assert (updated_user.limits_overrides["route_overrides"]["/some-route"]
- ["route_per_min"] == 5)
- def test_collection_ownership_filtering(client: R2RClient):
- """Test the ownerOnly filter parameter in collections list endpoint."""
- # Create two test users
- user1_email = f"user1_{uuid.uuid4()}@test.com"
- user1_password = "password123"
- user2_email = f"user2_{uuid.uuid4()}@test.com"
- user2_password = "password123"
- # Register users
- client.users.create(user1_email, user1_password)
- client.users.create(user2_email, user2_password)
- # Login as user1 and create a collection
- client.users.login(user1_email, user1_password)
- user1_id = client.users.me().results.id
- user1_collection = client.collections.create(name="User1 Collection").results
- user1_collection_id = user1_collection.id
- # Login as user2 and create a collection
- client.users.logout()
- client.users.login(user2_email, user2_password)
- user2_id = client.users.me().results.id
- user2_collection = client.collections.create(name="User2 Collection").results
- user2_collection_id = user2_collection.id
- # User2 adds user1 to their collection
- client.collections.add_user(user2_collection_id, user1_id)
- # Login as user1 and check collections
- client.users.logout()
- client.users.login(user1_email, user1_password)
- # List all collections
- all_collections = client.collections.list().results
- all_collection_ids = [str(col.id) for col in all_collections]
- # Verify user1 can see their own collection
- assert str(user1_collection_id) in all_collection_ids, "User1 can't see their own collection"
- # Verify user1 can see user2's shared collection
- assert str(user2_collection_id) in all_collection_ids, "User1 can't see shared collection"
- # List only owned collections
- owned_collections = client.collections.list(owner_only=True).results
- owned_collection_ids = [str(col.id) for col in owned_collections]
- # Verify user1's collection is in the owned list
- assert str(user1_collection_id) in owned_collection_ids, "User1's collection not in owned list"
- # Verify user2's collection is NOT in the owned list
- assert str(user2_collection_id) not in owned_collection_ids, "Shared collection should not be in owned list"
- # User1 adds user2 to their collection
- client.collections.add_user(user1_collection_id, user2_id)
- # Login as user2 and check collections
- client.users.logout()
- client.users.login(user2_email, user2_password)
- # List all collections
- all_collections = client.collections.list().results
- all_collection_ids = [str(col.id) for col in all_collections]
- # Verify user2 can see their own collection
- assert str(user2_collection_id) in all_collection_ids, "User2 can't see their own collection"
- # Verify user2 can see user1's shared collection
- assert str(user1_collection_id) in all_collection_ids, "User2 can't see shared collection"
- # List only owned collections
- owned_collections = client.collections.list(owner_only=True).results
- owned_collection_ids = [str(col.id) for col in owned_collections]
- # Verify user2's collection is in the owned list
- assert str(user2_collection_id) in owned_collection_ids, "User2's collection not in owned list"
- # Verify user1's collection is NOT in the owned list
- assert str(user1_collection_id) not in owned_collection_ids, "Shared collection should not be in owned list"
- # Cleanup
- client.users.logout()
- def test_superuser_collection_ownership_filtering(client: R2RClient, superuser_login, config):
- """Test the ownerOnly filter for superusers."""
- # Create a regular user
- user_email = f"regular_{uuid.uuid4()}@test.com"
- user_password = "password123"
- client.users.create(user_email, user_password)
- # Create a collection as superuser
- superuser_collection = client.collections.create(name="Superuser Collection").results
- superuser_id = client.users.me().results.id
- # List all collections as superuser (without filter)
- all_collections_count = len(client.collections.list().results)
- assert all_collections_count > 0, "Superuser should see collections"
- # List only owned collections as superuser
- owned_collections = client.collections.list(owner_only=True).results
- owned_count = len(owned_collections)
- assert owned_count > 0, "Superuser should see owned collections"
- assert owned_count < all_collections_count, "Filtered list should be smaller than all collections"
- # Verify the superuser collection is in the owned list
- assert any(str(col.id) == str(superuser_collection.id) for col in owned_collections), \
- "Superuser collection should be in the owned list"
- # Cleanup
- client.collections.delete(superuser_collection.id)
- client.users.logout()
- def test_collection_filter_invalid_parameters(client: R2RClient):
- """Test error handling for invalid filter parameters."""
- # Create a test user
- user_email = f"test_{uuid.uuid4()}@test.com"
- user_password = "password123"
- client.users.create(user_email, user_password)
- client.users.login(user_email, user_password)
- # Test with invalid owner_only parameter type (should be bool, not string)
- with pytest.raises(R2RException) as exc_info:
- client.collections.list(owner_only="not-a-bool")
- assert exc_info.value.status_code in [400, 422], \
- "Expected validation error for invalid owner_only parameter"
- client.users.logout()
- def test_document_ownership_filtering(client: R2RClient):
- """Test the ownerOnly filter parameter in documents list endpoint."""
- # Create two test users
- user1_email = f"user1_doc_{uuid.uuid4()}@test.com"
- user1_password = "password123"
- user2_email = f"user2_doc_{uuid.uuid4()}@test.com"
- user2_password = "password123"
- # Register users
- client.users.create(user1_email, user1_password)
- client.users.create(user2_email, user2_password)
- # Login as user1 and create a document and collection
- client.users.login(user1_email, user1_password)
- user1_id = client.users.me().results.id
- user1_collection = client.collections.create(name="User1 Doc Collection").results
- user1_collection_id = user1_collection.id
- user1_document = client.documents.create(
- raw_text="User 1 document content",
- metadata={"title": "User 1 Document"}
- ).results
- user1_document_id = user1_document.document_id
- # Wait for processing
- import time
- time.sleep(5)
- # Login as user2 and create a document and collection
- client.users.logout()
- client.users.login(user2_email, user2_password)
- user2_id = client.users.me().results.id
- user2_collection = client.collections.create(name="User2 Doc Collection").results
- user2_collection_id = user2_collection.id
- user2_document = client.documents.create(
- raw_text="User 2 document content",
- metadata={"title": "User 2 Document"}
- ).results
- user2_document_id = user2_document.document_id
- # Wait for processing
- time.sleep(5)
- # Add user1's document to user2's collection
- client.collections.add_document(user2_collection_id, user1_document_id)
- # Login as user1 and check documents
- client.users.logout()
- client.users.login(user1_email, user1_password)
- # List all documents
- all_documents = client.documents.list().results
- all_document_ids = [str(doc.id) for doc in all_documents]
- # Verify user1 can see their own document
- assert str(user1_document_id) in all_document_ids, "User1 can't see their own document"
- # List only owned documents
- owned_documents = client.documents.list(owner_only=True).results
- owned_document_ids = [str(doc.id) for doc in owned_documents]
- # Verify user1's document is in the owned list
- assert str(user1_document_id) in owned_document_ids, "User1's document not in owned list"
- # Add user2's document to user1's collection
- client.collections.add_document(user1_collection_id, user2_document_id)
- # Login as user2 and check documents
- client.users.logout()
- client.users.login(user2_email, user2_password)
- # List all documents
- all_documents = client.documents.list().results
- all_document_ids = [str(doc.id) for doc in all_documents]
- # Verify user2 can see their own document
- assert str(user2_document_id) in all_document_ids, "User2 can't see their own document"
- # Verify user2 can see user1's shared document
- assert str(user1_document_id) in all_document_ids, "User2 can't see shared document"
- # List only owned documents
- owned_documents = client.documents.list(owner_only=True).results
- owned_document_ids = [str(doc.id) for doc in owned_documents]
- # Verify user2's document is in the owned list
- assert str(user2_document_id) in owned_document_ids, "User2's document not in owned list"
- # Verify user1's document is NOT in the owned list
- assert str(user1_document_id) not in owned_document_ids, "Shared document should not be in owned list"
- # Cleanup - login as the right user first
- client.users.logout()
- client.users.login(user1_email, user1_password)
- try:
- client.documents.delete(user1_document_id)
- except Exception as e:
- print(f"Failed to delete user1's document: {e}")
- client.users.logout()
- client.users.login(user2_email, user2_password)
- try:
- client.documents.delete(user2_document_id)
- except Exception as e:
- print(f"Failed to delete user2's document: {e}")
- client.users.logout()
- def test_document_filter_invalid_parameters(client: R2RClient):
- """Test error handling for invalid filter parameters in documents endpoint."""
- # Create a test user
- user_email = f"test_doc_{uuid.uuid4()}@test.com"
- user_password = "password123"
- client.users.create(user_email, user_password)
- client.users.login(user_email, user_password)
- # Test with invalid owner_only parameter type (should be bool, not string)
- with pytest.raises(R2RException) as exc_info:
- client.documents.list(owner_only="not-a-bool")
- assert exc_info.value.status_code in [400, 422], \
- "Expected validation error for invalid owner_only parameter"
- client.users.logout()
|