1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import asyncio
- import contextlib
- import logging
- from pathlib import Path
- import aiofiles
- from fastapi import Depends, WebSocket
- from fastapi.requests import Request
- from fastapi.templating import Jinja2Templates
- from ...abstractions import R2RProviders, R2RServices
- from .base_router import BaseRouterV3
- class LogsRouter(BaseRouterV3):
- def __init__(
- self,
- providers: R2RProviders,
- services: R2RServices,
- ):
- super().__init__(providers, services)
- CURRENT_DIR = Path(__file__).resolve().parent
- TEMPLATES_DIR = CURRENT_DIR.parent / "templates"
- self.templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
- self.services = services
- self.log_file = Path.cwd() / "logs" / "app.log"
- self.log_file.parent.mkdir(exist_ok=True)
- if not self.log_file.exists():
- self.log_file.touch(mode=0o666)
- # Start from the beginning of the file
- self.last_position = 0
- async def read_full_file(self) -> str:
- """Read the entire log file from the start."""
- if not self.log_file.exists():
- return "Initializing logging system..."
- try:
- async with aiofiles.open(self.log_file, mode="r") as f:
- # Start at beginning
- await f.seek(0)
- full_content = await f.read()
- # Move last_position to end of file after reading full content
- self.last_position = await f.tell()
- return full_content
- except Exception as e:
- logging.error(f"Error reading full logs: {str(e)}")
- return f"Error accessing full log file: {str(e)}"
- async def read_new_logs(self) -> str:
- """Read new logs appended after last_position."""
- if not self.log_file.exists():
- return "Initializing logging system..."
- try:
- async with aiofiles.open(self.log_file, mode="r") as f:
- await f.seek(self.last_position)
- new_content = await f.read()
- self.last_position = await f.tell()
- return new_content or ""
- except Exception as e:
- logging.error(f"Error reading logs: {str(e)}")
- return f"Error accessing log file: {str(e)}"
- def _setup_routes(self):
- @self.router.websocket(
- "/logs/stream",
- dependencies=[Depends(self.websocket_rate_limit_dependency)],
- )
- async def stream_logs(websocket: WebSocket):
- await websocket.accept()
- try:
- # Send the entire file content upon initial connection
- full_logs = await self.read_full_file()
- if full_logs:
- await websocket.send_text(full_logs)
- # Now send incremental updates only
- while True:
- new_logs = await self.read_new_logs()
- if new_logs:
- await websocket.send_text(new_logs)
- await asyncio.sleep(0.5)
- except Exception as e:
- logging.error(f"WebSocket error: {str(e)}")
- finally:
- with contextlib.suppress(Exception):
- await websocket.close()
- @self.router.get(
- "/logs/viewer",
- dependencies=[Depends(self.rate_limit_dependency)],
- )
- async def get_log_viewer(request: Request):
- return self.templates.TemplateResponse(
- "log_viewer.html", {"request": request}
- )
|