test_users.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807
  1. import uuid
  2. import pytest
  3. from r2r import R2RClient, R2RException
  4. @pytest.fixture(scope="session")
  5. def config():
  6. class TestConfig:
  7. base_url = "http://localhost:7272"
  8. superuser_email = "admin@example.com"
  9. superuser_password = "change_me_immediately"
  10. known_collection_id = "122fdf6a-e116-546b-a8f6-e4cb2e2c0a09" # Example known collection ID
  11. return TestConfig()
  12. # @pytest.fixture(scope="session")
  13. def client(config):
  14. return R2RClient(config.base_url)
  15. @pytest.fixture
  16. def superuser_login(client: R2RClient, config):
  17. """A fixture that ensures the client is logged in as superuser."""
  18. client.users.login(config.superuser_email, config.superuser_password)
  19. yield
  20. # After test, if needed, we can logout or reset
  21. # client.users.logout()
  22. def register_and_return_user_id(client: R2RClient, email: str,
  23. password: str) -> str:
  24. return client.users.create(email, password).results.id
  25. def test_register_user(client: R2RClient):
  26. random_email = f"{uuid.uuid4()}@example.com"
  27. password = "test_password123"
  28. user = client.users.create(random_email, password).results
  29. assert user.id is not None, "No user ID returned after registration."
  30. client.users.logout()
  31. def test_user_refresh_token(client: R2RClient):
  32. random_email = f"{uuid.uuid4()}@example.com"
  33. password = "test_password123"
  34. register_and_return_user_id(client, random_email, password)
  35. client.users.login(random_email, password)
  36. old_access_token = client.access_token
  37. new_access_token = client.users.refresh_token().results.access_token.token
  38. assert new_access_token != old_access_token, (
  39. "Refresh token did not provide a new access token.")
  40. def test_change_password(client: R2RClient):
  41. random_email = f"{uuid.uuid4()}@example.com"
  42. old_password = "old_password123"
  43. new_password = "new_password456"
  44. register_and_return_user_id(client, random_email, old_password)
  45. client.users.login(random_email, old_password)
  46. change_resp = client.users.change_password(old_password,
  47. new_password).results
  48. assert change_resp.message is not None, "Change password failed."
  49. # Check old password no longer works
  50. client.users.logout()
  51. with pytest.raises(R2RException) as exc_info:
  52. client.users.login(random_email, old_password)
  53. assert exc_info.value.status_code == 401, (
  54. "Old password should not work anymore.")
  55. # New password should work
  56. client.users.login(random_email, new_password)
  57. client.users.logout()
  58. @pytest.mark.skip(
  59. reason=
  60. "Requires a real or mocked reset token retrieval if verification is implemented."
  61. )
  62. def test_request_and_reset_password(client: R2RClient):
  63. # This test scenario assumes you can obtain a valid reset token somehow.
  64. random_email = f"{uuid.uuid4()}@example.com"
  65. password = "initial_password123"
  66. register_and_return_user_id(client, random_email, password)
  67. client.users.logout()
  68. # Request password reset
  69. reset_req = client.users.request_password_reset(random_email).results
  70. assert reset_req.message is not None, "Request password reset failed."
  71. # Suppose we can retrieve a reset_token from test hooks or logs:
  72. reset_token = (
  73. "FAKE_RESET_TOKEN_FOR_TESTING" # Replace with actual if available
  74. )
  75. new_password = "new_reset_password789"
  76. # Attempt reset
  77. resp = client.users.reset_password(reset_token, new_password).results
  78. assert resp.message is not None, "Reset password failed."
  79. # Verify login with new password
  80. client.users.login(random_email, new_password)
  81. client.users.logout()
  82. def test_users_list(client: R2RClient, superuser_login):
  83. users_list = client.users.list().results
  84. assert isinstance(users_list, list), "Listing users failed."
  85. client.users.logout()
  86. def test_get_current_user(client: R2RClient, superuser_login):
  87. me = client.users.me().results
  88. assert me.id is not None, "Failed to get current user."
  89. client.users.logout()
  90. def test_get_user_by_id(client: R2RClient, superuser_login):
  91. random_email = f"{uuid.uuid4()}@example.com"
  92. password = "somepassword"
  93. user_id = register_and_return_user_id(client, random_email, password)
  94. user = client.users.retrieve(user_id).results
  95. assert user.id == user_id, "Retrieved user does not match requested ID."
  96. client.users.logout()
  97. def test_update_user(client: R2RClient, superuser_login):
  98. random_email = f"{uuid.uuid4()}@example.com"
  99. password = "somepassword"
  100. user_id = register_and_return_user_id(client, random_email, password)
  101. updated_name = "Updated Name"
  102. update_resp = client.users.update(user_id, name=updated_name).results
  103. assert update_resp.name == updated_name, "User update failed."
  104. client.users.logout()
  105. def test_user_collections(client: R2RClient, superuser_login, config):
  106. # Create a user and list their collections
  107. random_email = f"{uuid.uuid4()}@example.com"
  108. password = "somepassword"
  109. user_id = register_and_return_user_id(client, random_email, password)
  110. collections = client.users.list_collections(user_id).results
  111. assert isinstance(collections, list), "Listing user collections failed."
  112. client.users.logout()
  113. def test_add_remove_user_from_collection(client: R2RClient, superuser_login,
  114. config):
  115. random_email = f"{uuid.uuid4()}@example.com"
  116. password = "somepassword"
  117. user_id = register_and_return_user_id(client, random_email, password)
  118. # Add user to known collection
  119. add_resp = client.users.add_to_collection(
  120. user_id, config.known_collection_id).results
  121. assert add_resp.success, "Failed to add user to collection."
  122. # Verify
  123. collections = client.users.list_collections(user_id).results
  124. assert any(
  125. str(col.id) == str(config.known_collection_id)
  126. for col in collections), "User not in collection after add."
  127. # Remove user from collection
  128. remove_resp = client.users.remove_from_collection(
  129. user_id, config.known_collection_id).results
  130. assert remove_resp.success, "Failed to remove user from collection."
  131. collections_after = client.users.list_collections(user_id).results
  132. assert not any(
  133. str(col.id) == str(config.known_collection_id) for col in
  134. collections_after), "User still in collection after removal."
  135. client.users.logout()
  136. def test_delete_user(client: R2RClient):
  137. # Create and then delete user
  138. client.users.logout()
  139. random_email = f"{uuid.uuid4()}@example.com"
  140. password = "somepassword"
  141. client.users.create(random_email, password)
  142. client.users.login(random_email, password)
  143. user_id = client.users.me().results.id
  144. del_resp = client.users.delete(user_id, password).results
  145. assert del_resp.success, "User deletion failed."
  146. with pytest.raises(R2RException) as exc_info:
  147. client.users.login(random_email, password)
  148. assert exc_info.value.status_code == 404, (
  149. "User still exists after deletion.")
  150. def test_superuser_downgrade_permissions(client: R2RClient, superuser_login,
  151. config):
  152. user_email = f"test_super_{uuid.uuid4()}@test.com"
  153. user_password = "securepass"
  154. new_user_id = register_and_return_user_id(client, user_email,
  155. user_password)
  156. # Upgrade user to superuser
  157. upgraded_user = client.users.update(new_user_id, is_superuser=True).results
  158. assert upgraded_user.is_superuser == True, (
  159. "User not upgraded to superuser.")
  160. # Logout admin, login as new superuser
  161. client.users.logout()
  162. client.users.login(user_email, user_password)
  163. all_users = client.users.list().results
  164. assert isinstance(all_users, list), "New superuser cannot list users."
  165. # Downgrade back to normal (re-login as original admin)
  166. client.users.logout()
  167. client.users.login(config.superuser_email, config.superuser_password)
  168. downgraded_user = client.users.update(new_user_id,
  169. is_superuser=False).results
  170. assert downgraded_user.is_superuser == False, "User not downgraded."
  171. # Now login as downgraded user and verify no superuser access
  172. client.users.logout()
  173. client.users.login(user_email, user_password)
  174. with pytest.raises(R2RException) as exc_info:
  175. client.users.list()
  176. assert exc_info.value.status_code == 403, (
  177. "Downgraded user still has superuser privileges.")
  178. client.users.logout()
  179. def test_non_owner_delete_collection(client: R2RClient):
  180. # Create owner user
  181. owner_email = f"owner_{uuid.uuid4()}@test.com"
  182. owner_password = "pwd123"
  183. client.users.create(owner_email, owner_password)
  184. client.users.login(owner_email, owner_password)
  185. coll_id = client.collections.create(name="Owner Collection").results.id
  186. # Create another user and get their ID
  187. non_owner_email = f"nonowner_{uuid.uuid4()}@test.com"
  188. non_owner_password = "pwd1234"
  189. client.users.logout()
  190. client.users.create(non_owner_email, non_owner_password)
  191. client.users.login(non_owner_email, non_owner_password)
  192. non_owner_id = client.users.me().results.id
  193. client.users.logout()
  194. # Owner adds non-owner to collection
  195. client.users.login(owner_email, owner_password)
  196. client.collections.add_user(coll_id, non_owner_id)
  197. client.users.logout()
  198. # Non-owner tries to delete collection
  199. client.users.login(non_owner_email, non_owner_password)
  200. with pytest.raises(R2RException) as exc_info:
  201. result = client.collections.delete(coll_id)
  202. assert exc_info.value.status_code == 403, (
  203. "Wrong error code for non-owner deletion attempt")
  204. # Cleanup
  205. client.users.logout()
  206. client.users.login(owner_email, owner_password)
  207. client.collections.delete(coll_id)
  208. client.users.logout()
  209. def test_update_user_with_invalid_email(client: R2RClient, superuser_login):
  210. # Create a user
  211. email = f"{uuid.uuid4()}@example.com"
  212. password = "password"
  213. user_id = register_and_return_user_id(client, email, password)
  214. # Attempt to update to invalid email
  215. with pytest.raises(R2RException) as exc_info:
  216. client.users.update(user_id, email="not-an-email")
  217. # Expect a validation error (likely 422)
  218. assert exc_info.value.status_code in [
  219. 400,
  220. 422,
  221. ], "Expected validation error for invalid email."
  222. client.users.logout()
  223. def test_update_user_email_already_exists(client: R2RClient, superuser_login):
  224. # Create two users
  225. email1 = f"{uuid.uuid4()}@example.com"
  226. email2 = f"{uuid.uuid4()}@example.com"
  227. password = "password"
  228. user1_id = register_and_return_user_id(client, email1, password)
  229. user2_id = register_and_return_user_id(client, email2, password)
  230. # Try updating user2's email to user1's email
  231. with pytest.raises(R2RException) as exc_info:
  232. client.users.update(user2_id, email=email1)
  233. # Expect a conflict (likely 409) or validation error
  234. # TODO - Error code should be in [400, 409, 422], not 500
  235. assert exc_info.value.status_code in [
  236. 400,
  237. 409,
  238. 422,
  239. 500,
  240. ], "Expected error updating email to an existing user's email."
  241. client.users.logout()
  242. def test_delete_user_with_incorrect_password(client: R2RClient):
  243. email = f"{uuid.uuid4()}@example.com"
  244. password = "correct_password"
  245. # user_id = register_and_return_user_id(client: R2RClient, email, password)
  246. client.users.create(email, password)
  247. client.users.login(email, password)
  248. user_id = client.users.me().results.id
  249. # Attempt deletion with incorrect password
  250. with pytest.raises(R2RException) as exc_info:
  251. client.users.delete(user_id, "wrong_password")
  252. # TODO - Error code should be in [401, 403]
  253. assert exc_info.value.status_code in [
  254. 400,
  255. 401,
  256. 403,
  257. ], "Expected auth error with incorrect password on delete."
  258. def test_login_with_incorrect_password(client: R2RClient):
  259. email = f"{uuid.uuid4()}@example.com"
  260. password = "password123"
  261. client.users.create(email, password)
  262. # Try incorrect password
  263. with pytest.raises(R2RException) as exc_info:
  264. client.users.login(email, "wrongpass")
  265. assert exc_info.value.status_code == 401, (
  266. "Expected 401 when logging in with incorrect password.")
  267. client.users.logout()
  268. def test_refresh_token(client: R2RClient):
  269. # Assume that refresh token endpoint checks token validity
  270. # Try using a bogus refresh token
  271. email = f"{uuid.uuid4()}@example.com"
  272. password = "password123"
  273. client.users.create(email, password)
  274. client.users.login(email, password)
  275. client.users.refresh_token() # refresh_token="invalid_token")
  276. # assert exc_info.value.status_code in [400, 401], "Expected error using invalid refresh token."
  277. client.users.logout()
  278. @pytest.mark.skip(reason="Email verification logic not implemented.")
  279. def test_verification_with_invalid_code(client: R2RClient):
  280. # If your system supports email verification
  281. email = f"{uuid.uuid4()}@example.com"
  282. password = "password"
  283. register_and_return_user_id(client, email, password)
  284. # Try verifying with invalid code
  285. with pytest.raises(R2RException) as exc_info:
  286. client.users.verify_email(email, "wrong_code")
  287. assert exc_info.value.status_code in [
  288. 400,
  289. 422,
  290. ], "Expected error verifying with invalid code."
  291. client.users.logout()
  292. @pytest.mark.skip(
  293. reason="Verification and token logic depends on implementation.")
  294. def test_password_reset_with_invalid_token(client: R2RClient):
  295. email = f"{uuid.uuid4()}@example.com"
  296. password = "initialpass"
  297. register_and_return_user_id(client, email, password)
  298. client.users.logout()
  299. # Assume request password reset done here if needed
  300. # Try resetting with invalid token
  301. with pytest.raises(R2RException) as exc_info:
  302. client.users.reset_password("invalid_token", "newpass123")
  303. assert exc_info.value.status_code in [
  304. 400,
  305. 422,
  306. ], "Expected error resetting password with invalid token."
  307. client.users.logout()
  308. @pytest.fixture
  309. def user_with_api_key(client: R2RClient):
  310. """Fixture that creates a user and returns their ID and API key details."""
  311. random_email = f"{uuid.uuid4()}@example.com"
  312. password = "api_key_test_password"
  313. user_id = client.users.create(random_email, password).results.id
  314. # Login to create an API key
  315. client.users.login(random_email, password)
  316. api_key_resp = client.users.create_api_key(user_id).results
  317. api_key = api_key_resp.api_key
  318. key_id = api_key_resp.key_id
  319. yield user_id, api_key, key_id
  320. # Cleanup
  321. try:
  322. client.users.delete_api_key(user_id, key_id)
  323. except:
  324. pass
  325. client.users.logout()
  326. def test_api_key_lifecycle(client: R2RClient):
  327. """Test the complete lifecycle of API keys including creation, listing, and
  328. deletion."""
  329. # Create user and login
  330. email = f"{uuid.uuid4()}@example.com"
  331. password = "api_key_test_password"
  332. user_id = client.users.create(email, password).results.id
  333. client.users.login(email, password)
  334. # Create API key
  335. api_key_resp = client.users.create_api_key(user_id).results
  336. assert api_key_resp.api_key is not None, "API key not returned"
  337. assert api_key_resp.key_id is not None, "Key ID not returned"
  338. assert api_key_resp.public_key is not None, "Public key not returned"
  339. key_id = api_key_resp.key_id
  340. # List API keys
  341. list_resp = client.users.list_api_keys(user_id).results
  342. assert len(list_resp) > 0, "No API keys found after creation"
  343. assert list_resp[0].key_id == key_id, (
  344. "Listed key ID doesn't match created key")
  345. assert list_resp[0].updated_at is not None, "Updated timestamp missing"
  346. assert list_resp[0].public_key is not None, "Public key missing in list"
  347. # Delete API key using key_id
  348. delete_resp = client.users.delete_api_key(user_id, key_id).results
  349. assert delete_resp.success, "Failed to delete API key"
  350. # Verify deletion
  351. list_resp_after = client.users.list_api_keys(user_id).results
  352. assert not any(
  353. k.key_id == key_id
  354. for k in list_resp_after), ("API key still exists after deletion")
  355. client.users.logout()
  356. def test_api_key_authentication(client: R2RClient, user_with_api_key):
  357. """Test using an API key for authentication."""
  358. user_id, api_key, _ = user_with_api_key
  359. # Create new client with API key
  360. api_client = R2RClient(client.base_url)
  361. api_client.set_api_key(api_key)
  362. # Test API key authentication
  363. me_id = api_client.users.me().results.id
  364. assert me_id == user_id, "API key authentication failed"
  365. def test_api_key_permissions(client: R2RClient, user_with_api_key):
  366. """Test API key permission restrictions."""
  367. user_id, api_key, _ = user_with_api_key
  368. # Create new client with API key
  369. api_client = R2RClient(client.base_url)
  370. api_client.set_api_key(api_key)
  371. # Should not be able to list all users (superuser only)
  372. with pytest.raises(R2RException) as exc_info:
  373. api_client.users.list()
  374. assert exc_info.value.status_code == 403, (
  375. "Non-superuser API key shouldn't list users")
  376. def test_invalid_api_key(client: R2RClient):
  377. """Test behavior with invalid API key."""
  378. api_client = R2RClient(client.base_url)
  379. api_client.set_api_key("invalid.api.key")
  380. with pytest.raises(R2RException) as exc_info:
  381. api_client.users.me()
  382. assert exc_info.value.status_code == 401, (
  383. "Expected 401 for invalid API key")
  384. def test_multiple_api_keys(client: R2RClient):
  385. """Test creating and managing multiple API keys for a single user."""
  386. email = f"{uuid.uuid4()}@example.com"
  387. password = "multi_key_test_password"
  388. user_id = client.users.create(email, password).results.id
  389. client.users.login(email, password)
  390. # Create multiple API keys
  391. key_ids = []
  392. for i in range(3):
  393. key_resp = client.users.create_api_key(user_id).results
  394. key_ids.append(key_resp.key_id)
  395. # List and verify all keys exist
  396. list_resp = client.users.list_api_keys(user_id).results
  397. assert len(list_resp) >= 3, "Not all API keys were created"
  398. # Delete keys one by one and verify counts
  399. for key_id in key_ids:
  400. client.users.delete_api_key(user_id, key_id)
  401. current_keys = client.users.list_api_keys(user_id).results
  402. assert not any(k.key_id == key_id for k in current_keys), (
  403. f"Key {key_id} still exists after deletion")
  404. client.users.logout()
  405. def test_update_user_limits_overrides(client: R2RClient):
  406. # 1) Create user
  407. user_email = f"test_{uuid.uuid4()}@example.com"
  408. client.users.create(user_email, "SomePassword123!")
  409. client.users.login(user_email, "SomePassword123!")
  410. # 2) Confirm the default overrides is None
  411. fetched_user = client.users.me().results
  412. client.users.logout()
  413. assert len(fetched_user.limits_overrides) == 0
  414. # 3) Update the overrides
  415. overrides = {
  416. "global_per_min": 10,
  417. "monthly_limit": 3000,
  418. "route_overrides": {
  419. "/some-route": {
  420. "route_per_min": 5
  421. },
  422. },
  423. }
  424. client.users.update(id=fetched_user.id, limits_overrides=overrides)
  425. # 4) Fetch user again, check
  426. client.users.login(user_email, "SomePassword123!")
  427. updated_user = client.users.me().results
  428. assert len(updated_user.limits_overrides) != 0
  429. assert updated_user.limits_overrides["global_per_min"] == 10
  430. assert (updated_user.limits_overrides["route_overrides"]["/some-route"]
  431. ["route_per_min"] == 5)
  432. def test_collection_ownership_filtering(client: R2RClient):
  433. """Test the ownerOnly filter parameter in collections list endpoint."""
  434. # Create two test users
  435. user1_email = f"user1_{uuid.uuid4()}@test.com"
  436. user1_password = "password123"
  437. user2_email = f"user2_{uuid.uuid4()}@test.com"
  438. user2_password = "password123"
  439. # Register users
  440. client.users.create(user1_email, user1_password)
  441. client.users.create(user2_email, user2_password)
  442. # Login as user1 and create a collection
  443. client.users.login(user1_email, user1_password)
  444. user1_id = client.users.me().results.id
  445. user1_collection = client.collections.create(name="User1 Collection").results
  446. user1_collection_id = user1_collection.id
  447. # Login as user2 and create a collection
  448. client.users.logout()
  449. client.users.login(user2_email, user2_password)
  450. user2_id = client.users.me().results.id
  451. user2_collection = client.collections.create(name="User2 Collection").results
  452. user2_collection_id = user2_collection.id
  453. # User2 adds user1 to their collection
  454. client.collections.add_user(user2_collection_id, user1_id)
  455. # Login as user1 and check collections
  456. client.users.logout()
  457. client.users.login(user1_email, user1_password)
  458. # List all collections
  459. all_collections = client.collections.list().results
  460. all_collection_ids = [str(col.id) for col in all_collections]
  461. # Verify user1 can see their own collection
  462. assert str(user1_collection_id) in all_collection_ids, "User1 can't see their own collection"
  463. # Verify user1 can see user2's shared collection
  464. assert str(user2_collection_id) in all_collection_ids, "User1 can't see shared collection"
  465. # List only owned collections
  466. owned_collections = client.collections.list(owner_only=True).results
  467. owned_collection_ids = [str(col.id) for col in owned_collections]
  468. # Verify user1's collection is in the owned list
  469. assert str(user1_collection_id) in owned_collection_ids, "User1's collection not in owned list"
  470. # Verify user2's collection is NOT in the owned list
  471. assert str(user2_collection_id) not in owned_collection_ids, "Shared collection should not be in owned list"
  472. # User1 adds user2 to their collection
  473. client.collections.add_user(user1_collection_id, user2_id)
  474. # Login as user2 and check collections
  475. client.users.logout()
  476. client.users.login(user2_email, user2_password)
  477. # List all collections
  478. all_collections = client.collections.list().results
  479. all_collection_ids = [str(col.id) for col in all_collections]
  480. # Verify user2 can see their own collection
  481. assert str(user2_collection_id) in all_collection_ids, "User2 can't see their own collection"
  482. # Verify user2 can see user1's shared collection
  483. assert str(user1_collection_id) in all_collection_ids, "User2 can't see shared collection"
  484. # List only owned collections
  485. owned_collections = client.collections.list(owner_only=True).results
  486. owned_collection_ids = [str(col.id) for col in owned_collections]
  487. # Verify user2's collection is in the owned list
  488. assert str(user2_collection_id) in owned_collection_ids, "User2's collection not in owned list"
  489. # Verify user1's collection is NOT in the owned list
  490. assert str(user1_collection_id) not in owned_collection_ids, "Shared collection should not be in owned list"
  491. # Cleanup
  492. client.users.logout()
  493. def test_superuser_collection_ownership_filtering(client: R2RClient, superuser_login, config):
  494. """Test the ownerOnly filter for superusers."""
  495. # Create a regular user
  496. user_email = f"regular_{uuid.uuid4()}@test.com"
  497. user_password = "password123"
  498. client.users.create(user_email, user_password)
  499. # Create a collection as superuser
  500. superuser_collection = client.collections.create(name="Superuser Collection").results
  501. superuser_id = client.users.me().results.id
  502. # List all collections as superuser (without filter)
  503. all_collections_count = len(client.collections.list().results)
  504. assert all_collections_count > 0, "Superuser should see collections"
  505. # List only owned collections as superuser
  506. owned_collections = client.collections.list(owner_only=True).results
  507. owned_count = len(owned_collections)
  508. assert owned_count > 0, "Superuser should see owned collections"
  509. assert owned_count < all_collections_count, "Filtered list should be smaller than all collections"
  510. # Verify the superuser collection is in the owned list
  511. assert any(str(col.id) == str(superuser_collection.id) for col in owned_collections), \
  512. "Superuser collection should be in the owned list"
  513. # Cleanup
  514. client.collections.delete(superuser_collection.id)
  515. client.users.logout()
  516. def test_collection_filter_invalid_parameters(client: R2RClient):
  517. """Test error handling for invalid filter parameters."""
  518. # Create a test user
  519. user_email = f"test_{uuid.uuid4()}@test.com"
  520. user_password = "password123"
  521. client.users.create(user_email, user_password)
  522. client.users.login(user_email, user_password)
  523. # Test with invalid owner_only parameter type (should be bool, not string)
  524. with pytest.raises(R2RException) as exc_info:
  525. client.collections.list(owner_only="not-a-bool")
  526. assert exc_info.value.status_code in [400, 422], \
  527. "Expected validation error for invalid owner_only parameter"
  528. client.users.logout()
  529. def test_document_ownership_filtering(client: R2RClient):
  530. """Test the ownerOnly filter parameter in documents list endpoint."""
  531. # Create two test users
  532. user1_email = f"user1_doc_{uuid.uuid4()}@test.com"
  533. user1_password = "password123"
  534. user2_email = f"user2_doc_{uuid.uuid4()}@test.com"
  535. user2_password = "password123"
  536. # Register users
  537. client.users.create(user1_email, user1_password)
  538. client.users.create(user2_email, user2_password)
  539. # Login as user1 and create a document and collection
  540. client.users.login(user1_email, user1_password)
  541. user1_id = client.users.me().results.id
  542. user1_collection = client.collections.create(name="User1 Doc Collection").results
  543. user1_collection_id = user1_collection.id
  544. user1_document = client.documents.create(
  545. raw_text="User 1 document content",
  546. metadata={"title": "User 1 Document"}
  547. ).results
  548. user1_document_id = user1_document.document_id
  549. # Wait for processing
  550. import time
  551. time.sleep(5)
  552. # Login as user2 and create a document and collection
  553. client.users.logout()
  554. client.users.login(user2_email, user2_password)
  555. user2_id = client.users.me().results.id
  556. user2_collection = client.collections.create(name="User2 Doc Collection").results
  557. user2_collection_id = user2_collection.id
  558. user2_document = client.documents.create(
  559. raw_text="User 2 document content",
  560. metadata={"title": "User 2 Document"}
  561. ).results
  562. user2_document_id = user2_document.document_id
  563. # Wait for processing
  564. time.sleep(5)
  565. # Add user1's document to user2's collection
  566. client.collections.add_document(user2_collection_id, user1_document_id)
  567. # Login as user1 and check documents
  568. client.users.logout()
  569. client.users.login(user1_email, user1_password)
  570. # List all documents
  571. all_documents = client.documents.list().results
  572. all_document_ids = [str(doc.id) for doc in all_documents]
  573. # Verify user1 can see their own document
  574. assert str(user1_document_id) in all_document_ids, "User1 can't see their own document"
  575. # List only owned documents
  576. owned_documents = client.documents.list(owner_only=True).results
  577. owned_document_ids = [str(doc.id) for doc in owned_documents]
  578. # Verify user1's document is in the owned list
  579. assert str(user1_document_id) in owned_document_ids, "User1's document not in owned list"
  580. # Add user2's document to user1's collection
  581. client.collections.add_document(user1_collection_id, user2_document_id)
  582. # Login as user2 and check documents
  583. client.users.logout()
  584. client.users.login(user2_email, user2_password)
  585. # List all documents
  586. all_documents = client.documents.list().results
  587. all_document_ids = [str(doc.id) for doc in all_documents]
  588. # Verify user2 can see their own document
  589. assert str(user2_document_id) in all_document_ids, "User2 can't see their own document"
  590. # Verify user2 can see user1's shared document
  591. assert str(user1_document_id) in all_document_ids, "User2 can't see shared document"
  592. # List only owned documents
  593. owned_documents = client.documents.list(owner_only=True).results
  594. owned_document_ids = [str(doc.id) for doc in owned_documents]
  595. # Verify user2's document is in the owned list
  596. assert str(user2_document_id) in owned_document_ids, "User2's document not in owned list"
  597. # Verify user1's document is NOT in the owned list
  598. assert str(user1_document_id) not in owned_document_ids, "Shared document should not be in owned list"
  599. # Cleanup - login as the right user first
  600. client.users.logout()
  601. client.users.login(user1_email, user1_password)
  602. try:
  603. client.documents.delete(user1_document_id)
  604. except Exception as e:
  605. print(f"Failed to delete user1's document: {e}")
  606. client.users.logout()
  607. client.users.login(user2_email, user2_password)
  608. try:
  609. client.documents.delete(user2_document_id)
  610. except Exception as e:
  611. print(f"Failed to delete user2's document: {e}")
  612. client.users.logout()
  613. def test_document_filter_invalid_parameters(client: R2RClient):
  614. """Test error handling for invalid filter parameters in documents endpoint."""
  615. # Create a test user
  616. user_email = f"test_doc_{uuid.uuid4()}@test.com"
  617. user_password = "password123"
  618. client.users.create(user_email, user_password)
  619. client.users.login(user_email, user_password)
  620. # Test with invalid owner_only parameter type (should be bool, not string)
  621. with pytest.raises(R2RException) as exc_info:
  622. client.documents.list(owner_only="not-a-bool")
  623. assert exc_info.value.status_code in [400, 422], \
  624. "Expected validation error for invalid owner_only parameter"
  625. client.users.logout()