123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622 |
- import time
- import uuid
- import pytest
- from r2r import R2RClient, R2RException
- @pytest.fixture
- def cleanup_documents(client: R2RClient):
- doc_ids = []
- def _track_document(doc_id):
- doc_ids.append(doc_id)
- return doc_id
- yield _track_document
- # Cleanup all documents
- for doc_id in doc_ids:
- try:
- client.documents.delete(id=doc_id)
- except R2RException:
- pass
- def test_create_document_with_file(client: R2RClient, cleanup_documents):
- results = client.documents.create(
- file_path="core/examples/data/aristotle.txt",
- run_with_orchestration=False,
- ).results
- doc_id = cleanup_documents(results.document_id)
- assert results.document_id, "No document_id returned after file ingestion"
- def test_create_document_with_raw_text(client: R2RClient, cleanup_documents):
- resp = client.documents.create(raw_text="This is raw text content.",
- run_with_orchestration=False)
- results = resp.results
- doc_id = cleanup_documents(results.document_id)
- assert doc_id, "No document_id returned after raw text ingestion"
- # Verify retrieval
- retrieved = client.documents.retrieve(id=doc_id)
- retrieved_results = retrieved.results
- assert retrieved_results.id == doc_id, (
- "Failed to retrieve the ingested raw text document")
- def test_create_document_with_chunks(client: R2RClient, cleanup_documents):
- suffix = str(uuid.uuid4())[:8]
- resp = client.documents.create(
- chunks=[f"Chunk one{suffix}", f"Chunk two{suffix}"],
- run_with_orchestration=False,
- )
- results = resp.results
- doc_id = cleanup_documents(results.document_id)
- assert doc_id, "No document_id returned after chunk ingestion"
- retrieved = client.documents.retrieve(id=doc_id)
- retrieved_results = retrieved.results
- assert retrieved_results.id == doc_id, (
- "Failed to retrieve the chunk-based document")
- def test_create_document_different_modes(client: R2RClient, cleanup_documents):
- # hi-res mode
- hi_res_resp = client.documents.create(
- raw_text="High resolution doc.",
- ingestion_mode="hi-res",
- run_with_orchestration=False,
- ).results
- hi_res_id = cleanup_documents(hi_res_resp.document_id)
- assert hi_res_id, "No doc_id returned for hi-res ingestion"
- # fast mode
- fast_resp = client.documents.create(
- raw_text="Fast mode doc.",
- ingestion_mode="fast",
- run_with_orchestration=False,
- ).results
- fast_id = cleanup_documents(fast_resp.document_id)
- assert fast_id, "No doc_id returned for fast ingestion"
- def test_list_documents(client: R2RClient, test_document):
- results = client.documents.list(offset=0, limit=10).results
- assert isinstance(results, list), "Documents list response is not a list"
- assert len(results) >= 1, "Expected at least one document"
- # test_document is created for this test, so we expect at least that one present.
- def test_retrieve_document(client: R2RClient, test_document):
- retrieved = client.documents.retrieve(id=test_document).results
- assert retrieved.id == test_document, "Retrieved wrong document"
- def test_download_document(client: R2RClient, test_document):
- # For text-only documents, the endpoint returns text as a buffer
- content = client.documents.download(id=test_document)
- assert content, "Failed to download document content"
- data = content.getvalue()
- assert len(data) > 0, "Document content is empty"
- def test_delete_document(client: R2RClient):
- # Create a doc to delete
- resp = client.documents.create(raw_text="This is a temporary doc",
- run_with_orchestration=False).results
- doc_id = resp.document_id
- del_resp = client.documents.delete(id=doc_id).results
- assert del_resp.success, "Failed to delete document"
- # Verify it's gone
- with pytest.raises(R2RException) as exc_info:
- client.documents.retrieve(id=doc_id)
- assert exc_info.value.status_code == 404, "Expected 404 after deletion"
- def test_delete_document_by_filter(client: R2RClient):
- # Create a doc with unique metadata
- resp = client.documents.create(
- raw_text="Document to be filtered out",
- metadata={
- "to_delete": "yes"
- },
- run_with_orchestration=False,
- ).results
- doc_id = resp.document_id
- filters = {"to_delete": {"$eq": "yes"}}
- del_resp = client.documents.delete_by_filter(filters).results
- assert del_resp.success, "Failed to delete documents by filter"
- # Verify deletion
- with pytest.raises(R2RException) as exc_info:
- client.documents.retrieve(id=doc_id)
- assert exc_info.value.status_code == 404, (
- "Document still exists after filter-based deletion")
- # @pytest.mark.skip(reason="Only if superuser-specific logic is implemented")
- def test_list_document_collections(client: R2RClient, test_document):
- # This test assumes the currently logged in user is a superuser
- collections = client.documents.list_collections(id=test_document).results
- assert isinstance(collections,
- list), ("Document collections list is not a list")
- # @pytest.mark.skip(
- # reason="Requires actual entity extraction logic implemented and superuser access"
- # )
- def test_extract_document(client: R2RClient, test_document):
- time.sleep(10)
- run_resp = client.documents.extract(id=test_document,
- run_with_orchestration=False).results
- assert run_resp.message is not None, "No message after extraction run"
- # @pytest.mark.skip(reason="Requires entity extraction results present")
- def test_list_entities(client: R2RClient, test_document):
- # If no entities extracted yet, this could raise an exception
- try:
- entities = client.documents.list_entities(id=test_document).results
- assert isinstance(entities, list), "Entities response not a list"
- except R2RException as e:
- # Possibly no entities extracted yet
- pytest.skip(f"No entities extracted yet: {str(e)}")
- # @pytest.mark.skip(reason="Requires relationship extraction results present")
- def test_list_relationships(client: R2RClient, test_document):
- try:
- relationships = client.documents.list_relationships(
- id=test_document).results
- assert isinstance(relationships,
- list), ("Relationships response not a list")
- except R2RException as e:
- pytest.skip(f"No relationships extracted yet: {str(e)}")
- def test_search_documents(client: R2RClient, test_document):
- # Add some delay if indexing takes time
- time.sleep(1)
- query = "Temporary"
- search_results = client.documents.search(query=query,
- search_mode="custom",
- search_settings={"limit": 5})
- assert search_results.results is not None, "Search results key not found"
- # We cannot guarantee a match, but at least we got a well-formed response
- assert isinstance(search_results.results,
- list), ("Search results not a list")
- def test_list_document_chunks(mutable_client: R2RClient, cleanup_documents):
- temp_user = f"{uuid.uuid4()}@me.com"
- mutable_client.users.create(temp_user, "password")
- mutable_client.users.login(temp_user, "password")
- resp = mutable_client.documents.create(
- chunks=["C1", "C2", "C3"], run_with_orchestration=False).results
- doc_id = cleanup_documents(resp.document_id)
- chunks_resp = mutable_client.documents.list_chunks(id=doc_id)
- results = chunks_resp.results
- assert len(results) == 3, "Expected 3 chunks"
- mutable_client.users.logout()
- def test_search_documents_extended(client: R2RClient, cleanup_documents):
- doc_id = cleanup_documents(
- client.documents.create(
- raw_text="Aristotle was a Greek philosopher.",
- run_with_orchestration=False,
- ).results.document_id)
- time.sleep(1) # If indexing is asynchronous
- search_results = client.documents.search(
- query="Greek philosopher",
- search_mode="basic",
- search_settings={"limit": 1},
- )
- assert search_results.results is not None, (
- "No results key in search response")
- assert len(search_results.results) > 0, "No documents found"
- def test_retrieve_document_not_found(client):
- bad_id = str(uuid.uuid4())
- with pytest.raises(R2RException) as exc_info:
- client.documents.retrieve(id=bad_id)
- assert exc_info.value.status_code == 404, "Wrong error code for not found"
- def test_delete_document_non_existent(client):
- bad_id = str(uuid.uuid4())
- with pytest.raises(R2RException) as exc_info:
- client.documents.delete(id=bad_id)
- assert exc_info.value.status_code == 404, (
- "Wrong error code for delete non-existent")
- # @pytest.mark.skip(reason="If your API restricts this endpoint to superusers")
- def test_get_document_collections_non_superuser(client):
- # Create a non-superuser client
- non_super_client = R2RClient(client.base_url)
- random_string = str(uuid.uuid4())
- non_super_client.users.create(f"{random_string}@me.com", "password")
- non_super_client.users.login(f"{random_string}@me.com", "password")
- document_id = str(uuid.uuid4()) # Some doc ID
- with pytest.raises(R2RException) as exc_info:
- non_super_client.documents.list_collections(id=document_id)
- assert exc_info.value.status_code == 403, (
- "Expected 403 for non-superuser collections access")
- def test_access_document_not_owned(client: R2RClient, cleanup_documents):
- # Create a doc as superuser
- doc_id = cleanup_documents(
- client.documents.create(
- raw_text="Owner doc test",
- run_with_orchestration=False).results.document_id)
- # Now try to access with a non-superuser
- non_super_client = R2RClient(client.base_url)
- random_string = str(uuid.uuid4())
- non_super_client.users.create(f"{random_string}@me.com", "password")
- non_super_client.users.login(f"{random_string}@me.com", "password")
- with pytest.raises(R2RException) as exc_info:
- non_super_client.documents.download(id=doc_id)
- assert exc_info.value.status_code == 403, (
- "Wrong error code for unauthorized access")
- def test_list_documents_with_pagination(mutable_client: R2RClient,
- cleanup_documents):
- temp_user = f"{uuid.uuid4()}@me.com"
- mutable_client.users.create(temp_user, "password")
- mutable_client.users.login(temp_user, "password")
- for i in range(3):
- cleanup_documents(
- mutable_client.documents.create(
- raw_text=f"Doc {i}",
- run_with_orchestration=False).results.document_id)
- listed = mutable_client.documents.list(limit=2, offset=0)
- results = listed.results
- assert len(results) == 2, "Expected 2 results for paginated listing"
- def test_ingest_invalid_chunks(client):
- invalid_chunks = ["Valid chunk", 12345, {"not": "a string"}]
- with pytest.raises(R2RException) as exc_info:
- client.documents.create(chunks=invalid_chunks,
- run_with_orchestration=False)
- assert exc_info.value.status_code in [
- 400,
- 422,
- ], "Expected validation error for invalid chunks"
- def test_ingest_too_many_chunks(client: R2RClient):
- excessive_chunks = ["Chunk"] * (1024 * 100 + 1) # Just over the limit
- with pytest.raises(R2RException) as exc_info:
- client.documents.create(chunks=excessive_chunks,
- run_with_orchestration=False)
- assert exc_info.value.status_code == 400, (
- "Wrong error code for exceeding max chunks")
- def test_chunk_size_and_overlap(client: R2RClient, cleanup_documents):
- test_text = "This is a test document with chunk size and overlap settings that we want to verify."
- document_id = cleanup_documents(
- client.documents.create(
- raw_text=test_text,
- ingestion_config={
- "chunk_size": 10,
- "chunk_overlap": 2,
- },
- run_with_orchestration=False
- ).results.document_id
- )
- time.sleep(1)
- chunks = client.documents.list_chunks(id=document_id).results
- assert len(chunks) > 0, "No chunks were created"
- # Verify each chunk respects the maximum size
- for chunk in chunks:
- assert len(chunk.text) <= 10, f"Chunk exceeds maximum size: '{chunk.text}'"
- long_text = "Here is a longer document that we can use to test larger chunk sizes and overlaps to ensure the chunking algorithm works properly across different configurations."
- document_id2 = cleanup_documents(
- client.documents.create(
- raw_text=long_text,
- ingestion_config={
- "chunk_size": 20,
- "chunk_overlap": 5,
- },
- run_with_orchestration=False
- ).results.document_id
- )
- chunks2 = client.documents.list_chunks(id=document_id2).results
- assert len(chunks2) > 0, "No chunks were created for the second document"
- for chunk in chunks2:
- assert len(chunk.text) <= 20, f"Chunk exceeds maximum size: '{chunk.text}'"
- def test_delete_by_complex_filter(client: R2RClient, cleanup_documents):
- doc1 = cleanup_documents(
- client.documents.create(
- raw_text="Doc with tag A",
- metadata={
- "tag": "A"
- },
- run_with_orchestration=False,
- ).results.document_id)
- doc2 = cleanup_documents(
- client.documents.create(
- raw_text="Doc with tag B",
- metadata={
- "tag": "B"
- },
- run_with_orchestration=False,
- ).results.document_id)
- filters = {"$or": [{"tag": {"$eq": "A"}}, {"tag": {"$eq": "B"}}]}
- del_resp = client.documents.delete_by_filter(filters).results
- assert del_resp.success, "Complex filter deletion failed"
- # Verify both documents are deleted
- for d_id in [doc1, doc2]:
- with pytest.raises(R2RException) as exc_info:
- client.documents.retrieve(d_id)
- assert exc_info.value.status_code == 404, (
- f"Document {d_id} still exists after deletion")
- def test_search_documents_no_match(client: R2RClient, cleanup_documents):
- doc_id = cleanup_documents(
- client.documents.create(
- raw_text="Just a random document",
- metadata={
- "category": "unrelated"
- },
- run_with_orchestration=False,
- ).results.document_id)
- # Search for non-existent category
- search_results = client.documents.search(
- query="nonexistent category",
- search_mode="basic",
- search_settings={
- "filters": {
- "category": {
- "$eq": "doesnotexist"
- }
- },
- "limit": 10,
- },
- )
- assert search_results.results is not None, "Search missing results key"
- assert len(search_results.results) == 0, "Expected zero results"
- import pytest
- def test_delete_by_workflow_metadata(client: R2RClient, cleanup_documents):
- """Test deletion by workflow state metadata."""
- # Create test documents with workflow metadata
- random_suffix = uuid.uuid4()
- docs = []
- try:
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Draft document 1" + str(random_suffix),
- metadata={
- "workflow": {
- "state": "draft",
- "assignee": "user1",
- "review_count": 0,
- }
- },
- run_with_orchestration=False,
- ).results.document_id))
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Draft document 2" + str(random_suffix),
- metadata={
- "workflow": {
- "state": "draft",
- "assignee": "user2",
- "review_count": 1,
- }
- },
- run_with_orchestration=False,
- ).results.document_id))
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Published document" + str(random_suffix),
- metadata={
- "workflow": {
- "state": "published",
- "assignee": "user1",
- "review_count": 2,
- }
- },
- run_with_orchestration=False,
- ).results.document_id))
- # Delete drafts with no reviews
- filters = {
- "$and": [
- {
- "metadata.workflow.state": {
- "$eq": "draft"
- }
- },
- {
- "metadata.workflow.review_count": {
- "$eq": 0
- }
- },
- ]
- }
- response = client.documents.delete_by_filter(filters).results
- assert response.success
- # Verify first draft is deleted
- with pytest.raises(R2RException) as exc:
- client.documents.retrieve(id=docs[0])
- assert exc.value.status_code == 404
- # Verify other documents still exist
- assert client.documents.retrieve(id=docs[1])
- assert client.documents.retrieve(id=docs[2])
- except Exception:
- raise
- def test_delete_by_classification_metadata(client: R2RClient,
- cleanup_documents):
- """Test deletion by document classification metadata."""
- docs = []
- try:
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Confidential document",
- metadata={
- "classification": {
- "level": "confidential",
- "department": "HR",
- "retention_years": 7,
- }
- },
- run_with_orchestration=False,
- ).results.document_id))
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Public document",
- metadata={
- "classification": {
- "level": "public",
- "department": "Marketing",
- "retention_years": 1,
- }
- },
- run_with_orchestration=False,
- ).results.document_id))
- # Delete HR documents with high retention
- filters = {
- "$and": [
- {
- "classification.department": {
- "$eq": "HR"
- }
- },
- {
- "classification.retention_years": {
- "$gt": 5
- }
- },
- ]
- }
- response = client.documents.delete_by_filter(filters).results
- assert response.success
- # Verify confidential HR doc is deleted
- with pytest.raises(R2RException) as exc:
- client.documents.retrieve(id=docs[0])
- assert exc.value.status_code == 404
- # Verify public doc still exists
- assert client.documents.retrieve(id=docs[1])
- except Exception:
- raise
- def test_delete_by_version_metadata(client: R2RClient, cleanup_documents):
- """Test deletion by version and status metadata with array conditions."""
- suffix = uuid.uuid4()
- docs = []
- try:
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Old version document" + str(suffix),
- metadata={
- "version_info": {
- "number": "1.0.0",
- "status": "deprecated",
- "tags": ["legacy", "unsupported"],
- },
- },
- run_with_orchestration=False,
- ).results.document_id))
- docs.append(
- cleanup_documents(
- client.documents.create(
- raw_text="Current version document" + str(suffix),
- metadata={
- "version_info": {
- "number": "2.0.0",
- "status": "current",
- "tags": ["stable", "supported"],
- },
- },
- run_with_orchestration=False,
- ).results.document_id))
- # Delete deprecated documents with legacy tag
- filters = {
- "$and": [
- {
- "metadata.version_info.status": {
- "$eq": "deprecated"
- }
- },
- {
- "metadata.version_info.tags": {
- "$in": ["legacy"]
- }
- },
- ]
- }
- response = client.documents.delete_by_filter(filters).results
- assert response.success
- # Verify deprecated doc is deleted
- with pytest.raises(R2RException) as exc:
- doc = client.documents.retrieve(id=docs[0])
- print('doc = ', doc)
- assert exc.value.status_code == 404
- # Verify current doc still exists
- assert client.documents.retrieve(id=docs[1])
- except Exception:
- raise
|