123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """
- Tests for the user commands in the CLI.
- - create
- - list
- - retrieve
- - me
- x list-collections
- x add-to-collection
- x remove-from-collection
- """
- import json
- import uuid
- import pytest
- from click.testing import CliRunner
- from cli.commands.users import (
- add_to_collection,
- create,
- list,
- list_collections,
- me,
- remove_from_collection,
- retrieve,
- )
- from r2r import R2RAsyncClient
- from tests.cli.async_invoke import async_invoke
- def extract_json_block(output: str) -> dict:
- """Extract and parse the first valid JSON object found in the output."""
- start = output.find("{")
- if start == -1:
- raise ValueError("No JSON object start found in output")
- brace_count = 0
- for i, char in enumerate(output[start:], start=start):
- if char == "{":
- brace_count += 1
- elif char == "}":
- brace_count -= 1
- if brace_count == 0:
- json_str = output[start : i + 1].strip()
- return json.loads(json_str)
- raise ValueError("No complete JSON object found in output")
- @pytest.mark.asyncio
- async def test_user_lifecycle():
- """Test the complete lifecycle of a user: create, retrieve, list, collections."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- # Create test user with random email
- test_email = f"test_{uuid.uuid4()}@example.com"
- test_password = "TestPassword123!"
- # Create user
- create_result = await async_invoke(
- runner, create, test_email, test_password, obj=client
- )
- assert create_result.exit_code == 0, create_result.stdout_bytes.decode()
- output = create_result.stdout_bytes.decode()
- create_response = extract_json_block(output)
- user_id = create_response["results"]["id"]
- try:
- # List users and verify our new user is included
- list_result = await async_invoke(runner, list, obj=client)
- assert list_result.exit_code == 0, list_result.stdout_bytes.decode()
- list_output = list_result.stdout_bytes.decode()
- assert test_email in list_output
- # Retrieve specific user
- retrieve_result = await async_invoke(
- runner, retrieve, user_id, obj=client
- )
- assert (
- retrieve_result.exit_code == 0
- ), retrieve_result.stdout_bytes.decode()
- retrieve_output = retrieve_result.stdout_bytes.decode()
- retrieve_response = extract_json_block(retrieve_output)
- assert retrieve_response["results"]["email"] == test_email
- # Test me endpoint
- me_result = await async_invoke(runner, me, obj=client)
- assert me_result.exit_code == 0, me_result.stdout_bytes.decode()
- # List collections for user
- collections_result = await async_invoke(
- runner, list_collections, user_id, obj=client
- )
- assert (
- collections_result.exit_code == 0
- ), collections_result.stdout_bytes.decode()
- finally:
- # We don't delete the user since there's no delete command
- pass
- # FIXME: This should be returning 'User not found' but returns an empty list instead.
- # @pytest.mark.asyncio
- # async def test_retrieve_nonexistent_user():
- # """Test retrieving a user that doesn't exist."""
- # client = R2RAsyncClient(base_url="http://localhost:7272")
- # runner = CliRunner(mix_stderr=False)
- # nonexistent_id = str(uuid.uuid4())
- # result = await async_invoke(runner, retrieve, nonexistent_id, obj=client)
- # assert result.exit_code != 0
- # error_output = result.stderr_bytes.decode()
- # assert "User not found" in error_output
- # FIXME: This is returning with a status of 0 but has a 400 on the server side?
- # @pytest.mark.asyncio
- # async def test_create_duplicate_user():
- # """Test creating a user with an email that already exists."""
- # client = R2RAsyncClient(base_url="http://localhost:7272")
- # runner = CliRunner(mix_stderr=False)
- # test_email = f"test_{uuid.uuid4()}@example.com"
- # test_password = "TestPassword123!"
- # # Create first user
- # first_result = await async_invoke(
- # runner, create, test_email, test_password, obj=client
- # )
- # assert first_result.exit_code == 0
- # # Try to create second user with same email
- # second_result = await async_invoke(
- # runner, create, test_email, test_password, obj=client
- # )
- # print(f"SECOND RESULT: {second_result}")
- # assert second_result.exit_code != 0
- # error_output = second_result.stderr_bytes.decode()
- # assert "already exists" in error_output.lower()
|