async_invoke.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import asyncio
  2. from types import TracebackType
  3. from typing import Any, Tuple, Type, cast
  4. import asyncclick as click
  5. from click import Abort
  6. from click.testing import CliRunner, Result
  7. async def async_invoke(
  8. runner: CliRunner, cmd: click.Command, *args: str, **kwargs: Any
  9. ) -> Result:
  10. """Helper function to invoke async Click commands in tests."""
  11. exit_code = 0
  12. exception: BaseException | None = None
  13. exc_info: (
  14. Tuple[Type[BaseException], BaseException, TracebackType] | None
  15. ) = None
  16. with runner.isolation() as out_err:
  17. stdout, stderr = out_err
  18. try:
  19. # Get current event loop instead of creating new one
  20. loop = asyncio.get_event_loop()
  21. # Run the command using create_task
  22. task = loop.create_task(
  23. cmd.main(args=args, standalone_mode=False, **kwargs)
  24. )
  25. return_value = await task
  26. except Abort as e:
  27. exit_code = 1
  28. exception = cast(BaseException, e)
  29. if e.__traceback__:
  30. exc_info = (BaseException, exception, e.__traceback__)
  31. return_value = None
  32. except Exception as e:
  33. exit_code = 1
  34. exception = cast(BaseException, e)
  35. if e.__traceback__:
  36. exc_info = (BaseException, exception, e.__traceback__)
  37. return_value = None
  38. stdout_bytes = stdout.getvalue() or b""
  39. stderr_bytes = stderr.getvalue() if stderr else b""
  40. return Result(
  41. runner=runner,
  42. stdout_bytes=stdout_bytes,
  43. stderr_bytes=stderr_bytes,
  44. return_value=return_value,
  45. exit_code=exit_code,
  46. exception=exception,
  47. exc_info=exc_info,
  48. )