123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- Tests for the conversations commands in the CLI.
- - create
- - list
- - retrieve
- - delete
- - list-users
- """
- import json
- import uuid
- import pytest
- from click.testing import CliRunner
- from cli.commands.conversations import (
- create,
- delete,
- list,
- list_users,
- retrieve,
- )
- from r2r import R2RAsyncClient
- from tests.cli.async_invoke import async_invoke
- def extract_json_block(output: str) -> dict:
- """Extract and parse the first valid JSON object found in the output."""
- start = output.find("{")
- if start == -1:
- raise ValueError("No JSON object start found in output")
- brace_count = 0
- for i, char in enumerate(output[start:], start=start):
- if char == "{":
- brace_count += 1
- elif char == "}":
- brace_count -= 1
- if brace_count == 0:
- json_str = output[start : i + 1].strip()
- return json.loads(json_str)
- raise ValueError("No complete JSON object found in output")
- @pytest.mark.asyncio
- async def test_conversation_lifecycle():
- """Test the complete lifecycle of a conversation: create, retrieve, delete."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- # Create conversation
- create_result = await async_invoke(
- runner,
- create,
- obj=client,
- )
- assert create_result.exit_code == 0, create_result.stdout_bytes.decode()
- output = create_result.stdout_bytes.decode()
- create_response = extract_json_block(output)
- conversation_id = create_response["results"]["id"]
- try:
- # Retrieve conversation
- retrieve_result = await async_invoke(
- runner, retrieve, conversation_id, obj=client
- )
- assert retrieve_result.exit_code == 0
- retrieve_output = retrieve_result.stdout_bytes.decode()
- # FIXME: This assertion fails, we need to sync Conversation and ConversationResponse
- # assert conversation_id in retrieve_output
- assert "results" in retrieve_output
- # List users in conversation
- list_users_result = await async_invoke(
- runner, list_users, conversation_id, obj=client
- )
- assert list_users_result.exit_code == 0
- finally:
- # Delete conversation
- delete_result = await async_invoke(
- runner, delete, conversation_id, obj=client
- )
- assert delete_result.exit_code == 0
- @pytest.mark.asyncio
- async def test_list_conversations():
- """Test listing conversations with various parameters."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- # Create test conversation first
- create_result = await async_invoke(runner, create, obj=client)
- response = extract_json_block(create_result.stdout_bytes.decode())
- conversation_id = response["results"]["id"]
- try:
- # Test basic list
- list_result = await async_invoke(runner, list, obj=client)
- assert list_result.exit_code == 0
- # Test paginated results
- list_paginated = await async_invoke(
- runner, list, "--offset", "0", "--limit", "2", obj=client
- )
- assert list_paginated.exit_code == 0
- finally:
- # Cleanup
- await async_invoke(runner, delete, conversation_id, obj=client)
- @pytest.mark.asyncio
- async def test_nonexistent_conversation():
- """Test operations on a nonexistent conversation."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- nonexistent_id = str(uuid.uuid4())
- # Test retrieve
- retrieve_result = await async_invoke(
- runner, retrieve, nonexistent_id, obj=client
- )
- assert "not found" in retrieve_result.stderr_bytes.decode().lower()
- # Test list_users
- list_users_result = await async_invoke(
- runner, list_users, nonexistent_id, obj=client
- )
- assert "not found" in list_users_result.stderr_bytes.decode().lower()
- @pytest.mark.asyncio
- async def test_list_conversations_pagination():
- """Test pagination functionality of list conversations."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- # Create multiple conversations
- conversation_ids = []
- for _ in range(3):
- create_result = await async_invoke(runner, create, obj=client)
- response = extract_json_block(create_result.stdout_bytes.decode())
- conversation_ids.append(response["results"]["id"])
- try:
- # Test with different pagination parameters
- list_first_page = await async_invoke(
- runner, list, "--offset", "0", "--limit", "2", obj=client
- )
- assert list_first_page.exit_code == 0
- first_page_output = list_first_page.stdout_bytes.decode()
- list_second_page = await async_invoke(
- runner, list, "--offset", "2", "--limit", "2", obj=client
- )
- assert list_second_page.exit_code == 0
- second_page_output = list_second_page.stdout_bytes.decode()
- # Verify different results on different pages
- assert first_page_output != second_page_output
- finally:
- # Cleanup
- for conversation_id in conversation_ids:
- await async_invoke(runner, delete, conversation_id, obj=client)
|