123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- import uuid
- import pytest
- from r2r import R2RClient, R2RException
- @pytest.fixture(scope="session")
- def config():
- class TestConfig:
- base_url = "http://localhost:7272"
- superuser_email = "admin@example.com"
- superuser_password = "change_me_immediately"
- known_collection_id = "122fdf6a-e116-546b-a8f6-e4cb2e2c0a09" # Example known collection ID
- return TestConfig()
- @pytest.fixture(scope="session")
- def client(config):
- client = R2RClient(config.base_url)
- # Optionally, log in as superuser here if needed globally
- # client.users.login(config.superuser_email, config.superuser_password)
- return client
- @pytest.fixture
- def superuser_login(client, config):
- """A fixture that ensures the client is logged in as superuser."""
- client.users.login(config.superuser_email, config.superuser_password)
- yield
- # After test, if needed, we can logout or reset
- # client.users.logout()
- def register_and_return_user_id(client, email: str, password: str) -> str:
- print('email = ', email)
- print('making request.....')
- user_resp = client.users.create(email, password)["results"]
- print('user_resp = ', user_resp)
- user_id = user_resp["id"]
- # If verification is mandatory, you'd have a step here to verify the user.
- # Otherwise, assume the user can login immediately.
- return user_id
- def test_register_user(client):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "test_password123"
- user_resp = client.users.create(random_email, password)
- user = user_resp["results"]
- assert "id" in user, "No user ID returned after registration."
- # COMMENTED OUT SINCE AUTH IS NOT REQUIRED BY DEFAULT IN R2R.TOML
- # def test_user_login_logout(client):
- # random_email = f"{uuid.uuid4()}@example.com"
- # password = "test_password123"
- # user_id = register_and_return_user_id(client, random_email, password)
- # login_resp = client.users.login(random_email, password)["results"]
- # assert "access_token" in login_resp, "Login failed."
- # me = client.users.me()["results"]
- # assert me["id"] == user_id, "Logged in user does not match expected user."
- # logout_resp = client.users.logout()["results"]
- # assert "message" in logout_resp, "Logout failed."
- # # After logout, token should be invalid
- # with pytest.raises(R2RException) as exc_info:
- # client.users.me()
- # assert exc_info.value.status_code == 401, "Expected 401 after logout."
- def test_user_refresh_token(client):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "test_password123"
- register_and_return_user_id(client, random_email, password)
- client.users.login(random_email, password)
- old_access_token = client.access_token
- refresh_resp = client.users.refresh_token()["results"]
- new_access_token = refresh_resp["access_token"]["token"]
- assert (
- new_access_token != old_access_token
- ), "Refresh token did not provide a new access token."
- def test_change_password(client):
- random_email = f"{uuid.uuid4()}@example.com"
- old_password = "old_password123"
- new_password = "new_password456"
- register_and_return_user_id(client, random_email, old_password)
- client.users.login(random_email, old_password)
- change_resp = client.users.change_password(old_password, new_password)[
- "results"
- ]
- assert "message" in change_resp, "Change password failed."
- # Check old password no longer works
- client.users.logout()
- with pytest.raises(R2RException) as exc_info:
- client.users.login(random_email, old_password)
- assert (
- exc_info.value.status_code == 401
- ), "Old password should not work anymore."
- # New password should work
- client.users.login(random_email, new_password)
- @pytest.mark.skip(
- reason="Requires a real or mocked reset token retrieval if verification is implemented."
- )
- def test_request_and_reset_password(client):
- # This test scenario assumes you can obtain a valid reset token somehow.
- random_email = f"{uuid.uuid4()}@example.com"
- password = "initial_password123"
- register_and_return_user_id(client, random_email, password)
- client.users.logout()
- # Request password reset
- reset_req = client.users.request_password_reset(random_email)
- assert "message" in reset_req["results"], "Request password reset failed."
- # Suppose we can retrieve a reset_token from test hooks or logs:
- reset_token = (
- "FAKE_RESET_TOKEN_FOR_TESTING" # Replace with actual if available
- )
- new_password = "new_reset_password789"
- # Attempt reset
- resp = client.users.reset_password(reset_token, new_password)
- assert "message" in resp["results"], "Reset password failed."
- # Verify login with new password
- client.users.login(random_email, new_password)
- def test_users_list(client, superuser_login):
- users_list = client.users.list()["results"]
- assert isinstance(users_list, list), "Listing users failed."
- def test_get_current_user(client, superuser_login):
- me = client.users.me()["results"]
- assert "id" in me, "Failed to get current user."
- def test_get_user_by_id(client, superuser_login):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- user = client.users.retrieve(user_id)["results"]
- assert user["id"] == user_id, "Retrieved user does not match requested ID."
- def test_update_user(client, superuser_login):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- updated_name = "Updated Name"
- update_resp = client.users.update(user_id, name=updated_name)["results"]
- assert update_resp["name"] == updated_name, "User update failed."
- def test_user_collections(client, superuser_login, config):
- # Create a user and list their collections
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- collections = client.users.list_collections(user_id)["results"]
- assert isinstance(collections, list), "Listing user collections failed."
- def test_add_remove_user_from_collection(client, superuser_login, config):
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- # Add user to known collection
- add_resp = client.users.add_to_collection(
- user_id, config.known_collection_id
- )["results"]
- assert add_resp["success"], "Failed to add user to collection."
- # Verify
- collections = client.users.list_collections(user_id)["results"]
- assert any(
- col["id"] == config.known_collection_id for col in collections
- ), "User not in collection after add."
- # Remove user from collection
- remove_resp = client.users.remove_from_collection(
- user_id, config.known_collection_id
- )["results"]
- assert remove_resp["success"], "Failed to remove user from collection."
- collections_after = client.users.list_collections(user_id)["results"]
- assert not any(
- col["id"] == config.known_collection_id for col in collections_after
- ), "User still in collection after removal."
- def test_delete_user(client):
- # Create and then delete user
- client.users.logout()
- random_email = f"{uuid.uuid4()}@example.com"
- password = "somepassword"
- user_id = register_and_return_user_id(client, random_email, password)
- client.users.login(random_email, password)
- del_resp = client.users.delete(user_id, password)["results"]
- assert del_resp["success"], "User deletion failed."
- with pytest.raises(R2RException) as exc_info:
- result = client.users.retrieve(user_id)
- print("result = ", result)
- assert (
- exc_info.value.status_code == 404
- ), "User still exists after deletion."
- # def test_non_superuser_restrict_access(client):
- # # Create user
- # client.users.logout()
- # random_email = f"test_user_{uuid.uuid4()}@example.com"
- # password = "somepassword"
- # user_id = register_and_return_user_id(client, random_email, password)
- # print('trying to login now....')
- # # client.users.login(random_email, password)
- # # Non-superuser listing users should fail
- # with pytest.raises(R2RException) as exc_info:
- # client.users.list()
- # assert (
- # exc_info.value.status_code == 403
- # ), "Non-superuser listed users without error."
- # # # Create another user
- # # another_email = f"{uuid.uuid4()}@example.com"
- # # another_password = "anotherpassword"
- # # another_user_id = register_and_return_user_id(
- # # client, another_email, another_password
- # # )
- # # # Non-superuser updating another user should fail
- # # with pytest.raises(R2RException) as exc_info:
- # # client.users.update(another_user_id, name="Nope")
- # # assert (
- # # exc_info.value.status_code == 403
- # # ), "Non-superuser updated another user."
- def test_superuser_downgrade_permissions(client, superuser_login, config):
- user_email = f"test_super_{uuid.uuid4()}@test.com"
- user_password = "securepass"
- new_user_id = register_and_return_user_id(
- client, user_email, user_password
- )
- # Upgrade user to superuser
- upgraded_user = client.users.update(new_user_id, is_superuser=True)[
- "results"
- ]
- assert (
- upgraded_user["is_superuser"] == True
- ), "User not upgraded to superuser."
- # Logout admin, login as new superuser
- client.users.logout()
- client.users.login(user_email, user_password)
- all_users = client.users.list()["results"]
- assert isinstance(all_users, list), "New superuser cannot list users."
- # Downgrade back to normal (re-login as original admin)
- client.users.logout()
- client.users.login(config.superuser_email, config.superuser_password)
- downgraded_user = client.users.update(new_user_id, is_superuser=False)[
- "results"
- ]
- assert downgraded_user["is_superuser"] == False, "User not downgraded."
- # Now login as downgraded user and verify no superuser access
- client.users.logout()
- client.users.login(user_email, user_password)
- with pytest.raises(R2RException) as exc_info:
- client.users.list()
- assert (
- exc_info.value.status_code == 403
- ), "Downgraded user still has superuser privileges."
|