1from __future__ import annotations
2
3import operator
4from functools import partial
5from typing import (
6 TYPE_CHECKING,
7 Generic,
8 TypeVar,
9 Union,
10)
11
12from typing_extensions import TypeGuard, TypeVarTuple, Unpack
13
14if TYPE_CHECKING:
15 from collections.abc import Callable, Iterable
16
17 from stdlibx.streams._types import (
18 Disposable,
19 Observable,
20 Observer,
21 Operation,
22 Subscriber,
23 )
24
25T = TypeVar("T")
26U = TypeVar("U")
27Ts = TypeVarTuple("Ts")
28
29
[docs]
30def map_(source: Observable[T], func: Callable[[T], U]) -> Observable[U]:
31 def _subscribe(other: Observer[U]) -> Disposable:
32 return source.subscribe(lambda val: other.push(func(val)))
33
34 return _Observable(_subscribe)
35
36
[docs]
37def if_(source: Observable[T], func: Callable[[T], bool]) -> Observable[T]:
38 def _subscribe(other: Observer[T]) -> Disposable:
39 return source.subscribe(
40 lambda val: other.push(val) if func(val) is True else None
41 )
42
43 return _Observable(_subscribe)
44
45
[docs]
46def is_(
47 source: Observable[Union[T, U]], func: Callable[[Union[T, U]], TypeGuard[U]]
48) -> Observable[U]:
49 def _subscribe(other: Observer[U]) -> Disposable:
50 return source.subscribe(lambda val: other.push(val) if func(val) else None)
51
52 return _Observable(_subscribe)
53
54
[docs]
55def is_not_none(source: Observable[Union[T, None]]) -> Observable[T]:
56 return is_(source, lambda val: val is not None) # type: ignore
57
58
[docs]
59def as_tuple(source: Observable[T]) -> Observable[tuple[T]]:
60 return map_(source, lambda v: (v,))
61
62
[docs]
63def combine(
64 source: Observable[tuple[Unpack[Ts]]], other: Observable[U]
65) -> Observable[tuple[Unpack[Ts], U]]:
66 values = {}
67
68 def _update(observer: Observer[tuple[Unpack[Ts]]], **kwargs) -> None:
69 nonlocal values
70 values.update(kwargs)
71
72 if ("first" not in values) or ("second" not in values):
73 return
74
75 if isinstance(values["first"], tuple):
76 observer.push((*values["first"], values["second"])) # type: ignore
77 else:
78 observer.push((values["first"], values["second"])) # type: ignore
79
80 def _subscribe(observer: Observer[tuple]) -> Disposable:
81 _updater = partial(_update, observer)
82 return _CompositeDisposable(
83 [
84 source.subscribe(lambda v: _updater(first=v)),
85 other.subscribe(lambda v: _updater(second=v)),
86 ]
87 )
88
89 return _Observable(_subscribe)
90
91
[docs]
92def start_with(source: Observable[T], value: T) -> Observable[T]:
93 def _subscribe(other: Observer[T]) -> Disposable:
94 other.push(value)
95 return source.subscribe(other.push)
96
97 return _Observable(_subscribe)
98
99
[docs]
100def for_(
101 source: Observable[Iterable[T]], func: Callable[[T], U]
102) -> Observable[Iterable[U]]:
103 return map_(source, lambda items: [func(item) for item in items])
104
105
[docs]
106def distinct(
107 source: Observable[T], equal_fn: Callable[[T, T], bool] = operator.eq
108) -> Observable[T]:
109 def _subscribe(other: Observer[T]) -> Disposable:
110 curr_val: "Union[T, None]" = None
111
112 def _subscriber(other: Observer[T], val: T) -> None:
113 nonlocal curr_val
114 if curr_val is None or equal_fn(curr_val, val) is False:
115 other.push(val)
116 curr_val = val
117
118 return source.subscribe(partial(_subscriber, other))
119
120 return _Observable(_subscribe)
121
122
123class _Observable(Generic[T]):
124 def __init__(self, subscribe: Subscriber[T]) -> None:
125 self.__subscribe = subscribe
126
127 def apply(self, func: Operation[Observable[T], U]) -> U:
128 return func(self)
129
130 def __or__(self, func: Operation[Observable[T], U]) -> U:
131 return self.apply(func)
132
133 def subscribe(self, func: Callable[[T], None]) -> Disposable:
134 return self.__subscribe(_CallbackObserver(func))
135
136
137class _CallbackObserver(Generic[T]):
138 def __init__(self, func: Callable[[T], None]) -> None:
139 self.__func = func
140
141 def push(self, val: T) -> None:
142 self.__func(val)
143
144
145class _CompositeDisposable:
146 def __init__(self, disposables: list[Disposable]) -> None:
147 self.__disposables = disposables
148
149 def dispose(self) -> None:
150 for disposable in self.__disposables:
151 disposable.dispose()