logs_router.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import asyncio
  2. import contextlib
  3. import logging
  4. from pathlib import Path
  5. import aiofiles
  6. from fastapi import Depends, WebSocket
  7. from fastapi.requests import Request
  8. from fastapi.templating import Jinja2Templates
  9. from ...abstractions import R2RProviders, R2RServices
  10. from .base_router import BaseRouterV3
  11. class LogsRouter(BaseRouterV3):
  12. def __init__(
  13. self,
  14. providers: R2RProviders,
  15. services: R2RServices,
  16. ):
  17. super().__init__(providers, services)
  18. CURRENT_DIR = Path(__file__).resolve().parent
  19. TEMPLATES_DIR = CURRENT_DIR.parent / "templates"
  20. self.templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
  21. self.services = services
  22. self.log_file = Path.cwd() / "logs" / "app.log"
  23. self.log_file.parent.mkdir(exist_ok=True)
  24. if not self.log_file.exists():
  25. self.log_file.touch(mode=0o666)
  26. # Start from the beginning of the file
  27. self.last_position = 0
  28. async def read_full_file(self) -> str:
  29. """Read the entire log file from the start."""
  30. if not self.log_file.exists():
  31. return "Initializing logging system..."
  32. try:
  33. async with aiofiles.open(self.log_file, mode="r") as f:
  34. # Start at beginning
  35. await f.seek(0)
  36. full_content = await f.read()
  37. # Move last_position to end of file after reading full content
  38. self.last_position = await f.tell()
  39. return full_content
  40. except Exception as e:
  41. logging.error(f"Error reading full logs: {str(e)}")
  42. return f"Error accessing full log file: {str(e)}"
  43. async def read_new_logs(self) -> str:
  44. """Read new logs appended after last_position."""
  45. if not self.log_file.exists():
  46. return "Initializing logging system..."
  47. try:
  48. async with aiofiles.open(self.log_file, mode="r") as f:
  49. await f.seek(self.last_position)
  50. new_content = await f.read()
  51. self.last_position = await f.tell()
  52. return new_content or ""
  53. except Exception as e:
  54. logging.error(f"Error reading logs: {str(e)}")
  55. return f"Error accessing log file: {str(e)}"
  56. def _setup_routes(self):
  57. @self.router.websocket(
  58. "/logs/stream",
  59. dependencies=[Depends(self.websocket_rate_limit_dependency)],
  60. )
  61. async def stream_logs(websocket: WebSocket):
  62. await websocket.accept()
  63. try:
  64. # Send the entire file content upon initial connection
  65. full_logs = await self.read_full_file()
  66. if full_logs:
  67. await websocket.send_text(full_logs)
  68. # Now send incremental updates only
  69. while True:
  70. new_logs = await self.read_new_logs()
  71. if new_logs:
  72. await websocket.send_text(new_logs)
  73. await asyncio.sleep(0.5)
  74. except Exception as e:
  75. logging.error(f"WebSocket error: {str(e)}")
  76. finally:
  77. with contextlib.suppress(Exception):
  78. await websocket.close()
  79. @self.router.get(
  80. "/logs/viewer",
  81. dependencies=[Depends(self.rate_limit_dependency)],
  82. )
  83. async def get_log_viewer(request: Request):
  84. return self.templates.TemplateResponse(
  85. "log_viewer.html", {"request": request}
  86. )