thread_executor.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import atexit
  2. from concurrent.futures import Executor, ThreadPoolExecutor
  3. import concurrent
  4. import concurrent.futures
  5. from typing import List
  6. def get_executor_for_config(worker_num: int, thread_name_prefix: str) -> Executor:
  7. """
  8. Returns a generator that yields a ThreadPoolExecutor with the specified number of workers.
  9. Args:
  10. worker_num (int): The number of worker threads in the ThreadPoolExecutor.
  11. thread_name_prefix (str): thread name perfix.
  12. Yields:
  13. Executor: A ThreadPoolExecutor instance.
  14. """
  15. executor = ThreadPoolExecutor(max_workers=worker_num, thread_name_prefix=thread_name_prefix)
  16. atexit.register(executor.shutdown, wait=False)
  17. return executor
  18. def run_with_executor(executor: Executor, func, tasks: List, timeout: int):
  19. """
  20. Executes the given function with the provided executor and tasks.
  21. Args:
  22. executor (Executor): The executor to use for running the tasks.
  23. func: The function to be executed.
  24. tasks (List): The list of tasks to be executed.
  25. timeout (int): The maximum time to wait for the tasks to complete.
  26. Returns:
  27. List: The results of the executed tasks.
  28. Raises:
  29. Exception: If any of the tasks raise an exception.
  30. """
  31. futures = [executor.submit(lambda args: func(*args), task) for task in tasks]
  32. done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION, timeout=timeout)
  33. results = []
  34. for future in done:
  35. if future.exception():
  36. raise future.exception()
  37. if future.done():
  38. results.append(future.result())
  39. return results