diff --git a/src/danom/_stream.py b/src/danom/_stream.py index 7a4af15..fac555e 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -4,7 +4,7 @@ import itertools import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable, Iterable, Sequence +from collections.abc import Awaitable, Callable, Iterable from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from copy import deepcopy from enum import Enum @@ -481,13 +481,10 @@ def _apply_fns_worker[T](args: tuple[tuple[T], tuple[PlannedOps, ...]]) -> tuple @attrs.define(frozen=True, hash=True, eq=True) class _Tap: - fns: Sequence[Callable] + fn: Callable - def __call__(self, initial: T) -> T: - return reduce(self._apply, self.fns, initial) - - def _apply[T](self, value: T, fn: Callable[[T], None]) -> T: - deepcopy(fn(value)) + def __call__(self, value: T) -> T: + deepcopy(self.fn(value)) return value @@ -499,7 +496,7 @@ def _apply_fns[T](elements: tuple[T], ops: tuple[PlannedOps, ...]) -> tuple[T, . elif op == _FILTER: pipeline = filter(fn, pipeline) elif op == _TAP: - pipeline = map(_Tap((fn,)), pipeline) + pipeline = map(_Tap(fn), pipeline) else: raise RuntimeError("Invalid operation selected. Valid options [map, filter, tap]")