Source code for stdlibx.streams._subject

 1from __future__ import annotations
 2
 3from typing import TYPE_CHECKING, TypeVar, cast
 4
 5from stdlibx.streams._abc import ObservableBase
 6
 7if TYPE_CHECKING:
 8    from collections.abc import Callable
 9
10    from stdlibx.streams._types import Disposable
11
12T = TypeVar("T")
13
14
[docs] 15class Subject(ObservableBase[T]): 16 def __init__(self, initial: T, /) -> None: 17 super().__init__() 18 19 self.__value = initial 20
[docs] 21 def subscribe(self, func: Callable[[T], None]) -> Disposable: 22 func(self.__value) 23 return super().subscribe(func)
24
[docs] 25 def push(self, val: T | Callable[[T], T]) -> None: 26 if callable(val): 27 self.__value = cast("T", val(self.__value)) 28 else: 29 self.__value = cast("T", val) 30 31 for subscriber in self._subscribers: 32 subscriber(self.__value)