""" Tests for the document commands in the CLI. - create - retrieve - list - delete - list-chunks - list-collections x ingest-files-from-url x extract x list-entities x list-relationships x create-sample x create-samples """ import contextlib import json import os import tempfile import uuid import pytest from click.testing import CliRunner from cli.commands.documents import ( create, delete, list, list_chunks, list_collections, retrieve, ) from r2r import R2RAsyncClient from tests.cli.async_invoke import async_invoke @pytest.fixture def temp_text_file(): """Create a temporary text file for testing.""" with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False ) as f: f.write("This is test content for document testing.") temp_path = f.name yield temp_path # Cleanup temp file if os.path.exists(temp_path): os.unlink(temp_path) @pytest.fixture def temp_json_file(): """Create a temporary JSON file for testing.""" with tempfile.NamedTemporaryFile( mode="w", suffix=".json", delete=False ) as f: json.dump({"test": "content", "for": "document testing"}, f) temp_path = f.name yield temp_path # Cleanup temp file if os.path.exists(temp_path): os.unlink(temp_path) def extract_json_block(output: str) -> dict: """Extract and parse the first valid JSON object found in the output.""" # We assume the output contains at least one JSON object printed with json.dumps(indent=2). # We'll find the first '{' and the matching closing '}' that forms a valid JSON object. start = output.find("{") if start == -1: raise ValueError("No JSON object start found in output") # Track braces to find the matching '}' brace_count = 0 for i, char in enumerate(output[start:], start=start): if char == "{": brace_count += 1 elif char == "}": brace_count -= 1 if brace_count == 0: # Found the matching closing brace json_str = output[start : i + 1].strip() return json.loads(json_str) raise ValueError("No complete JSON object found in output") @pytest.mark.asyncio async def test_document_lifecycle(temp_text_file): """Test the complete lifecycle of a document: create, retrieve, delete.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) # Create document create_result = await async_invoke( runner, create, temp_text_file, obj=client ) assert create_result.exit_code == 0, create_result.stdout_bytes.decode() output = create_result.stdout_bytes.decode() create_response = extract_json_block(output) document_id = create_response["results"]["document_id"] try: # Retrieve document retrieve_result = await async_invoke( runner, retrieve, document_id, obj=client ) assert ( retrieve_result.exit_code == 0 ), retrieve_result.stdout_bytes.decode() # Instead of parsing JSON, verify the ID appears in the table output retrieve_output = retrieve_result.stdout_bytes.decode() assert document_id in retrieve_output # List chunks list_chunks_result = await async_invoke( runner, list_chunks, document_id, obj=client ) assert ( list_chunks_result.exit_code == 0 ), list_chunks_result.stdout_bytes.decode() # List collections list_collections_result = await async_invoke( runner, list_collections, document_id, obj=client ) assert ( list_collections_result.exit_code == 0 ), list_collections_result.stdout_bytes.decode() finally: # Delete document delete_result = await async_invoke( runner, delete, document_id, obj=client ) assert ( delete_result.exit_code == 0 ), delete_result.stdout_bytes.decode() @pytest.mark.asyncio async def test_create_multiple_documents(temp_text_file, temp_json_file): """Test creating multiple documents with metadata.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) metadatas = json.dumps( [ {"description": "Test document 1"}, {"description": "Test document 2"}, ] ) create_result = await async_invoke( runner, create, temp_text_file, temp_json_file, "--metadatas", metadatas, obj=client, ) assert create_result.exit_code == 0, create_result.stdout_bytes.decode() output = create_result.stdout_bytes.decode() # The command may print multiple JSON objects separated by dashes and status lines. # Extract all JSON objects. json_objects = [] start_idx = 0 while True: try: # Attempt to extract a JSON object from output[start_idx:] block = extract_json_block(output[start_idx:]) json_objects.append(block) # Move start_idx beyond this block to find the next one next_start = output[start_idx:].find("{") start_idx += output[start_idx:].find("{") + 1 # Move past the first '{' we found # Actually, let's break after one extraction to avoid infinite loops if the output is large. # Instead, we find multiple objects by splitting on the line of dashes: break except ValueError: break # Alternatively, if multiple objects are separated by "----------", we can split and parse each: # This assumes each block between "----------" lines contains exactly one JSON object. blocks = output.split("-" * 40) json_objects = [] for block in blocks: block = block.strip() if '"results"' in block and "{" in block and "}" in block: with contextlib.suppress(ValueError): json_objects.append(extract_json_block(block)) assert ( len(json_objects) == 2 ), f"Expected 2 JSON objects, got {len(json_objects)}: {output}" document_ids = [obj["results"]["document_id"] for obj in json_objects] try: # List all documents list_result = await async_invoke(runner, list, obj=client) assert list_result.exit_code == 0, list_result.stdout_bytes.decode() # Verify both documents were created for doc_id in document_ids: retrieve_result = await async_invoke( runner, retrieve, doc_id, obj=client ) assert ( retrieve_result.exit_code == 0 ), retrieve_result.stdout_bytes.decode() finally: # Cleanup - delete all created documents for doc_id in document_ids: delete_result = await async_invoke( runner, delete, doc_id, obj=client ) assert ( delete_result.exit_code == 0 ), delete_result.stdout_bytes.decode() @pytest.mark.asyncio async def test_create_with_custom_id(): """Test creating a document with a custom ID.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) custom_id = str(uuid.uuid4()) with tempfile.NamedTemporaryFile( mode="w", suffix=".txt", delete=False ) as f: f.write("Test content") temp_path = f.name try: create_result = await async_invoke( runner, create, temp_path, "--ids", custom_id, obj=client ) assert ( create_result.exit_code == 0 ), create_result.stdout_bytes.decode() output = create_result.stdout_bytes.decode() create_response = extract_json_block(output) assert create_response["results"]["document_id"] == custom_id finally: if os.path.exists(temp_path): os.unlink(temp_path) await async_invoke(runner, delete, custom_id, obj=client) @pytest.mark.asyncio async def test_retrieve_nonexistent_document(): """Test retrieving a document that doesn't exist.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) nonexistent_id = str(uuid.uuid4()) result = await async_invoke(runner, retrieve, nonexistent_id, obj=client) stderr = result.stderr_bytes.decode() assert ( "Document not found" in stderr or "Document not found" in result.stdout_bytes.decode() ) @pytest.mark.asyncio async def test_list_chunks_nonexistent_document(): """Test listing chunks for a document that doesn't exist.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) nonexistent_id = str(uuid.uuid4()) result = await async_invoke( runner, list_chunks, nonexistent_id, obj=client ) stderr = result.stderr_bytes.decode() assert ( "No chunks found for the given document ID." in stderr or "No chunks found for the given document ID." in result.stdout_bytes.decode() ) # FIXME: This should be returning 'Document not found' but returns an empty list instead. # @pytest.mark.asyncio # async def test_list_collections_nonexistent_document(): # """Test listing collections for a document that doesn't exist.""" # client = R2RAsyncClient(base_url="http://localhost:7272") # runner = CliRunner(mix_stderr=False) # nonexistent_id = str(uuid.uuid4()) # result = await async_invoke( # runner, list_collections, nonexistent_id, obj=client # ) # stderr = result.stderr_bytes.decode() # assert ( # "Document not found" in stderr # or "Document not found" in result.stdout_bytes.decode() # ) @pytest.mark.asyncio async def test_delete_nonexistent_document(): """Test deleting a document that doesn't exist.""" client = R2RAsyncClient(base_url="http://localhost:7272") runner = CliRunner(mix_stderr=False) nonexistent_id = str(uuid.uuid4()) result = await async_invoke(runner, delete, nonexistent_id, obj=client) stderr = result.stderr_bytes.decode() assert ( "No entries found for deletion" in stderr or "No entries found for deletion" in result.stdout_bytes.decode() )