Source code for stdlibx.streams._types

 1from __future__ import annotations
 2
 3from typing import Callable, Protocol, TypeVar, runtime_checkable
 4
 5from typing_extensions import TypeAlias
 6
 7T = TypeVar("T")
 8U = TypeVar("U")
 9
10Operation: TypeAlias = Callable[[T], U]
11
12
13T_co = TypeVar("T_co", covariant=True)
14U_contra = TypeVar("U_contra", contravariant=True)
15
16
[docs] 17class Disposable(Protocol):
[docs] 18 def dispose(self) -> None: ...
19 20
[docs] 21@runtime_checkable 22class Observable(Protocol[T_co]):
[docs] 23 def apply(self, func: Operation[Observable[T_co], U_contra]) -> U_contra: ...
24 25 def __or__(self, func: Operation[Observable[T_co], U_contra]) -> U_contra: ... 26
[docs] 27 def subscribe(self, func: Callable[[T_co], None]) -> Disposable: ...
28 29
[docs] 30class Observer(Protocol[U_contra]):
[docs] 31 def push(self, val: U_contra) -> None: ...
32 33 34Subscriber: TypeAlias = Callable[[Observer[T]], Disposable]