graph_enrichment.py 780 B

12345678910111213141516171819202122232425262728293031
  1. import logging
  2. from typing import Optional
  3. from ..base.logger.run_manager import RunManager
  4. from ..base.pipeline.base_pipeline import AsyncPipeline
  5. from ..base.pipes.base_pipe import AsyncPipe
  6. logger = logging.getLogger()
  7. class KGEnrichmentPipeline(AsyncPipeline):
  8. """A pipeline for enhancing the graph with communities, connected components etc."""
  9. pipeline_type: str = "other"
  10. def __init__(
  11. self,
  12. run_manager: Optional[RunManager] = None,
  13. ):
  14. super().__init__(run_manager)
  15. def add_pipe(
  16. self,
  17. pipe: AsyncPipe,
  18. *args,
  19. **kwargs,
  20. ) -> None:
  21. logger.debug(
  22. f"Adding pipe {pipe.config.name} to the KGEnrichmentPipeline"
  23. )
  24. super().add_pipe(pipe, *args, **kwargs)