search_pipe.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import logging
  2. from abc import abstractmethod
  3. from typing import Any, AsyncGenerator, Optional, Union
  4. from uuid import UUID
  5. from core.base import AsyncPipe, AsyncState, ChunkSearchResult
  6. logger = logging.getLogger()
  7. class SearchPipe(AsyncPipe[ChunkSearchResult]):
  8. class SearchConfig(AsyncPipe.PipeConfig):
  9. name: str = "default_vector_search"
  10. filters: dict = {}
  11. limit: int = 10
  12. class Input(AsyncPipe.Input):
  13. message: Union[AsyncGenerator[str, None], str]
  14. def __init__(
  15. self,
  16. config: AsyncPipe.PipeConfig,
  17. *args,
  18. **kwargs,
  19. ):
  20. super().__init__(
  21. config,
  22. *args,
  23. **kwargs,
  24. )
  25. @abstractmethod
  26. async def search(
  27. self,
  28. query: str,
  29. search_settings: Any,
  30. *args: Any,
  31. **kwargs: Any,
  32. ) -> AsyncGenerator[ChunkSearchResult, None]:
  33. pass
  34. @abstractmethod
  35. async def _run_logic(
  36. self,
  37. input: AsyncPipe.Input,
  38. state: AsyncState,
  39. run_id: UUID,
  40. *args: Any,
  41. **kwargs,
  42. ) -> AsyncGenerator[ChunkSearchResult, None]:
  43. pass