123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- """
- Tests for the graphs commands in the CLI.
- - list
- - retrieve
- - reset
- - update
- - list-entities
- - get-entity
- x remove-entity
- - list-relationships
- - get-relationship
- x remove-relationship
- - build
- - list-communities
- - get-community
- x update-community
- x delete-community
- - pull
- - remove-document
- """
- import json
- import uuid
- import pytest
- from click.testing import CliRunner
- from cli.commands.collections import create as create_collection
- from cli.commands.graphs import (
- build,
- delete_community,
- get_community,
- get_entity,
- get_relationship,
- list,
- list_communities,
- list_entities,
- list_relationships,
- pull,
- remove_document,
- remove_entity,
- remove_relationship,
- reset,
- retrieve,
- update,
- update_community,
- )
- from r2r import R2RAsyncClient
- from tests.cli.async_invoke import async_invoke
- def extract_json_block(output: str) -> dict:
- """Extract and parse the first valid JSON object found in the output."""
- start = output.find("{")
- if start == -1:
- raise ValueError("No JSON object start found in output")
- brace_count = 0
- for i, char in enumerate(output[start:], start=start):
- if char == "{":
- brace_count += 1
- elif char == "}":
- brace_count -= 1
- if brace_count == 0:
- json_str = output[start : i + 1].strip()
- return json.loads(json_str)
- raise ValueError("No complete JSON object found in output")
- async def create_test_collection(
- runner: CliRunner, client: R2RAsyncClient
- ) -> str:
- """Helper function to create a test collection and return its ID."""
- collection_name = f"test-collection-{uuid.uuid4()}"
- create_result = await async_invoke(
- runner, create_collection, collection_name, obj=client
- )
- response = extract_json_block(create_result.stdout_bytes.decode())
- return response["results"]["id"]
- @pytest.mark.asyncio
- async def test_graph_basic_operations():
- """Test basic graph operations: retrieve, reset, update."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- collection_id = await create_test_collection(runner, client)
- try:
- # Retrieve graph
- retrieve_result = await async_invoke(
- runner, retrieve, collection_id, obj=client
- )
- assert retrieve_result.exit_code == 0
- assert collection_id in retrieve_result.stdout_bytes.decode()
- # Update graph
- new_name = "Updated Graph Name"
- new_description = "Updated description"
- update_result = await async_invoke(
- runner,
- update,
- collection_id,
- "--name",
- new_name,
- "--description",
- new_description,
- obj=client,
- )
- assert update_result.exit_code == 0
- # Reset graph
- reset_result = await async_invoke(
- runner, reset, collection_id, obj=client
- )
- assert reset_result.exit_code == 0
- finally:
- # Cleanup will be handled by collection deletion
- pass
- @pytest.mark.asyncio
- async def test_graph_entity_operations():
- """Test entity-related operations in a graph."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- collection_id = await create_test_collection(runner, client)
- try:
- # List entities (empty initially)
- list_entities_result = await async_invoke(
- runner, list_entities, collection_id, obj=client
- )
- assert list_entities_result.exit_code == 0
- # Test with pagination
- paginated_result = await async_invoke(
- runner,
- list_entities,
- collection_id,
- "--offset",
- "0",
- "--limit",
- "2",
- obj=client,
- )
- assert paginated_result.exit_code == 0
- # Test nonexistent entity operations
- nonexistent_entity_id = str(uuid.uuid4())
- get_entity_result = await async_invoke(
- runner,
- get_entity,
- collection_id,
- nonexistent_entity_id,
- obj=client,
- )
- assert "not found" in get_entity_result.stderr_bytes.decode().lower()
- finally:
- # Cleanup will be handled by collection deletion
- pass
- @pytest.mark.asyncio
- async def test_graph_relationship_operations():
- """Test relationship-related operations in a graph."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- collection_id = await create_test_collection(runner, client)
- try:
- # List relationships
- list_rel_result = await async_invoke(
- runner, list_relationships, collection_id, obj=client
- )
- assert list_rel_result.exit_code == 0
- # Test with pagination
- paginated_result = await async_invoke(
- runner,
- list_relationships,
- collection_id,
- "--offset",
- "0",
- "--limit",
- "2",
- obj=client,
- )
- assert paginated_result.exit_code == 0
- # Test nonexistent relationship operations
- nonexistent_rel_id = str(uuid.uuid4())
- get_rel_result = await async_invoke(
- runner,
- get_relationship,
- collection_id,
- nonexistent_rel_id,
- obj=client,
- )
- assert "not found" in get_rel_result.stderr_bytes.decode().lower()
- finally:
- # Cleanup will be handled by collection deletion
- pass
- @pytest.mark.asyncio
- async def test_graph_community_operations():
- """Test community-related operations in a graph."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- collection_id = await create_test_collection(runner, client)
- try:
- # List communities
- list_comm_result = await async_invoke(
- runner, list_communities, collection_id, obj=client
- )
- assert list_comm_result.exit_code == 0
- # Test with pagination
- paginated_result = await async_invoke(
- runner,
- list_communities,
- collection_id,
- "--offset",
- "0",
- "--limit",
- "2",
- obj=client,
- )
- assert paginated_result.exit_code == 0
- # Test nonexistent community operations
- nonexistent_comm_id = str(uuid.uuid4())
- get_comm_result = await async_invoke(
- runner,
- get_community,
- collection_id,
- nonexistent_comm_id,
- obj=client,
- )
- assert "not found" in get_comm_result.stderr_bytes.decode().lower()
- finally:
- # Cleanup will be handled by collection deletion
- pass
- @pytest.mark.asyncio
- async def test_graph_build_and_pull():
- """Test graph building and document pull operations."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- collection_id = await create_test_collection(runner, client)
- try:
- # Test build with minimal settings
- settings = {"some_setting": "value"}
- build_result = await async_invoke(
- runner,
- build,
- collection_id,
- "--settings",
- json.dumps(settings),
- obj=client,
- )
- assert build_result.exit_code == 0
- # Test pull documents
- pull_result = await async_invoke(
- runner, pull, collection_id, obj=client
- )
- assert pull_result.exit_code == 0
- # Test remove document (with nonexistent document)
- nonexistent_doc_id = str(uuid.uuid4())
- remove_doc_result = await async_invoke(
- runner,
- remove_document,
- collection_id,
- nonexistent_doc_id,
- obj=client,
- )
- assert "not found" in remove_doc_result.stderr_bytes.decode().lower()
- finally:
- # Cleanup will be handled by collection deletion
- pass
- @pytest.mark.asyncio
- async def test_list_graphs():
- """Test listing graphs."""
- client = R2RAsyncClient(base_url="http://localhost:7272")
- runner = CliRunner(mix_stderr=False)
- try:
- # Test basic list
- list_result = await async_invoke(runner, list, obj=client)
- assert list_result.exit_code == 0
- finally:
- # Cleanup will be handled by collection deletion
- pass
|