Source code for stdlibx.cancel._lib

  1from __future__ import annotations
  2
  3import threading
  4from functools import partial
  5from typing import TYPE_CHECKING
  6
  7from stdlibx.cancel._errors import (
  8    CancellationTokenCancelledError,
  9    CancellationTokenTimeoutError,
 10)
 11from typing_extensions import TypeGuard
 12
 13if TYPE_CHECKING:
 14    from collections.abc import Callable
 15
 16    from stdlibx.cancel._types import CancellationToken, CancelledToken
 17
 18
[docs] 19def default_token() -> CancellationToken: 20 """Create a default, non-cancellable token. This token will never be 21 cancelled and can be used as a placeholder when no cancellation logic is 22 required. 23 24 :returns: A default `Token` that is never cancelled. 25 """ 26 return _DefaultToken()
27 28
[docs] 29def with_cancel( 30 parent: CancellationToken, 31) -> tuple[CancellationToken, Callable[[], None]]: 32 """Create a new cancellable token linked to a parent token. The returned 33 token can be cancelled independently, and will also be cancelled if the 34 parent token is cancelled. 35 36 :param parent: The parent `Token` to link cancellation from. 37 :returns: A tuple containing, the new `Token` and a callable to cancel 38 the token manually 39 """ 40 41 token = _CancellableToken() 42 parent.register(token.cancel) 43 return token, partial(token.cancel, None)
44 45
[docs] 46def with_timeout( 47 parent: CancellationToken, timeout: float 48) -> tuple[CancellationToken, Callable[[], None]]: 49 """Create a new cancellable token with a timeout, linked to a parent token. 50 The token will be cancelled if either: the parent token is cancelled or the 51 specified timeout elapses. 52 53 :param parent: The parent `Token` to link cancellation from. 54 :param timeout: Timeout duration in seconds before automatic cancellation. 55 :returns: A tuple containing, the new `Token` and a callable to cancel 56 the token manually 57 """ 58 59 token = _CancellableToken() 60 timer = threading.Timer( 61 timeout, partial(token.cancel, CancellationTokenTimeoutError()) 62 ) 63 64 def _cancel(error: Exception | None): 65 token.cancel(error) 66 timer.cancel() 67 68 parent.register(_cancel) 69 timer.daemon = True 70 timer.start() 71 return token, partial(_cancel, None)
72 73
[docs] 74def is_token_cancelled(token: CancellationToken) -> TypeGuard[CancelledToken]: 75 """Type guard to check if a token is cancelled. This function refines the 76 type of the given token to `CancelledToken` for type checkers if the token 77 is indeed cancelled. 78 79 :param token: The token to check. 80 :returns: True if the token is cancelled, False otherwise. 81 """ 82 83 return token.is_cancelled()
84 85 86class _DefaultToken: 87 def register(self, fn: Callable[[Exception], None]) -> None: 88 pass 89 90 def is_cancelled(self) -> bool: 91 return False 92 93 def get_error(self) -> Exception | None: 94 return None 95 96 def raise_if_cancelled(self) -> None: 97 pass 98 99 def wait(self, timeout: float | None) -> Exception | None: 100 return None 101 102 103class _CancellableToken: 104 def __init__(self) -> None: 105 self.__lock = threading.RLock() 106 self.__signal = threading.Event() 107 108 self.__callbacks: list[Callable[[Exception], None]] = [] 109 self.__error: Exception | None = None 110 111 def cancel(self, error: Exception | None = None) -> None: 112 with self.__lock: 113 if self.is_cancelled(): 114 return 115 116 self.__error = error or CancellationTokenCancelledError() 117 for callback in self.__callbacks: 118 callback(self.__error) 119 self.__signal.set() 120 121 def register(self, fn: Callable[[Exception], None]) -> None: 122 with self.__lock: 123 self.__callbacks.append(fn) 124 125 def is_cancelled(self) -> bool: 126 return self.__signal.is_set() 127 128 def get_error(self) -> Exception | None: 129 return self.__error 130 131 def raise_if_cancelled(self) -> None: 132 if self.__signal.is_set() and self.__error is not None: 133 raise self.__error 134 135 def wait(self, timeout: float | None) -> Exception | None: 136 self.__signal.wait(timeout) 137 return self.__error