Source code for stdlibx.streams._types
1from __future__ import annotations
2
3from typing import Callable, Protocol, TypeVar, runtime_checkable
4
5from typing_extensions import TypeAlias
6
7T = TypeVar("T")
8U = TypeVar("U")
9
10Operation: TypeAlias = Callable[[T], U]
11
12
13T_co = TypeVar("T_co", covariant=True)
14U_contra = TypeVar("U_contra", contravariant=True)
15
16
[docs]
17class Disposable(Protocol):
[docs]
18 def dispose(self) -> None: ...
19
20
[docs]
21@runtime_checkable
22class Observable(Protocol[T_co]):
[docs]
23 def apply(self, func: Operation[Observable[T_co], U_contra]) -> U_contra: ...
24
25 def __or__(self, func: Operation[Observable[T_co], U_contra]) -> U_contra: ...
26
[docs]
27 def subscribe(self, func: Callable[[T_co], None]) -> Disposable: ...
28
29
[docs]
30class Observer(Protocol[U_contra]):
[docs]
31 def push(self, val: U_contra) -> None: ...
32
33
34Subscriber: TypeAlias = Callable[[Observer[T]], Disposable]