loadTester.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import asyncio
  2. import random
  3. import statistics
  4. import time
  5. from dataclasses import dataclass
  6. from glob import glob
  7. from r2r import R2RAsyncClient
  8. # Configuration
  9. NUM_USERS = 25
  10. QUERIES_PER_SECOND = 5
  11. TEST_DURATION_SECONDS = 30
  12. RAMP_UP_SECONDS = 5
  13. STEADY_STATE_SECONDS = 20
  14. RAMP_DOWN_SECONDS = 5
  15. # Adjust timeouts as needed
  16. REQUEST_TIMEOUT = 10 # seconds
  17. LOGIN_TIMEOUT = 5
  18. REGISTER_TIMEOUT = 5
  19. DOC_UPLOAD_TIMEOUT = 10
  20. # Test queries
  21. QUERIES = [
  22. "Aristotle",
  23. "Plato",
  24. "Socrates",
  25. "Confucius",
  26. "Kant",
  27. "Nietzsche",
  28. "Descartes",
  29. "Hume",
  30. "Hegel",
  31. "Aquinas",
  32. ]
  33. @dataclass
  34. class Metrics:
  35. start_time: float
  36. end_time: float
  37. status: str
  38. duration_ms: float
  39. class LoadTester:
  40. def __init__(self, base_url: str):
  41. self.base_url = base_url
  42. self.metrics: list[Metrics] = []
  43. self.users: list[dict] = []
  44. self.running = True
  45. print("making an async client...")
  46. self.client = R2RAsyncClient(base_url)
  47. async def safe_call(self, coro, timeout, operation_desc="operation"):
  48. """Safely call an async function with a timeout and handle
  49. exceptions."""
  50. try:
  51. return await asyncio.wait_for(coro, timeout=timeout)
  52. except asyncio.TimeoutError:
  53. print(
  54. f"[TIMEOUT] {operation_desc} took longer than {timeout} seconds"
  55. )
  56. except Exception as e:
  57. print(f"[ERROR] Exception during {operation_desc}: {e}")
  58. return None
  59. async def register_login_ingest_user(self, user_email: str, password: str):
  60. """Register and login a single user with robust error handling."""
  61. # Register user
  62. reg_result = await self.safe_call(
  63. self.client.users.create(user_email, password),
  64. timeout=REGISTER_TIMEOUT,
  65. operation_desc=f"register user {user_email}",
  66. )
  67. if reg_result is None:
  68. print(
  69. f"Registration may have failed or user {user_email} already exists."
  70. )
  71. # Login user
  72. login_result = await self.safe_call(
  73. self.client.users.login(user_email, password),
  74. timeout=LOGIN_TIMEOUT,
  75. operation_desc=f"login user {user_email}",
  76. )
  77. user = ({
  78. "email": user_email,
  79. "password": password
  80. } if login_result else None)
  81. # Ingest documents for user
  82. files = glob("core/examples/data/*")
  83. for file in files:
  84. with open(file, "r"):
  85. try:
  86. pass
  87. # await self.client.documents.create(file_path=file)
  88. # await self.safe_call(
  89. # self.client.documents.create(file_path=file, run_with_orchestration=False),
  90. # timeout=DOC_UPLOAD_TIMEOUT,
  91. # operation_desc=f"document ingestion {file} for {user_email}"
  92. # )
  93. except:
  94. pass
  95. return user
  96. async def setup_users(self):
  97. """Initialize users and their documents."""
  98. print("Setting up users...")
  99. setup_tasks = []
  100. for i in range(NUM_USERS):
  101. user_email = f"user_{i}@test.com"
  102. password = "password"
  103. task = self.register_login_ingest_user(user_email, password)
  104. setup_tasks.append(task)
  105. # Wait for all user setups to complete
  106. user_results = await asyncio.gather(*setup_tasks)
  107. self.users = [user for user in user_results if user is not None]
  108. print(f"Setup complete! Successfully set up {len(self.users)} users")
  109. async def run_user_queries(self, user: dict):
  110. """Run queries for a single user, with timeouts and error handling."""
  111. while self.running:
  112. # Login before query
  113. login_res = await self.safe_call(
  114. self.client.users.login(user["email"], user["password"]),
  115. timeout=LOGIN_TIMEOUT,
  116. operation_desc=f"login for querying {user['email']}",
  117. )
  118. if login_res is None:
  119. # Could not login, wait and try again
  120. await asyncio.sleep(1)
  121. continue
  122. # Perform random search
  123. query_1 = random.choice(QUERIES)
  124. query_2 = random.choice(QUERIES)
  125. query_3 = random.choice(QUERIES)
  126. query = f"{query_1} {query_2} {query_3}"
  127. start_time = time.time()
  128. search_res = await self.safe_call(
  129. self.client.retrieval.search(query),
  130. timeout=REQUEST_TIMEOUT,
  131. operation_desc=f"search '{query}' for {user['email']}",
  132. )
  133. end_time = time.time()
  134. duration_ms = (end_time - start_time) * 1000
  135. if search_res is not None:
  136. status = "success"
  137. else:
  138. status = "error"
  139. # Record metrics
  140. self.metrics.append(
  141. Metrics(
  142. start_time=start_time,
  143. end_time=end_time,
  144. status=status,
  145. duration_ms=duration_ms,
  146. ))
  147. # Wait according to queries per second rate
  148. await asyncio.sleep(max(0, 1 / QUERIES_PER_SECOND))
  149. def calculate_statistics(self):
  150. """Calculate and print test statistics."""
  151. durations = [m.duration_ms for m in self.metrics]
  152. successful_requests = len(
  153. [m for m in self.metrics if m.status == "success"])
  154. failed_requests = len([m for m in self.metrics if m.status == "error"])
  155. print("\nTest Results:")
  156. print(f"Total Requests: {len(self.metrics)}")
  157. print(f"Successful Requests: {successful_requests}")
  158. print(f"Failed Requests: {failed_requests}")
  159. if durations:
  160. print("\nLatency Statistics (ms):")
  161. print(f"Min: {min(durations) / 1000.0:.2f}")
  162. print(f"Max: {max(durations) / 1000.0:.2f}")
  163. print(f"Mean: {statistics.mean(durations) / 1000.0:.2f}")
  164. print(f"Median: {statistics.median(durations) / 1000.0:.2f}")
  165. try:
  166. print(
  167. f"95th Percentile: {statistics.quantiles(durations, n=20)[-1] / 1000.0:.2f}"
  168. )
  169. except Exception:
  170. pass
  171. print(
  172. f"\nRequests per second: {len(self.metrics) / TEST_DURATION_SECONDS:.2f}"
  173. )
  174. async def run_load_test(self):
  175. """Main load test execution."""
  176. await self.setup_users()
  177. if not self.users:
  178. print("No users were successfully set up. Exiting test.")
  179. return
  180. print(f"Starting load test with {len(self.users)} users...")
  181. print(f"Ramp up: {RAMP_UP_SECONDS}s")
  182. print(f"Steady state: {STEADY_STATE_SECONDS}s")
  183. print(f"Ramp down: {RAMP_DOWN_SECONDS}s")
  184. tasks = [
  185. asyncio.create_task(self.run_user_queries(user))
  186. for user in self.users
  187. ]
  188. # Run for specified duration
  189. await asyncio.sleep(TEST_DURATION_SECONDS)
  190. self.running = False
  191. # Give tasks some time to exit gracefully
  192. try:
  193. await asyncio.wait_for(asyncio.gather(*tasks), timeout=20)
  194. except asyncio.TimeoutError:
  195. print(
  196. "[WARNING] Not all tasks finished promptly after stopping. Cancelling tasks."
  197. )
  198. for t in tasks:
  199. if not t.done():
  200. t.cancel()
  201. # Wait again for tasks to cancel
  202. await asyncio.gather(*tasks, return_exceptions=True)
  203. self.calculate_statistics()
  204. def main():
  205. load_tester = LoadTester("http://localhost:7280")
  206. asyncio.run(load_tester.run_load_test())
  207. if __name__ == "__main__":
  208. main()