diff --git a/CHANGELOG.md b/CHANGELOG.md index ae72fb6b..2d718739 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,36 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.10 (2026-06-05) + +### Fixed + +- **`SqlAlchemyEventStore` optimistic-concurrency hole.** `append()` read the + current version via `latest_version()` on a **separate connection before** the + write transaction, so two concurrent writers could both pass the + `expected_version` check; the loser then violated `UNIQUE(aggregate_id, + sequence)` and surfaced a **raw `IntegrityError`** — never the documented + `ConcurrencyError` — so retry-on-`ConcurrencyError` callers missed the + collision. The version check now runs **inside** the write transaction, and a + `UNIQUE` violation is translated to `ConcurrencyError`. (`InMemoryEventStore` + was already correct — atomic under its lock.) +- **`EventUpcaster` was dead code.** `EventUpcaster`/`NoOpUpcaster` were exported + and documented but **never invoked** by any read path. Both event stores now + accept `upcasters=...` and apply them in `load()` and `stream_all()`, so stored + events are upcast to the current schema on read (default: no upcasters → identity). + +### Changed + +- **`EventHandlerException` is now exported from `pyfly.eventsourcing`** (it was + only reachable via the private `pyfly.eventsourcing.aggregate` submodule, unlike + its sibling `ConcurrencyError`). Users can now `from pyfly.eventsourcing import + EventHandlerException` to catch missing-handler failures. +- **`TransactionalOutbox.dead_letters()`** added — surfaces records that exhausted + `max_attempts` (retained, but excluded from `pending()`) for inspection / manual + retry. + +--- + ## v26.06.09 (2026-06-05) ### Fixed diff --git a/README.md b/README.md index 879b9ebd..0342642f 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.09 + Version: 26.06.10 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/pyproject.toml b/pyproject.toml index 21bcbf26..e31cef95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.9" +version = "26.6.10" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index a8f9f186..850ec771 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.09" +__version__ = "26.06.10" diff --git a/src/pyfly/eventsourcing/__init__.py b/src/pyfly/eventsourcing/__init__.py index 35e8aeb2..9615d7c8 100644 --- a/src/pyfly/eventsourcing/__init__.py +++ b/src/pyfly/eventsourcing/__init__.py @@ -27,7 +27,7 @@ from __future__ import annotations -from pyfly.eventsourcing.aggregate import AggregateRoot +from pyfly.eventsourcing.aggregate import AggregateRoot, EventHandlerException from pyfly.eventsourcing.event import DomainEvent, StoredEventEnvelope, domain_event from pyfly.eventsourcing.outbox import OutboxRecord, TransactionalOutbox from pyfly.eventsourcing.projection import Projection, ProjectionRunner @@ -48,6 +48,7 @@ "AggregateRoot", "ConcurrencyError", "DomainEvent", + "EventHandlerException", "EventStore", "EventUpcaster", "InMemoryEventStore", diff --git a/src/pyfly/eventsourcing/outbox.py b/src/pyfly/eventsourcing/outbox.py index 0ffa45fe..d1fa29a8 100644 --- a/src/pyfly/eventsourcing/outbox.py +++ b/src/pyfly/eventsourcing/outbox.py @@ -90,6 +90,16 @@ async def pending(self) -> list[OutboxRecord]: async with self._lock: return [r for r in self._records.values() if not r.delivered and r.attempts < self._max_attempts] + async def dead_letters(self) -> list[OutboxRecord]: + """Records that exhausted ``max_attempts`` without being delivered. + + Retained for inspection / manual retry — at-least-once delivery holds up + to ``max_attempts``; ``pending()`` deliberately excludes these so the + publish loop stops re-attempting them. + """ + async with self._lock: + return [r for r in self._records.values() if not r.delivered and r.attempts >= self._max_attempts] + async def _loop(self) -> None: while not self._stop.is_set(): try: diff --git a/src/pyfly/eventsourcing/store.py b/src/pyfly/eventsourcing/store.py index 343cef2f..7e84d1b5 100644 --- a/src/pyfly/eventsourcing/store.py +++ b/src/pyfly/eventsourcing/store.py @@ -16,15 +16,29 @@ from __future__ import annotations import asyncio +from collections.abc import Sequence from typing import Any, Protocol, runtime_checkable from pyfly.eventsourcing.event import StoredEventEnvelope +from pyfly.eventsourcing.upcaster import EventUpcaster class ConcurrencyError(Exception): """Optimistic-locking failure: expected version did not match the store's.""" +def _apply_upcasters(envelope: StoredEventEnvelope, upcasters: Sequence[EventUpcaster]) -> StoredEventEnvelope: + """Apply each registered upcaster (in order) that handles this envelope. + + Read paths (``load`` / ``stream_all``) run stored events through the + configured upcasters so consumers always see current-schema events. + """ + for upcaster in upcasters: + if upcaster.applies_to(envelope): + envelope = upcaster.upcast(envelope) + return envelope + + @runtime_checkable class EventStore(Protocol): """Append, load and stream events for aggregates.""" @@ -48,10 +62,11 @@ async def latest_version(self, aggregate_id: str) -> int: ... class InMemoryEventStore: """Default zero-dep adapter: list per aggregate, global event log.""" - def __init__(self) -> None: + def __init__(self, upcasters: Sequence[EventUpcaster] = ()) -> None: self._by_aggregate: dict[str, list[StoredEventEnvelope]] = {} self._all: list[StoredEventEnvelope] = [] self._lock = asyncio.Lock() + self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters) async def append( self, @@ -77,16 +92,19 @@ async def append( async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]: async with self._lock: events = self._by_aggregate.get(aggregate_id, []) - return [e for e in events if e.sequence > after_sequence] + return [_apply_upcasters(e, self._upcasters) for e in events if e.sequence > after_sequence] async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]: async with self._lock: if after_event_id is None: - return list(self._all[:limit]) - for idx, evt in enumerate(self._all): - if evt.event_id == after_event_id: - return list(self._all[idx + 1 : idx + 1 + limit]) - return [] + raw = list(self._all[:limit]) + else: + raw = [] + for idx, evt in enumerate(self._all): + if evt.event_id == after_event_id: + raw = list(self._all[idx + 1 : idx + 1 + limit]) + break + return [_apply_upcasters(e, self._upcasters) for e in raw] async def latest_version(self, aggregate_id: str) -> int: async with self._lock: @@ -116,8 +134,9 @@ class SqlAlchemyEventStore: ) """ - def __init__(self, engine: Any) -> None: + def __init__(self, engine: Any, upcasters: Sequence[EventUpcaster] = ()) -> None: self._engine = engine + self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters) async def initialize(self) -> None: from sqlalchemy import text # type: ignore[import-not-found, unused-ignore] @@ -134,38 +153,54 @@ async def append( expected_version: int, ) -> None: from sqlalchemy import text # type: ignore[import-not-found, unused-ignore] - - latest = await self.latest_version(aggregate_id) - if latest != expected_version: - msg = f"expected version {expected_version}, found {latest}" - raise ConcurrencyError(msg) - async with self._engine.begin() as conn: - for i, evt in enumerate(events, start=1): - evt.aggregate_id = aggregate_id - evt.aggregate_type = aggregate_type - evt.sequence = expected_version + i - await conn.execute( - text( - """ - INSERT INTO pyfly_event_store - (event_id, aggregate_id, aggregate_type, sequence, - event_type, payload, metadata, occurred_at, version, tenant_id) - VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant) - """ - ), - { - "eid": evt.event_id, - "aid": evt.aggregate_id, - "atype": evt.aggregate_type, - "seq": evt.sequence, - "etype": evt.event_type, - "payload": evt.to_json(), - "meta": "{}", - "occurred": evt.occurred_at, - "ver": evt.version, - "tenant": evt.tenant_id, - }, + from sqlalchemy.exc import IntegrityError # type: ignore[import-not-found, unused-ignore] + + try: + async with self._engine.begin() as conn: + # Read the current version INSIDE the write transaction (same + # connection) so the check-then-insert is not a TOCTOU race. + result = await conn.execute( + text("SELECT COALESCE(MAX(sequence), 0) FROM pyfly_event_store WHERE aggregate_id = :aid"), + {"aid": aggregate_id}, ) + latest = int(result.scalar() or 0) + if latest != expected_version: + msg = f"expected version {expected_version}, found {latest}" + raise ConcurrencyError(msg) + for i, evt in enumerate(events, start=1): + evt.aggregate_id = aggregate_id + evt.aggregate_type = aggregate_type + evt.sequence = expected_version + i + await conn.execute( + text( + """ + INSERT INTO pyfly_event_store + (event_id, aggregate_id, aggregate_type, sequence, + event_type, payload, metadata, occurred_at, version, tenant_id) + VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant) + """ + ), + { + "eid": evt.event_id, + "aid": evt.aggregate_id, + "atype": evt.aggregate_type, + "seq": evt.sequence, + "etype": evt.event_type, + "payload": evt.to_json(), + "meta": "{}", + "occurred": evt.occurred_at, + "ver": evt.version, + "tenant": evt.tenant_id, + }, + ) + except IntegrityError as exc: + # A concurrent writer committed the same (aggregate_id, sequence) + # between our in-transaction check and insert; the UNIQUE constraint + # is the backstop. Surface as the documented optimistic-lock failure + # so retry-on-ConcurrencyError callers see it. + raise ConcurrencyError( + f"concurrent append for aggregate {aggregate_id!r} at version {expected_version}" + ) from exc async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]: from sqlalchemy import text # type: ignore[import-not-found, unused-ignore] @@ -181,7 +216,7 @@ async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[Stor {"aid": aggregate_id, "after": after_sequence}, ) ).fetchall() - return [StoredEventEnvelope.from_json(r[0]) for r in rows] + return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows] async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]: from sqlalchemy import text # type: ignore[import-not-found, unused-ignore] @@ -207,7 +242,7 @@ async def stream_all(self, *, after_event_id: str | None = None, limit: int = 10 {"eid": after_event_id, "limit": limit}, ) ).fetchall() - return [StoredEventEnvelope.from_json(r[0]) for r in rows] + return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows] async def latest_version(self, aggregate_id: str) -> int: from sqlalchemy import text # type: ignore[import-not-found, unused-ignore] diff --git a/tests/eventsourcing/test_eventsourcing_fixes.py b/tests/eventsourcing/test_eventsourcing_fixes.py new file mode 100644 index 00000000..262a2c0e --- /dev/null +++ b/tests/eventsourcing/test_eventsourcing_fixes.py @@ -0,0 +1,127 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Regression tests for event-sourcing fixes (v26.06.10). + +- EventHandlerException is exported from the package (API symmetry w/ ConcurrencyError). +- Upcasters are applied on read (load + stream_all) — previously dead code. +- TransactionalOutbox exposes dead-lettered (exhausted) records. +- SqlAlchemyEventStore append translates a concurrent UNIQUE collision into + ConcurrencyError instead of leaking a raw IntegrityError (TOCTOU fix). +""" + +from __future__ import annotations + +import asyncio +import dataclasses +from pathlib import Path + +import pytest + +from pyfly.eventsourcing.event import StoredEventEnvelope +from pyfly.eventsourcing.outbox import TransactionalOutbox +from pyfly.eventsourcing.store import ConcurrencyError, InMemoryEventStore, SqlAlchemyEventStore + + +def _env(aggregate_id: str, event_type: str, **payload: object) -> StoredEventEnvelope: + return StoredEventEnvelope( + aggregate_id=aggregate_id, aggregate_type="Account", event_type=event_type, payload=dict(payload) + ) + + +def test_event_handler_exception_is_exported_from_package() -> None: + # Regression: EventHandlerException was defined in the submodule but, unlike + # its sibling ConcurrencyError, never re-exported from pyfly.eventsourcing. + from pyfly.eventsourcing import EventHandlerException + from pyfly.eventsourcing.aggregate import EventHandlerException as Submodule + + assert EventHandlerException is Submodule + + +class _RenameUpcaster: + """Upcasts the legacy event name to the current one (returns a copy).""" + + def applies_to(self, envelope: StoredEventEnvelope) -> bool: + return envelope.event_type == "legacy.opened" + + def upcast(self, envelope: StoredEventEnvelope) -> StoredEventEnvelope: + return dataclasses.replace(envelope, event_type="account.opened", payload={**envelope.payload, "upcast": True}) + + +class TestUpcastersAppliedOnRead: + @pytest.mark.asyncio + async def test_load_and_stream_apply_upcasters(self) -> None: + store = InMemoryEventStore(upcasters=[_RenameUpcaster()]) + await store.append("acc-1", "Account", [_env("acc-1", "legacy.opened", owner="Ada")], expected_version=0) + + loaded = await store.load("acc-1") + assert [e.event_type for e in loaded] == ["account.opened"] + assert loaded[0].payload["upcast"] is True + + streamed = await store.stream_all() + assert [e.event_type for e in streamed] == ["account.opened"] + + @pytest.mark.asyncio + async def test_no_upcasters_is_identity(self) -> None: + store = InMemoryEventStore() + await store.append("acc-1", "Account", [_env("acc-1", "legacy.opened")], expected_version=0) + assert (await store.load("acc-1"))[0].event_type == "legacy.opened" + + +class TestOutboxDeadLetters: + @pytest.mark.asyncio + async def test_exhausted_records_are_surfaced(self) -> None: + async def always_fail(_env: StoredEventEnvelope) -> None: + raise RuntimeError("upstream down") + + outbox = TransactionalOutbox(publish=always_fail, max_attempts=2, poll_interval_s=0.02) + record = await outbox.enqueue(_env("acc-1", "account.opened")) + await outbox.start() + for _ in range(100): + await asyncio.sleep(0.02) + if record.attempts >= 2: + break + await outbox.stop() + + assert record.attempts >= 2 + assert record.delivered is False + assert await outbox.pending() == [] # excluded from the publish loop + assert record in await outbox.dead_letters() # but surfaced for inspection + + +class TestSqlAlchemyConcurrency: + @pytest.mark.asyncio + async def test_concurrent_append_raises_concurrency_error_not_raw_db_error(self, tmp_path: Path) -> None: + pytest.importorskip("sqlalchemy") + pytest.importorskip("aiosqlite") + from sqlalchemy.ext.asyncio import create_async_engine + + engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path / 'es.db'}", connect_args={"timeout": 30}) + try: + store = SqlAlchemyEventStore(engine) + await store.initialize() + await store.append("acc-1", "Account", [_env("acc-1", "account.opened")], expected_version=0) + + async def writer(event_type: str) -> None: + await store.append("acc-1", "Account", [_env("acc-1", event_type)], expected_version=1) + + results = await asyncio.gather(writer("a.deposited"), writer("b.deposited"), return_exceptions=True) + errors = [r for r in results if isinstance(r, BaseException)] + + # exactly one writer wins; the loser sees ConcurrencyError, NOT a raw + # IntegrityError / OperationalError leaking the DB constraint. + assert len(errors) == 1, results + assert isinstance(errors[0], ConcurrencyError), f"got {type(errors[0]).__name__}: {errors[0]}" + assert await store.latest_version("acc-1") == 2 + finally: + await engine.dispose() diff --git a/uv.lock b/uv.lock index 3359e0de..3e88ca2a 100644 --- a/uv.lock +++ b/uv.lock @@ -1967,7 +1967,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.9" +version = "26.6.10" source = { editable = "." } dependencies = [ { name = "pydantic" },