thread_executor.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import atexit
  2. from concurrent.futures import Executor, ThreadPoolExecutor
  3. import concurrent
  4. import concurrent.futures
  5. from concurrent.futures import as_completed
  6. from typing import List
  7. def get_executor_for_config(worker_num: int, thread_name_prefix: str) -> Executor:
  8. """
  9. Returns a generator that yields a ThreadPoolExecutor with the specified number of workers.
  10. Args:
  11. worker_num (int): The number of worker threads in the ThreadPoolExecutor.
  12. thread_name_prefix (str): thread name perfix.
  13. Yields:
  14. Executor: A ThreadPoolExecutor instance.
  15. """
  16. executor = ThreadPoolExecutor(
  17. max_workers=worker_num, thread_name_prefix=thread_name_prefix
  18. )
  19. atexit.register(executor.shutdown, wait=False)
  20. return executor
  21. def run_with_executor(executor: Executor, func, tasks: List, timeout: int):
  22. """
  23. Executes the given function with the provided executor and tasks.
  24. Args:
  25. executor (Executor): The executor to use for running the tasks.
  26. func: The function to be executed.
  27. tasks (List): The list of tasks to be executed.
  28. timeout (int): The maximum time to wait for the tasks to complete.
  29. Returns:
  30. List: The results of the executed tasks.
  31. Raises:
  32. Exception: If any of the tasks raise an exception.
  33. futures = [executor.submit(lambda args: func(*args), task) for task in tasks]
  34. done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION, timeout=timeout)
  35. results = []
  36. for future in done:
  37. if future.exception():
  38. raise future.exception()
  39. if future.done():
  40. results.append(future.result())
  41. return results
  42. """
  43. """
  44. results = []
  45. # Iterate over tasks and execute them sequentially
  46. for task in tasks:
  47. future = executor.submit(lambda args: func(*args), task)
  48. # Wait for the task to complete (with timeout)
  49. try:
  50. result = future.result(timeout=timeout)
  51. results.append(result)
  52. except Exception as e:
  53. print(e)
  54. return results
  55. futures = [executor.submit(lambda args: func(*args), task) for task in tasks]
  56. results = []
  57. for future in as_completed(futures, timeout=timeout):
  58. try:
  59. result = future.result(timeout=timeout)
  60. results.append(result)
  61. except Exception as e:
  62. print(e)
  63. return results
  64. """
  65. futures = [executor.submit(lambda args: func(*args), task) for task in tasks]
  66. results = [future.result() for future in futures]
  67. return results