generator_pipe.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from abc import abstractmethod
  2. from typing import Any, AsyncGenerator, Optional
  3. from uuid import UUID
  4. from core.base import AsyncState, CompletionProvider, DatabaseProvider
  5. from core.base.abstractions import GenerationConfig
  6. from core.base.pipes.base_pipe import AsyncPipe
  7. class GeneratorPipe(AsyncPipe):
  8. class PipeConfig(AsyncPipe.PipeConfig):
  9. name: str
  10. task_prompt: str
  11. system_prompt: str = "default_system"
  12. def __init__(
  13. self,
  14. llm_provider: CompletionProvider,
  15. database_provider: DatabaseProvider,
  16. config: AsyncPipe.PipeConfig,
  17. *args,
  18. **kwargs,
  19. ):
  20. super().__init__(
  21. config,
  22. *args,
  23. **kwargs,
  24. )
  25. self.llm_provider = llm_provider
  26. self.database_provider = database_provider
  27. @abstractmethod
  28. async def _run_logic(
  29. self,
  30. input: AsyncPipe.Input,
  31. state: AsyncState,
  32. run_id: UUID,
  33. rag_generation_config: GenerationConfig,
  34. *args: Any,
  35. **kwargs: Any,
  36. ) -> AsyncGenerator[Any, None]:
  37. pass