search_pipe.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import logging
  2. from abc import abstractmethod
  3. from typing import Any, AsyncGenerator
  4. from uuid import UUID
  5. from core.base import AsyncPipe, AsyncState, ChunkSearchResult
  6. logger = logging.getLogger()
  7. class SearchPipe(AsyncPipe[ChunkSearchResult]):
  8. class SearchConfig(AsyncPipe.PipeConfig):
  9. name: str = "default_vector_search"
  10. filters: dict = {}
  11. limit: int = 10
  12. class Input(AsyncPipe.Input):
  13. message: AsyncGenerator[str, None] | str
  14. def __init__(
  15. self,
  16. config: AsyncPipe.PipeConfig,
  17. *args,
  18. **kwargs,
  19. ):
  20. super().__init__(
  21. config,
  22. *args,
  23. **kwargs,
  24. )
  25. @abstractmethod
  26. async def search(
  27. self,
  28. query: str,
  29. search_settings: Any,
  30. *args: Any,
  31. **kwargs: Any,
  32. ) -> AsyncGenerator[ChunkSearchResult, None]:
  33. pass
  34. @abstractmethod
  35. async def _run_logic(
  36. self,
  37. input: AsyncPipe.Input,
  38. state: AsyncState,
  39. run_id: UUID,
  40. *args: Any,
  41. **kwargs,
  42. ) -> AsyncGenerator[ChunkSearchResult, None]:
  43. pass