Source code for stdlibx.streams.fn.base
1from __future__ import annotations
2
3import operator
4from functools import partial
5from typing import (
6 TYPE_CHECKING,
7 TypeVar,
8 Union,
9)
10
11from stdlibx.streams import methods
12from typing_extensions import TypeGuard, TypeVarTuple, Unpack
13
14if TYPE_CHECKING:
15 from collections.abc import Callable, Iterable
16
17 from stdlibx.streams._types import (
18 Observable,
19 Operation,
20 )
21
22T = TypeVar("T")
23U = TypeVar("U")
24Ts = TypeVarTuple("Ts")
25
26
[docs]
27def map_(func: Callable[[T], U]) -> Operation[Observable[T], Observable[U]]:
28 return partial(methods.map_, func=func)
29
30
[docs]
31def if_(func: Callable[[T], bool]) -> Operation[Observable[T], Observable[T]]:
32 return partial(methods.if_, func=func)
33
34
[docs]
35def is_(
36 func: Callable[[Union[T, U]], TypeGuard[U]],
37) -> Operation[Observable[Union[T, U]], Observable[U]]:
38 return partial(methods.is_, func=func)
39
40
[docs]
41def is_not_none() -> Operation[Observable[Union[T, None]], Observable[T]]:
42 return methods.is_not_none
43
44
[docs]
45def as_tuple() -> Operation[Observable[T], Observable[tuple[T]]]:
46 return methods.as_tuple
47
48
[docs]
49def combine(
50 other: Observable[U],
51) -> Operation[Observable[tuple[Unpack[Ts]]], Observable[tuple[Unpack[Ts], U]]]:
52 return partial(methods.combine, other=other)
53
54
[docs]
55def start_with(value: T) -> Operation[Observable[T], Observable[T]]:
56 return partial(methods.start_with, value=value)
57
58
[docs]
59def for_(
60 func: Callable[[T], U],
61) -> Operation[Observable[Iterable[T]], Observable[Iterable[U]]]:
62 return partial(methods.for_, func=func)
63
64
[docs]
65def distinct(
66 equal_fn: Callable[[T, T], bool] = operator.eq,
67) -> Operation[Observable[T], Observable[T]]:
68 return partial(methods.distinct, equal_fn=equal_fn)