telemetry_decorator.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import asyncio
  2. import logging
  3. import os
  4. import uuid
  5. from concurrent.futures import ThreadPoolExecutor
  6. from functools import wraps
  7. from pathlib import Path
  8. from typing import Optional
  9. import toml
  10. from core.telemetry.events import ErrorEvent, FeatureUsageEvent
  11. from core.telemetry.posthog import telemetry_client
  12. logger = logging.getLogger()
  13. class ProductTelemetryClient:
  14. USER_ID_PATH = str(Path.home() / ".cache" / "r2r" / "telemetry_user_id")
  15. UNKNOWN_USER_ID = "UNKNOWN"
  16. _curr_user_id = None
  17. _version = None
  18. @property
  19. def version(self) -> str:
  20. if self._version is None:
  21. try:
  22. pyproject_path = (
  23. Path(__file__).parent.parent.parent / "pyproject.toml"
  24. )
  25. pyproject_data = toml.load(pyproject_path)
  26. self._version = pyproject_data["tool"]["poetry"]["version"]
  27. except Exception as e:
  28. logger.error(
  29. f"Error reading version from pyproject.toml: {str(e)}"
  30. )
  31. self._version = "UNKNOWN"
  32. return self._version
  33. @property
  34. def user_id(self) -> str:
  35. if self._curr_user_id:
  36. return self._curr_user_id
  37. try:
  38. if not os.path.exists(self.USER_ID_PATH):
  39. os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
  40. with open(self.USER_ID_PATH, "w") as f:
  41. new_user_id = str(uuid.uuid4())
  42. f.write(new_user_id)
  43. self._curr_user_id = new_user_id
  44. else:
  45. with open(self.USER_ID_PATH, "r") as f:
  46. self._curr_user_id = f.read().strip()
  47. except Exception:
  48. self._curr_user_id = self.UNKNOWN_USER_ID
  49. return self._curr_user_id
  50. product_telemetry_client = ProductTelemetryClient()
  51. def get_project_metadata():
  52. import platform
  53. return {
  54. "os": platform.system(),
  55. "python_version": platform.python_version(),
  56. "version": product_telemetry_client.version,
  57. }
  58. # Create a thread pool with a fixed number of workers
  59. telemetry_thread_pool: Optional[ThreadPoolExecutor] = None
  60. if os.getenv("TELEMETRY_ENABLED", "true").lower() in ("true", "1"):
  61. telemetry_thread_pool = ThreadPoolExecutor(max_workers=2)
  62. def telemetry_event(event_name):
  63. def decorator(func):
  64. def log_telemetry(event_type, user_id, metadata, error_message=None):
  65. if telemetry_thread_pool is None:
  66. return
  67. try:
  68. if event_type == "feature":
  69. telemetry_client.capture(
  70. FeatureUsageEvent(
  71. user_id=user_id,
  72. properties=metadata,
  73. feature=event_name,
  74. )
  75. )
  76. elif event_type == "error":
  77. telemetry_client.capture(
  78. ErrorEvent(
  79. user_id=user_id,
  80. properties=metadata,
  81. endpoint=event_name,
  82. error_message=error_message,
  83. )
  84. )
  85. except Exception as e:
  86. logger.error(f"Error in telemetry event logging: {str(e)}")
  87. @wraps(func)
  88. async def async_wrapper(*args, **kwargs):
  89. if telemetry_thread_pool is None:
  90. return await func(*args, **kwargs)
  91. metadata = get_project_metadata()
  92. user_id = product_telemetry_client.user_id
  93. try:
  94. result = await func(*args, **kwargs)
  95. telemetry_thread_pool.submit(
  96. log_telemetry, "feature", user_id, metadata
  97. )
  98. return result
  99. except Exception as e:
  100. telemetry_thread_pool.submit(
  101. log_telemetry, "error", user_id, metadata, str(e)
  102. )
  103. raise
  104. @wraps(func)
  105. def sync_wrapper(*args, **kwargs):
  106. loop = asyncio.get_event_loop()
  107. if loop.is_running():
  108. future = asyncio.run_coroutine_threadsafe(
  109. async_wrapper(*args, **kwargs), loop
  110. )
  111. return future.result()
  112. else:
  113. return loop.run_until_complete(async_wrapper(*args, **kwargs))
  114. return (
  115. async_wrapper
  116. if asyncio.iscoroutinefunction(func)
  117. else sync_wrapper
  118. )
  119. return decorator