test_users.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import uuid
  2. import pytest
  3. from r2r import R2RClient, R2RException
  4. @pytest.fixture(scope="session")
  5. def config():
  6. class TestConfig:
  7. base_url = "http://localhost:7272"
  8. superuser_email = "admin@example.com"
  9. superuser_password = "change_me_immediately"
  10. known_collection_id = "122fdf6a-e116-546b-a8f6-e4cb2e2c0a09" # Example known collection ID
  11. return TestConfig()
  12. @pytest.fixture(scope="session")
  13. def client(config):
  14. client = R2RClient(config.base_url)
  15. # Optionally, log in as superuser here if needed globally
  16. # client.users.login(config.superuser_email, config.superuser_password)
  17. return client
  18. @pytest.fixture
  19. def superuser_login(client, config):
  20. """A fixture that ensures the client is logged in as superuser."""
  21. client.users.login(config.superuser_email, config.superuser_password)
  22. yield
  23. # After test, if needed, we can logout or reset
  24. # client.users.logout()
  25. def register_and_return_user_id(client, email: str, password: str) -> str:
  26. print('email = ', email)
  27. print('making request.....')
  28. user_resp = client.users.create(email, password)["results"]
  29. print('user_resp = ', user_resp)
  30. user_id = user_resp["id"]
  31. # If verification is mandatory, you'd have a step here to verify the user.
  32. # Otherwise, assume the user can login immediately.
  33. return user_id
  34. def test_register_user(client):
  35. random_email = f"{uuid.uuid4()}@example.com"
  36. password = "test_password123"
  37. user_resp = client.users.create(random_email, password)
  38. user = user_resp["results"]
  39. assert "id" in user, "No user ID returned after registration."
  40. # COMMENTED OUT SINCE AUTH IS NOT REQUIRED BY DEFAULT IN R2R.TOML
  41. # def test_user_login_logout(client):
  42. # random_email = f"{uuid.uuid4()}@example.com"
  43. # password = "test_password123"
  44. # user_id = register_and_return_user_id(client, random_email, password)
  45. # login_resp = client.users.login(random_email, password)["results"]
  46. # assert "access_token" in login_resp, "Login failed."
  47. # me = client.users.me()["results"]
  48. # assert me["id"] == user_id, "Logged in user does not match expected user."
  49. # logout_resp = client.users.logout()["results"]
  50. # assert "message" in logout_resp, "Logout failed."
  51. # # After logout, token should be invalid
  52. # with pytest.raises(R2RException) as exc_info:
  53. # client.users.me()
  54. # assert exc_info.value.status_code == 401, "Expected 401 after logout."
  55. def test_user_refresh_token(client):
  56. random_email = f"{uuid.uuid4()}@example.com"
  57. password = "test_password123"
  58. register_and_return_user_id(client, random_email, password)
  59. client.users.login(random_email, password)
  60. old_access_token = client.access_token
  61. refresh_resp = client.users.refresh_token()["results"]
  62. new_access_token = refresh_resp["access_token"]["token"]
  63. assert (
  64. new_access_token != old_access_token
  65. ), "Refresh token did not provide a new access token."
  66. def test_change_password(client):
  67. random_email = f"{uuid.uuid4()}@example.com"
  68. old_password = "old_password123"
  69. new_password = "new_password456"
  70. register_and_return_user_id(client, random_email, old_password)
  71. client.users.login(random_email, old_password)
  72. change_resp = client.users.change_password(old_password, new_password)[
  73. "results"
  74. ]
  75. assert "message" in change_resp, "Change password failed."
  76. # Check old password no longer works
  77. client.users.logout()
  78. with pytest.raises(R2RException) as exc_info:
  79. client.users.login(random_email, old_password)
  80. assert (
  81. exc_info.value.status_code == 401
  82. ), "Old password should not work anymore."
  83. # New password should work
  84. client.users.login(random_email, new_password)
  85. @pytest.mark.skip(
  86. reason="Requires a real or mocked reset token retrieval if verification is implemented."
  87. )
  88. def test_request_and_reset_password(client):
  89. # This test scenario assumes you can obtain a valid reset token somehow.
  90. random_email = f"{uuid.uuid4()}@example.com"
  91. password = "initial_password123"
  92. register_and_return_user_id(client, random_email, password)
  93. client.users.logout()
  94. # Request password reset
  95. reset_req = client.users.request_password_reset(random_email)
  96. assert "message" in reset_req["results"], "Request password reset failed."
  97. # Suppose we can retrieve a reset_token from test hooks or logs:
  98. reset_token = (
  99. "FAKE_RESET_TOKEN_FOR_TESTING" # Replace with actual if available
  100. )
  101. new_password = "new_reset_password789"
  102. # Attempt reset
  103. resp = client.users.reset_password(reset_token, new_password)
  104. assert "message" in resp["results"], "Reset password failed."
  105. # Verify login with new password
  106. client.users.login(random_email, new_password)
  107. def test_users_list(client, superuser_login):
  108. users_list = client.users.list()["results"]
  109. assert isinstance(users_list, list), "Listing users failed."
  110. def test_get_current_user(client, superuser_login):
  111. me = client.users.me()["results"]
  112. assert "id" in me, "Failed to get current user."
  113. def test_get_user_by_id(client, superuser_login):
  114. random_email = f"{uuid.uuid4()}@example.com"
  115. password = "somepassword"
  116. user_id = register_and_return_user_id(client, random_email, password)
  117. user = client.users.retrieve(user_id)["results"]
  118. assert user["id"] == user_id, "Retrieved user does not match requested ID."
  119. def test_update_user(client, superuser_login):
  120. random_email = f"{uuid.uuid4()}@example.com"
  121. password = "somepassword"
  122. user_id = register_and_return_user_id(client, random_email, password)
  123. updated_name = "Updated Name"
  124. update_resp = client.users.update(user_id, name=updated_name)["results"]
  125. assert update_resp["name"] == updated_name, "User update failed."
  126. def test_user_collections(client, superuser_login, config):
  127. # Create a user and list their collections
  128. random_email = f"{uuid.uuid4()}@example.com"
  129. password = "somepassword"
  130. user_id = register_and_return_user_id(client, random_email, password)
  131. collections = client.users.list_collections(user_id)["results"]
  132. assert isinstance(collections, list), "Listing user collections failed."
  133. def test_add_remove_user_from_collection(client, superuser_login, config):
  134. random_email = f"{uuid.uuid4()}@example.com"
  135. password = "somepassword"
  136. user_id = register_and_return_user_id(client, random_email, password)
  137. # Add user to known collection
  138. add_resp = client.users.add_to_collection(
  139. user_id, config.known_collection_id
  140. )["results"]
  141. assert add_resp["success"], "Failed to add user to collection."
  142. # Verify
  143. collections = client.users.list_collections(user_id)["results"]
  144. assert any(
  145. col["id"] == config.known_collection_id for col in collections
  146. ), "User not in collection after add."
  147. # Remove user from collection
  148. remove_resp = client.users.remove_from_collection(
  149. user_id, config.known_collection_id
  150. )["results"]
  151. assert remove_resp["success"], "Failed to remove user from collection."
  152. collections_after = client.users.list_collections(user_id)["results"]
  153. assert not any(
  154. col["id"] == config.known_collection_id for col in collections_after
  155. ), "User still in collection after removal."
  156. def test_delete_user(client):
  157. # Create and then delete user
  158. client.users.logout()
  159. random_email = f"{uuid.uuid4()}@example.com"
  160. password = "somepassword"
  161. user_id = register_and_return_user_id(client, random_email, password)
  162. client.users.login(random_email, password)
  163. del_resp = client.users.delete(user_id, password)["results"]
  164. assert del_resp["success"], "User deletion failed."
  165. with pytest.raises(R2RException) as exc_info:
  166. result = client.users.retrieve(user_id)
  167. print("result = ", result)
  168. assert (
  169. exc_info.value.status_code == 404
  170. ), "User still exists after deletion."
  171. # def test_non_superuser_restrict_access(client):
  172. # # Create user
  173. # client.users.logout()
  174. # random_email = f"test_user_{uuid.uuid4()}@example.com"
  175. # password = "somepassword"
  176. # user_id = register_and_return_user_id(client, random_email, password)
  177. # print('trying to login now....')
  178. # # client.users.login(random_email, password)
  179. # # Non-superuser listing users should fail
  180. # with pytest.raises(R2RException) as exc_info:
  181. # client.users.list()
  182. # assert (
  183. # exc_info.value.status_code == 403
  184. # ), "Non-superuser listed users without error."
  185. # # # Create another user
  186. # # another_email = f"{uuid.uuid4()}@example.com"
  187. # # another_password = "anotherpassword"
  188. # # another_user_id = register_and_return_user_id(
  189. # # client, another_email, another_password
  190. # # )
  191. # # # Non-superuser updating another user should fail
  192. # # with pytest.raises(R2RException) as exc_info:
  193. # # client.users.update(another_user_id, name="Nope")
  194. # # assert (
  195. # # exc_info.value.status_code == 403
  196. # # ), "Non-superuser updated another user."
  197. def test_superuser_downgrade_permissions(client, superuser_login, config):
  198. user_email = f"test_super_{uuid.uuid4()}@test.com"
  199. user_password = "securepass"
  200. new_user_id = register_and_return_user_id(
  201. client, user_email, user_password
  202. )
  203. # Upgrade user to superuser
  204. upgraded_user = client.users.update(new_user_id, is_superuser=True)[
  205. "results"
  206. ]
  207. assert (
  208. upgraded_user["is_superuser"] == True
  209. ), "User not upgraded to superuser."
  210. # Logout admin, login as new superuser
  211. client.users.logout()
  212. client.users.login(user_email, user_password)
  213. all_users = client.users.list()["results"]
  214. assert isinstance(all_users, list), "New superuser cannot list users."
  215. # Downgrade back to normal (re-login as original admin)
  216. client.users.logout()
  217. client.users.login(config.superuser_email, config.superuser_password)
  218. downgraded_user = client.users.update(new_user_id, is_superuser=False)[
  219. "results"
  220. ]
  221. assert downgraded_user["is_superuser"] == False, "User not downgraded."
  222. # Now login as downgraded user and verify no superuser access
  223. client.users.logout()
  224. client.users.login(user_email, user_password)
  225. with pytest.raises(R2RException) as exc_info:
  226. client.users.list()
  227. assert (
  228. exc_info.value.status_code == 403
  229. ), "Downgraded user still has superuser privileges."