loadTester.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import asyncio
  2. import random
  3. import statistics
  4. import string
  5. import time
  6. from dataclasses import dataclass
  7. from glob import glob
  8. from typing import Dict, List
  9. from r2r import R2RAsyncClient
  10. # Configuration
  11. NUM_USERS = 25
  12. QUERIES_PER_SECOND = 5
  13. TEST_DURATION_SECONDS = 30
  14. RAMP_UP_SECONDS = 5
  15. STEADY_STATE_SECONDS = 20
  16. RAMP_DOWN_SECONDS = 5
  17. # Adjust timeouts as needed
  18. REQUEST_TIMEOUT = 10 # seconds
  19. LOGIN_TIMEOUT = 5
  20. REGISTER_TIMEOUT = 5
  21. DOC_UPLOAD_TIMEOUT = 10
  22. # Test queries
  23. QUERIES = [
  24. "Aristotle",
  25. "Plato",
  26. "Socrates",
  27. "Confucius",
  28. "Kant",
  29. "Nietzsche",
  30. "Descartes",
  31. "Hume",
  32. "Hegel",
  33. "Aquinas",
  34. ]
  35. @dataclass
  36. class Metrics:
  37. start_time: float
  38. end_time: float
  39. status: str
  40. duration_ms: float
  41. class LoadTester:
  42. def __init__(self, base_url: str):
  43. self.base_url = base_url
  44. self.metrics: List[Metrics] = []
  45. self.users: List[Dict] = []
  46. self.running = True
  47. print("making an async client...")
  48. self.client = R2RAsyncClient(base_url)
  49. async def safe_call(self, coro, timeout, operation_desc="operation"):
  50. """Safely call an async function with a timeout and handle exceptions."""
  51. try:
  52. return await asyncio.wait_for(coro, timeout=timeout)
  53. except asyncio.TimeoutError:
  54. print(
  55. f"[TIMEOUT] {operation_desc} took longer than {timeout} seconds"
  56. )
  57. except Exception as e:
  58. print(f"[ERROR] Exception during {operation_desc}: {e}")
  59. return None
  60. async def register_login_ingest_user(self, user_email: str, password: str):
  61. """Register and login a single user with robust error handling."""
  62. # Register user
  63. reg_result = await self.safe_call(
  64. self.client.users.register(user_email, password),
  65. timeout=REGISTER_TIMEOUT,
  66. operation_desc=f"register user {user_email}",
  67. )
  68. if reg_result is None:
  69. print(
  70. f"Registration may have failed or user {user_email} already exists."
  71. )
  72. # Login user
  73. login_result = await self.safe_call(
  74. self.client.users.login(user_email, password),
  75. timeout=LOGIN_TIMEOUT,
  76. operation_desc=f"login user {user_email}",
  77. )
  78. user = (
  79. {"email": user_email, "password": password}
  80. if login_result
  81. else None
  82. )
  83. # Ingest documents for user
  84. files = glob("core/examples/data/*")
  85. for file in files:
  86. with open(file, "r"):
  87. try:
  88. pass
  89. # await self.client.documents.create(file_path=file)
  90. # await self.safe_call(
  91. # self.client.documents.create(file_path=file, run_with_orchestration=False),
  92. # timeout=DOC_UPLOAD_TIMEOUT,
  93. # operation_desc=f"document ingestion {file} for {user_email}"
  94. # )
  95. except:
  96. pass
  97. return user
  98. async def setup_users(self):
  99. """Initialize users and their documents"""
  100. print("Setting up users...")
  101. setup_tasks = []
  102. for i in range(NUM_USERS):
  103. user_email = f"user_{i}@test.com"
  104. password = "password"
  105. task = self.register_login_ingest_user(user_email, password)
  106. setup_tasks.append(task)
  107. # Wait for all user setups to complete
  108. user_results = await asyncio.gather(*setup_tasks)
  109. self.users = [user for user in user_results if user is not None]
  110. print(f"Setup complete! Successfully set up {len(self.users)} users")
  111. async def run_user_queries(self, user: Dict):
  112. """Run queries for a single user, with timeouts and error handling."""
  113. while self.running:
  114. # Login before query
  115. login_res = await self.safe_call(
  116. self.client.users.login(user["email"], user["password"]),
  117. timeout=LOGIN_TIMEOUT,
  118. operation_desc=f"login for querying {user['email']}",
  119. )
  120. if login_res is None:
  121. # Could not login, wait and try again
  122. await asyncio.sleep(1)
  123. continue
  124. # Perform random search
  125. query_1 = random.choice(QUERIES)
  126. query_2 = random.choice(QUERIES)
  127. query_3 = random.choice(QUERIES)
  128. query = f"{query_1} {query_2} {query_3}"
  129. start_time = time.time()
  130. search_res = await self.safe_call(
  131. self.client.retrieval.search(query),
  132. timeout=REQUEST_TIMEOUT,
  133. operation_desc=f"search '{query}' for {user['email']}",
  134. )
  135. end_time = time.time()
  136. duration_ms = (end_time - start_time) * 1000
  137. if search_res is not None:
  138. status = "success"
  139. else:
  140. status = "error"
  141. # Record metrics
  142. self.metrics.append(
  143. Metrics(
  144. start_time=start_time,
  145. end_time=end_time,
  146. status=status,
  147. duration_ms=duration_ms,
  148. )
  149. )
  150. # Wait according to queries per second rate
  151. await asyncio.sleep(max(0, 1 / QUERIES_PER_SECOND))
  152. def calculate_statistics(self):
  153. """Calculate and print test statistics"""
  154. durations = [m.duration_ms for m in self.metrics]
  155. successful_requests = len(
  156. [m for m in self.metrics if m.status == "success"]
  157. )
  158. failed_requests = len([m for m in self.metrics if m.status == "error"])
  159. print("\nTest Results:")
  160. print(f"Total Requests: {len(self.metrics)}")
  161. print(f"Successful Requests: {successful_requests}")
  162. print(f"Failed Requests: {failed_requests}")
  163. if durations:
  164. print(f"\nLatency Statistics (ms):")
  165. print(f"Min: {min(durations)/1000.:.2f}")
  166. print(f"Max: {max(durations)/1000.:.2f}")
  167. print(f"Mean: {statistics.mean(durations)/1000.:.2f}")
  168. print(f"Median: {statistics.median(durations)/1000.:.2f}")
  169. try:
  170. print(
  171. f"95th Percentile: {statistics.quantiles(durations, n=20)[-1]/1000.:.2f}"
  172. )
  173. except Exception:
  174. pass
  175. print(
  176. f"\nRequests per second: {len(self.metrics) / TEST_DURATION_SECONDS:.2f}"
  177. )
  178. async def run_load_test(self):
  179. """Main load test execution"""
  180. await self.setup_users()
  181. if not self.users:
  182. print("No users were successfully set up. Exiting test.")
  183. return
  184. print(f"Starting load test with {len(self.users)} users...")
  185. print(f"Ramp up: {RAMP_UP_SECONDS}s")
  186. print(f"Steady state: {STEADY_STATE_SECONDS}s")
  187. print(f"Ramp down: {RAMP_DOWN_SECONDS}s")
  188. tasks = [
  189. asyncio.create_task(self.run_user_queries(user))
  190. for user in self.users
  191. ]
  192. # Run for specified duration
  193. await asyncio.sleep(TEST_DURATION_SECONDS)
  194. self.running = False
  195. # Give tasks some time to exit gracefully
  196. try:
  197. await asyncio.wait_for(asyncio.gather(*tasks), timeout=20)
  198. except asyncio.TimeoutError:
  199. print(
  200. "[WARNING] Not all tasks finished promptly after stopping. Cancelling tasks."
  201. )
  202. for t in tasks:
  203. if not t.done():
  204. t.cancel()
  205. # Wait again for tasks to cancel
  206. await asyncio.gather(*tasks, return_exceptions=True)
  207. self.calculate_statistics()
  208. def main():
  209. load_tester = LoadTester("http://localhost:7280")
  210. asyncio.run(load_tester.run_load_test())
  211. if __name__ == "__main__":
  212. main()