events.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import uuid
  2. from typing import Any, Optional
  3. class BaseTelemetryEvent:
  4. def __init__(self, event_type: str, properties: dict[str, Any]):
  5. self.event_type = event_type
  6. self.properties = properties
  7. self.event_id = str(uuid.uuid4())
  8. class DailyActiveUserEvent(BaseTelemetryEvent):
  9. def __init__(self, user_id: str):
  10. super().__init__("DailyActiveUser", {"user_id": user_id})
  11. class FeatureUsageEvent(BaseTelemetryEvent):
  12. def __init__(
  13. self,
  14. user_id: str,
  15. feature: str,
  16. properties: Optional[dict[str, Any]] = None,
  17. ):
  18. super().__init__(
  19. "FeatureUsage",
  20. {
  21. "user_id": user_id,
  22. "feature": feature,
  23. "properties": properties or {},
  24. },
  25. )
  26. class ErrorEvent(BaseTelemetryEvent):
  27. def __init__(
  28. self,
  29. user_id: str,
  30. endpoint: str,
  31. error_message: str,
  32. properties: Optional[dict[str, Any]] = None,
  33. ):
  34. super().__init__(
  35. "Error",
  36. {
  37. "user_id": user_id,
  38. "endpoint": endpoint,
  39. "error_message": error_message,
  40. "properties": properties or {},
  41. },
  42. )
  43. class RequestLatencyEvent(BaseTelemetryEvent):
  44. def __init__(
  45. self,
  46. endpoint: str,
  47. latency: float,
  48. properties: Optional[dict[str, Any]] = None,
  49. ):
  50. super().__init__(
  51. "RequestLatency",
  52. {
  53. "endpoint": endpoint,
  54. "latency": latency,
  55. "properties": properties or {},
  56. },
  57. )
  58. class GeographicDistributionEvent(BaseTelemetryEvent):
  59. def __init__(
  60. self,
  61. user_id: str,
  62. country: str,
  63. properties: Optional[dict[str, Any]] = None,
  64. ):
  65. super().__init__(
  66. "GeographicDistribution",
  67. {
  68. "user_id": user_id,
  69. "country": country,
  70. "properties": properties or {},
  71. },
  72. )
  73. class SessionDurationEvent(BaseTelemetryEvent):
  74. def __init__(
  75. self,
  76. user_id: str,
  77. duration: float,
  78. properties: Optional[dict[str, Any]] = None,
  79. ):
  80. super().__init__(
  81. "SessionDuration",
  82. {
  83. "user_id": user_id,
  84. "duration": duration,
  85. "properties": properties or {},
  86. },
  87. )
  88. class UserPathEvent(BaseTelemetryEvent):
  89. def __init__(
  90. self,
  91. user_id: str,
  92. path: str,
  93. properties: Optional[dict[str, Any]] = None,
  94. ):
  95. super().__init__(
  96. "UserPath",
  97. {"user_id": user_id, "path": path, "properties": properties or {}},
  98. )