loadTester.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import asyncio
  2. import random
  3. import statistics
  4. import time
  5. from dataclasses import dataclass
  6. from glob import glob
  7. from r2r import R2RAsyncClient
  8. # Configuration
  9. NUM_USERS = 25
  10. QUERIES_PER_SECOND = 5
  11. TEST_DURATION_SECONDS = 30
  12. RAMP_UP_SECONDS = 5
  13. STEADY_STATE_SECONDS = 20
  14. RAMP_DOWN_SECONDS = 5
  15. # Adjust timeouts as needed
  16. REQUEST_TIMEOUT = 10 # seconds
  17. LOGIN_TIMEOUT = 5
  18. REGISTER_TIMEOUT = 5
  19. DOC_UPLOAD_TIMEOUT = 10
  20. # Test queries
  21. QUERIES = [
  22. "Aristotle",
  23. "Plato",
  24. "Socrates",
  25. "Confucius",
  26. "Kant",
  27. "Nietzsche",
  28. "Descartes",
  29. "Hume",
  30. "Hegel",
  31. "Aquinas",
  32. ]
  33. @dataclass
  34. class Metrics:
  35. start_time: float
  36. end_time: float
  37. status: str
  38. duration_ms: float
  39. class LoadTester:
  40. def __init__(self, base_url: str):
  41. self.base_url = base_url
  42. self.metrics: list[Metrics] = []
  43. self.users: list[dict] = []
  44. self.running = True
  45. print("making an async client...")
  46. self.client = R2RAsyncClient(base_url)
  47. async def safe_call(self, coro, timeout, operation_desc="operation"):
  48. """Safely call an async function with a timeout and handle exceptions."""
  49. try:
  50. return await asyncio.wait_for(coro, timeout=timeout)
  51. except asyncio.TimeoutError:
  52. print(
  53. f"[TIMEOUT] {operation_desc} took longer than {timeout} seconds"
  54. )
  55. except Exception as e:
  56. print(f"[ERROR] Exception during {operation_desc}: {e}")
  57. return None
  58. async def register_login_ingest_user(self, user_email: str, password: str):
  59. """Register and login a single user with robust error handling."""
  60. # Register user
  61. reg_result = await self.safe_call(
  62. self.client.users.register(user_email, password),
  63. timeout=REGISTER_TIMEOUT,
  64. operation_desc=f"register user {user_email}",
  65. )
  66. if reg_result is None:
  67. print(
  68. f"Registration may have failed or user {user_email} already exists."
  69. )
  70. # Login user
  71. login_result = await self.safe_call(
  72. self.client.users.login(user_email, password),
  73. timeout=LOGIN_TIMEOUT,
  74. operation_desc=f"login user {user_email}",
  75. )
  76. user = (
  77. {"email": user_email, "password": password}
  78. if login_result
  79. else None
  80. )
  81. # Ingest documents for user
  82. files = glob("core/examples/data/*")
  83. for file in files:
  84. with open(file, "r"):
  85. try:
  86. pass
  87. # await self.client.documents.create(file_path=file)
  88. # await self.safe_call(
  89. # self.client.documents.create(file_path=file, run_with_orchestration=False),
  90. # timeout=DOC_UPLOAD_TIMEOUT,
  91. # operation_desc=f"document ingestion {file} for {user_email}"
  92. # )
  93. except:
  94. pass
  95. return user
  96. async def setup_users(self):
  97. """Initialize users and their documents"""
  98. print("Setting up users...")
  99. setup_tasks = []
  100. for i in range(NUM_USERS):
  101. user_email = f"user_{i}@test.com"
  102. password = "password"
  103. task = self.register_login_ingest_user(user_email, password)
  104. setup_tasks.append(task)
  105. # Wait for all user setups to complete
  106. user_results = await asyncio.gather(*setup_tasks)
  107. self.users = [user for user in user_results if user is not None]
  108. print(f"Setup complete! Successfully set up {len(self.users)} users")
  109. async def run_user_queries(self, user: dict):
  110. """Run queries for a single user, with timeouts and error handling."""
  111. while self.running:
  112. # Login before query
  113. login_res = await self.safe_call(
  114. self.client.users.login(user["email"], user["password"]),
  115. timeout=LOGIN_TIMEOUT,
  116. operation_desc=f"login for querying {user['email']}",
  117. )
  118. if login_res is None:
  119. # Could not login, wait and try again
  120. await asyncio.sleep(1)
  121. continue
  122. # Perform random search
  123. query_1 = random.choice(QUERIES)
  124. query_2 = random.choice(QUERIES)
  125. query_3 = random.choice(QUERIES)
  126. query = f"{query_1} {query_2} {query_3}"
  127. start_time = time.time()
  128. search_res = await self.safe_call(
  129. self.client.retrieval.search(query),
  130. timeout=REQUEST_TIMEOUT,
  131. operation_desc=f"search '{query}' for {user['email']}",
  132. )
  133. end_time = time.time()
  134. duration_ms = (end_time - start_time) * 1000
  135. if search_res is not None:
  136. status = "success"
  137. else:
  138. status = "error"
  139. # Record metrics
  140. self.metrics.append(
  141. Metrics(
  142. start_time=start_time,
  143. end_time=end_time,
  144. status=status,
  145. duration_ms=duration_ms,
  146. )
  147. )
  148. # Wait according to queries per second rate
  149. await asyncio.sleep(max(0, 1 / QUERIES_PER_SECOND))
  150. def calculate_statistics(self):
  151. """Calculate and print test statistics"""
  152. durations = [m.duration_ms for m in self.metrics]
  153. successful_requests = len(
  154. [m for m in self.metrics if m.status == "success"]
  155. )
  156. failed_requests = len([m for m in self.metrics if m.status == "error"])
  157. print("\nTest Results:")
  158. print(f"Total Requests: {len(self.metrics)}")
  159. print(f"Successful Requests: {successful_requests}")
  160. print(f"Failed Requests: {failed_requests}")
  161. if durations:
  162. print(f"\nLatency Statistics (ms):")
  163. print(f"Min: {min(durations)/1000.:.2f}")
  164. print(f"Max: {max(durations)/1000.:.2f}")
  165. print(f"Mean: {statistics.mean(durations)/1000.:.2f}")
  166. print(f"Median: {statistics.median(durations)/1000.:.2f}")
  167. try:
  168. print(
  169. f"95th Percentile: {statistics.quantiles(durations, n=20)[-1]/1000.:.2f}"
  170. )
  171. except Exception:
  172. pass
  173. print(
  174. f"\nRequests per second: {len(self.metrics) / TEST_DURATION_SECONDS:.2f}"
  175. )
  176. async def run_load_test(self):
  177. """Main load test execution"""
  178. await self.setup_users()
  179. if not self.users:
  180. print("No users were successfully set up. Exiting test.")
  181. return
  182. print(f"Starting load test with {len(self.users)} users...")
  183. print(f"Ramp up: {RAMP_UP_SECONDS}s")
  184. print(f"Steady state: {STEADY_STATE_SECONDS}s")
  185. print(f"Ramp down: {RAMP_DOWN_SECONDS}s")
  186. tasks = [
  187. asyncio.create_task(self.run_user_queries(user))
  188. for user in self.users
  189. ]
  190. # Run for specified duration
  191. await asyncio.sleep(TEST_DURATION_SECONDS)
  192. self.running = False
  193. # Give tasks some time to exit gracefully
  194. try:
  195. await asyncio.wait_for(asyncio.gather(*tasks), timeout=20)
  196. except asyncio.TimeoutError:
  197. print(
  198. "[WARNING] Not all tasks finished promptly after stopping. Cancelling tasks."
  199. )
  200. for t in tasks:
  201. if not t.done():
  202. t.cancel()
  203. # Wait again for tasks to cancel
  204. await asyncio.gather(*tasks, return_exceptions=True)
  205. self.calculate_statistics()
  206. def main():
  207. load_tester = LoadTester("http://localhost:7280")
  208. asyncio.run(load_tester.run_load_test())
  209. if __name__ == "__main__":
  210. main()