sync/async iterable streams for Python
stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress observation, and error handling.
pip install streamable
(no dependencies)
from streamable import streamCreate a stream[T] from an Iterable[T] or AsyncIterable[T]:
ints: stream[int] = stream(range(10))Chain lazy operations (accepting sync and async functions):
import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream
pokemons: stream[str] = (
stream(range(10))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(5, per=timedelta(seconds=1))
.map(httpx.get, concurrency=2)
.do(Response.raise_for_status)
.catch(HTTPStatusError, do=logging.warning)
.map(lambda poke: poke.json()["name"])
)Source elements will be processed on-the-fly during iteration.
A stream[T] is Iterable[T] and AsyncIterable[T]:
>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']
>>> [pokemon async for pokemon in pokemons]
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']📖 operations (docs)
- mapping
- (un)grouping
- filtering
- control
A stream exposes operations to manipulate its elements, but its consumption and the I/O are not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, dlt (ETL example) ...
A stream can be iterated several times if its source allows it.
Operations return a new stream.
Operations allow iteration to resume after an exception.
Operations accept both sync and async functions.
A sync iteration over a stream involving async functions runs an async iteration under the hood, using the current event loop (or setting a new one if needed).
Operations adapt their behavior to the type of iteration.
see examples
-
.throttlesleeps viatime.sleepduring sync iteration and viaasyncio.sleepduring async iteration. -
.buffer's background task is athreading.Threadduring sync iteration and anasyncio.Taskduring async iteration. -
concurrent
.mapof a sync function usesloop.run_in_executorduring async iteration to not block the event loop. -
...
Transform elements:
int_chars: stream[str] = stream(range(10)).map(str)
assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']Set the concurrency param to apply the transformation concurrently.
Only concurrency upstream elements are in-flight for processing.
Preserve upstream order unless you set as_completed=True.
If concurrency > 1, the transformation will be applied via concurrency threads:
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(httpx.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:
# async context
async with httpx.AsyncClient() as http_client:
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']# sync context
with asyncio.Runner() as runner:
http_client = httpx.AsyncClient()
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
# uses runner's loop
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
runner.run(http_client.aclose())concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=10) as processes:
state: list[int] = []
# ints are mapped
assert list(
stream(range(10))
.map(state.append, concurrency=processes)
) == [None] * 10
# the `state` of the main process is not mutated
assert state == []Perform side effects:
state: list[int] = []
storing_ints: stream[int] = stream(range(10)).do(state.append)
assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Same as .map.
Group elements into batches...
... up_to a given batch size:
int_batches: stream[list[int]] = stream(range(10)).group(5)
assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]... within a given time interval:
from datetime import timedelta
int_1s_batches: stream[list[int]] = (
stream(range(10))
.throttle(2, per=timedelta(seconds=1))
.group(within=timedelta(seconds=0.99))
)
assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]... by a given key, yielding (key, elements) pairs:
ints_by_parity: stream[tuple[str, list[int]]] = (
stream(range(10))
.group(by=lambda n: "odd" if n % 2 else "even")
)
assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]You can combine these parameters.
Explode upstream elements (Iterable or AsyncIterable):
chars: stream[str] = stream(["hel", "lo!"]).flatten()
assert list(chars) == ["h", "e", "l", "l", "o", "!"]Explode concurrency iterables at a time:
chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)
assert list(chars) == ["h", "l", "e", "o", "l", "!"]Filter elements satisfying a predicate:
even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)
assert list(even_ints) == [0, 2, 4, 6, 8]Take a given number of elements:
first_5_ints: stream[int] = stream(range(10)).take(5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]... or take until a predicate is satisfied:
first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]Skip a given number of elements:
ints_after_5: stream[int] = stream(range(10)).skip(5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]... or skip until a predicate is satisfied:
ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]Catch exceptions of a given type:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]... where a predicate is satisfied:
from http import HTTPStatus
urls = [
"https://github.com/ebonnal",
"https://github.com/ebonnal/streamable",
"https://github.com/ebonnal/foo",
]
responses: stream[httpx.Response] = (
stream(urls)
.map(httpx.get)
.do(httpx.Response.raise_for_status)
.catch(
httpx.HTTPStatusError,
where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND,
)
)
assert len(list(responses)) == 2... do a side effect on catch:
errors: list[Exception] = []
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1... replace with a value:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replace=lambda e: float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]... stop=True to stop the iteration if an exception is caught:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, stop=True)
)
assert list(inverses) == []You can combine these parameters.
Limit the number of emissions per time interval (sliding window):
from datetime import timedelta
throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))
# takes 3 seconds
assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):
pulled: list[int] = []
buffered_ints = iter(
stream(range(10))
.do(pulled.append)
.buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]Observe the iteration progress:
observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]logs:
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10
Logs are produced when the counts reach powers of 2. Set every to produce them periodically:
# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:
observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)Concatenate a stream with another iterable:
concatenated_ints: stream[int] = stream(range(5)) + range(5, 10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Provide a type hint for elements:
docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docsIterate as an Iterable until exhaustion, without collecting its elements:
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
pipeline()
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Iterate as an AsyncIterable until exhaustion, without collecting its elements:
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
await pipeline
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:
import polars as pl
pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")A stream can also be instantiated from a function (sync or async) that will be called sequentially to get the next source element during iteration.
e.g. stream from a Queue:
queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.
from streamable import star
pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
stream(enumerate(pokemons))
.map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']To collect distinct elements you can set(a_stream).
To deduplicate in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):
seen: set[str] = set()
unique_ints: stream[int] = (
stream("001000111")
.filter(lambda _: _ not in seen)
.do(seen.add)
.map(int)
)
assert list(unique_ints) == [0, 1]There is zero overhead during iteration compared to builtins.map and builtins.filter:
odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)iter(odd_int_chars) visits the operation lineage and returns exactly:
map(str, filter(lambda n: n % 2, range(N)))Throughput of each operation compared to builtins.map:
| operation | x times slower than map(lambda _: _, range(N)) |
|---|---|
stream(range(N)).map(lambda _: _) |
1.00x (same throughput) |
stream(range(N)).filter(lambda _: _) |
1.05x |
stream(range(100) for _ in range(N // 100)).flatten() |
1.7x |
stream(range(N)).do(lambda _: _) |
1.8x |
stream(range(N)).skip(N) |
1.9x |
stream(range(N)).catch(ValueError) |
2.0x |
stream(range(N)).group(5) |
2.4x |
stream(range(N)).take(N) |
2.9x |
stream(range(N)).observe('ints', do=bool) |
4.1x |
stream(range(N)).observe('ints', do=bool, every=N) |
4.5x |
stream(range(N)).observe('ints', do=bool, every=timedelta(...)) |
5.5x |
stream(range(N)).throttle(N, per=timedelta(...)) |
5.8x |
stream(range(N)).group(by=bool, up_to=5) |
7.1x |
stream(range(N)).buffer(N) |
50x |
stream(range(N)).map(lambda _: _, concurrency=2) |
310x |
stream(range(N)).map(lambda _: _, concurrency=2, as_completed=True) |
335x |
stream(range(N)).group(within=timedelta(...)) |
340x |
stream(range(100) for _ in range(N // 100)).flatten(concurrency=2) |
370x |
(source: pytest -s tests/benchmark.py)
e.g. ETL via dlt
A stream is an expressive way to declare a dlt.resource:
# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream
def not_found(e: HTTPStatusError) -> bool:
return e.response.status_code == HTTPStatus.NOT_FOUND
@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
"""Ingest Pokémons from the PokéAPI, stop on first 404."""
return (
stream(count(1))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(per_second, per=timedelta(seconds=1))
.map(http_client.get, concurrency=concurrency, as_completed=True)
.do(Response.raise_for_status)
.catch(HTTPStatusError, where=not_found, stop=True)
.map(Response.json)
.observe("pokemons")
)
# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
dlt.pipeline(
pipeline_name="ingest_pokeapi",
destination=filesystem("deltalake"),
dataset_name="pokeapi",
).run(
pokemons(http_client, concurrency=8, per_second=32),
table_format="delta",
write_disposition='merge',
columns={
"id": {"primary_key": True},
"color__name": {"partition": True},
},
)