From f95f7f0832c7b79dc9c98eee7457c8e623ac3c05 Mon Sep 17 00:00:00 2001 From: ed cuss Date: Sat, 4 Apr 2026 18:06:41 +0100 Subject: [PATCH] refactor: make _Tap take a single fn instead of a sequence --- src/danom/_stream.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/danom/_stream.py b/src/danom/_stream.py index 7a4af15..fac555e 100644 --- a/src/danom/_stream.py +++ b/src/danom/_stream.py @@ -4,7 +4,7 @@ import itertools import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable, Iterable, Sequence +from collections.abc import Awaitable, Callable, Iterable from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from copy import deepcopy from enum import Enum @@ -481,13 +481,10 @@ def _apply_fns_worker[T](args: tuple[tuple[T], tuple[PlannedOps, ...]]) -> tuple @attrs.define(frozen=True, hash=True, eq=True) class _Tap: - fns: Sequence[Callable] + fn: Callable - def __call__(self, initial: T) -> T: - return reduce(self._apply, self.fns, initial) - - def _apply[T](self, value: T, fn: Callable[[T], None]) -> T: - deepcopy(fn(value)) + def __call__(self, value: T) -> T: + deepcopy(self.fn(value)) return value @@ -499,7 +496,7 @@ def _apply_fns[T](elements: tuple[T], ops: tuple[PlannedOps, ...]) -> tuple[T, . elif op == _FILTER: pipeline = filter(fn, pipeline) elif op == _TAP: - pipeline = map(_Tap((fn,)), pipeline) + pipeline = map(_Tap(fn), pipeline) else: raise RuntimeError("Invalid operation selected. Valid options [map, filter, tap]")