diff --git a/CHANGELOG.md b/CHANGELOG.md
index ae72fb6b..2d718739 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,36 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
---
+## v26.06.10 (2026-06-05)
+
+### Fixed
+
+- **`SqlAlchemyEventStore` optimistic-concurrency hole.** `append()` read the
+ current version via `latest_version()` on a **separate connection before** the
+ write transaction, so two concurrent writers could both pass the
+ `expected_version` check; the loser then violated `UNIQUE(aggregate_id,
+ sequence)` and surfaced a **raw `IntegrityError`** — never the documented
+ `ConcurrencyError` — so retry-on-`ConcurrencyError` callers missed the
+ collision. The version check now runs **inside** the write transaction, and a
+ `UNIQUE` violation is translated to `ConcurrencyError`. (`InMemoryEventStore`
+ was already correct — atomic under its lock.)
+- **`EventUpcaster` was dead code.** `EventUpcaster`/`NoOpUpcaster` were exported
+ and documented but **never invoked** by any read path. Both event stores now
+ accept `upcasters=...` and apply them in `load()` and `stream_all()`, so stored
+ events are upcast to the current schema on read (default: no upcasters → identity).
+
+### Changed
+
+- **`EventHandlerException` is now exported from `pyfly.eventsourcing`** (it was
+ only reachable via the private `pyfly.eventsourcing.aggregate` submodule, unlike
+ its sibling `ConcurrencyError`). Users can now `from pyfly.eventsourcing import
+ EventHandlerException` to catch missing-handler failures.
+- **`TransactionalOutbox.dead_letters()`** added — surfaces records that exhausted
+ `max_attempts` (retained, but excluded from `pending()`) for inspection / manual
+ retry.
+
+---
+
## v26.06.09 (2026-06-05)
### Fixed
diff --git a/README.md b/README.md
index 879b9ebd..0342642f 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
-
+
diff --git a/pyproject.toml b/pyproject.toml
index 21bcbf26..e31cef95 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
-version = "26.6.9"
+version = "26.6.10"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py
index a8f9f186..850ec771 100644
--- a/src/pyfly/__init__.py
+++ b/src/pyfly/__init__.py
@@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""
-__version__ = "26.06.09"
+__version__ = "26.06.10"
diff --git a/src/pyfly/eventsourcing/__init__.py b/src/pyfly/eventsourcing/__init__.py
index 35e8aeb2..9615d7c8 100644
--- a/src/pyfly/eventsourcing/__init__.py
+++ b/src/pyfly/eventsourcing/__init__.py
@@ -27,7 +27,7 @@
from __future__ import annotations
-from pyfly.eventsourcing.aggregate import AggregateRoot
+from pyfly.eventsourcing.aggregate import AggregateRoot, EventHandlerException
from pyfly.eventsourcing.event import DomainEvent, StoredEventEnvelope, domain_event
from pyfly.eventsourcing.outbox import OutboxRecord, TransactionalOutbox
from pyfly.eventsourcing.projection import Projection, ProjectionRunner
@@ -48,6 +48,7 @@
"AggregateRoot",
"ConcurrencyError",
"DomainEvent",
+ "EventHandlerException",
"EventStore",
"EventUpcaster",
"InMemoryEventStore",
diff --git a/src/pyfly/eventsourcing/outbox.py b/src/pyfly/eventsourcing/outbox.py
index 0ffa45fe..d1fa29a8 100644
--- a/src/pyfly/eventsourcing/outbox.py
+++ b/src/pyfly/eventsourcing/outbox.py
@@ -90,6 +90,16 @@ async def pending(self) -> list[OutboxRecord]:
async with self._lock:
return [r for r in self._records.values() if not r.delivered and r.attempts < self._max_attempts]
+ async def dead_letters(self) -> list[OutboxRecord]:
+ """Records that exhausted ``max_attempts`` without being delivered.
+
+ Retained for inspection / manual retry — at-least-once delivery holds up
+ to ``max_attempts``; ``pending()`` deliberately excludes these so the
+ publish loop stops re-attempting them.
+ """
+ async with self._lock:
+ return [r for r in self._records.values() if not r.delivered and r.attempts >= self._max_attempts]
+
async def _loop(self) -> None:
while not self._stop.is_set():
try:
diff --git a/src/pyfly/eventsourcing/store.py b/src/pyfly/eventsourcing/store.py
index 343cef2f..7e84d1b5 100644
--- a/src/pyfly/eventsourcing/store.py
+++ b/src/pyfly/eventsourcing/store.py
@@ -16,15 +16,29 @@
from __future__ import annotations
import asyncio
+from collections.abc import Sequence
from typing import Any, Protocol, runtime_checkable
from pyfly.eventsourcing.event import StoredEventEnvelope
+from pyfly.eventsourcing.upcaster import EventUpcaster
class ConcurrencyError(Exception):
"""Optimistic-locking failure: expected version did not match the store's."""
+def _apply_upcasters(envelope: StoredEventEnvelope, upcasters: Sequence[EventUpcaster]) -> StoredEventEnvelope:
+ """Apply each registered upcaster (in order) that handles this envelope.
+
+ Read paths (``load`` / ``stream_all``) run stored events through the
+ configured upcasters so consumers always see current-schema events.
+ """
+ for upcaster in upcasters:
+ if upcaster.applies_to(envelope):
+ envelope = upcaster.upcast(envelope)
+ return envelope
+
+
@runtime_checkable
class EventStore(Protocol):
"""Append, load and stream events for aggregates."""
@@ -48,10 +62,11 @@ async def latest_version(self, aggregate_id: str) -> int: ...
class InMemoryEventStore:
"""Default zero-dep adapter: list per aggregate, global event log."""
- def __init__(self) -> None:
+ def __init__(self, upcasters: Sequence[EventUpcaster] = ()) -> None:
self._by_aggregate: dict[str, list[StoredEventEnvelope]] = {}
self._all: list[StoredEventEnvelope] = []
self._lock = asyncio.Lock()
+ self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters)
async def append(
self,
@@ -77,16 +92,19 @@ async def append(
async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]:
async with self._lock:
events = self._by_aggregate.get(aggregate_id, [])
- return [e for e in events if e.sequence > after_sequence]
+ return [_apply_upcasters(e, self._upcasters) for e in events if e.sequence > after_sequence]
async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]:
async with self._lock:
if after_event_id is None:
- return list(self._all[:limit])
- for idx, evt in enumerate(self._all):
- if evt.event_id == after_event_id:
- return list(self._all[idx + 1 : idx + 1 + limit])
- return []
+ raw = list(self._all[:limit])
+ else:
+ raw = []
+ for idx, evt in enumerate(self._all):
+ if evt.event_id == after_event_id:
+ raw = list(self._all[idx + 1 : idx + 1 + limit])
+ break
+ return [_apply_upcasters(e, self._upcasters) for e in raw]
async def latest_version(self, aggregate_id: str) -> int:
async with self._lock:
@@ -116,8 +134,9 @@ class SqlAlchemyEventStore:
)
"""
- def __init__(self, engine: Any) -> None:
+ def __init__(self, engine: Any, upcasters: Sequence[EventUpcaster] = ()) -> None:
self._engine = engine
+ self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters)
async def initialize(self) -> None:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
@@ -134,38 +153,54 @@ async def append(
expected_version: int,
) -> None:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
-
- latest = await self.latest_version(aggregate_id)
- if latest != expected_version:
- msg = f"expected version {expected_version}, found {latest}"
- raise ConcurrencyError(msg)
- async with self._engine.begin() as conn:
- for i, evt in enumerate(events, start=1):
- evt.aggregate_id = aggregate_id
- evt.aggregate_type = aggregate_type
- evt.sequence = expected_version + i
- await conn.execute(
- text(
- """
- INSERT INTO pyfly_event_store
- (event_id, aggregate_id, aggregate_type, sequence,
- event_type, payload, metadata, occurred_at, version, tenant_id)
- VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant)
- """
- ),
- {
- "eid": evt.event_id,
- "aid": evt.aggregate_id,
- "atype": evt.aggregate_type,
- "seq": evt.sequence,
- "etype": evt.event_type,
- "payload": evt.to_json(),
- "meta": "{}",
- "occurred": evt.occurred_at,
- "ver": evt.version,
- "tenant": evt.tenant_id,
- },
+ from sqlalchemy.exc import IntegrityError # type: ignore[import-not-found, unused-ignore]
+
+ try:
+ async with self._engine.begin() as conn:
+ # Read the current version INSIDE the write transaction (same
+ # connection) so the check-then-insert is not a TOCTOU race.
+ result = await conn.execute(
+ text("SELECT COALESCE(MAX(sequence), 0) FROM pyfly_event_store WHERE aggregate_id = :aid"),
+ {"aid": aggregate_id},
)
+ latest = int(result.scalar() or 0)
+ if latest != expected_version:
+ msg = f"expected version {expected_version}, found {latest}"
+ raise ConcurrencyError(msg)
+ for i, evt in enumerate(events, start=1):
+ evt.aggregate_id = aggregate_id
+ evt.aggregate_type = aggregate_type
+ evt.sequence = expected_version + i
+ await conn.execute(
+ text(
+ """
+ INSERT INTO pyfly_event_store
+ (event_id, aggregate_id, aggregate_type, sequence,
+ event_type, payload, metadata, occurred_at, version, tenant_id)
+ VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant)
+ """
+ ),
+ {
+ "eid": evt.event_id,
+ "aid": evt.aggregate_id,
+ "atype": evt.aggregate_type,
+ "seq": evt.sequence,
+ "etype": evt.event_type,
+ "payload": evt.to_json(),
+ "meta": "{}",
+ "occurred": evt.occurred_at,
+ "ver": evt.version,
+ "tenant": evt.tenant_id,
+ },
+ )
+ except IntegrityError as exc:
+ # A concurrent writer committed the same (aggregate_id, sequence)
+ # between our in-transaction check and insert; the UNIQUE constraint
+ # is the backstop. Surface as the documented optimistic-lock failure
+ # so retry-on-ConcurrencyError callers see it.
+ raise ConcurrencyError(
+ f"concurrent append for aggregate {aggregate_id!r} at version {expected_version}"
+ ) from exc
async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
@@ -181,7 +216,7 @@ async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[Stor
{"aid": aggregate_id, "after": after_sequence},
)
).fetchall()
- return [StoredEventEnvelope.from_json(r[0]) for r in rows]
+ return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows]
async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
@@ -207,7 +242,7 @@ async def stream_all(self, *, after_event_id: str | None = None, limit: int = 10
{"eid": after_event_id, "limit": limit},
)
).fetchall()
- return [StoredEventEnvelope.from_json(r[0]) for r in rows]
+ return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows]
async def latest_version(self, aggregate_id: str) -> int:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
diff --git a/tests/eventsourcing/test_eventsourcing_fixes.py b/tests/eventsourcing/test_eventsourcing_fixes.py
new file mode 100644
index 00000000..262a2c0e
--- /dev/null
+++ b/tests/eventsourcing/test_eventsourcing_fixes.py
@@ -0,0 +1,127 @@
+# Copyright 2026 Firefly Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Regression tests for event-sourcing fixes (v26.06.10).
+
+- EventHandlerException is exported from the package (API symmetry w/ ConcurrencyError).
+- Upcasters are applied on read (load + stream_all) — previously dead code.
+- TransactionalOutbox exposes dead-lettered (exhausted) records.
+- SqlAlchemyEventStore append translates a concurrent UNIQUE collision into
+ ConcurrencyError instead of leaking a raw IntegrityError (TOCTOU fix).
+"""
+
+from __future__ import annotations
+
+import asyncio
+import dataclasses
+from pathlib import Path
+
+import pytest
+
+from pyfly.eventsourcing.event import StoredEventEnvelope
+from pyfly.eventsourcing.outbox import TransactionalOutbox
+from pyfly.eventsourcing.store import ConcurrencyError, InMemoryEventStore, SqlAlchemyEventStore
+
+
+def _env(aggregate_id: str, event_type: str, **payload: object) -> StoredEventEnvelope:
+ return StoredEventEnvelope(
+ aggregate_id=aggregate_id, aggregate_type="Account", event_type=event_type, payload=dict(payload)
+ )
+
+
+def test_event_handler_exception_is_exported_from_package() -> None:
+ # Regression: EventHandlerException was defined in the submodule but, unlike
+ # its sibling ConcurrencyError, never re-exported from pyfly.eventsourcing.
+ from pyfly.eventsourcing import EventHandlerException
+ from pyfly.eventsourcing.aggregate import EventHandlerException as Submodule
+
+ assert EventHandlerException is Submodule
+
+
+class _RenameUpcaster:
+ """Upcasts the legacy event name to the current one (returns a copy)."""
+
+ def applies_to(self, envelope: StoredEventEnvelope) -> bool:
+ return envelope.event_type == "legacy.opened"
+
+ def upcast(self, envelope: StoredEventEnvelope) -> StoredEventEnvelope:
+ return dataclasses.replace(envelope, event_type="account.opened", payload={**envelope.payload, "upcast": True})
+
+
+class TestUpcastersAppliedOnRead:
+ @pytest.mark.asyncio
+ async def test_load_and_stream_apply_upcasters(self) -> None:
+ store = InMemoryEventStore(upcasters=[_RenameUpcaster()])
+ await store.append("acc-1", "Account", [_env("acc-1", "legacy.opened", owner="Ada")], expected_version=0)
+
+ loaded = await store.load("acc-1")
+ assert [e.event_type for e in loaded] == ["account.opened"]
+ assert loaded[0].payload["upcast"] is True
+
+ streamed = await store.stream_all()
+ assert [e.event_type for e in streamed] == ["account.opened"]
+
+ @pytest.mark.asyncio
+ async def test_no_upcasters_is_identity(self) -> None:
+ store = InMemoryEventStore()
+ await store.append("acc-1", "Account", [_env("acc-1", "legacy.opened")], expected_version=0)
+ assert (await store.load("acc-1"))[0].event_type == "legacy.opened"
+
+
+class TestOutboxDeadLetters:
+ @pytest.mark.asyncio
+ async def test_exhausted_records_are_surfaced(self) -> None:
+ async def always_fail(_env: StoredEventEnvelope) -> None:
+ raise RuntimeError("upstream down")
+
+ outbox = TransactionalOutbox(publish=always_fail, max_attempts=2, poll_interval_s=0.02)
+ record = await outbox.enqueue(_env("acc-1", "account.opened"))
+ await outbox.start()
+ for _ in range(100):
+ await asyncio.sleep(0.02)
+ if record.attempts >= 2:
+ break
+ await outbox.stop()
+
+ assert record.attempts >= 2
+ assert record.delivered is False
+ assert await outbox.pending() == [] # excluded from the publish loop
+ assert record in await outbox.dead_letters() # but surfaced for inspection
+
+
+class TestSqlAlchemyConcurrency:
+ @pytest.mark.asyncio
+ async def test_concurrent_append_raises_concurrency_error_not_raw_db_error(self, tmp_path: Path) -> None:
+ pytest.importorskip("sqlalchemy")
+ pytest.importorskip("aiosqlite")
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ engine = create_async_engine(f"sqlite+aiosqlite:///{tmp_path / 'es.db'}", connect_args={"timeout": 30})
+ try:
+ store = SqlAlchemyEventStore(engine)
+ await store.initialize()
+ await store.append("acc-1", "Account", [_env("acc-1", "account.opened")], expected_version=0)
+
+ async def writer(event_type: str) -> None:
+ await store.append("acc-1", "Account", [_env("acc-1", event_type)], expected_version=1)
+
+ results = await asyncio.gather(writer("a.deposited"), writer("b.deposited"), return_exceptions=True)
+ errors = [r for r in results if isinstance(r, BaseException)]
+
+ # exactly one writer wins; the loser sees ConcurrencyError, NOT a raw
+ # IntegrityError / OperationalError leaking the DB constraint.
+ assert len(errors) == 1, results
+ assert isinstance(errors[0], ConcurrencyError), f"got {type(errors[0]).__name__}: {errors[0]}"
+ assert await store.latest_version("acc-1") == 2
+ finally:
+ await engine.dispose()
diff --git a/uv.lock b/uv.lock
index 3359e0de..3e88ca2a 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1967,7 +1967,7 @@ wheels = [
[[package]]
name = "pyfly"
-version = "26.6.9"
+version = "26.6.10"
source = { editable = "." }
dependencies = [
{ name = "pydantic" },