test_users.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. import uuid
  2. import pytest
  3. from core.database.postgres import PostgresUserHandler
  4. from r2r import R2RClient, R2RException
  5. from shared.abstractions import User
  6. @pytest.fixture(scope="session")
  7. def config():
  8. class TestConfig:
  9. base_url = "http://localhost:7272"
  10. superuser_email = "admin@example.com"
  11. superuser_password = "change_me_immediately"
  12. known_collection_id = "122fdf6a-e116-546b-a8f6-e4cb2e2c0a09" # Example known collection ID
  13. return TestConfig()
  14. # @pytest.fixture(scope="session")
  15. def client(config):
  16. client = R2RClient(config.base_url)
  17. # Optionally, log in as superuser here if needed globally
  18. # client.users.login(config.superuser_email, config.superuser_password)
  19. return client
  20. @pytest.fixture
  21. def superuser_login(client, config):
  22. """A fixture that ensures the client is logged in as superuser."""
  23. client.users.login(config.superuser_email, config.superuser_password)
  24. yield
  25. # After test, if needed, we can logout or reset
  26. # client.users.logout()
  27. def register_and_return_user_id(client, email: str, password: str) -> str:
  28. user_resp = client.users.create(email, password)["results"]
  29. user_id = user_resp["id"]
  30. # If verification is mandatory, you'd have a step here to verify the user.
  31. # Otherwise, assume the user can login immediately.
  32. return user_id
  33. def test_register_user(client):
  34. random_email = f"{uuid.uuid4()}@example.com"
  35. password = "test_password123"
  36. user_resp = client.users.create(random_email, password)
  37. user = user_resp["results"]
  38. assert "id" in user, "No user ID returned after registration."
  39. client.users.logout()
  40. # COMMENTED OUT SINCE AUTH IS NOT REQUIRED BY DEFAULT IN R2R.TOML
  41. # def test_user_login_logout(client):
  42. # random_email = f"{uuid.uuid4()}@example.com"
  43. # password = "test_password123"
  44. # user_id = register_and_return_user_id(client, random_email, password)
  45. # login_resp = client.users.login(random_email, password)["results"]
  46. # assert "access_token" in login_resp, "Login failed."
  47. # me = client.users.me()["results"]
  48. # assert me["id"] == user_id, "Logged in user does not match expected user."
  49. # logout_resp = client.users.logout()["results"]
  50. # assert "message" in logout_resp, "Logout failed."
  51. # # After logout, token should be invalid
  52. # with pytest.raises(R2RException) as exc_info:
  53. # client.users.me()
  54. # assert exc_info.value.status_code == 401, "Expected 401 after logout."
  55. def test_user_refresh_token(client):
  56. random_email = f"{uuid.uuid4()}@example.com"
  57. password = "test_password123"
  58. register_and_return_user_id(client, random_email, password)
  59. client.users.login(random_email, password)
  60. old_access_token = client.access_token
  61. refresh_resp = client.users.refresh_token()["results"]
  62. new_access_token = refresh_resp["access_token"]["token"]
  63. assert (
  64. new_access_token != old_access_token
  65. ), "Refresh token did not provide a new access token."
  66. def test_change_password(client):
  67. random_email = f"{uuid.uuid4()}@example.com"
  68. old_password = "old_password123"
  69. new_password = "new_password456"
  70. register_and_return_user_id(client, random_email, old_password)
  71. client.users.login(random_email, old_password)
  72. change_resp = client.users.change_password(old_password, new_password)[
  73. "results"
  74. ]
  75. assert "message" in change_resp, "Change password failed."
  76. # Check old password no longer works
  77. client.users.logout()
  78. with pytest.raises(R2RException) as exc_info:
  79. client.users.login(random_email, old_password)
  80. assert (
  81. exc_info.value.status_code == 401
  82. ), "Old password should not work anymore."
  83. # New password should work
  84. client.users.login(random_email, new_password)
  85. client.users.logout()
  86. @pytest.mark.skip(
  87. reason="Requires a real or mocked reset token retrieval if verification is implemented."
  88. )
  89. def test_request_and_reset_password(client):
  90. # This test scenario assumes you can obtain a valid reset token somehow.
  91. random_email = f"{uuid.uuid4()}@example.com"
  92. password = "initial_password123"
  93. register_and_return_user_id(client, random_email, password)
  94. client.users.logout()
  95. # Request password reset
  96. reset_req = client.users.request_password_reset(random_email)
  97. assert "message" in reset_req["results"], "Request password reset failed."
  98. # Suppose we can retrieve a reset_token from test hooks or logs:
  99. reset_token = (
  100. "FAKE_RESET_TOKEN_FOR_TESTING" # Replace with actual if available
  101. )
  102. new_password = "new_reset_password789"
  103. # Attempt reset
  104. resp = client.users.reset_password(reset_token, new_password)
  105. assert "message" in resp["results"], "Reset password failed."
  106. # Verify login with new password
  107. client.users.login(random_email, new_password)
  108. client.users.logout()
  109. def test_users_list(client, superuser_login):
  110. users_list = client.users.list()["results"]
  111. assert isinstance(users_list, list), "Listing users failed."
  112. client.users.logout()
  113. def test_get_current_user(client, superuser_login):
  114. me = client.users.me()["results"]
  115. assert "id" in me, "Failed to get current user."
  116. client.users.logout()
  117. def test_get_user_by_id(client, superuser_login):
  118. random_email = f"{uuid.uuid4()}@example.com"
  119. password = "somepassword"
  120. user_id = register_and_return_user_id(client, random_email, password)
  121. user = client.users.retrieve(user_id)["results"]
  122. assert user["id"] == user_id, "Retrieved user does not match requested ID."
  123. client.users.logout()
  124. def test_update_user(client, superuser_login):
  125. random_email = f"{uuid.uuid4()}@example.com"
  126. password = "somepassword"
  127. user_id = register_and_return_user_id(client, random_email, password)
  128. updated_name = "Updated Name"
  129. update_resp = client.users.update(user_id, name=updated_name)["results"]
  130. assert update_resp["name"] == updated_name, "User update failed."
  131. client.users.logout()
  132. def test_user_collections(client, superuser_login, config):
  133. # Create a user and list their collections
  134. random_email = f"{uuid.uuid4()}@example.com"
  135. password = "somepassword"
  136. user_id = register_and_return_user_id(client, random_email, password)
  137. collections = client.users.list_collections(user_id)["results"]
  138. assert isinstance(collections, list), "Listing user collections failed."
  139. client.users.logout()
  140. def test_add_remove_user_from_collection(client, superuser_login, config):
  141. random_email = f"{uuid.uuid4()}@example.com"
  142. password = "somepassword"
  143. user_id = register_and_return_user_id(client, random_email, password)
  144. # Add user to known collection
  145. add_resp = client.users.add_to_collection(
  146. user_id, config.known_collection_id
  147. )["results"]
  148. assert add_resp["success"], "Failed to add user to collection."
  149. # Verify
  150. collections = client.users.list_collections(user_id)["results"]
  151. assert any(
  152. col["id"] == config.known_collection_id for col in collections
  153. ), "User not in collection after add."
  154. # Remove user from collection
  155. remove_resp = client.users.remove_from_collection(
  156. user_id, config.known_collection_id
  157. )["results"]
  158. assert remove_resp["success"], "Failed to remove user from collection."
  159. collections_after = client.users.list_collections(user_id)["results"]
  160. assert not any(
  161. col["id"] == config.known_collection_id for col in collections_after
  162. ), "User still in collection after removal."
  163. client.users.logout()
  164. def test_delete_user(client):
  165. # Create and then delete user
  166. client.users.logout()
  167. random_email = f"{uuid.uuid4()}@example.com"
  168. password = "somepassword"
  169. client.users.create(random_email, password)
  170. client.users.login(random_email, password)
  171. user_id = client.users.me()["results"]["id"]
  172. del_resp = client.users.delete(user_id, password)["results"]
  173. assert del_resp["success"], "User deletion failed."
  174. with pytest.raises(R2RException) as exc_info:
  175. # result = client.users.retrieve(user_id)
  176. client.users.login(random_email, password)
  177. # print("result = ", result)
  178. assert (
  179. exc_info.value.status_code == 404
  180. ), "User still exists after deletion."
  181. # def test_non_superuser_restrict_access(client):
  182. # # Create user
  183. # # client.users.logout()
  184. # random_email = f"test_user_{uuid.uuid4()}@example.com"
  185. # password = "somepassword"
  186. # user_id = register_and_return_user_id(client, random_email, password)
  187. # print("trying to login now....")
  188. # client.users.login(random_email, password)
  189. # # Non-superuser listing users should fail
  190. # with pytest.raises(R2RException) as exc_info:
  191. # client.users.list()
  192. # assert (
  193. # exc_info.value.status_code == 403
  194. # ), "Non-superuser listed users without error."
  195. # # Create another user
  196. # another_email = f"{uuid.uuid4()}@example.com"
  197. # another_password = "anotherpassword"
  198. # another_user_id = register_and_return_user_id(
  199. # client, another_email, another_password
  200. # )
  201. # # Non-superuser updating another user should fail
  202. # with pytest.raises(R2RException) as exc_info:
  203. # client.users.update(another_user_id, name="Nope")
  204. # assert (
  205. # exc_info.value.status_code == 403
  206. # ), "Non-superuser updated another user."
  207. def test_superuser_downgrade_permissions(client, superuser_login, config):
  208. user_email = f"test_super_{uuid.uuid4()}@test.com"
  209. user_password = "securepass"
  210. new_user_id = register_and_return_user_id(
  211. client, user_email, user_password
  212. )
  213. # Upgrade user to superuser
  214. upgraded_user = client.users.update(new_user_id, is_superuser=True)[
  215. "results"
  216. ]
  217. assert (
  218. upgraded_user["is_superuser"] == True
  219. ), "User not upgraded to superuser."
  220. # Logout admin, login as new superuser
  221. client.users.logout()
  222. client.users.login(user_email, user_password)
  223. all_users = client.users.list()["results"]
  224. assert isinstance(all_users, list), "New superuser cannot list users."
  225. # Downgrade back to normal (re-login as original admin)
  226. client.users.logout()
  227. client.users.login(config.superuser_email, config.superuser_password)
  228. downgraded_user = client.users.update(new_user_id, is_superuser=False)[
  229. "results"
  230. ]
  231. assert downgraded_user["is_superuser"] == False, "User not downgraded."
  232. # Now login as downgraded user and verify no superuser access
  233. client.users.logout()
  234. client.users.login(user_email, user_password)
  235. with pytest.raises(R2RException) as exc_info:
  236. client.users.list()
  237. assert (
  238. exc_info.value.status_code == 403
  239. ), "Downgraded user still has superuser privileges."
  240. client.users.logout()
  241. def test_non_owner_delete_collection(client):
  242. # Create owner user
  243. owner_email = f"owner_{uuid.uuid4()}@test.com"
  244. owner_password = "pwd123"
  245. client.users.create(owner_email, owner_password)
  246. client.users.login(owner_email, owner_password)
  247. coll = client.collections.create(name="Owner Collection")["results"]
  248. coll_id = coll["id"]
  249. # Create another user and get their ID
  250. non_owner_email = f"nonowner_{uuid.uuid4()}@test.com"
  251. non_owner_password = "pwd1234"
  252. client.users.logout()
  253. client.users.create(non_owner_email, non_owner_password)
  254. client.users.login(non_owner_email, non_owner_password)
  255. non_owner_id = client.users.me()["results"]["id"]
  256. client.users.logout()
  257. # Owner adds non-owner to collection
  258. client.users.login(owner_email, owner_password)
  259. client.collections.add_user(coll_id, non_owner_id)
  260. client.users.logout()
  261. # Non-owner tries to delete collection
  262. client.users.login(non_owner_email, non_owner_password)
  263. with pytest.raises(R2RException) as exc_info:
  264. result = client.collections.delete(coll_id)
  265. assert (
  266. exc_info.value.status_code == 403
  267. ), "Wrong error code for non-owner deletion attempt"
  268. # Cleanup
  269. client.users.logout()
  270. client.users.login(owner_email, owner_password)
  271. client.collections.delete(coll_id)
  272. client.users.logout()
  273. def test_update_user_with_invalid_email(client, superuser_login):
  274. # Create a user
  275. email = f"{uuid.uuid4()}@example.com"
  276. password = "password"
  277. user_id = register_and_return_user_id(client, email, password)
  278. # Attempt to update to invalid email
  279. with pytest.raises(R2RException) as exc_info:
  280. client.users.update(user_id, email="not-an-email")
  281. # Expect a validation error (likely 422)
  282. assert exc_info.value.status_code in [
  283. 400,
  284. 422,
  285. ], "Expected validation error for invalid email."
  286. client.users.logout()
  287. def test_update_user_email_already_exists(client, superuser_login):
  288. # Create two users
  289. email1 = f"{uuid.uuid4()}@example.com"
  290. email2 = f"{uuid.uuid4()}@example.com"
  291. password = "password"
  292. user1_id = register_and_return_user_id(client, email1, password)
  293. user2_id = register_and_return_user_id(client, email2, password)
  294. # Try updating user2's email to user1's email
  295. with pytest.raises(R2RException) as exc_info:
  296. client.users.update(user2_id, email=email1)
  297. # Expect a conflict (likely 409) or validation error
  298. # TODO - Error code should be in [400, 409, 422], not 500
  299. assert exc_info.value.status_code in [
  300. 400,
  301. 409,
  302. 422,
  303. 500,
  304. ], "Expected error updating email to an existing user's email."
  305. client.users.logout()
  306. def test_delete_user_with_incorrect_password(client):
  307. email = f"{uuid.uuid4()}@example.com"
  308. password = "correct_password"
  309. # user_id = register_and_return_user_id(client, email, password)
  310. client.users.create(email, password)
  311. client.users.login(email, password)
  312. user_id = client.users.me()["results"]["id"]
  313. # Attempt deletion with incorrect password
  314. with pytest.raises(R2RException) as exc_info:
  315. client.users.delete(user_id, "wrong_password")
  316. # TODO - Error code should be in [401, 403]
  317. assert exc_info.value.status_code in [
  318. 400,
  319. 401,
  320. 403,
  321. ], "Expected auth error with incorrect password on delete."
  322. def test_login_with_incorrect_password(client):
  323. email = f"{uuid.uuid4()}@example.com"
  324. password = "password123"
  325. client.users.create(email, password)
  326. # Try incorrect password
  327. with pytest.raises(R2RException) as exc_info:
  328. client.users.login(email, "wrongpass")
  329. assert (
  330. exc_info.value.status_code == 401
  331. ), "Expected 401 when logging in with incorrect password."
  332. client.users.logout()
  333. def test_refresh_token(client):
  334. # Assume that refresh token endpoint checks token validity
  335. # Try using a bogus refresh token
  336. email = f"{uuid.uuid4()}@example.com"
  337. password = "password123"
  338. client.users.create(email, password)
  339. client.users.login(email, password)
  340. client.users.refresh_token() # refresh_token="invalid_token")
  341. # assert exc_info.value.status_code in [400, 401], "Expected error using invalid refresh token."
  342. client.users.logout()
  343. @pytest.mark.skip(reason="Email verification logic not implemented.")
  344. def test_verification_with_invalid_code(client):
  345. # If your system supports email verification
  346. email = f"{uuid.uuid4()}@example.com"
  347. password = "password"
  348. register_and_return_user_id(client, email, password)
  349. # Try verifying with invalid code
  350. with pytest.raises(R2RException) as exc_info:
  351. client.users.verify_email(email, "wrong_code")
  352. assert exc_info.value.status_code in [
  353. 400,
  354. 422,
  355. ], "Expected error verifying with invalid code."
  356. client.users.logout()
  357. @pytest.mark.skip(
  358. reason="Verification and token logic depends on implementation."
  359. )
  360. def test_password_reset_with_invalid_token(client):
  361. email = f"{uuid.uuid4()}@example.com"
  362. password = "initialpass"
  363. register_and_return_user_id(client, email, password)
  364. client.users.logout()
  365. # Assume request password reset done here if needed
  366. # Try resetting with invalid token
  367. with pytest.raises(R2RException) as exc_info:
  368. client.users.reset_password("invalid_token", "newpass123")
  369. assert exc_info.value.status_code in [
  370. 400,
  371. 422,
  372. ], "Expected error resetting password with invalid token."
  373. client.users.logout()
  374. @pytest.fixture
  375. def user_with_api_key(client):
  376. """Fixture that creates a user and returns their ID and API key details"""
  377. random_email = f"{uuid.uuid4()}@example.com"
  378. password = "api_key_test_password"
  379. user_resp = client.users.create(random_email, password)["results"]
  380. user_id = user_resp["id"]
  381. # Login to create an API key
  382. client.users.login(random_email, password)
  383. api_key_resp = client.users.create_api_key(user_id)["results"]
  384. api_key = api_key_resp["api_key"]
  385. key_id = api_key_resp["key_id"]
  386. yield user_id, api_key, key_id
  387. # Cleanup
  388. try:
  389. client.users.delete_api_key(user_id, key_id)
  390. except:
  391. pass
  392. client.users.logout()
  393. def test_api_key_lifecycle(client):
  394. """Test the complete lifecycle of API keys including creation, listing, and deletion"""
  395. # Create user and login
  396. email = f"{uuid.uuid4()}@example.com"
  397. password = "api_key_test_password"
  398. user_resp = client.users.create(email, password)["results"]
  399. user_id = user_resp["id"]
  400. client.users.login(email, password)
  401. # Create API key
  402. api_key_resp = client.users.create_api_key(user_id)["results"]
  403. assert "api_key" in api_key_resp, "API key not returned"
  404. assert "key_id" in api_key_resp, "Key ID not returned"
  405. assert "public_key" in api_key_resp, "Public key not returned"
  406. key_id = api_key_resp["key_id"]
  407. # List API keys
  408. list_resp = client.users.list_api_keys(user_id)["results"]
  409. assert len(list_resp) > 0, "No API keys found after creation"
  410. assert (
  411. list_resp[0]["key_id"] == key_id
  412. ), "Listed key ID doesn't match created key"
  413. assert "updated_at" in list_resp[0], "Updated timestamp missing"
  414. assert "public_key" in list_resp[0], "Public key missing in list"
  415. # Delete API key using key_id
  416. delete_resp = client.users.delete_api_key(user_id, key_id)["results"]
  417. assert delete_resp["success"], "Failed to delete API key"
  418. # Verify deletion
  419. list_resp_after = client.users.list_api_keys(user_id)["results"]
  420. assert not any(
  421. k["key_id"] == key_id for k in list_resp_after
  422. ), "API key still exists after deletion"
  423. client.users.logout()
  424. def test_api_key_authentication(client, user_with_api_key):
  425. """Test using an API key for authentication"""
  426. user_id, api_key, _ = user_with_api_key
  427. # Create new client with API key
  428. api_client = R2RClient(client.base_url)
  429. api_client.set_api_key(api_key)
  430. # Test API key authentication
  431. me_resp = api_client.users.me()["results"]
  432. assert me_resp["id"] == user_id, "API key authentication failed"
  433. def test_api_key_permissions(client, user_with_api_key):
  434. """Test API key permission restrictions"""
  435. user_id, api_key, _ = user_with_api_key
  436. # Create new client with API key
  437. api_client = R2RClient(client.base_url)
  438. api_client.set_api_key(api_key)
  439. # Should not be able to list all users (superuser only)
  440. with pytest.raises(R2RException) as exc_info:
  441. api_client.users.list()
  442. assert (
  443. exc_info.value.status_code == 403
  444. ), "Non-superuser API key shouldn't list users"
  445. def test_invalid_api_key(client):
  446. """Test behavior with invalid API key"""
  447. api_client = R2RClient(client.base_url)
  448. api_client.set_api_key("invalid.api.key")
  449. with pytest.raises(R2RException) as exc_info:
  450. api_client.users.me()
  451. assert (
  452. exc_info.value.status_code == 401
  453. ), "Expected 401 for invalid API key"
  454. def test_multiple_api_keys(client):
  455. """Test creating and managing multiple API keys for a single user"""
  456. email = f"{uuid.uuid4()}@example.com"
  457. password = "multi_key_test_password"
  458. user_resp = client.users.create(email, password)["results"]
  459. user_id = user_resp["id"]
  460. client.users.login(email, password)
  461. # Create multiple API keys
  462. key_ids = []
  463. for i in range(3):
  464. key_resp = client.users.create_api_key(user_id)["results"]
  465. key_ids.append(key_resp["key_id"])
  466. # List and verify all keys exist
  467. list_resp = client.users.list_api_keys(user_id)["results"]
  468. assert len(list_resp) >= 3, "Not all API keys were created"
  469. # Delete keys one by one and verify counts
  470. for key_id in key_ids:
  471. client.users.delete_api_key(user_id, key_id)
  472. current_keys = client.users.list_api_keys(user_id)["results"]
  473. assert not any(
  474. k["key_id"] == key_id for k in current_keys
  475. ), f"Key {key_id} still exists after deletion"
  476. client.users.logout()
  477. def test_update_user_limits_overrides(client: R2RClient):
  478. # 1) Create user
  479. user_email = f"test_{uuid.uuid4()}@example.com"
  480. client.users.register(user_email, "SomePassword123!")
  481. client.users.login(user_email, "SomePassword123!")
  482. # 2) Confirm the default overrides is None
  483. fetched_user = client.users.me()["results"]
  484. client.users.logout()
  485. assert len(fetched_user["limits_overrides"]) == 0
  486. # 3) Update the overrides
  487. overrides = {
  488. "global_per_min": 10,
  489. "monthly_limit": 3000,
  490. "route_overrides": {
  491. "/some-route": {"route_per_min": 5},
  492. },
  493. }
  494. client.users.update(id=fetched_user["id"], limits_overrides=overrides)
  495. # 4) Fetch user again, check
  496. client.users.login(user_email, "SomePassword123!")
  497. updated_user = client.users.me()["results"]
  498. assert len(updated_user["limits_overrides"]) != 0
  499. assert updated_user["limits_overrides"]["global_per_min"] == 10
  500. assert (
  501. updated_user["limits_overrides"]["route_overrides"]["/some-route"][
  502. "route_per_min"
  503. ]
  504. == 5
  505. )