123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- """
- Tests for the document commands in the CLI.
- - create
- - retrieve
- - list
- - delete
- - list-chunks
- - list-collections
- x ingest-files-from-url
- x extract
- x list-entities
- x list-relationships
- x create-sample
- x create-samples
- """
- import contextlib
- import json
- import os
- import tempfile
- import uuid
- import pytest
- from click.testing import CliRunner
- from cli.commands.documents import (
- create,
- delete,
- list,
- list_chunks,
- list_collections,
- retrieve,
- )
- from r2r import R2RAsyncClient
- from tests.cli.async_invoke import async_invoke
- @pytest.fixture
- def temp_text_file():
- """Create a temporary text file for testing."""
- with tempfile.NamedTemporaryFile(
- mode="w", suffix=".txt", delete=False
- ) as f:
- f.write("This is test content for document testing.")
- temp_path = f.name
- yield temp_path
- # Cleanup temp file
- if os.path.exists(temp_path):
- os.unlink(temp_path)
- @pytest.fixture
- def temp_json_file():
- """Create a temporary JSON file for testing."""
- with tempfile.NamedTemporaryFile(
- mode="w", suffix=".json", delete=False
- ) as f:
- json.dump({"test": "content", "for": "document testing"}, f)
- temp_path = f.name
- yield temp_path
- # Cleanup temp file
- if os.path.exists(temp_path):
- os.unlink(temp_path)
- def extract_json_block(output: str) -> dict:
- """Extract and parse the first valid JSON object found in the output."""
- # We assume the output contains at least one JSON object printed with json.dumps(indent=2).
- # We'll find the first '{' and the matching closing '}' that forms a valid JSON object.
- start = output.find("{")
- if start == -1:
- raise ValueError("No JSON object start found in output")
- # Track braces to find the matching '}'
- brace_count = 0
- for i, char in enumerate(output[start:], start=start):
- if char == "{":
- brace_count += 1
- elif char == "}":
- brace_count -= 1
- if brace_count == 0:
- # Found the matching closing brace
- json_str = output[start : i + 1].strip()
- return json.loads(json_str)
- raise ValueError("No complete JSON object found in output")
- @pytest.mark.asyncio
- async def test_document_lifecycle(temp_text_file):
- """Test the complete lifecycle of a document: create, retrieve, delete."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- # Create document
- create_result = await async_invoke(
- runner, create, temp_text_file, obj=client
- )
- assert create_result.exit_code == 0, create_result.stdout_bytes.decode()
- output = create_result.stdout_bytes.decode()
- create_response = extract_json_block(output)
- document_id = create_response["results"]["document_id"]
- try:
- # Retrieve document
- retrieve_result = await async_invoke(
- runner, retrieve, document_id, obj=client
- )
- assert (
- retrieve_result.exit_code == 0
- ), retrieve_result.stdout_bytes.decode()
- # Instead of parsing JSON, verify the ID appears in the table output
- retrieve_output = retrieve_result.stdout_bytes.decode()
- assert document_id in retrieve_output
- # List chunks
- list_chunks_result = await async_invoke(
- runner, list_chunks, document_id, obj=client
- )
- assert (
- list_chunks_result.exit_code == 0
- ), list_chunks_result.stdout_bytes.decode()
- # List collections
- list_collections_result = await async_invoke(
- runner, list_collections, document_id, obj=client
- )
- assert (
- list_collections_result.exit_code == 0
- ), list_collections_result.stdout_bytes.decode()
- finally:
- # Delete document
- delete_result = await async_invoke(
- runner, delete, document_id, obj=client
- )
- assert (
- delete_result.exit_code == 0
- ), delete_result.stdout_bytes.decode()
- @pytest.mark.asyncio
- async def test_create_multiple_documents(temp_text_file, temp_json_file):
- """Test creating multiple documents with metadata."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- metadatas = json.dumps(
- [
- {"description": "Test document 1"},
- {"description": "Test document 2"},
- ]
- )
- create_result = await async_invoke(
- runner,
- create,
- temp_text_file,
- temp_json_file,
- "--metadatas",
- metadatas,
- obj=client,
- )
- assert create_result.exit_code == 0, create_result.stdout_bytes.decode()
- output = create_result.stdout_bytes.decode()
- # The command may print multiple JSON objects separated by dashes and status lines.
- # Extract all JSON objects.
- json_objects = []
- start_idx = 0
- while True:
- try:
- # Attempt to extract a JSON object from output[start_idx:]
- block = extract_json_block(output[start_idx:])
- json_objects.append(block)
- # Move start_idx beyond this block to find the next one
- next_start = output[start_idx:].find("{")
- start_idx += output[start_idx:].find("{") + 1
- # Move past the first '{' we found
- # Actually, let's break after one extraction to avoid infinite loops if the output is large.
- # Instead, we find multiple objects by splitting on the line of dashes:
- break
- except ValueError:
- break
- # Alternatively, if multiple objects are separated by "----------", we can split and parse each:
- # This assumes each block between "----------" lines contains exactly one JSON object.
- blocks = output.split("-" * 40)
- json_objects = []
- for block in blocks:
- block = block.strip()
- if '"results"' in block and "{" in block and "}" in block:
- with contextlib.suppress(ValueError):
- json_objects.append(extract_json_block(block))
- assert (
- len(json_objects) == 2
- ), f"Expected 2 JSON objects, got {len(json_objects)}: {output}"
- document_ids = [obj["results"]["document_id"] for obj in json_objects]
- try:
- # List all documents
- list_result = await async_invoke(runner, list, obj=client)
- assert list_result.exit_code == 0, list_result.stdout_bytes.decode()
- # Verify both documents were created
- for doc_id in document_ids:
- retrieve_result = await async_invoke(
- runner, retrieve, doc_id, obj=client
- )
- assert (
- retrieve_result.exit_code == 0
- ), retrieve_result.stdout_bytes.decode()
- finally:
- # Cleanup - delete all created documents
- for doc_id in document_ids:
- delete_result = await async_invoke(
- runner, delete, doc_id, obj=client
- )
- assert (
- delete_result.exit_code == 0
- ), delete_result.stdout_bytes.decode()
- @pytest.mark.asyncio
- async def test_create_with_custom_id():
- """Test creating a document with a custom ID."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- custom_id = str(uuid.uuid4())
- with tempfile.NamedTemporaryFile(
- mode="w", suffix=".txt", delete=False
- ) as f:
- f.write("Test content")
- temp_path = f.name
- try:
- create_result = await async_invoke(
- runner, create, temp_path, "--ids", custom_id, obj=client
- )
- assert (
- create_result.exit_code == 0
- ), create_result.stdout_bytes.decode()
- output = create_result.stdout_bytes.decode()
- create_response = extract_json_block(output)
- assert create_response["results"]["document_id"] == custom_id
- finally:
- if os.path.exists(temp_path):
- os.unlink(temp_path)
- await async_invoke(runner, delete, custom_id, obj=client)
- @pytest.mark.asyncio
- async def test_retrieve_nonexistent_document():
- """Test retrieving a document that doesn't exist."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- nonexistent_id = str(uuid.uuid4())
- result = await async_invoke(runner, retrieve, nonexistent_id, obj=client)
- stderr = result.stderr_bytes.decode()
- assert (
- "Document not found" in stderr
- or "Document not found" in result.stdout_bytes.decode()
- )
- @pytest.mark.asyncio
- async def test_list_chunks_nonexistent_document():
- """Test listing chunks for a document that doesn't exist."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- nonexistent_id = str(uuid.uuid4())
- result = await async_invoke(
- runner, list_chunks, nonexistent_id, obj=client
- )
- stderr = result.stderr_bytes.decode()
- assert (
- "No chunks found for the given document ID." in stderr
- or "No chunks found for the given document ID."
- in result.stdout_bytes.decode()
- )
- # FIXME: This should be returning 'Document not found' but returns an empty list instead.
- # @pytest.mark.asyncio
- # async def test_list_collections_nonexistent_document():
- # """Test listing collections for a document that doesn't exist."""
- # client = R2RAsyncClient(base_url="http://localhost:7272")
- # runner = CliRunner(mix_stderr=False)
- # nonexistent_id = str(uuid.uuid4())
- # result = await async_invoke(
- # runner, list_collections, nonexistent_id, obj=client
- # )
- # stderr = result.stderr_bytes.decode()
- # assert (
- # "Document not found" in stderr
- # or "Document not found" in result.stdout_bytes.decode()
- # )
- @pytest.mark.asyncio
- async def test_delete_nonexistent_document():
- """Test deleting a document that doesn't exist."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- nonexistent_id = str(uuid.uuid4())
- result = await async_invoke(runner, delete, nonexistent_id, obj=client)
- stderr = result.stderr_bytes.decode()
- assert (
- "No entries found for deletion" in stderr
- or "No entries found for deletion" in result.stdout_bytes.decode()
- )
|