apscheduler.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import logging
  2. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  3. from core.base import SchedulerConfig, SchedulerProvider
  4. logger = logging.getLogger(__name__)
  5. class APSchedulerProvider(SchedulerProvider):
  6. """Implementation using APScheduler"""
  7. def __init__(self, config: SchedulerConfig):
  8. super().__init__(config)
  9. self.scheduler = AsyncIOScheduler()
  10. async def add_job(self, func, trigger, **kwargs):
  11. logger.info(
  12. f"Adding job {func.__name__} with trigger {trigger} and kwargs {kwargs}"
  13. )
  14. self.scheduler.add_job(func, trigger, **kwargs)
  15. async def start(self):
  16. self.scheduler.start()
  17. logger.info("Scheduler started")
  18. async def shutdown(self):
  19. if self.scheduler.running:
  20. self.scheduler.shutdown()
  21. logger.info("Scheduler shutdown")
  22. async def __aenter__(self):
  23. await self.start()
  24. return self
  25. async def __aexit__(self, exc_type, exc, tb):
  26. await self.shutdown()