123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- import json
- import os
- import tempfile
- import uuid
- from urllib.parse import urlparse
- import asyncclick as click
- import requests
- from asyncclick import pass_context
- from cli.utils.param_types import JSON
- from cli.utils.timer import timer
- from r2r import R2RAsyncClient
- @click.group()
- def documents():
- """Documents commands."""
- pass
- @documents.command()
- @click.argument(
- "file_paths", nargs=-1, required=True, type=click.Path(exists=True)
- )
- @click.option("--ids", multiple=True, help="Document IDs for ingestion")
- @click.option(
- "--metadatas", type=JSON, help="Metadatas for ingestion as a JSON string"
- )
- @click.option(
- "--run-without-orchestration", is_flag=True, help="Run with orchestration"
- )
- @pass_context
- async def create(ctx, file_paths, ids, metadatas, run_without_orchestration):
- """Ingest files into R2R."""
- client: R2RAsyncClient = ctx.obj
- run_with_orchestration = not run_without_orchestration
- responses = []
- for idx, file_path in enumerate(file_paths):
- with timer():
- current_id = [ids[idx]] if ids and idx < len(ids) else None
- current_metadata = (
- metadatas[idx] if metadatas and idx < len(metadatas) else None
- )
- click.echo(
- f"Processing file {idx + 1}/{len(file_paths)}: {file_path}"
- )
- response = await client.documents.create(
- file_path=file_path,
- metadata=current_metadata,
- id=current_id,
- run_with_orchestration=run_with_orchestration,
- )
- responses.append(response)
- click.echo(json.dumps(response, indent=2))
- click.echo("-" * 40)
- click.echo(f"\nProcessed {len(responses)} files successfully.")
- @documents.command()
- @click.argument("file_path", required=True, type=click.Path(exists=True))
- @click.option("--id", required=True, help="Existing document ID to update")
- @click.option(
- "--metadata", type=JSON, help="Metadatas for ingestion as a JSON string"
- )
- @click.option(
- "--run-without-orchestration", is_flag=True, help="Run with orchestration"
- )
- @pass_context
- async def update(ctx, file_path, id, metadata, run_without_orchestration):
- """Update an existing file in R2R."""
- client: R2RAsyncClient = ctx.obj
- run_with_orchestration = not run_without_orchestration
- responses = []
- with timer():
- click.echo(f"Updating file {id}: {file_path}")
- response = await client.documents.update(
- file_path=file_path,
- metadata=metadata,
- id=id,
- run_with_orchestration=run_with_orchestration,
- )
- responses.append(response)
- click.echo(json.dumps(response, indent=2))
- click.echo("-" * 40)
- click.echo(f"Updated file {id} file successfully.")
- @documents.command()
- @click.argument("id", required=True, type=str)
- @pass_context
- async def retrieve(ctx, id):
- """Retrieve a document by ID."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.retrieve(id=id)
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.argument("id", required=True, type=str)
- @pass_context
- async def delete(ctx, id):
- """Delete a document by ID."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.delete(id=id)
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.argument("id", required=True, type=str)
- @click.option(
- "--offset",
- default=0,
- help="The offset to start from. Defaults to 0.",
- )
- @click.option(
- "--limit",
- default=100,
- help="The maximum number of nodes to return. Defaults to 100.",
- )
- @pass_context
- async def list_chunks(ctx, id, offset, limit):
- """List collections for a specific document."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.list_chunks(
- id=id,
- offset=offset,
- limit=limit,
- )
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.argument("id", required=True, type=str)
- @click.option(
- "--offset",
- default=0,
- help="The offset to start from. Defaults to 0.",
- )
- @click.option(
- "--limit",
- default=100,
- help="The maximum number of nodes to return. Defaults to 100.",
- )
- @pass_context
- async def list_collections(ctx, id, offset, limit):
- """List collections for a specific document."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.list_collections(
- id=id,
- offset=offset,
- limit=limit,
- )
- click.echo(json.dumps(response, indent=2))
- # TODO
- async def ingest_files_from_urls(client, urls):
- """Download and ingest files from given URLs."""
- files_to_ingest = []
- metadatas = []
- document_ids = []
- temp_files = []
- try:
- for url in urls:
- filename = os.path.basename(urlparse(url).path)
- is_pdf = filename.lower().endswith(".pdf")
- temp_file = tempfile.NamedTemporaryFile(
- mode="wb" if is_pdf else "w+",
- delete=False,
- suffix=f"_{filename}",
- )
- temp_files.append(temp_file)
- response = requests.get(url)
- response.raise_for_status()
- if is_pdf:
- temp_file.write(response.content)
- else:
- temp_file.write(response.text)
- temp_file.close()
- files_to_ingest.append(temp_file.name)
- metadatas.append({"title": filename})
- # TODO: use the utils function generate_document_id
- document_ids.append(str(uuid.uuid5(uuid.NAMESPACE_DNS, url)))
- for it, file in enumerate(files_to_ingest):
- click.echo(f"Ingesting file: {file}")
- response = await client.documents.create(
- file, metadata=metadatas[it], id=document_ids[it]
- )
- return response["results"]
- finally:
- # Clean up temporary files
- for temp_file in temp_files:
- os.unlink(temp_file.name)
- # Missing CLI Commands
- @documents.command()
- @click.argument("id", required=True, type=str)
- @click.option("--run-type", help="Extraction run type (estimate or run)")
- @click.option("--settings", type=JSON, help="Extraction settings as JSON")
- @click.option(
- "--run-without-orchestration",
- is_flag=True,
- help="Run without orchestration",
- )
- @pass_context
- async def extract(ctx, id, run_type, settings, run_without_orchestration):
- """Extract entities and relationships from a document."""
- client: R2RAsyncClient = ctx.obj
- run_with_orchestration = not run_without_orchestration
- with timer():
- response = await client.documents.extract(
- id=id,
- run_type=run_type,
- settings=settings,
- run_with_orchestration=run_with_orchestration,
- )
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.argument("id", required=True, type=str)
- @click.option(
- "--offset",
- default=0,
- help="The offset to start from. Defaults to 0.",
- )
- @click.option(
- "--limit",
- default=100,
- help="The maximum number of items to return. Defaults to 100.",
- )
- @click.option(
- "--include-embeddings",
- is_flag=True,
- help="Include embeddings in response",
- )
- @pass_context
- async def list_entities(ctx, id, offset, limit, include_embeddings):
- """List entities extracted from a document."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.list_entities(
- id=id,
- offset=offset,
- limit=limit,
- include_embeddings=include_embeddings,
- )
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.argument("id", required=True, type=str)
- @click.option(
- "--offset",
- default=0,
- help="The offset to start from. Defaults to 0.",
- )
- @click.option(
- "--limit",
- default=100,
- help="The maximum number of items to return. Defaults to 100.",
- )
- @click.option(
- "--entity-names",
- multiple=True,
- help="Filter by entity names",
- )
- @click.option(
- "--relationship-types",
- multiple=True,
- help="Filter by relationship types",
- )
- @pass_context
- async def list_relationships(
- ctx, id, offset, limit, entity_names, relationship_types
- ):
- """List relationships extracted from a document."""
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await client.documents.list_relationships(
- id=id,
- offset=offset,
- limit=limit,
- entity_names=list(entity_names) if entity_names else None,
- relationship_types=(
- list(relationship_types) if relationship_types else None
- ),
- )
- click.echo(json.dumps(response, indent=2))
- @documents.command()
- @click.option(
- "--v2", is_flag=True, help="use aristotle_v2.txt (a smaller file)"
- )
- @click.option(
- "--v3", is_flag=True, help="use aristotle_v3.txt (a larger file)"
- )
- @pass_context
- async def create_sample(ctx, v2=True, v3=False):
- """Ingest the first sample file into R2R."""
- sample_file_url = f"https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/aristotle.txt"
- client: R2RAsyncClient = ctx.obj
- with timer():
- response = await ingest_files_from_urls(client, [sample_file_url])
- click.echo(
- f"Sample file ingestion completed. Ingest files response:\n\n{response}"
- )
- @documents.command()
- @pass_context
- async def create_samples(ctx):
- """Ingest multiple sample files into R2R."""
- client: R2RAsyncClient = ctx.obj
- urls = [
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/pg_essay_3.html",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/pg_essay_4.html",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/pg_essay_5.html",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/lyft_2021.pdf",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/uber_2021.pdf",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/got.txt",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/pg_essay_1.html",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/pg_essay_2.html",
- "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/aristotle.txt",
- ]
- with timer():
- response = await ingest_files_from_urls(client, urls)
- click.echo(
- f"Sample files ingestion completed. Ingest files response:\n\n{response}"
- )
- @documents.command()
- @click.option("--ids", multiple=True, help="Document IDs to fetch")
- @click.option(
- "--offset",
- default=0,
- help="The offset to start from. Defaults to 0.",
- )
- @click.option(
- "--limit",
- default=100,
- help="The maximum number of nodes to return. Defaults to 100.",
- )
- @pass_context
- async def list(ctx, ids, offset, limit):
- """Get an overview of documents."""
- client: R2RAsyncClient = ctx.obj
- ids = list(ids) if ids else None
- with timer():
- response = await client.documents.list(
- ids=ids,
- offset=offset,
- limit=limit,
- )
- for document in response["results"]:
- click.echo(document)
|