Source code for stdlibx.streams.fn.base

 1from __future__ import annotations
 2
 3import operator
 4from functools import partial
 5from typing import (
 6    TYPE_CHECKING,
 7    TypeVar,
 8    Union,
 9)
10
11from stdlibx.streams import methods
12from typing_extensions import TypeGuard, TypeVarTuple, Unpack
13
14if TYPE_CHECKING:
15    from collections.abc import Callable, Iterable
16
17    from stdlibx.streams._types import (
18        Observable,
19        Operation,
20    )
21
22T = TypeVar("T")
23U = TypeVar("U")
24Ts = TypeVarTuple("Ts")
25
26
[docs] 27def map_(func: Callable[[T], U]) -> Operation[Observable[T], Observable[U]]: 28 return partial(methods.map_, func=func)
29 30
[docs] 31def if_(func: Callable[[T], bool]) -> Operation[Observable[T], Observable[T]]: 32 return partial(methods.if_, func=func)
33 34
[docs] 35def is_( 36 func: Callable[[Union[T, U]], TypeGuard[U]], 37) -> Operation[Observable[Union[T, U]], Observable[U]]: 38 return partial(methods.is_, func=func)
39 40
[docs] 41def is_not_none() -> Operation[Observable[Union[T, None]], Observable[T]]: 42 return methods.is_not_none
43 44
[docs] 45def as_tuple() -> Operation[Observable[T], Observable[tuple[T]]]: 46 return methods.as_tuple
47 48
[docs] 49def combine( 50 other: Observable[U], 51) -> Operation[Observable[tuple[Unpack[Ts]]], Observable[tuple[Unpack[Ts], U]]]: 52 return partial(methods.combine, other=other)
53 54
[docs] 55def start_with(value: T) -> Operation[Observable[T], Observable[T]]: 56 return partial(methods.start_with, value=value)
57 58
[docs] 59def for_( 60 func: Callable[[T], U], 61) -> Operation[Observable[Iterable[T]], Observable[Iterable[U]]]: 62 return partial(methods.for_, func=func)
63 64
[docs] 65def distinct( 66 equal_fn: Callable[[T, T], bool] = operator.eq, 67) -> Operation[Observable[T], Observable[T]]: 68 return partial(methods.distinct, equal_fn=equal_fn)