Source code for stdlibx.cancel._lib
1from __future__ import annotations
2
3import threading
4from functools import partial
5from typing import TYPE_CHECKING
6
7from stdlibx.cancel._errors import (
8 CancellationTokenCancelledError,
9 CancellationTokenTimeoutError,
10)
11from typing_extensions import TypeGuard
12
13if TYPE_CHECKING:
14 from collections.abc import Callable
15
16 from stdlibx.cancel._types import CancellationToken, CancelledToken
17
18
[docs]
19def default_token() -> CancellationToken:
20 """Create a default, non-cancellable token. This token will never be
21 cancelled and can be used as a placeholder when no cancellation logic is
22 required.
23
24 :returns: A default `Token` that is never cancelled.
25 """
26 return _DefaultToken()
27
28
[docs]
29def with_cancel(
30 parent: CancellationToken,
31) -> tuple[CancellationToken, Callable[[], None]]:
32 """Create a new cancellable token linked to a parent token. The returned
33 token can be cancelled independently, and will also be cancelled if the
34 parent token is cancelled.
35
36 :param parent: The parent `Token` to link cancellation from.
37 :returns: A tuple containing, the new `Token` and a callable to cancel
38 the token manually
39 """
40
41 token = _CancellableToken()
42 parent.register(token.cancel)
43 return token, partial(token.cancel, None)
44
45
[docs]
46def with_timeout(
47 parent: CancellationToken, timeout: float
48) -> tuple[CancellationToken, Callable[[], None]]:
49 """Create a new cancellable token with a timeout, linked to a parent token.
50 The token will be cancelled if either: the parent token is cancelled or the
51 specified timeout elapses.
52
53 :param parent: The parent `Token` to link cancellation from.
54 :param timeout: Timeout duration in seconds before automatic cancellation.
55 :returns: A tuple containing, the new `Token` and a callable to cancel
56 the token manually
57 """
58
59 token = _CancellableToken()
60 timer = threading.Timer(
61 timeout, partial(token.cancel, CancellationTokenTimeoutError())
62 )
63
64 def _cancel(error: Exception | None):
65 token.cancel(error)
66 timer.cancel()
67
68 parent.register(_cancel)
69 timer.daemon = True
70 timer.start()
71 return token, partial(_cancel, None)
72
73
[docs]
74def is_token_cancelled(token: CancellationToken) -> TypeGuard[CancelledToken]:
75 """Type guard to check if a token is cancelled. This function refines the
76 type of the given token to `CancelledToken` for type checkers if the token
77 is indeed cancelled.
78
79 :param token: The token to check.
80 :returns: True if the token is cancelled, False otherwise.
81 """
82
83 return token.is_cancelled()
84
85
86class _DefaultToken:
87 def register(self, fn: Callable[[Exception], None]) -> None:
88 pass
89
90 def is_cancelled(self) -> bool:
91 return False
92
93 def get_error(self) -> Exception | None:
94 return None
95
96 def raise_if_cancelled(self) -> None:
97 pass
98
99 def wait(self, timeout: float | None) -> Exception | None:
100 return None
101
102
103class _CancellableToken:
104 def __init__(self) -> None:
105 self.__lock = threading.RLock()
106 self.__signal = threading.Event()
107
108 self.__callbacks: list[Callable[[Exception], None]] = []
109 self.__error: Exception | None = None
110
111 def cancel(self, error: Exception | None = None) -> None:
112 with self.__lock:
113 if self.is_cancelled():
114 return
115
116 self.__error = error or CancellationTokenCancelledError()
117 for callback in self.__callbacks:
118 callback(self.__error)
119 self.__signal.set()
120
121 def register(self, fn: Callable[[Exception], None]) -> None:
122 with self.__lock:
123 self.__callbacks.append(fn)
124
125 def is_cancelled(self) -> bool:
126 return self.__signal.is_set()
127
128 def get_error(self) -> Exception | None:
129 return self.__error
130
131 def raise_if_cancelled(self) -> None:
132 if self.__signal.is_set() and self.__error is not None:
133 raise self.__error
134
135 def wait(self, timeout: float | None) -> Exception | None:
136 self.__signal.wait(timeout)
137 return self.__error