factory.py 26 KB


  1. import logging
  2. import os
  3. from typing import Any, Optional, Union
  4. from core.agent import R2RRAGAgent, R2RStreamingRAGAgent
  5. from core.base import (
  6. AsyncPipe,
  7. AuthConfig,
  8. CompletionConfig,
  9. CompletionProvider,
  10. CryptoConfig,
  11. DatabaseConfig,
  12. EmailConfig,
  13. EmbeddingConfig,
  14. EmbeddingProvider,
  15. IngestionConfig,
  16. OrchestrationConfig,
  17. )
  18. from core.pipelines import RAGPipeline, SearchPipeline
  19. from core.pipes import GeneratorPipe, MultiSearchPipe, SearchPipe
  20. from core.providers.email.sendgrid import SendGridEmailProvider
  21. from ..abstractions import R2RAgents, R2RPipelines, R2RPipes, R2RProviders
  22. from ..config import R2RConfig
  23. logger = logging.getLogger()
  24. from core.database import PostgresDatabaseProvider
  25. from core.providers import ( # PostgresDatabaseProvider,
  26. AsyncSMTPEmailProvider,
  27. BCryptConfig,
  28. BCryptProvider,
  29. ConsoleMockEmailProvider,
  30. HatchetOrchestrationProvider,
  31. LiteLLMCompletionProvider,
  32. LiteLLMEmbeddingProvider,
  33. OllamaEmbeddingProvider,
  34. OpenAICompletionProvider,
  35. OpenAIEmbeddingProvider,
  36. R2RAuthProvider,
  37. R2RIngestionConfig,
  38. R2RIngestionProvider,
  39. SimpleOrchestrationProvider,
  40. SupabaseAuthProvider,
  41. UnstructuredIngestionConfig,
  42. UnstructuredIngestionProvider,
  43. )
  44. class R2RProviderFactory:
  45. def __init__(self, config: R2RConfig):
  46. self.config = config
  47. @staticmethod
  48. async def create_auth_provider(
  49. auth_config: AuthConfig,
  50. crypto_provider: BCryptProvider,
  51. database_provider: PostgresDatabaseProvider,
  52. email_provider: Union[
  53. AsyncSMTPEmailProvider,
  54. ConsoleMockEmailProvider,
  55. SendGridEmailProvider,
  56. ],
  57. *args,
  58. **kwargs,
  59. ) -> Union[R2RAuthProvider, SupabaseAuthProvider]:
  60. if auth_config.provider == "r2r":
  61. r2r_auth = R2RAuthProvider(
  62. auth_config, crypto_provider, database_provider, email_provider
  63. )
  64. await r2r_auth.initialize()
  65. return r2r_auth
  66. elif auth_config.provider == "supabase":
  67. return SupabaseAuthProvider(
  68. auth_config, crypto_provider, database_provider, email_provider
  69. )
  70. else:
  71. raise ValueError(
  72. f"Auth provider {auth_config.provider} not supported."
  73. )
  74. @staticmethod
  75. def create_crypto_provider(
  76. crypto_config: CryptoConfig, *args, **kwargs
  77. ) -> BCryptProvider:
  78. if crypto_config.provider == "bcrypt":
  79. return BCryptProvider(BCryptConfig(**crypto_config.dict()))
  80. else:
  81. raise ValueError(
  82. f"Crypto provider {crypto_config.provider} not supported."
  83. )
  84. @staticmethod
  85. def create_ingestion_provider(
  86. ingestion_config: IngestionConfig,
  87. database_provider: PostgresDatabaseProvider,
  88. llm_provider: Union[
  89. LiteLLMCompletionProvider, OpenAICompletionProvider
  90. ],
  91. *args,
  92. **kwargs,
  93. ) -> Union[R2RIngestionProvider, UnstructuredIngestionProvider]:
  94. config_dict = (
  95. ingestion_config.model_dump()
  96. if isinstance(ingestion_config, IngestionConfig)
  97. else ingestion_config
  98. )
  99. extra_fields = config_dict.pop("extra_fields", {})
  100. if config_dict["provider"] == "r2r":
  101. r2r_ingestion_config = R2RIngestionConfig(
  102. **config_dict, **extra_fields
  103. )
  104. return R2RIngestionProvider(
  105. r2r_ingestion_config, database_provider, llm_provider
  106. )
  107. elif config_dict["provider"] in [
  108. "unstructured_local",
  109. "unstructured_api",
  110. ]:
  111. unstructured_ingestion_config = UnstructuredIngestionConfig(
  112. **config_dict, **extra_fields
  113. )
  114. return UnstructuredIngestionProvider(
  115. unstructured_ingestion_config, database_provider, llm_provider
  116. )
  117. else:
  118. raise ValueError(
  119. f"Ingestion provider {ingestion_config.provider} not supported"
  120. )
  121. @staticmethod
  122. def create_orchestration_provider(
  123. config: OrchestrationConfig, *args, **kwargs
  124. ) -> Union[HatchetOrchestrationProvider, SimpleOrchestrationProvider]:
  125. if config.provider == "hatchet":
  126. orchestration_provider = HatchetOrchestrationProvider(config)
  127. orchestration_provider.get_worker("r2r-worker")
  128. return orchestration_provider
  129. elif config.provider == "simple":
  130. from core.providers import SimpleOrchestrationProvider
  131. return SimpleOrchestrationProvider(config)
  132. else:
  133. raise ValueError(
  134. f"Orchestration provider {config.provider} not supported"
  135. )
  136. async def create_database_provider(
  137. self,
  138. db_config: DatabaseConfig,
  139. crypto_provider: BCryptProvider,
  140. *args,
  141. **kwargs,
  142. ) -> PostgresDatabaseProvider:
  143. if not self.config.embedding.base_dimension:
  144. raise ValueError(
  145. "Embedding config must have a base dimension to initialize database."
  146. )
  147. dimension = self.config.embedding.base_dimension
  148. quantization_type = (
  149. self.config.embedding.quantization_settings.quantization_type
  150. )
  151. if db_config.provider == "postgres":
  152. from ...database.postgres import PostgresDatabaseProvider
  153. database_provider = PostgresDatabaseProvider(
  154. db_config,
  155. dimension,
  156. crypto_provider=crypto_provider,
  157. quantization_type=quantization_type,
  158. )
  159. await database_provider.initialize()
  160. return database_provider
  161. else:
  162. raise ValueError(
  163. f"Database provider {db_config.provider} not supported"
  164. )
  165. @staticmethod
  166. def create_embedding_provider(
  167. embedding: EmbeddingConfig, *args, **kwargs
  168. ) -> Union[
  169. LiteLLMEmbeddingProvider,
  170. OllamaEmbeddingProvider,
  171. OpenAIEmbeddingProvider,
  172. ]:
  173. embedding_provider: Optional[EmbeddingProvider] = None
  174. if embedding.provider == "openai":
  175. if not os.getenv("OPENAI_API_KEY"):
  176. raise ValueError(
  177. "Must set OPENAI_API_KEY in order to initialize OpenAIEmbeddingProvider."
  178. )
  179. from core.providers import OpenAIEmbeddingProvider
  180. embedding_provider = OpenAIEmbeddingProvider(embedding)
  181. elif embedding.provider == "litellm":
  182. from core.providers import LiteLLMEmbeddingProvider
  183. embedding_provider = LiteLLMEmbeddingProvider(embedding)
  184. elif embedding.provider == "ollama":
  185. from core.providers import OllamaEmbeddingProvider
  186. embedding_provider = OllamaEmbeddingProvider(embedding)
  187. else:
  188. raise ValueError(
  189. f"Embedding provider {embedding.provider} not supported"
  190. )
  191. return embedding_provider
  192. @staticmethod
  193. def create_llm_provider(
  194. llm_config: CompletionConfig, *args, **kwargs
  195. ) -> Union[LiteLLMCompletionProvider, OpenAICompletionProvider]:
  196. llm_provider: Optional[CompletionProvider] = None
  197. if llm_config.provider == "openai":
  198. llm_provider = OpenAICompletionProvider(llm_config)
  199. elif llm_config.provider == "litellm":
  200. llm_provider = LiteLLMCompletionProvider(llm_config)
  201. else:
  202. raise ValueError(
  203. f"Language model provider {llm_config.provider} not supported"
  204. )
  205. if not llm_provider:
  206. raise ValueError("Language model provider not found")
  207. return llm_provider
  208. @staticmethod
  209. async def create_email_provider(
  210. email_config: Optional[EmailConfig] = None, *args, **kwargs
  211. ) -> Union[
  212. AsyncSMTPEmailProvider, ConsoleMockEmailProvider, SendGridEmailProvider
  213. ]:
  214. """Creates an email provider based on configuration."""
  215. if not email_config:
  216. raise ValueError(
  217. f"No email configuration provided for email provider, please add `[email]` to your `r2r.toml`."
  218. )
  219. if email_config.provider == "smtp":
  220. return AsyncSMTPEmailProvider(email_config)
  221. elif email_config.provider == "console_mock":
  222. return ConsoleMockEmailProvider(email_config)
  223. elif email_config.provider == "sendgrid":
  224. return SendGridEmailProvider(email_config)
  225. else:
  226. raise ValueError(
  227. f"Email provider {email_config.provider} not supported."
  228. )
  229. async def create_providers(
  230. self,
  231. auth_provider_override: Optional[
  232. Union[R2RAuthProvider, SupabaseAuthProvider]
  233. ] = None,
  234. crypto_provider_override: Optional[BCryptProvider] = None,
  235. database_provider_override: Optional[PostgresDatabaseProvider] = None,
  236. email_provider_override: Optional[
  237. Union[
  238. AsyncSMTPEmailProvider,
  239. ConsoleMockEmailProvider,
  240. SendGridEmailProvider,
  241. ]
  242. ] = None,
  243. embedding_provider_override: Optional[
  244. Union[
  245. LiteLLMEmbeddingProvider,
  246. OpenAIEmbeddingProvider,
  247. OllamaEmbeddingProvider,
  248. ]
  249. ] = None,
  250. ingestion_provider_override: Optional[
  251. Union[R2RIngestionProvider, UnstructuredIngestionProvider]
  252. ] = None,
  253. llm_provider_override: Optional[
  254. Union[OpenAICompletionProvider, LiteLLMCompletionProvider]
  255. ] = None,
  256. orchestration_provider_override: Optional[Any] = None,
  257. *args,
  258. **kwargs,
  259. ) -> R2RProviders:
  260. embedding_provider = (
  261. embedding_provider_override
  262. or self.create_embedding_provider(
  263. self.config.embedding, *args, **kwargs
  264. )
  265. )
  266. llm_provider = llm_provider_override or self.create_llm_provider(
  267. self.config.completion, *args, **kwargs
  268. )
  269. crypto_provider = (
  270. crypto_provider_override
  271. or self.create_crypto_provider(self.config.crypto, *args, **kwargs)
  272. )
  273. database_provider = (
  274. database_provider_override
  275. or await self.create_database_provider(
  276. self.config.database, crypto_provider, *args, **kwargs
  277. )
  278. )
  279. ingestion_provider = (
  280. ingestion_provider_override
  281. or self.create_ingestion_provider(
  282. self.config.ingestion,
  283. database_provider,
  284. llm_provider,
  285. *args,
  286. **kwargs,
  287. )
  288. )
  289. email_provider = (
  290. email_provider_override
  291. or await self.create_email_provider(
  292. self.config.email, crypto_provider, *args, **kwargs
  293. )
  294. )
  295. auth_provider = (
  296. auth_provider_override
  297. or await self.create_auth_provider(
  298. self.config.auth,
  299. crypto_provider,
  300. database_provider,
  301. email_provider,
  302. *args,
  303. **kwargs,
  304. )
  305. )
  306. orchestration_provider = (
  307. orchestration_provider_override
  308. or self.create_orchestration_provider(self.config.orchestration)
  309. )
  310. return R2RProviders(
  311. auth=auth_provider,
  312. database=database_provider,
  313. embedding=embedding_provider,
  314. ingestion=ingestion_provider,
  315. llm=llm_provider,
  316. email=email_provider,
  317. orchestration=orchestration_provider,
  318. )
  319. class R2RPipeFactory:
  320. def __init__(self, config: R2RConfig, providers: R2RProviders):
  321. self.config = config
  322. self.providers = providers
  323. def create_pipes(
  324. self,
  325. parsing_pipe_override: Optional[AsyncPipe] = None,
  326. embedding_pipe_override: Optional[AsyncPipe] = None,
  327. kg_relationships_extraction_pipe_override: Optional[AsyncPipe] = None,
  328. kg_storage_pipe_override: Optional[AsyncPipe] = None,
  329. kg_search_pipe_override: Optional[AsyncPipe] = None,
  330. vector_storage_pipe_override: Optional[AsyncPipe] = None,
  331. vector_search_pipe_override: Optional[AsyncPipe] = None,
  332. rag_pipe_override: Optional[AsyncPipe] = None,
  333. streaming_rag_pipe_override: Optional[AsyncPipe] = None,
  334. kg_entity_description_pipe: Optional[AsyncPipe] = None,
  335. kg_clustering_pipe: Optional[AsyncPipe] = None,
  336. kg_entity_deduplication_pipe: Optional[AsyncPipe] = None,
  337. kg_entity_deduplication_summary_pipe: Optional[AsyncPipe] = None,
  338. kg_community_summary_pipe: Optional[AsyncPipe] = None,
  339. *args,
  340. **kwargs,
  341. ) -> R2RPipes:
  342. return R2RPipes(
  343. parsing_pipe=parsing_pipe_override
  344. or self.create_parsing_pipe(
  345. self.config.ingestion.excluded_parsers,
  346. *args,
  347. **kwargs,
  348. ),
  349. embedding_pipe=embedding_pipe_override
  350. or self.create_embedding_pipe(*args, **kwargs),
  351. kg_relationships_extraction_pipe=kg_relationships_extraction_pipe_override
  352. or self.create_kg_relationships_extraction_pipe(*args, **kwargs),
  353. kg_storage_pipe=kg_storage_pipe_override
  354. or self.create_kg_storage_pipe(*args, **kwargs),
  355. vector_storage_pipe=vector_storage_pipe_override
  356. or self.create_vector_storage_pipe(*args, **kwargs),
  357. vector_search_pipe=vector_search_pipe_override
  358. or self.create_vector_search_pipe(*args, **kwargs),
  359. kg_search_pipe=kg_search_pipe_override
  360. or self.create_kg_search_pipe(*args, **kwargs),
  361. rag_pipe=rag_pipe_override
  362. or self.create_rag_pipe(*args, **kwargs),
  363. streaming_rag_pipe=streaming_rag_pipe_override
  364. or self.create_rag_pipe(True, *args, **kwargs),
  365. kg_entity_description_pipe=kg_entity_description_pipe
  366. or self.create_kg_entity_description_pipe(*args, **kwargs),
  367. kg_clustering_pipe=kg_clustering_pipe
  368. or self.create_kg_clustering_pipe(*args, **kwargs),
  369. kg_entity_deduplication_pipe=kg_entity_deduplication_pipe
  370. or self.create_kg_entity_deduplication_pipe(*args, **kwargs),
  371. kg_entity_deduplication_summary_pipe=kg_entity_deduplication_summary_pipe
  372. or self.create_kg_entity_deduplication_summary_pipe(
  373. *args, **kwargs
  374. ),
  375. kg_community_summary_pipe=kg_community_summary_pipe
  376. or self.create_kg_community_summary_pipe(*args, **kwargs),
  377. )
  378. def create_parsing_pipe(self, *args, **kwargs) -> Any:
  379. from core.pipes import ParsingPipe
  380. return ParsingPipe(
  381. ingestion_provider=self.providers.ingestion,
  382. database_provider=self.providers.database,
  383. config=AsyncPipe.PipeConfig(name="parsing_pipe"),
  384. )
  385. def create_embedding_pipe(self, *args, **kwargs) -> Any:
  386. if self.config.embedding.provider is None:
  387. return None
  388. from core.pipes import EmbeddingPipe
  389. return EmbeddingPipe(
  390. embedding_provider=self.providers.embedding,
  391. database_provider=self.providers.database,
  392. embedding_batch_size=self.config.embedding.batch_size,
  393. config=AsyncPipe.PipeConfig(name="embedding_pipe"),
  394. )
  395. def create_vector_storage_pipe(self, *args, **kwargs) -> Any:
  396. if self.config.embedding.provider is None:
  397. return None
  398. from core.pipes import VectorStoragePipe
  399. return VectorStoragePipe(
  400. database_provider=self.providers.database,
  401. config=AsyncPipe.PipeConfig(name="vector_storage_pipe"),
  402. )
  403. def create_default_vector_search_pipe(self, *args, **kwargs) -> Any:
  404. if self.config.embedding.provider is None:
  405. return None
  406. from core.pipes import VectorSearchPipe
  407. return VectorSearchPipe(
  408. database_provider=self.providers.database,
  409. embedding_provider=self.providers.embedding,
  410. config=SearchPipe.SearchConfig(name="vector_search_pipe"),
  411. )
  412. def create_multi_search_pipe(
  413. self,
  414. inner_search_pipe: SearchPipe,
  415. use_rrf: bool = False,
  416. expansion_technique: str = "hyde",
  417. expansion_factor: int = 3,
  418. *args,
  419. **kwargs,
  420. ) -> MultiSearchPipe:
  421. from core.pipes import QueryTransformPipe
  422. multi_search_config = MultiSearchPipe.PipeConfig(
  423. use_rrf=use_rrf, expansion_factor=expansion_factor
  424. )
  425. query_transform_pipe = QueryTransformPipe(
  426. llm_provider=self.providers.llm,
  427. database_provider=self.providers.database,
  428. config=QueryTransformPipe.QueryTransformConfig(
  429. name="multi_query_transform",
  430. task_prompt=expansion_technique,
  431. ),
  432. )
  433. return MultiSearchPipe(
  434. query_transform_pipe=query_transform_pipe,
  435. inner_search_pipe=inner_search_pipe,
  436. config=multi_search_config,
  437. )
  438. def create_vector_search_pipe(self, *args, **kwargs) -> Any:
  439. if self.config.embedding.provider is None:
  440. return None
  441. vanilla_vector_search_pipe = self.create_default_vector_search_pipe(
  442. *args, **kwargs
  443. )
  444. hyde_search_pipe = self.create_multi_search_pipe(
  445. vanilla_vector_search_pipe,
  446. False,
  447. "hyde",
  448. *args,
  449. **kwargs,
  450. )
  451. rag_fusion_pipe = self.create_multi_search_pipe(
  452. vanilla_vector_search_pipe,
  453. True,
  454. "rag_fusion",
  455. *args,
  456. **kwargs,
  457. )
  458. from core.pipes import RoutingSearchPipe
  459. return RoutingSearchPipe(
  460. search_pipes={
  461. "vanilla": vanilla_vector_search_pipe,
  462. "hyde": hyde_search_pipe,
  463. "rag_fusion": rag_fusion_pipe,
  464. },
  465. default_strategy="hyde",
  466. config=AsyncPipe.PipeConfig(name="routing_search_pipe"),
  467. )
  468. def create_kg_relationships_extraction_pipe(self, *args, **kwargs) -> Any:
  469. from core.pipes import KGExtractionPipe
  470. return KGExtractionPipe(
  471. llm_provider=self.providers.llm,
  472. database_provider=self.providers.database,
  473. config=AsyncPipe.PipeConfig(
  474. name="kg_relationships_extraction_pipe"
  475. ),
  476. )
  477. def create_kg_storage_pipe(self, *args, **kwargs) -> Any:
  478. from core.pipes import KGStoragePipe
  479. return KGStoragePipe(
  480. database_provider=self.providers.database,
  481. config=AsyncPipe.PipeConfig(name="kg_storage_pipe"),
  482. )
  483. def create_kg_search_pipe(self, *args, **kwargs) -> Any:
  484. from core.pipes import KGSearchSearchPipe
  485. return KGSearchSearchPipe(
  486. database_provider=self.providers.database,
  487. llm_provider=self.providers.llm,
  488. embedding_provider=self.providers.embedding,
  489. config=GeneratorPipe.PipeConfig(
  490. name="kg_rag_pipe", task_prompt="kg_search"
  491. ),
  492. )
  493. def create_rag_pipe(self, stream: bool = False, *args, **kwargs) -> Any:
  494. if stream:
  495. from core.pipes import StreamingSearchRAGPipe
  496. return StreamingSearchRAGPipe(
  497. llm_provider=self.providers.llm,
  498. database_provider=self.providers.database,
  499. config=GeneratorPipe.PipeConfig(
  500. name="streaming_rag_pipe", task_prompt="default_rag"
  501. ),
  502. )
  503. else:
  504. from core.pipes import SearchRAGPipe
  505. return SearchRAGPipe(
  506. llm_provider=self.providers.llm,
  507. database_provider=self.providers.database,
  508. config=GeneratorPipe.PipeConfig(
  509. name="search_rag_pipe", task_prompt="default_rag"
  510. ),
  511. )
  512. def create_kg_entity_description_pipe(self, *args, **kwargs) -> Any:
  513. from core.pipes import KGEntityDescriptionPipe
  514. return KGEntityDescriptionPipe(
  515. database_provider=self.providers.database,
  516. llm_provider=self.providers.llm,
  517. embedding_provider=self.providers.embedding,
  518. config=AsyncPipe.PipeConfig(name="kg_entity_description_pipe"),
  519. )
  520. def create_kg_clustering_pipe(self, *args, **kwargs) -> Any:
  521. from core.pipes import KGClusteringPipe
  522. return KGClusteringPipe(
  523. database_provider=self.providers.database,
  524. llm_provider=self.providers.llm,
  525. embedding_provider=self.providers.embedding,
  526. config=AsyncPipe.PipeConfig(name="kg_clustering_pipe"),
  527. )
  528. def create_kg_deduplication_summary_pipe(self, *args, **kwargs) -> Any:
  529. from core.pipes import KGEntityDeduplicationSummaryPipe
  530. return KGEntityDeduplicationSummaryPipe(
  531. database_provider=self.providers.database,
  532. llm_provider=self.providers.llm,
  533. embedding_provider=self.providers.embedding,
  534. config=AsyncPipe.PipeConfig(name="kg_deduplication_summary_pipe"),
  535. )
  536. def create_kg_community_summary_pipe(self, *args, **kwargs) -> Any:
  537. from core.pipes import KGCommunitySummaryPipe
  538. return KGCommunitySummaryPipe(
  539. database_provider=self.providers.database,
  540. llm_provider=self.providers.llm,
  541. embedding_provider=self.providers.embedding,
  542. config=AsyncPipe.PipeConfig(name="kg_community_summary_pipe"),
  543. )
  544. def create_kg_entity_deduplication_pipe(self, *args, **kwargs) -> Any:
  545. from core.pipes import KGEntityDeduplicationPipe
  546. return KGEntityDeduplicationPipe(
  547. database_provider=self.providers.database,
  548. llm_provider=self.providers.llm,
  549. embedding_provider=self.providers.embedding,
  550. config=AsyncPipe.PipeConfig(name="kg_entity_deduplication_pipe"),
  551. )
  552. def create_kg_entity_deduplication_summary_pipe(
  553. self, *args, **kwargs
  554. ) -> Any:
  555. from core.pipes import KGEntityDeduplicationSummaryPipe
  556. return KGEntityDeduplicationSummaryPipe(
  557. database_provider=self.providers.database,
  558. llm_provider=self.providers.llm,
  559. embedding_provider=self.providers.embedding,
  560. config=AsyncPipe.PipeConfig(
  561. name="kg_entity_deduplication_summary_pipe"
  562. ),
  563. )
  564. class R2RPipelineFactory:
  565. def __init__(
  566. self, config: R2RConfig, providers: R2RProviders, pipes: R2RPipes
  567. ):
  568. self.config = config
  569. self.providers = providers
  570. self.pipes = pipes
  571. def create_search_pipeline(self, *args, **kwargs) -> SearchPipeline:
  572. """factory method to create an ingestion pipeline."""
  573. search_pipeline = SearchPipeline()
  574. # Add vector search pipes if embedding provider and vector provider is set
  575. if (
  576. self.config.embedding.provider is not None
  577. and self.config.database.provider is not None
  578. ):
  579. search_pipeline.add_pipe(
  580. self.pipes.vector_search_pipe, vector_search_pipe=True
  581. )
  582. search_pipeline.add_pipe(
  583. self.pipes.kg_search_pipe, kg_search_pipe=True
  584. )
  585. return search_pipeline
  586. def create_rag_pipeline(
  587. self,
  588. search_pipeline: SearchPipeline,
  589. stream: bool = False,
  590. *args,
  591. **kwargs,
  592. ) -> RAGPipeline:
  593. rag_pipe = (
  594. self.pipes.streaming_rag_pipe if stream else self.pipes.rag_pipe
  595. )
  596. rag_pipeline = RAGPipeline()
  597. rag_pipeline.set_search_pipeline(search_pipeline)
  598. rag_pipeline.add_pipe(rag_pipe)
  599. return rag_pipeline
  600. def create_pipelines(
  601. self,
  602. search_pipeline: Optional[SearchPipeline] = None,
  603. rag_pipeline: Optional[RAGPipeline] = None,
  604. streaming_rag_pipeline: Optional[RAGPipeline] = None,
  605. *args,
  606. **kwargs,
  607. ) -> R2RPipelines:
  608. search_pipeline = search_pipeline or self.create_search_pipeline(
  609. *args, **kwargs
  610. )
  611. return R2RPipelines(
  612. search_pipeline=search_pipeline,
  613. rag_pipeline=rag_pipeline
  614. or self.create_rag_pipeline(
  615. search_pipeline,
  616. False,
  617. *args,
  618. **kwargs,
  619. ),
  620. streaming_rag_pipeline=streaming_rag_pipeline
  621. or self.create_rag_pipeline(
  622. search_pipeline,
  623. True,
  624. *args,
  625. **kwargs,
  626. ),
  627. )
  628. class R2RAgentFactory:
  629. def __init__(
  630. self,
  631. config: R2RConfig,
  632. providers: R2RProviders,
  633. pipelines: R2RPipelines,
  634. ):
  635. self.config = config
  636. self.providers = providers
  637. self.pipelines = pipelines
  638. def create_agents(
  639. self,
  640. rag_agent_override: Optional[R2RRAGAgent] = None,
  641. stream_rag_agent_override: Optional[R2RStreamingRAGAgent] = None,
  642. *args,
  643. **kwargs,
  644. ) -> R2RAgents:
  645. return R2RAgents(
  646. rag_agent=rag_agent_override
  647. or self.create_rag_agent(*args, **kwargs),
  648. streaming_rag_agent=stream_rag_agent_override
  649. or self.create_streaming_rag_agent(*args, **kwargs),
  650. )
  651. def create_streaming_rag_agent(
  652. self, *args, **kwargs
  653. ) -> R2RStreamingRAGAgent:
  654. if not self.providers.llm or not self.providers.database:
  655. raise ValueError(
  656. "LLM and database providers are required for RAG Agent"
  657. )
  658. return R2RStreamingRAGAgent(
  659. database_provider=self.providers.database,
  660. llm_provider=self.providers.llm,
  661. config=self.config.agent,
  662. search_pipeline=self.pipelines.search_pipeline,
  663. )
  664. def create_rag_agent(self, *args, **kwargs) -> R2RRAGAgent:
  665. if not self.providers.llm or not self.providers.database:
  666. raise ValueError(
  667. "LLM and database providers are required for RAG Agent"
  668. )
  669. return R2RRAGAgent(
  670. database_provider=self.providers.database,
  671. llm_provider=self.providers.llm,
  672. config=self.config.agent,
  673. search_pipeline=self.pipelines.search_pipeline,
  674. )