maintenance_service.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import logging
  2. from datetime import datetime
  3. from typing import Any
  4. from ..abstractions import R2RProviders
  5. from ..config import R2RConfig
  6. from .base import Service
  7. logger = logging.getLogger(__name__)
  8. class MaintenanceService(Service):
  9. def __init__(
  10. self,
  11. config: R2RConfig,
  12. providers: R2RProviders,
  13. ):
  14. super().__init__(
  15. config,
  16. providers,
  17. )
  18. self.scheduled_jobs: list[Any] = []
  19. async def initialize(self):
  20. """Initialize and schedule maintenance tasks from configuration"""
  21. logger.info("Initializing database maintenance service")
  22. await self.providers.scheduler.start()
  23. maintenance_config = self.config.database.maintenance
  24. # Parse the cron schedule
  25. schedule_parts = self._parse_cron_schedule(
  26. maintenance_config.vacuum_schedule
  27. )
  28. # Schedule the vacuum job
  29. job = await self.providers.scheduler.add_job(
  30. self.vacuum_database,
  31. trigger="cron",
  32. **schedule_parts,
  33. kwargs={
  34. "full": maintenance_config.vacuum_full,
  35. "analyze": maintenance_config.vacuum_analyze,
  36. },
  37. )
  38. self.scheduled_jobs.append(job)
  39. def _parse_cron_schedule(self, cron_schedule: str) -> dict:
  40. """Parse a cron schedule string into kwargs for APScheduler"""
  41. parts = cron_schedule.split()
  42. # Handle both 5-part and 6-part cron expressions
  43. if len(parts) == 6:
  44. # With seconds field
  45. second, minute, hour, day, month, day_of_week = parts
  46. return {
  47. "second": second,
  48. "minute": minute,
  49. "hour": hour,
  50. "day": day,
  51. "month": month,
  52. "day_of_week": day_of_week,
  53. }
  54. elif len(parts) == 5:
  55. # Standard cron (no seconds)
  56. minute, hour, day, month, day_of_week = parts
  57. return {
  58. "minute": minute,
  59. "hour": hour,
  60. "day": day,
  61. "month": month,
  62. "day_of_week": day_of_week,
  63. }
  64. else:
  65. logger.warning(
  66. f"Invalid cron format: {cron_schedule}. Using defaults."
  67. )
  68. return {"hour": 3, "minute": 0}
  69. async def vacuum_database(self, full: bool = False, analyze: bool = True):
  70. """Run vacuum on the entire database"""
  71. start_time = datetime.now()
  72. try:
  73. await (
  74. self.providers.database.maintenance_handler.vacuum_all_tables(
  75. analyze=analyze, full=full
  76. )
  77. )
  78. duration = datetime.now() - start_time
  79. logger.info(
  80. f"Database vacuum completed successfully in {duration.total_seconds():.2f} seconds"
  81. )
  82. except Exception as e:
  83. logger.error(f"Database vacuum failed: {str(e)}")
  84. async def vacuum_table(
  85. self, table_name: str, full: bool = False, analyze: bool = True
  86. ):
  87. """Run vacuum on a specific table"""
  88. start_time = datetime.now()
  89. logger.info(
  90. f"Running vacuum on table {table_name} (full={full}, analyze={analyze})"
  91. )
  92. try:
  93. await self.providers.database.maintenance_handler.vacuum_table(
  94. table_name=table_name, analyze=analyze, full=full
  95. )
  96. duration = datetime.now() - start_time
  97. logger.info(
  98. f"Table vacuum completed successfully in {duration.total_seconds():.2f} seconds"
  99. )
  100. except Exception as e:
  101. logger.error(f"Table vacuum failed for {table_name}: {str(e)}")