123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import logging
- from datetime import datetime
- from typing import Any
- from ..abstractions import R2RProviders
- from ..config import R2RConfig
- from .base import Service
- logger = logging.getLogger(__name__)
- class MaintenanceService(Service):
- def __init__(
- self,
- config: R2RConfig,
- providers: R2RProviders,
- ):
- super().__init__(
- config,
- providers,
- )
- self.scheduled_jobs: list[Any] = []
- async def initialize(self):
- """Initialize and schedule maintenance tasks from configuration"""
- logger.info("Initializing database maintenance service")
- await self.providers.scheduler.start()
- maintenance_config = self.config.database.maintenance
- # Parse the cron schedule
- schedule_parts = self._parse_cron_schedule(
- maintenance_config.vacuum_schedule
- )
- # Schedule the vacuum job
- job = await self.providers.scheduler.add_job(
- self.vacuum_database,
- trigger="cron",
- **schedule_parts,
- kwargs={
- "full": maintenance_config.vacuum_full,
- "analyze": maintenance_config.vacuum_analyze,
- },
- )
- self.scheduled_jobs.append(job)
- def _parse_cron_schedule(self, cron_schedule: str) -> dict:
- """Parse a cron schedule string into kwargs for APScheduler"""
- parts = cron_schedule.split()
- # Handle both 5-part and 6-part cron expressions
- if len(parts) == 6:
- # With seconds field
- second, minute, hour, day, month, day_of_week = parts
- return {
- "second": second,
- "minute": minute,
- "hour": hour,
- "day": day,
- "month": month,
- "day_of_week": day_of_week,
- }
- elif len(parts) == 5:
- # Standard cron (no seconds)
- minute, hour, day, month, day_of_week = parts
- return {
- "minute": minute,
- "hour": hour,
- "day": day,
- "month": month,
- "day_of_week": day_of_week,
- }
- else:
- logger.warning(
- f"Invalid cron format: {cron_schedule}. Using defaults."
- )
- return {"hour": 3, "minute": 0}
- async def vacuum_database(self, full: bool = False, analyze: bool = True):
- """Run vacuum on the entire database"""
- start_time = datetime.now()
- try:
- await (
- self.providers.database.maintenance_handler.vacuum_all_tables(
- analyze=analyze, full=full
- )
- )
- duration = datetime.now() - start_time
- logger.info(
- f"Database vacuum completed successfully in {duration.total_seconds():.2f} seconds"
- )
- except Exception as e:
- logger.error(f"Database vacuum failed: {str(e)}")
- async def vacuum_table(
- self, table_name: str, full: bool = False, analyze: bool = True
- ):
- """Run vacuum on a specific table"""
- start_time = datetime.now()
- logger.info(
- f"Running vacuum on table {table_name} (full={full}, analyze={analyze})"
- )
- try:
- await self.providers.database.maintenance_handler.vacuum_table(
- table_name=table_name, analyze=analyze, full=full
- )
- duration = datetime.now() - start_time
- logger.info(
- f"Table vacuum completed successfully in {duration.total_seconds():.2f} seconds"
- )
- except Exception as e:
- logger.error(f"Table vacuum failed for {table_name}: {str(e)}")
|