async_timeout.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # Workaround for https://github.com/python/cpython/issues/86296
  2. #
  3. # From https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
  4. # Licensed under the Apache License (Apache-2.0)
  5. import asyncio
  6. import enum
  7. import sys
  8. import warnings
  9. from types import TracebackType
  10. from typing import Any, Optional, Type
  11. if sys.version_info[:2] >= (3, 11):
  12. from asyncio import timeout as asyncio_timeout
  13. else:
  14. def asyncio_timeout(delay: Optional[float]) -> "Timeout":
  15. """timeout context manager.
  16. Useful in cases when you want to apply timeout logic around block
  17. of code or in cases when asyncio.wait_for is not suitable. For example:
  18. >>> async with timeout(0.001):
  19. ... async with aiohttp.get('https://github.com') as r:
  20. ... await r.text()
  21. delay - value in seconds or None to disable timeout logic
  22. """
  23. loop = asyncio.get_running_loop()
  24. deadline = loop.time() + delay if delay is not None else None
  25. return Timeout(deadline, loop)
  26. class _State(enum.Enum):
  27. INIT = "INIT"
  28. ENTER = "ENTER"
  29. TIMEOUT = "TIMEOUT"
  30. EXIT = "EXIT"
  31. class Timeout:
  32. # Internal class, please don't instantiate it directly
  33. # Use timeout() and timeout_at() public factories instead.
  34. #
  35. # Implementation note: `async with timeout()` is preferred
  36. # over `with timeout()`.
  37. # While technically the Timeout class implementation
  38. # doesn't need to be async at all,
  39. # the `async with` statement explicitly points that
  40. # the context manager should be used from async function context.
  41. #
  42. # This design allows to avoid many silly misusages.
  43. #
  44. # TimeoutError is raised immediately when scheduled
  45. # if the deadline is passed.
  46. # The purpose is to time out as soon as possible
  47. # without waiting for the next await expression.
  48. __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler")
  49. def __init__(self, deadline: Optional[float],
  50. loop: asyncio.AbstractEventLoop) -> None:
  51. self._loop = loop
  52. self._state = _State.INIT
  53. self._timeout_handler = None # type: Optional[asyncio.Handle]
  54. if deadline is None:
  55. self._deadline = None # type: Optional[float]
  56. else:
  57. self.update(deadline)
  58. def __enter__(self) -> "Timeout":
  59. warnings.warn(
  60. "with timeout() is deprecated, use async with timeout()",
  61. DeprecationWarning,
  62. stacklevel=2,
  63. )
  64. self._do_enter()
  65. return self
  66. def __exit__(
  67. self,
  68. exc_type: Optional[Type[BaseException]],
  69. exc_val: Optional[BaseException],
  70. exc_tb: Optional[TracebackType],
  71. ) -> Optional[bool]:
  72. self._do_exit(exc_type)
  73. return None
  74. async def __aenter__(self) -> "Timeout":
  75. self._do_enter()
  76. return self
  77. async def __aexit__(
  78. self,
  79. exc_type: Optional[Type[BaseException]],
  80. exc_val: Optional[BaseException],
  81. exc_tb: Optional[TracebackType],
  82. ) -> Optional[bool]:
  83. self._do_exit(exc_type)
  84. return None
  85. @property
  86. def expired(self) -> bool:
  87. """Is timeout expired during execution?"""
  88. return self._state == _State.TIMEOUT
  89. @property
  90. def deadline(self) -> Optional[float]:
  91. return self._deadline
  92. def reject(self) -> None:
  93. """Reject scheduled timeout if any."""
  94. # cancel is maybe better name but
  95. # task.cancel() raises CancelledError in asyncio world.
  96. if self._state not in (_State.INIT, _State.ENTER):
  97. raise RuntimeError(f"invalid state {self._state.value}")
  98. self._reject()
  99. def _reject(self) -> None:
  100. if self._timeout_handler is not None:
  101. self._timeout_handler.cancel()
  102. self._timeout_handler = None
  103. def shift(self, delay: float) -> None:
  104. """Advance timeout on delay seconds.
  105. The delay can be negative.
  106. Raise RuntimeError if shift is called when deadline is not scheduled
  107. """
  108. deadline = self._deadline
  109. if deadline is None:
  110. raise RuntimeError(
  111. "cannot shift timeout if deadline is not scheduled")
  112. self.update(deadline + delay)
  113. def update(self, deadline: float) -> None:
  114. """Set deadline to absolute value.
  115. deadline argument points on the time in the same clock system
  116. as loop.time().
  117. If new deadline is in the past the timeout is raised immediately.
  118. Please note: it is not POSIX time but a time with
  119. undefined starting base, e.g. the time of the system power on.
  120. """
  121. if self._state == _State.EXIT:
  122. raise RuntimeError(
  123. "cannot reschedule after exit from context manager")
  124. if self._state == _State.TIMEOUT:
  125. raise RuntimeError("cannot reschedule expired timeout")
  126. if self._timeout_handler is not None:
  127. self._timeout_handler.cancel()
  128. self._deadline = deadline
  129. if self._state != _State.INIT:
  130. self._reschedule()
  131. def _reschedule(self) -> None:
  132. assert self._state == _State.ENTER
  133. deadline = self._deadline
  134. if deadline is None:
  135. return
  136. now = self._loop.time()
  137. if self._timeout_handler is not None:
  138. self._timeout_handler.cancel()
  139. task = asyncio.current_task()
  140. if deadline <= now:
  141. self._timeout_handler = self._loop.call_soon(
  142. self._on_timeout, task)
  143. else:
  144. self._timeout_handler = self._loop.call_at(
  145. deadline, self._on_timeout, task)
  146. def _do_enter(self) -> None:
  147. if self._state != _State.INIT:
  148. raise RuntimeError(f"invalid state {self._state.value}")
  149. self._state = _State.ENTER
  150. self._reschedule()
  151. def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
  152. if exc_type is asyncio.CancelledError and \
  153. self._state == _State.TIMEOUT:
  154. self._timeout_handler = None
  155. raise asyncio.TimeoutError
  156. # timeout has not expired
  157. self._state = _State.EXIT
  158. self._reject()
  159. return None
  160. def _on_timeout(self, task: "Optional[asyncio.Task[Any]]") -> None:
  161. if task:
  162. task.cancel()
  163. self._state = _State.TIMEOUT
  164. # drop the reference early
  165. self._timeout_handler = None