123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import asyncio
- import logging
- import os
- import uuid
- from concurrent.futures import ThreadPoolExecutor
- from functools import wraps
- from pathlib import Path
- from typing import Optional
- import toml
- from core.telemetry.events import ErrorEvent, FeatureUsageEvent
- from core.telemetry.posthog import telemetry_client
- logger = logging.getLogger()
- class ProductTelemetryClient:
- USER_ID_PATH = str(Path.home() / ".cache" / "r2r" / "telemetry_user_id")
- UNKNOWN_USER_ID = "UNKNOWN"
- _curr_user_id = None
- _version = None
- @property
- def version(self) -> str:
- if self._version is None:
- try:
- pyproject_path = (
- Path(__file__).parent.parent.parent / "pyproject.toml"
- )
- pyproject_data = toml.load(pyproject_path)
- self._version = pyproject_data["tool"]["poetry"]["version"]
- except Exception as e:
- logger.error(
- f"Error reading version from pyproject.toml: {str(e)}"
- )
- self._version = "UNKNOWN"
- return self._version
- @property
- def user_id(self) -> str:
- if self._curr_user_id:
- return self._curr_user_id
- try:
- if not os.path.exists(self.USER_ID_PATH):
- os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
- with open(self.USER_ID_PATH, "w") as f:
- new_user_id = str(uuid.uuid4())
- f.write(new_user_id)
- self._curr_user_id = new_user_id
- else:
- with open(self.USER_ID_PATH, "r") as f:
- self._curr_user_id = f.read().strip()
- except Exception:
- self._curr_user_id = self.UNKNOWN_USER_ID
- return self._curr_user_id
- product_telemetry_client = ProductTelemetryClient()
- def get_project_metadata():
- import platform
- return {
- "os": platform.system(),
- "python_version": platform.python_version(),
- "version": product_telemetry_client.version,
- }
- # Create a thread pool with a fixed number of workers
- telemetry_thread_pool: Optional[ThreadPoolExecutor] = None
- if os.getenv("TELEMETRY_ENABLED", "true").lower() in ("true", "1"):
- telemetry_thread_pool = ThreadPoolExecutor(max_workers=2)
- def telemetry_event(event_name):
- def decorator(func):
- def log_telemetry(event_type, user_id, metadata, error_message=None):
- if telemetry_thread_pool is None:
- return
- try:
- if event_type == "feature":
- telemetry_client.capture(
- FeatureUsageEvent(
- user_id=user_id,
- properties=metadata,
- feature=event_name,
- )
- )
- elif event_type == "error":
- telemetry_client.capture(
- ErrorEvent(
- user_id=user_id,
- properties=metadata,
- endpoint=event_name,
- error_message=error_message,
- )
- )
- except Exception as e:
- logger.error(f"Error in telemetry event logging: {str(e)}")
- @wraps(func)
- async def async_wrapper(*args, **kwargs):
- if telemetry_thread_pool is None:
- return await func(*args, **kwargs)
- metadata = get_project_metadata()
- user_id = product_telemetry_client.user_id
- try:
- result = await func(*args, **kwargs)
- telemetry_thread_pool.submit(
- log_telemetry, "feature", user_id, metadata
- )
- return result
- except Exception as e:
- telemetry_thread_pool.submit(
- log_telemetry, "error", user_id, metadata, str(e)
- )
- raise
- @wraps(func)
- def sync_wrapper(*args, **kwargs):
- loop = asyncio.get_event_loop()
- if loop.is_running():
- future = asyncio.run_coroutine_threadsafe(
- async_wrapper(*args, **kwargs), loop
- )
- return future.result()
- else:
- return loop.run_until_complete(async_wrapper(*args, **kwargs))
- return (
- async_wrapper
- if asyncio.iscoroutinefunction(func)
- else sync_wrapper
- )
- return decorator
|