From 0a2dc3d961a78bcfbcab72b54649213452975f8b Mon Sep 17 00:00:00 2001 From: galuszkm Date: Mon, 22 Jun 2026 12:01:45 +0200 Subject: [PATCH 1/3] feat(events)!: make session id explicit in event stream Let callers pass the session id into wire_event_queue instead of inferring it from manifest ordering, and give SESSION_START/SESSION_END a stable, predictable shape. BREAKING CHANGE: first_session_id() removed; wire_event_queue no longer auto-derives the session id. SESSION_START data is now {session_id, manifest} and AGENT_START/agent-stop payloads drop the "type" field. --- SECURITY.md | 8 +-- docs/configuration/Chapter_15.md | 9 +-- examples/12_streaming/README.md | 2 +- .../config/resolvers/config.py | 10 ++-- src/strands_compose/hooks/event_publisher.py | 5 +- src/strands_compose/manifest.py | 21 ------- src/strands_compose/wire.py | 35 ++++------- tasks/test.just | 4 +- .../test_session_lifecycle_events.py | 10 ++-- tests/unit/hooks/test_event_publisher.py | 6 +- tests/unit/test_manifest.py | 59 ------------------- uv.lock | 2 +- 12 files changed, 37 insertions(+), 134 deletions(-) diff --git a/SECURITY.md b/SECURITY.md index 2149052..b3eb328 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -4,13 +4,7 @@ | Version | Supported | |---------|--------------------| -| 0.7.x | :white_check_mark: | -| 0.6.x | :x: | -| 0.5.x | :x: | -| 0.4.x | :x: | -| 0.3.x | :x: | -| 0.2.x | :x: | -| 0.1.x | :x: | +| latest | :white_check_mark: | ## Reporting Security Issues diff --git a/docs/configuration/Chapter_15.md b/docs/configuration/Chapter_15.md index 1d8d258..9ca7f60 100644 --- a/docs/configuration/Chapter_15.md +++ b/docs/configuration/Chapter_15.md @@ -40,10 +40,10 @@ These two events bracket every invocation. They are produced by the queue layer, | Event Type | Description | `data` payload | |------------|-------------|----------------| -| `SESSION_START` | First event on the queue — emitted before any agent activity | Serialised `SessionManifest`: agents, orchestrations, entry point, model info, session manager locations | +| `SESSION_START` | First event on the queue — emitted before any agent activity | `{"session_id": "", "manifest": {SessionManifest}}` — agents, orchestrations, entry point, model info, session manager locations | | `SESSION_END` | Last typed event before the stream closes | `{"session_id": ""}` | -The `SESSION_START` payload is the full wired topology at invocation time. Use it to restore conversation history, render an architecture diagram, or audit which models are in use — before any agent has run. +The `SESSION_START` payload wraps the full wired topology snapshot together with the effective session id. Use the `manifest` key to restore conversation history, render an architecture diagram, or audit which models are in use — before any agent has run. ### Per-agent events @@ -87,8 +87,9 @@ A typical consumer pattern that handles the session lifecycle: ```python while (event := await queue.get()) is not None: if event.type == "session_start": - manifest = event.data # full topology snapshot - entry = manifest["entry"] # {"name": "...", "kind": "agent|orchestration"} + session_id = event.data.get("session_id") + manifest = event.data["manifest"] # full topology snapshot + entry = manifest["entry"] # {"name": "...", "kind": "agent|orchestration"} elif event.type == "session_end": session_id = event.data.get("session_id") else: diff --git a/examples/12_streaming/README.md b/examples/12_streaming/README.md index 6775394..55007f5 100644 --- a/examples/12_streaming/README.md +++ b/examples/12_streaming/README.md @@ -36,7 +36,7 @@ bracketing all per-agent activity. | Type | When it fires | `data` | |------|---------------|--------| -| `session_start` | Before any agent runs — first event on the queue | Serialised `SessionManifest` (agents, orchestrations, entry, model info) | +| `session_start` | Before any agent runs — first event on the queue | `{"session_id": null \| "...", "manifest": {agents, orchestrations, entry, model info}}` | | `agent_start` | Agent begins processing | — | | `token` | Streaming text chunk | `{"text": "..."}` | | `reasoning` | Streaming reasoning chunk | `{"text": "..."}` | diff --git a/src/strands_compose/config/resolvers/config.py b/src/strands_compose/config/resolvers/config.py index cf74693..e986147 100644 --- a/src/strands_compose/config/resolvers/config.py +++ b/src/strands_compose/config/resolvers/config.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING -from ...manifest import build_manifest, first_session_id +from ...manifest import build_manifest from ...mcp.lifecycle import MCPLifecycle from ...wire import make_event_queue from .mcp import resolve_mcp_client, resolve_mcp_server @@ -44,6 +44,7 @@ class ResolvedConfig: def wire_event_queue( self, *, + session_id: str | None = None, tool_labels: dict[str, str] | None = None, ) -> EventQueue: """Wire all agents and orchestrators for event streaming. @@ -58,10 +59,6 @@ def wire_event_queue( 3. Emits a SESSION_START event carrying the manifest as the first event on the queue. - The effective session id is the first non-``None`` ``session_id`` - found in the manifest (agents first, then orchestrations); it is - included in the SESSION_END event payload. - .. warning:: This **mutates** the agents and orchestrators stored on this @@ -69,6 +66,7 @@ def wire_event_queue( Call it only once per ``ResolvedConfig`` instance. Args: + session_id: Optional session ID to embed in events. tool_labels: Optional tool name → display label mapping. Returns: @@ -84,7 +82,7 @@ def wire_event_queue( orchestrators=self.orchestrators, tool_labels=tool_labels, entry_name=manifest.entry.name, - session_id=first_session_id(manifest), + session_id=session_id, ) event_queue.emit_session_start(manifest) return event_queue diff --git a/src/strands_compose/hooks/event_publisher.py b/src/strands_compose/hooks/event_publisher.py index 50c0b29..3b3aa11 100644 --- a/src/strands_compose/hooks/event_publisher.py +++ b/src/strands_compose/hooks/event_publisher.py @@ -175,7 +175,6 @@ def _on_agent_start(self, event: BeforeInvocationEvent) -> None: StreamEvent( type=EventType.AGENT_START, agent_name=self._agent_name, - data={"type": "agent"}, ), ) @@ -246,14 +245,12 @@ def _on_complete(self, event: AfterInvocationEvent) -> None: ) return - metrics = event.agent.event_loop_metrics - # Usage from the latest invocation (current turn only). + metrics = event.agent.event_loop_metrics invocation = metrics.latest_agent_invocation usage = invocation.usage if invocation else metrics.accumulated_usage data: dict[str, Any] = { - "type": "agent", "usage": { "input_tokens": usage.get("inputTokens", 0), "output_tokens": usage.get("outputTokens", 0), diff --git a/src/strands_compose/manifest.py b/src/strands_compose/manifest.py index 5e05592..88dc71b 100644 --- a/src/strands_compose/manifest.py +++ b/src/strands_compose/manifest.py @@ -304,24 +304,3 @@ def build_manifest( ], entry=_resolve_entry(entry, agents, orchestrators), ) - - -def first_session_id(manifest: SessionManifest) -> str | None: - """Return the first non-None ``session_id`` found in the manifest. - - Iterates ``manifest.agents`` first, then ``manifest.orchestrations``, - returning the first ``session_manager.session_id`` that is set. Used by - :meth:`ResolvedConfig.wire_event_queue` to determine the effective session - id for the SESSION_END event payload. - - Args: - manifest: The session manifest. - - Returns: - The first session id found, or ``None`` when no descriptor in the - manifest has a session manager set. - """ - for descriptor in (*manifest.agents, *manifest.orchestrations): - if descriptor.session_manager is not None: - return descriptor.session_manager.session_id - return None diff --git a/src/strands_compose/wire.py b/src/strands_compose/wire.py index cff0615..25ea9c3 100644 --- a/src/strands_compose/wire.py +++ b/src/strands_compose/wire.py @@ -21,7 +21,7 @@ import asyncio import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from strands import Agent from strands.multiagent import Swarm @@ -63,22 +63,6 @@ def __init__( SESSION_END events emitted by :meth:`emit_session_start` and :meth:`close` respectively. - Example:: - - events = make_event_queue(config.agents) - - - async def _run(): - try: - await config.entry.invoke_async(prompt) - finally: - await events.close() - - - asyncio.create_task(_run()) - while (event := await events.get()) is not None: - yield event.asdict() - Args: queue: The underlying asyncio.Queue to wrap. entry_name: The configured name of the entry node. Used as @@ -132,8 +116,8 @@ def emit_session_start(self, manifest: SessionManifest) -> None: Args: manifest: The :class:`SessionManifest` describing the wired - session. Serialised via ``.model_dump()`` into the event - payload. + session. Placed in the event payload as + ``{"session_id": , "manifest": manifest.model_dump()}``. """ if self._session_start_emitted: return @@ -142,7 +126,10 @@ def emit_session_start(self, manifest: SessionManifest) -> None: StreamEvent( type=EventType.SESSION_START, agent_name=self._entry_name, - data=manifest.model_dump(), + data={ + "session_id": self._session_id, + "manifest": manifest.model_dump(), + }, ) ) logger.debug("entry=<%s> | session_start emitted", self._entry_name) @@ -164,7 +151,7 @@ def put_event(self, event: StreamEvent) -> None: """ self._put(event) - async def close(self) -> None: + async def close(self, data: dict[str, Any] = {}) -> None: """Signal end-of-stream. Emits a SESSION_END event before placing the sentinel on the queue. @@ -175,6 +162,10 @@ async def close(self) -> None: Typically called in a ``finally`` block after the agent invocation finishes. + + Args: + data: Additional data to include in the SESSION_END event. + """ if not self._session_end_emitted: self._session_end_emitted = True @@ -182,7 +173,7 @@ async def close(self) -> None: StreamEvent( type=EventType.SESSION_END, agent_name=self._entry_name, - data={"session_id": self._session_id}, + data={"session_id": self._session_id, **data}, ) ) logger.debug( diff --git a/tasks/test.just b/tasks/test.just index 5985027..05a0511 100644 --- a/tasks/test.just +++ b/tasks/test.just @@ -4,8 +4,8 @@ test: test-coverage # check code coverage [group('test')] -test-coverage numprocesses="auto" cov_fail_under="90": - uv run python -m pytest --numprocesses={{numprocesses}} --cov={{SOURCES}} --cov-fail-under={{cov_fail_under}} {{TESTS}} +test-coverage cov_fail_under="90": + uv run python -m pytest --numprocesses=2 --cov={{SOURCES}} --cov-fail-under={{cov_fail_under}} {{TESTS}} # run mutation testing (requires: pip install mutmut) [group('test')] diff --git a/tests/integration/test_session_lifecycle_events.py b/tests/integration/test_session_lifecycle_events.py index 7813048..a1ba335 100644 --- a/tests/integration/test_session_lifecycle_events.py +++ b/tests/integration/test_session_lifecycle_events.py @@ -61,7 +61,9 @@ async def test_session_start_payload_contains_manifest(self, fixture_path): assert event is not None assert event.type == EventType.SESSION_START - manifest = event.data + assert isinstance(event.data, dict) + assert event.data["session_id"] is None + manifest = event.data["manifest"] assert isinstance(manifest, dict) assert "agents" in manifest assert "orchestrations" in manifest @@ -109,7 +111,7 @@ async def test_session_start_session_end_multiple_agents(self, fixture_path): assert events[-1].type == EventType.SESSION_END assert events[-1].agent_name == "coordinator" - manifest = events[0].data + manifest = events[0].data["manifest"] agent_names = {agent["name"] for agent in manifest["agents"]} assert "researcher" in agent_names assert "writer" in agent_names @@ -143,7 +145,7 @@ async def test_session_start_session_end_swarm(self, fixture_path): assert events[-1].type == EventType.SESSION_END assert events[-1].agent_name == "team" - manifest = events[0].data + manifest = events[0].data["manifest"] assert len(manifest["orchestrations"]) > 0 swarm = manifest["orchestrations"][0] assert swarm["kind"] == "swarm" @@ -179,7 +181,7 @@ async def test_session_start_session_end_graph(self, fixture_path): assert events[-1].type == EventType.SESSION_END assert events[-1].agent_name == "pipeline" - manifest = events[0].data + manifest = events[0].data["manifest"] assert len(manifest["orchestrations"]) > 0 graph = manifest["orchestrations"][0] assert graph["kind"] == "graph" diff --git a/tests/unit/hooks/test_event_publisher.py b/tests/unit/hooks/test_event_publisher.py index 9b1a1a3..c73dd34 100644 --- a/tests/unit/hooks/test_event_publisher.py +++ b/tests/unit/hooks/test_event_publisher.py @@ -45,7 +45,7 @@ def test_agent_start_event(self): assert len(events) == 1 assert events[0].type == EventType.AGENT_START assert events[0].agent_name == "test" - assert events[0].data == {"type": "agent"} + assert events[0].data == {} def test_tool_start_and_end_events(self): events = [] @@ -258,7 +258,7 @@ def test_model_error_sets_errored_flag(self) -> None: model_event.exception = ValueError("bad request") pub._on_model_error(model_event) - assert pub._errored is True + assert pub._errored def test_successful_model_call_does_not_emit_error(self) -> None: """AfterModelCallEvent without exception emits nothing.""" @@ -322,7 +322,7 @@ def test_errored_flag_resets_on_next_invocation(self) -> None: # Second invocation: agent_start resets the flag agent_start_event = MagicMock() pub._on_agent_start(agent_start_event) - assert pub._errored is False + assert not pub._errored def test_register_hooks_includes_model_error(self) -> None: """register_hooks registers AfterModelCallEvent callback.""" diff --git a/tests/unit/test_manifest.py b/tests/unit/test_manifest.py index 21103c5..008994d 100644 --- a/tests/unit/test_manifest.py +++ b/tests/unit/test_manifest.py @@ -13,18 +13,12 @@ from strands_compose.manifest import ( build_manifest, build_session_manager_descriptor, - first_session_id, ) from strands_compose.types import ( AgentCoreProviderDescriptor, - AgentDescriptor, CustomProviderDescriptor, - EntryDescriptor, FileProviderDescriptor, - ModelDescriptor, S3ProviderDescriptor, - SessionManagerDescriptor, - SessionManifest, ) # ── Helpers ────────────────────────────────────────────────────────────────── @@ -614,56 +608,3 @@ def test_manifest_non_delegate_orchestration_not_added_to_agents(self): assert len(manifest.agents) == 1 assert manifest.agents[0].name == "agent1" - - -# ── first_session_id ───────────────────────────────────────────────────────── - - -def _file_descriptor(session_id: str) -> FileProviderDescriptor: - return FileProviderDescriptor(provider="file", session_id=session_id, storage_dir="/tmp") - - -class TestFirstSessionId: - """Tests for first_session_id.""" - - def _empty_entry(self) -> EntryDescriptor: - return EntryDescriptor(name="x", kind="agent") - - def _agent(self, sm: SessionManagerDescriptor | None) -> AgentDescriptor: - return AgentDescriptor( - name="a", - description=None, - model=ModelDescriptor(model_id=None, provider="P"), - session_manager=sm, - ) - - def test_returns_none_when_no_managers(self): - manifest = SessionManifest(entry=self._empty_entry()) - assert first_session_id(manifest) is None - - def test_returns_first_agent_session_id(self): - manifest = SessionManifest( - agents=[ - self._agent(None), - self._agent(_file_descriptor("sess-1")), - self._agent(_file_descriptor("sess-2")), - ], - entry=self._empty_entry(), - ) - assert first_session_id(manifest) == "sess-1" - - def test_falls_back_to_orchestration_when_no_agents_have_sm(self): - from strands_compose.types import OrchestrationDescriptor - - manifest = SessionManifest( - agents=[self._agent(None)], - orchestrations=[ - OrchestrationDescriptor( - name="o", - kind="swarm", - session_manager=_file_descriptor("orch-sess"), - ), - ], - entry=self._empty_entry(), - ) - assert first_session_id(manifest) == "orch-sess" diff --git a/uv.lock b/uv.lock index 005d918..5fb4db7 100644 --- a/uv.lock +++ b/uv.lock @@ -1802,7 +1802,7 @@ openai = [ [[package]] name = "strands-compose" -version = "0.6.0" +version = "0.7.0" source = { editable = "." } dependencies = [ { name = "mcp" }, From 5fd2d8282c35d7634d02fa57bd6adf8e553368dd Mon Sep 17 00:00:00 2001 From: galuszkm Date: Mon, 22 Jun 2026 12:13:14 +0200 Subject: [PATCH 2/3] fix(renderers): extract manifest from session start event data - Update AnsiRenderer to access manifest from event.data["manifest"] instead of directly from event.data - Aligns with explicit session id structure in event stream --- src/strands_compose/renderers/ansi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/strands_compose/renderers/ansi.py b/src/strands_compose/renderers/ansi.py index b7188cd..babd1d3 100644 --- a/src/strands_compose/renderers/ansi.py +++ b/src/strands_compose/renderers/ansi.py @@ -115,7 +115,7 @@ def flush(self) -> None: # noqa: D102 def _handle_session_start(self, event: StreamEvent) -> None: self._break() - manifest = SessionManifest.model_validate(event.data) + manifest = SessionManifest.model_validate(event.data["manifest"]) agent_names = ", ".join(a.name for a in manifest.agents) or "—" orch_names = ", ".join(o.name for o in manifest.orchestrations) or "—" self._out.write(self._separator(manifest.entry.name, "SESSION START", color=self._cyan)) From 788ec2b2291aa2d836587f569507d82b635dd30d Mon Sep 17 00:00:00 2001 From: galuszkm Date: Mon, 22 Jun 2026 12:18:54 +0200 Subject: [PATCH 3/3] test(renderers): add session lifecycle tests for AnsiRenderer --- tests/unit/renderers/test_ansi.py | 128 ++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/tests/unit/renderers/test_ansi.py b/tests/unit/renderers/test_ansi.py index 5313db0..c880a07 100644 --- a/tests/unit/renderers/test_ansi.py +++ b/tests/unit/renderers/test_ansi.py @@ -5,6 +5,9 @@ import io import time +import pytest +from pydantic import ValidationError + from strands_compose.renderers import AnsiRenderer from strands_compose.types import EventType from strands_compose.wire import StreamEvent @@ -250,3 +253,128 @@ def test_typewriter_delay_applies_elapsed_time(self) -> None: r.render(_event(EventType.TOKEN, text=text)) elapsed = time.monotonic() - start assert elapsed >= delay * len(text) + + +class TestAnsiRendererSessionLifecycle: + """AnsiRenderer renders SESSION_START and SESSION_END correctly.""" + + def _renderer(self) -> tuple[AnsiRenderer, io.StringIO]: + buf = io.StringIO() + return AnsiRenderer(file=buf), buf + + def _session_start_event( + self, + entry_name: str = "coordinator", + entry_kind: str = "agent", + agents: list[str] | None = None, + orchestrations: list[str] | None = None, + ) -> StreamEvent: + """Build a SESSION_START event with the correct envelope shape.""" + return StreamEvent( + type=EventType.SESSION_START, + agent_name=entry_name, + data={ + "session_id": None, + "manifest": { + "agents": [ + { + "name": n, + "description": None, + "model": {"model_id": None, "provider": "mock"}, + "session_manager": None, + } + for n in (agents or [entry_name]) + ], + "orchestrations": [ + { + "name": n, + "kind": "swarm", + "session_manager": None, + "nodes": [], + "edges": None, + "entry_node_id": None, + } + for n in (orchestrations or []) + ], + "entry": {"name": entry_name, "kind": entry_kind}, + }, + }, + ) + + def test_session_start_renders_session_start_header(self) -> None: + """SESSION_START event renders a SESSION START header.""" + r, buf = self._renderer() + r.render(self._session_start_event()) + assert "SESSION START" in buf.getvalue() + + def test_session_start_renders_entry_name(self) -> None: + """SESSION_START event renders the entry point name.""" + r, buf = self._renderer() + r.render(self._session_start_event(entry_name="coordinator")) + assert "coordinator" in buf.getvalue() + + def test_session_start_renders_agent_names(self) -> None: + """SESSION_START event renders all agent names.""" + r, buf = self._renderer() + r.render(self._session_start_event(agents=["researcher", "writer"])) + output = buf.getvalue() + assert "researcher" in output + assert "writer" in output + + def test_session_start_renders_orchestration_names(self) -> None: + """SESSION_START event renders orchestration names when present.""" + r, buf = self._renderer() + r.render(self._session_start_event(agents=["a"], orchestrations=["pipeline"])) + assert "pipeline" in buf.getvalue() + + def test_session_start_omits_orchestrations_line_when_none(self) -> None: + """SESSION_START event omits the orchestrations line when there are none.""" + r, buf = self._renderer() + r.render(self._session_start_event(agents=["a"], orchestrations=[])) + assert "orchestrations" not in buf.getvalue() + + def test_session_start_raises_on_flat_manifest_payload(self) -> None: + """SESSION_START with old flat manifest (pre-envelope) raises a validation error. + + Guards against regression to the old payload shape where event.data was + the raw SessionManifest dict instead of {"session_id": ..., "manifest": ...}. + """ + + r, buf = self._renderer() + flat_event = StreamEvent( + type=EventType.SESSION_START, + agent_name="agent", + data={ + "agents": [], + "orchestrations": [], + "entry": {"name": "agent", "kind": "agent"}, + }, + ) + with pytest.raises((KeyError, ValidationError)): + r.render(flat_event) + + def test_session_end_renders_session_end_header(self) -> None: + """SESSION_END event renders a SESSION END header.""" + r, buf = self._renderer() + r.render(_event(EventType.SESSION_END, session_id="abc-123")) + assert "SESSION END" in buf.getvalue() + + def test_session_end_renders_session_id(self) -> None: + """SESSION_END event renders the session id.""" + r, buf = self._renderer() + r.render(_event(EventType.SESSION_END, session_id="abc-123")) + assert "abc-123" in buf.getvalue() + + def test_session_end_renders_dash_when_session_id_is_none(self) -> None: + """SESSION_END event renders '—' when session_id is None.""" + r, buf = self._renderer() + r.render(_event(EventType.SESSION_END, session_id=None)) + assert "—" in buf.getvalue() + + def test_session_start_breaks_token_stream(self) -> None: + """SESSION_START must break an active inline token stream.""" + r, buf = self._renderer() + r.render(_event(EventType.TOKEN, text="partial")) + r.render(self._session_start_event()) + output = buf.getvalue() + assert "partial\n" in output