123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- import asyncio
- import json
- import logging
- from copy import deepcopy
- from datetime import datetime
- from typing import (
- TYPE_CHECKING,
- Any,
- AsyncGenerator,
- Iterable,
- Optional,
- TypeVar,
- )
- from uuid import NAMESPACE_DNS, UUID, uuid4, uuid5
- from ..abstractions.search import (
- AggregateSearchResult,
- KGCommunityResult,
- KGEntityResult,
- KGRelationshipResult,
- )
- from ..abstractions.vector import VectorQuantizationType
- logger = logging.getLogger()
- def format_search_results_for_llm(results: AggregateSearchResult) -> str:
- formatted_results = []
- source_counter = 1
- if results.chunk_search_results:
- formatted_results.append("Vector Search Results:")
- for result in results.chunk_search_results:
- formatted_results.extend(
- (f"Source [{source_counter}]:", f"{result.text}")
- )
- source_counter += 1
- if results.graph_search_results:
- formatted_results.append("KG Search Results:")
- for kg_result in results.graph_search_results:
- try:
- formatted_results.extend((f"Source [{source_counter}]:",))
- except AttributeError:
- raise ValueError(f"Invalid KG search result: {kg_result}")
- # formatted_results.extend(
- # (
- # f"Source [{source_counter}]:",
- # f"Type: {kg_result.content.type}",
- # )
- # )
- if isinstance(kg_result.content, KGCommunityResult):
- formatted_results.extend(
- (
- f"Name: {kg_result.content.name}",
- f"Summary: {kg_result.content.summary}",
- # f"Rating: {kg_result.content.rating}",
- # f"Rating Explanation: {kg_result.content.rating_explanation}",
- # "Findings:",
- )
- )
- # formatted_results.append(
- # f"- {finding}" for finding in kg_result.content.findings
- # )
- elif isinstance(
- kg_result.content,
- KGEntityResult,
- ):
- formatted_results.extend(
- [
- f"Name: {kg_result.content.name}",
- f"Description: {kg_result.content.description}",
- ]
- )
- elif isinstance(kg_result.content, KGRelationshipResult):
- formatted_results.append(
- f"Relationship: {kg_result.content.subject} - {kg_result.content.predicate} - {kg_result.content.object}",
- # f"Description: {kg_result.content.description}"
- )
- if kg_result.metadata:
- formatted_results.append("Metadata:")
- formatted_results.extend(
- f"- {key}: {value}"
- for key, value in kg_result.metadata.items()
- )
- source_counter += 1
- if results.web_search_results:
- formatted_results.append("Web Search Results:")
- for result in results.web_search_results:
- formatted_results.extend(
- (
- f"Source [{source_counter}]:",
- f"Title: {result.title}",
- f"Link: {result.link}",
- f"Snippet: {result.snippet}",
- )
- )
- if result.date:
- formatted_results.append(f"Date: {result.date}")
- source_counter += 1
- return "\n".join(formatted_results)
- def format_search_results_for_stream(result: AggregateSearchResult) -> str:
- CHUNK_SEARCH_STREAM_MARKER = "chunk_search"
- GRAPH_SEARCH_STREAM_MARKER = "graph_search"
- WEB_SEARCH_STREAM_MARKER = "web_search"
- context = ""
- if result.chunk_search_results:
- context += f"<{CHUNK_SEARCH_STREAM_MARKER}>"
- vector_results_list = [
- result.as_dict() for result in result.chunk_search_results
- ]
- context += json.dumps(vector_results_list, default=str)
- context += f"</{CHUNK_SEARCH_STREAM_MARKER}>"
- if result.graph_search_results:
- context += f"<{GRAPH_SEARCH_STREAM_MARKER}>"
- kg_results_list = [
- result.dict() for result in result.graph_search_results
- ]
- context += json.dumps(kg_results_list, default=str)
- context += f"</{GRAPH_SEARCH_STREAM_MARKER}>"
- if result.web_search_results:
- context += f"<{WEB_SEARCH_STREAM_MARKER}>"
- web_results_list = [
- result.to_dict() for result in result.web_search_results
- ]
- context += json.dumps(web_results_list, default=str)
- context += f"</{WEB_SEARCH_STREAM_MARKER}>"
- return context
- if TYPE_CHECKING:
- from ..pipeline.base_pipeline import AsyncPipeline
- def _generate_id_from_label(label) -> UUID:
- return uuid5(NAMESPACE_DNS, label)
- def generate_id(label: Optional[str] = None) -> UUID:
- """
- Generates a unique run id
- """
- return _generate_id_from_label(label if label != None else str(uuid4()))
- # def generate_id(label: Optional[str]= None) -> UUID:
- # """
- # Generates a unique run id
- # """
- # return _generate_id_from_label(str(uuid4(label)))
- def generate_document_id(filename: str, user_id: UUID) -> UUID:
- """
- Generates a unique document id from a given filename and user id
- """
- return _generate_id_from_label(f'{filename.split("/")[-1]}-{str(user_id)}')
- def generate_extraction_id(
- document_id: UUID, iteration: int = 0, version: str = "0"
- ) -> UUID:
- """
- Generates a unique extraction id from a given document id and iteration
- """
- return _generate_id_from_label(f"{str(document_id)}-{iteration}-{version}")
- def generate_default_user_collection_id(user_id: UUID) -> UUID:
- """
- Generates a unique collection id from a given user id
- """
- return _generate_id_from_label(str(user_id))
- def generate_user_id(email: str) -> UUID:
- """
- Generates a unique user id from a given email
- """
- return _generate_id_from_label(email)
- def generate_default_prompt_id(prompt_name: str) -> UUID:
- """
- Generates a unique prompt id
- """
- return _generate_id_from_label(prompt_name)
- def generate_entity_document_id() -> UUID:
- """
- Generates a unique document id inserting entities into a graph
- """
- generation_time = datetime.now().isoformat()
- return _generate_id_from_label(f"entity-{generation_time}")
- async def to_async_generator(
- iterable: Iterable[Any],
- ) -> AsyncGenerator[Any, None]:
- for item in iterable:
- yield item
- def run_pipeline(pipeline: "AsyncPipeline", input: Any, *args, **kwargs):
- if not isinstance(input, AsyncGenerator):
- if not isinstance(input, list):
- input = to_async_generator([input])
- else:
- input = to_async_generator(input)
- async def _run_pipeline(input, *args, **kwargs):
- return await pipeline.run(input, *args, **kwargs)
- return asyncio.run(_run_pipeline(input, *args, **kwargs))
- def increment_version(version: str) -> str:
- prefix = version[:-1]
- suffix = int(version[-1])
- return f"{prefix}{suffix + 1}"
- def decrement_version(version: str) -> str:
- prefix = version[:-1]
- suffix = int(version[-1])
- return f"{prefix}{max(0, suffix - 1)}"
- def llm_cost_per_million_tokens(
- model: str, input_output_ratio: float = 2
- ) -> float:
- """
- Returns the cost per million tokens for a given model and input/output ratio.
- Input/Output ratio is the ratio of input tokens to output tokens.
- """
- # improving this to use provider in the future
- model = model.split("/")[-1] # simplifying assumption
- cost_dict = {
- "gpt-4o-mini": (0.15, 0.6),
- "gpt-4o": (2.5, 10),
- }
- if model in cost_dict:
- return (
- cost_dict[model][0] * input_output_ratio * cost_dict[model][1]
- ) / (1 + input_output_ratio)
- else:
- # use gpt-4o as default
- logger.warning(f"Unknown model: {model}. Using gpt-4o as default.")
- return (
- cost_dict["gpt-4o"][0]
- * input_output_ratio
- * cost_dict["gpt-4o"][1]
- ) / (1 + input_output_ratio)
- def validate_uuid(uuid_str: str) -> UUID:
- return UUID(uuid_str)
- def update_settings_from_dict(server_settings, settings_dict: dict):
- """
- Updates a settings object with values from a dictionary.
- """
- settings = deepcopy(server_settings)
- for key, value in settings_dict.items():
- if value is not None:
- if isinstance(value, dict):
- for k, v in value.items():
- if isinstance(getattr(settings, key), dict):
- getattr(settings, key)[k] = v
- else:
- setattr(getattr(settings, key), k, v)
- else:
- setattr(settings, key, value)
- return settings
- def _decorate_vector_type(
- input_str: str,
- quantization_type: VectorQuantizationType = VectorQuantizationType.FP32,
- ) -> str:
- return f"{quantization_type.db_type}{input_str}"
- def _get_str_estimation_output(x: tuple[Any, Any]) -> str:
- if isinstance(x[0], int) and isinstance(x[1], int):
- return " - ".join(map(str, x))
- else:
- return " - ".join(f"{round(a, 2)}" for a in x)
- KeyType = TypeVar("KeyType")
- def deep_update(
- mapping: dict[KeyType, Any], *updating_mappings: dict[KeyType, Any]
- ) -> dict[KeyType, Any]:
- """
- Taken from Pydantic v1:
- https://github.com/pydantic/pydantic/blob/fd2991fe6a73819b48c906e3c3274e8e47d0f761/pydantic/utils.py#L200
- """
- updated_mapping = mapping.copy()
- for updating_mapping in updating_mappings:
- for k, v in updating_mapping.items():
- if (
- k in updated_mapping
- and isinstance(updated_mapping[k], dict)
- and isinstance(v, dict)
- ):
- updated_mapping[k] = deep_update(updated_mapping[k], v)
- else:
- updated_mapping[k] = v
- return updated_mapping
|