Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,36 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## v26.06.10 (2026-06-05)

### Fixed

- **`SqlAlchemyEventStore` optimistic-concurrency hole.** `append()` read the
current version via `latest_version()` on a **separate connection before** the
write transaction, so two concurrent writers could both pass the
`expected_version` check; the loser then violated `UNIQUE(aggregate_id,
sequence)` and surfaced a **raw `IntegrityError`** — never the documented
`ConcurrencyError` — so retry-on-`ConcurrencyError` callers missed the
collision. The version check now runs **inside** the write transaction, and a
`UNIQUE` violation is translated to `ConcurrencyError`. (`InMemoryEventStore`
was already correct — atomic under its lock.)
- **`EventUpcaster` was dead code.** `EventUpcaster`/`NoOpUpcaster` were exported
and documented but **never invoked** by any read path. Both event stores now
accept `upcasters=...` and apply them in `load()` and `stream_all()`, so stored
events are upcast to the current schema on read (default: no upcasters → identity).

### Changed

- **`EventHandlerException` is now exported from `pyfly.eventsourcing`** (it was
only reachable via the private `pyfly.eventsourcing.aggregate` submodule, unlike
its sibling `ConcurrencyError`). Users can now `from pyfly.eventsourcing import
EventHandlerException` to catch missing-handler failures.
- **`TransactionalOutbox.dead_letters()`** added — surfaces records that exhausted
`max_attempts` (retained, but excluded from `pending()`) for inspection / manual
retry.

---

## v26.06.09 (2026-06-05)

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<a href="https://github.com/fireflyframework"><img src="https://img.shields.io/badge/Firefly_Framework-official-ff6600?logo=data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCI+PHBhdGggZmlsbD0id2hpdGUiIGQ9Ik0xMiAyQzYuNDggMiAyIDYuNDggMiAxMnM0LjQ4IDEwIDEwIDEwIDEwLTQuNDggMTAtMTBTMTcuNTIgMiAxMiAyeiIvPjwvc3ZnPg==" alt="Firefly Framework"></a>
<a href="https://www.python.org/"><img src="https://img.shields.io/badge/python-3.12%2B-blue?logo=python&logoColor=white" alt="Python 3.12+"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-green" alt="License: Apache 2.0"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.09-brightgreen" alt="Version: 26.06.09"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.10-brightgreen" alt="Version: 26.06.10"></a>
<a href="#"><img src="https://img.shields.io/badge/type--checked-mypy%20strict-blue?logo=python&logoColor=white" alt="Type Checked: mypy strict"></a>
<a href="#"><img src="https://img.shields.io/badge/code%20style-ruff-purple?logo=ruff&logoColor=white" alt="Code Style: Ruff"></a>
<a href="#"><img src="https://img.shields.io/badge/async-first-brightgreen" alt="Async First"></a>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
version = "26.6.9"
version = "26.6.10"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/pyfly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""

__version__ = "26.06.09"
__version__ = "26.06.10"
3 changes: 2 additions & 1 deletion src/pyfly/eventsourcing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from __future__ import annotations

from pyfly.eventsourcing.aggregate import AggregateRoot
from pyfly.eventsourcing.aggregate import AggregateRoot, EventHandlerException
from pyfly.eventsourcing.event import DomainEvent, StoredEventEnvelope, domain_event
from pyfly.eventsourcing.outbox import OutboxRecord, TransactionalOutbox
from pyfly.eventsourcing.projection import Projection, ProjectionRunner
Expand All @@ -48,6 +48,7 @@
"AggregateRoot",
"ConcurrencyError",
"DomainEvent",
"EventHandlerException",
"EventStore",
"EventUpcaster",
"InMemoryEventStore",
Expand Down
10 changes: 10 additions & 0 deletions src/pyfly/eventsourcing/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ async def pending(self) -> list[OutboxRecord]:
async with self._lock:
return [r for r in self._records.values() if not r.delivered and r.attempts < self._max_attempts]

async def dead_letters(self) -> list[OutboxRecord]:
"""Records that exhausted ``max_attempts`` without being delivered.

Retained for inspection / manual retry — at-least-once delivery holds up
to ``max_attempts``; ``pending()`` deliberately excludes these so the
publish loop stops re-attempting them.
"""
async with self._lock:
return [r for r in self._records.values() if not r.delivered and r.attempts >= self._max_attempts]

async def _loop(self) -> None:
while not self._stop.is_set():
try:
Expand Down
117 changes: 76 additions & 41 deletions src/pyfly/eventsourcing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,29 @@
from __future__ import annotations

import asyncio
from collections.abc import Sequence
from typing import Any, Protocol, runtime_checkable

from pyfly.eventsourcing.event import StoredEventEnvelope
from pyfly.eventsourcing.upcaster import EventUpcaster


class ConcurrencyError(Exception):
"""Optimistic-locking failure: expected version did not match the store's."""


def _apply_upcasters(envelope: StoredEventEnvelope, upcasters: Sequence[EventUpcaster]) -> StoredEventEnvelope:
"""Apply each registered upcaster (in order) that handles this envelope.

Read paths (``load`` / ``stream_all``) run stored events through the
configured upcasters so consumers always see current-schema events.
"""
for upcaster in upcasters:
if upcaster.applies_to(envelope):
envelope = upcaster.upcast(envelope)
return envelope


@runtime_checkable
class EventStore(Protocol):
"""Append, load and stream events for aggregates."""
Expand All @@ -48,10 +62,11 @@ async def latest_version(self, aggregate_id: str) -> int: ...
class InMemoryEventStore:
"""Default zero-dep adapter: list per aggregate, global event log."""

def __init__(self) -> None:
def __init__(self, upcasters: Sequence[EventUpcaster] = ()) -> None:
self._by_aggregate: dict[str, list[StoredEventEnvelope]] = {}
self._all: list[StoredEventEnvelope] = []
self._lock = asyncio.Lock()
self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters)

async def append(
self,
Expand All @@ -77,16 +92,19 @@ async def append(
async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]:
async with self._lock:
events = self._by_aggregate.get(aggregate_id, [])
return [e for e in events if e.sequence > after_sequence]
return [_apply_upcasters(e, self._upcasters) for e in events if e.sequence > after_sequence]

async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]:
async with self._lock:
if after_event_id is None:
return list(self._all[:limit])
for idx, evt in enumerate(self._all):
if evt.event_id == after_event_id:
return list(self._all[idx + 1 : idx + 1 + limit])
return []
raw = list(self._all[:limit])
else:
raw = []
for idx, evt in enumerate(self._all):
if evt.event_id == after_event_id:
raw = list(self._all[idx + 1 : idx + 1 + limit])
break
return [_apply_upcasters(e, self._upcasters) for e in raw]

async def latest_version(self, aggregate_id: str) -> int:
async with self._lock:
Expand Down Expand Up @@ -116,8 +134,9 @@ class SqlAlchemyEventStore:
)
"""

def __init__(self, engine: Any) -> None:
def __init__(self, engine: Any, upcasters: Sequence[EventUpcaster] = ()) -> None:
self._engine = engine
self._upcasters: tuple[EventUpcaster, ...] = tuple(upcasters)

async def initialize(self) -> None:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
Expand All @@ -134,38 +153,54 @@ async def append(
expected_version: int,
) -> None:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]

latest = await self.latest_version(aggregate_id)
if latest != expected_version:
msg = f"expected version {expected_version}, found {latest}"
raise ConcurrencyError(msg)
async with self._engine.begin() as conn:
for i, evt in enumerate(events, start=1):
evt.aggregate_id = aggregate_id
evt.aggregate_type = aggregate_type
evt.sequence = expected_version + i
await conn.execute(
text(
"""
INSERT INTO pyfly_event_store
(event_id, aggregate_id, aggregate_type, sequence,
event_type, payload, metadata, occurred_at, version, tenant_id)
VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant)
"""
),
{
"eid": evt.event_id,
"aid": evt.aggregate_id,
"atype": evt.aggregate_type,
"seq": evt.sequence,
"etype": evt.event_type,
"payload": evt.to_json(),
"meta": "{}",
"occurred": evt.occurred_at,
"ver": evt.version,
"tenant": evt.tenant_id,
},
from sqlalchemy.exc import IntegrityError # type: ignore[import-not-found, unused-ignore]

try:
async with self._engine.begin() as conn:
# Read the current version INSIDE the write transaction (same
# connection) so the check-then-insert is not a TOCTOU race.
result = await conn.execute(
text("SELECT COALESCE(MAX(sequence), 0) FROM pyfly_event_store WHERE aggregate_id = :aid"),
{"aid": aggregate_id},
)
latest = int(result.scalar() or 0)
if latest != expected_version:
msg = f"expected version {expected_version}, found {latest}"
raise ConcurrencyError(msg)
for i, evt in enumerate(events, start=1):
evt.aggregate_id = aggregate_id
evt.aggregate_type = aggregate_type
evt.sequence = expected_version + i
await conn.execute(
text(
"""
INSERT INTO pyfly_event_store
(event_id, aggregate_id, aggregate_type, sequence,
event_type, payload, metadata, occurred_at, version, tenant_id)
VALUES (:eid, :aid, :atype, :seq, :etype, :payload, :meta, :occurred, :ver, :tenant)
"""
),
{
"eid": evt.event_id,
"aid": evt.aggregate_id,
"atype": evt.aggregate_type,
"seq": evt.sequence,
"etype": evt.event_type,
"payload": evt.to_json(),
"meta": "{}",
"occurred": evt.occurred_at,
"ver": evt.version,
"tenant": evt.tenant_id,
},
)
except IntegrityError as exc:
# A concurrent writer committed the same (aggregate_id, sequence)
# between our in-transaction check and insert; the UNIQUE constraint
# is the backstop. Surface as the documented optimistic-lock failure
# so retry-on-ConcurrencyError callers see it.
raise ConcurrencyError(
f"concurrent append for aggregate {aggregate_id!r} at version {expected_version}"
) from exc

async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[StoredEventEnvelope]:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
Expand All @@ -181,7 +216,7 @@ async def load(self, aggregate_id: str, *, after_sequence: int = 0) -> list[Stor
{"aid": aggregate_id, "after": after_sequence},
)
).fetchall()
return [StoredEventEnvelope.from_json(r[0]) for r in rows]
return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows]

async def stream_all(self, *, after_event_id: str | None = None, limit: int = 100) -> list[StoredEventEnvelope]:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
Expand All @@ -207,7 +242,7 @@ async def stream_all(self, *, after_event_id: str | None = None, limit: int = 10
{"eid": after_event_id, "limit": limit},
)
).fetchall()
return [StoredEventEnvelope.from_json(r[0]) for r in rows]
return [_apply_upcasters(StoredEventEnvelope.from_json(r[0]), self._upcasters) for r in rows]

async def latest_version(self, aggregate_id: str) -> int:
from sqlalchemy import text # type: ignore[import-not-found, unused-ignore]
Expand Down
Loading