Source code for stdlibx.streams.methods.base

  1from __future__ import annotations
  2
  3import operator
  4from functools import partial
  5from typing import (
  6    TYPE_CHECKING,
  7    Generic,
  8    TypeVar,
  9    Union,
 10)
 11
 12from typing_extensions import TypeGuard, TypeVarTuple, Unpack
 13
 14if TYPE_CHECKING:
 15    from collections.abc import Callable, Iterable
 16
 17    from stdlibx.streams._types import (
 18        Disposable,
 19        Observable,
 20        Observer,
 21        Operation,
 22        Subscriber,
 23    )
 24
 25T = TypeVar("T")
 26U = TypeVar("U")
 27Ts = TypeVarTuple("Ts")
 28
 29
[docs] 30def map_(source: Observable[T], func: Callable[[T], U]) -> Observable[U]: 31 def _subscribe(other: Observer[U]) -> Disposable: 32 return source.subscribe(lambda val: other.push(func(val))) 33 34 return _Observable(_subscribe)
35 36
[docs] 37def if_(source: Observable[T], func: Callable[[T], bool]) -> Observable[T]: 38 def _subscribe(other: Observer[T]) -> Disposable: 39 return source.subscribe( 40 lambda val: other.push(val) if func(val) is True else None 41 ) 42 43 return _Observable(_subscribe)
44 45
[docs] 46def is_( 47 source: Observable[Union[T, U]], func: Callable[[Union[T, U]], TypeGuard[U]] 48) -> Observable[U]: 49 def _subscribe(other: Observer[U]) -> Disposable: 50 return source.subscribe(lambda val: other.push(val) if func(val) else None) 51 52 return _Observable(_subscribe)
53 54
[docs] 55def is_not_none(source: Observable[Union[T, None]]) -> Observable[T]: 56 return is_(source, lambda val: val is not None) # type: ignore
57 58
[docs] 59def as_tuple(source: Observable[T]) -> Observable[tuple[T]]: 60 return map_(source, lambda v: (v,))
61 62
[docs] 63def combine( 64 source: Observable[tuple[Unpack[Ts]]], other: Observable[U] 65) -> Observable[tuple[Unpack[Ts], U]]: 66 values = {} 67 68 def _update(observer: Observer[tuple[Unpack[Ts]]], **kwargs) -> None: 69 nonlocal values 70 values.update(kwargs) 71 72 if ("first" not in values) or ("second" not in values): 73 return 74 75 if isinstance(values["first"], tuple): 76 observer.push((*values["first"], values["second"])) # type: ignore 77 else: 78 observer.push((values["first"], values["second"])) # type: ignore 79 80 def _subscribe(observer: Observer[tuple]) -> Disposable: 81 _updater = partial(_update, observer) 82 return _CompositeDisposable( 83 [ 84 source.subscribe(lambda v: _updater(first=v)), 85 other.subscribe(lambda v: _updater(second=v)), 86 ] 87 ) 88 89 return _Observable(_subscribe)
90 91
[docs] 92def start_with(source: Observable[T], value: T) -> Observable[T]: 93 def _subscribe(other: Observer[T]) -> Disposable: 94 other.push(value) 95 return source.subscribe(other.push) 96 97 return _Observable(_subscribe)
98 99
[docs] 100def for_( 101 source: Observable[Iterable[T]], func: Callable[[T], U] 102) -> Observable[Iterable[U]]: 103 return map_(source, lambda items: [func(item) for item in items])
104 105
[docs] 106def distinct( 107 source: Observable[T], equal_fn: Callable[[T, T], bool] = operator.eq 108) -> Observable[T]: 109 def _subscribe(other: Observer[T]) -> Disposable: 110 curr_val: "Union[T, None]" = None 111 112 def _subscriber(other: Observer[T], val: T) -> None: 113 nonlocal curr_val 114 if curr_val is None or equal_fn(curr_val, val) is False: 115 other.push(val) 116 curr_val = val 117 118 return source.subscribe(partial(_subscriber, other)) 119 120 return _Observable(_subscribe)
121 122 123class _Observable(Generic[T]): 124 def __init__(self, subscribe: Subscriber[T]) -> None: 125 self.__subscribe = subscribe 126 127 def apply(self, func: Operation[Observable[T], U]) -> U: 128 return func(self) 129 130 def __or__(self, func: Operation[Observable[T], U]) -> U: 131 return self.apply(func) 132 133 def subscribe(self, func: Callable[[T], None]) -> Disposable: 134 return self.__subscribe(_CallbackObserver(func)) 135 136 137class _CallbackObserver(Generic[T]): 138 def __init__(self, func: Callable[[T], None]) -> None: 139 self.__func = func 140 141 def push(self, val: T) -> None: 142 self.__func(val) 143 144 145class _CompositeDisposable: 146 def __init__(self, disposables: list[Disposable]) -> None: 147 self.__disposables = disposables 148 149 def dispose(self) -> None: 150 for disposable in self.__disposables: 151 disposable.dispose()