Source code for stdlibx.streams._subject
1from __future__ import annotations
2
3from typing import TYPE_CHECKING, TypeVar, cast
4
5from stdlibx.streams._abc import ObservableBase
6
7if TYPE_CHECKING:
8 from collections.abc import Callable
9
10 from stdlibx.streams._types import Disposable
11
12T = TypeVar("T")
13
14
[docs]
15class Subject(ObservableBase[T]):
16 def __init__(self, initial: T, /) -> None:
17 super().__init__()
18
19 self.__value = initial
20
[docs]
21 def subscribe(self, func: Callable[[T], None]) -> Disposable:
22 func(self.__value)
23 return super().subscribe(func)
24
[docs]
25 def push(self, val: T | Callable[[T], T]) -> None:
26 if callable(val):
27 self.__value = cast("T", val(self.__value))
28 else:
29 self.__value = cast("T", val)
30
31 for subscriber in self._subscribers:
32 subscriber(self.__value)