Skip to content

ebonnal/streamable

Repository files navigation

streamable

sync/async iterable streams for Python

stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress observation, and error handling.

Python 3.8+ PyPI version Anaconda-Server Badge coverage readthedocs

1. install

pip install streamable

(no dependencies)

2. import

from streamable import stream

3. init

Create a stream[T] from an Iterable[T] or AsyncIterable[T]:

ints: stream[int] = stream(range(10))

4. operate

Chain lazy operations (accepting sync and async functions):

import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream

pokemons: stream[str] = (
    stream(range(10))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .throttle(5, per=timedelta(seconds=1))
    .map(httpx.get, concurrency=2)
    .do(Response.raise_for_status)
    .catch(HTTPStatusError, do=logging.warning)
    .map(lambda poke: poke.json()["name"])
)

Source elements will be processed on-the-fly during iteration.

5. iterate

A stream[T] is Iterable[T] and AsyncIterable[T]:

>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']
>>> [pokemon async for pokemon in pokemons]
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']

📖 operations (docs)

A stream exposes operations to manipulate its elements, but its consumption and the I/O are not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, dlt (ETL example) ...

A stream can be iterated several times if its source allows it.

Operations return a new stream.

Operations allow iteration to resume after an exception.

Operations accept both sync and async functions.

A sync iteration over a stream involving async functions runs an async iteration under the hood, using the current event loop (or setting a new one if needed).

Operations adapt their behavior to the type of iteration.

see examples
  • .throttle sleeps via time.sleep during sync iteration and via asyncio.sleep during async iteration.

  • .buffer's background task is a threading.Thread during sync iteration and an asyncio.Task during async iteration.

  • concurrent .map of a sync function uses loop.run_in_executor during async iteration to not block the event loop.

  • ...

.map

Transform elements:

int_chars: stream[str] = stream(range(10)).map(str)

assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

concurrency

Set the concurrency param to apply the transformation concurrently.

Only concurrency upstream elements are in-flight for processing.

Preserve upstream order unless you set as_completed=True.

via threads

If concurrency > 1, the transformation will be applied via concurrency threads:

pokemons: stream[str] = (
    stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(httpx.get, concurrency=2)
    .map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']

via async

If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:

# async context
async with httpx.AsyncClient() as http_client:
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# sync context
with asyncio.Runner() as runner:
    http_client = httpx.AsyncClient()
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses runner's loop
    assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
    runner.run(http_client.aclose())

via processes

concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=10) as processes:
        state: list[int] = []
        # ints are mapped
        assert list(
            stream(range(10))
            .map(state.append, concurrency=processes)
        ) == [None] * 10
        # the `state` of the main process is not mutated
        assert state == []

.do

Perform side effects:

state: list[int] = []
storing_ints: stream[int] = stream(range(10)).do(state.append)

assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Same as .map.

.group

Group elements into batches...

... up_to a given batch size:

int_batches: stream[list[int]] = stream(range(10)).group(5)

assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... within a given time interval:

from datetime import timedelta

int_1s_batches: stream[list[int]] = (
    stream(range(10))
    .throttle(2, per=timedelta(seconds=1))
    .group(within=timedelta(seconds=0.99))
)

assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

... by a given key, yielding (key, elements) pairs:

ints_by_parity: stream[tuple[str, list[int]]] = (
    stream(range(10))
    .group(by=lambda n: "odd" if n % 2 else "even")
)

assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

You can combine these parameters.

.flatten

Explode upstream elements (Iterable or AsyncIterable):

chars: stream[str] = stream(["hel", "lo!"]).flatten()

assert list(chars) == ["h", "e", "l", "l", "o", "!"]

concurrency

Explode concurrency iterables at a time:

chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)

assert list(chars) == ["h", "l", "e", "o", "l", "!"]

.filter

Filter elements satisfying a predicate:

even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)

assert list(even_ints) == [0, 2, 4, 6, 8]

.take

Take a given number of elements:

first_5_ints: stream[int] = stream(range(10)).take(5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

... or take until a predicate is satisfied:

first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

.skip

Skip a given number of elements:

ints_after_5: stream[int] = stream(range(10)).skip(5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

... or skip until a predicate is satisfied:

ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

.catch

Catch exceptions of a given type:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... where a predicate is satisfied:

from http import HTTPStatus

urls = [
    "https://github.com/ebonnal",
    "https://github.com/ebonnal/streamable",
    "https://github.com/ebonnal/foo",
]
responses: stream[httpx.Response] = (
    stream(urls)
    .map(httpx.get)
    .do(httpx.Response.raise_for_status)
    .catch(
        httpx.HTTPStatusError,
        where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND,
    )
)
assert len(list(responses)) == 2

... do a side effect on catch:

errors: list[Exception] = []
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1

... replace with a value:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replace=lambda e: float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... stop=True to stop the iteration if an exception is caught:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, stop=True)
)

assert list(inverses) == []

You can combine these parameters.

.throttle

Limit the number of emissions per time interval (sliding window):

from datetime import timedelta

throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))

# takes 3 seconds
assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.buffer

Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):

pulled: list[int] = []
buffered_ints = iter(
    stream(range(10))
    .do(pulled.append)
    .buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]

.observe

Observe the iteration progress:

observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

logs:

2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10

Logs are produced when the counts reach powers of 2. Set every to produce them periodically:

# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))

Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:

observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)

+

Concatenate a stream with another iterable:

concatenated_ints: stream[int] = stream(range(5)) + range(5, 10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.cast

Provide a type hint for elements:

docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs

.__call__

Iterate as an Iterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

pipeline()

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

await

Iterate as an AsyncIterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

await pipeline

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.pipe

Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:

import polars as pl

pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")

••• other notes

function as source

A stream can also be instantiated from a function (sync or async) that will be called sequentially to get the next source element during iteration.

e.g. stream from a Queue:

queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)

starmap

The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.

from streamable import star

pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
    stream(enumerate(pokemons))
    .map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']

distinct

To collect distinct elements you can set(a_stream).

To deduplicate in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):

seen: set[str] = set()

unique_ints: stream[int] = (
    stream("001000111")
    .filter(lambda _: _ not in seen)
    .do(seen.add)
    .map(int)
)

assert list(unique_ints) == [0, 1]

performance

There is zero overhead during iteration compared to builtins.map and builtins.filter:

odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)

iter(odd_int_chars) visits the operation lineage and returns exactly:

map(str, filter(lambda n: n % 2, range(N)))

Throughput of each operation compared to builtins.map:

operation x times slower than map(lambda _: _, range(N))
stream(range(N)).map(lambda _: _) 1.00x (same throughput)
stream(range(N)).filter(lambda _: _) 1.05x
stream(range(100) for _ in range(N // 100)).flatten() 1.7x
stream(range(N)).do(lambda _: _) 1.8x
stream(range(N)).skip(N) 1.9x
stream(range(N)).catch(ValueError) 2.0x
stream(range(N)).group(5) 2.4x
stream(range(N)).take(N) 2.9x
stream(range(N)).observe('ints', do=bool) 4.1x
stream(range(N)).observe('ints', do=bool, every=N) 4.5x
stream(range(N)).observe('ints', do=bool, every=timedelta(...)) 5.5x
stream(range(N)).throttle(N, per=timedelta(...)) 5.8x
stream(range(N)).group(by=bool, up_to=5) 7.1x
stream(range(N)).buffer(N) 50x
stream(range(N)).map(lambda _: _, concurrency=2) 310x
stream(range(N)).map(lambda _: _, concurrency=2, as_completed=True) 335x
stream(range(N)).group(within=timedelta(...)) 340x
stream(range(100) for _ in range(N // 100)).flatten(concurrency=2) 370x

(source: pytest -s tests/benchmark.py)

e.g. ETL via dlt

A stream is an expressive way to declare a dlt.resource:

# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream

def not_found(e: HTTPStatusError) -> bool:
    return e.response.status_code == HTTPStatus.NOT_FOUND

@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
    """Ingest Pokémons from the PokéAPI, stop on first 404."""
    return (
        stream(count(1))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .throttle(per_second, per=timedelta(seconds=1))
        .map(http_client.get, concurrency=concurrency, as_completed=True)
        .do(Response.raise_for_status)
        .catch(HTTPStatusError, where=not_found, stop=True)
        .map(Response.json)
        .observe("pokemons")
    )

# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
    dlt.pipeline(
        pipeline_name="ingest_pokeapi",
        destination=filesystem("deltalake"),
        dataset_name="pokeapi",
    ).run(
        pokemons(http_client, concurrency=8, per_second=32),
        table_format="delta",
        write_disposition='merge',
        columns={
            "id": {"primary_key": True},
            "color__name": {"partition": True},
        },
    )

⭐ links