123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- # Add to your local machine with `mcp install r2r/mcp.py -v R2R_API_URL=http://localhost:7272` or so.
- from r2r import R2RClient
- def id_to_shorthand(id: str) -> str:
- return str(id)[:7]
- def format_search_results_for_llm(
- results,
- ) -> str:
- """
- Instead of resetting 'source_counter' to 1, we:
- - For each chunk / graph / web / doc in `results`,
- - Find the aggregator index from the collector,
- - Print 'Source [X]:' with that aggregator index.
- """
- lines = []
- # We'll build a quick helper to locate aggregator indices for each object:
- # Or you can rely on the fact that we've added them to the collector
- # in the same order. But let's do a "lookup aggregator index" approach:
- # 1) Chunk search
- if results.chunk_search_results:
- lines.append("Vector Search Results:")
- for c in results.chunk_search_results:
- lines.append(f"Source ID [{id_to_shorthand(c.id)}]:")
- lines.append(c.text or "") # or c.text[:200] to truncate
- # 2) Graph search
- if results.graph_search_results:
- lines.append("Graph Search Results:")
- for g in results.graph_search_results:
- lines.append(f"Source ID [{id_to_shorthand(g.id)}]:")
- if hasattr(g.content, "summary"):
- lines.append(f"Community Name: {g.content.name}")
- lines.append(f"ID: {g.content.id}")
- lines.append(f"Summary: {g.content.summary}")
- # etc. ...
- elif hasattr(g.content, "name") and hasattr(
- g.content, "description"
- ):
- lines.append(f"Entity Name: {g.content.name}")
- lines.append(f"Description: {g.content.description}")
- elif (
- hasattr(g.content, "subject")
- and hasattr(g.content, "predicate")
- and hasattr(g.content, "object")
- ):
- lines.append(
- f"Relationship: {g.content.subject}-{g.content.predicate}-{g.content.object}"
- )
- # Add metadata if needed
- # 3) Web search
- if results.web_search_results:
- lines.append("Web Search Results:")
- for w in results.web_search_results:
- lines.append(f"Source ID [{id_to_shorthand(w.id)}]:")
- lines.append(f"Title: {w.title}")
- lines.append(f"Link: {w.link}")
- lines.append(f"Snippet: {w.snippet}")
- # 4) Local context docs
- if results.document_search_results:
- lines.append("Local Context Documents:")
- for doc_result in results.document_search_results:
- doc_title = doc_result.title or "Untitled Document"
- doc_id = doc_result.id
- summary = doc_result.summary
- lines.append(f"Full Document ID: {doc_id}")
- lines.append(f"Shortened Document ID: {id_to_shorthand(doc_id)}")
- lines.append(f"Document Title: {doc_title}")
- if summary:
- lines.append(f"Summary: {summary}")
- if doc_result.chunks:
- # Then each chunk inside:
- for chunk in doc_result.chunks:
- lines.append(
- f"\nChunk ID {id_to_shorthand(chunk['id'])}:\n{chunk['text']}"
- )
- result = "\n".join(lines)
- return result
- # Create a FastMCP server
- try:
- from mcp.server.fastmcp import FastMCP
- mcp = FastMCP("R2R Retrieval System")
- except Exception as e:
- raise ImportError(
- "MCP is not installed. Please run `pip install mcp`"
- ) from e
- # Pass lifespan to server
- mcp = FastMCP("R2R Retrieval System")
- # RAG query tool
- @mcp.tool()
- async def search(query: str) -> str:
- """
- Performs a
- Args:
- query: The question to answer using the knowledge base
- Returns:
- A response generated based on relevant context from the knowledge base
- """
- client = R2RClient()
- # Call the RAG endpoint
- search_response = client.retrieval.search(
- query=query,
- )
- return format_search_results_for_llm(search_response.results)
- # RAG query tool
- @mcp.tool()
- async def rag(query: str) -> str:
- """
- Perform a Retrieval-Augmented Generation query
- Args:
- query: The question to answer using the knowledge base
- Returns:
- A response generated based on relevant context from the knowledge base
- """
- client = R2RClient()
- # Call the RAG endpoint
- rag_response = client.retrieval.rag(
- query=query,
- )
- return rag_response.results.generated_answer # type: ignore
- # Run the server if executed directly
- if __name__ == "__main__":
- mcp.run()
|