Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,7 @@

| Version | Supported |
|---------|--------------------|
| 0.7.x | :white_check_mark: |
| 0.6.x | :x: |
| 0.5.x | :x: |
| 0.4.x | :x: |
| 0.3.x | :x: |
| 0.2.x | :x: |
| 0.1.x | :x: |
| latest | :white_check_mark: |

## Reporting Security Issues

Expand Down
9 changes: 5 additions & 4 deletions docs/configuration/Chapter_15.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ These two events bracket every invocation. They are produced by the queue layer,

| Event Type | Description | `data` payload |
|------------|-------------|----------------|
| `SESSION_START` | First event on the queue — emitted before any agent activity | Serialised `SessionManifest`: agents, orchestrations, entry point, model info, session manager locations |
| `SESSION_START` | First event on the queue — emitted before any agent activity | `{"session_id": "<id or null>", "manifest": {SessionManifest}}` — agents, orchestrations, entry point, model info, session manager locations |
| `SESSION_END` | Last typed event before the stream closes | `{"session_id": "<id or null>"}` |

The `SESSION_START` payload is the full wired topology at invocation time. Use it to restore conversation history, render an architecture diagram, or audit which models are in use — before any agent has run.
The `SESSION_START` payload wraps the full wired topology snapshot together with the effective session id. Use the `manifest` key to restore conversation history, render an architecture diagram, or audit which models are in use — before any agent has run.

### Per-agent events

Expand Down Expand Up @@ -87,8 +87,9 @@ A typical consumer pattern that handles the session lifecycle:
```python
while (event := await queue.get()) is not None:
if event.type == "session_start":
manifest = event.data # full topology snapshot
entry = manifest["entry"] # {"name": "...", "kind": "agent|orchestration"}
session_id = event.data.get("session_id")
manifest = event.data["manifest"] # full topology snapshot
entry = manifest["entry"] # {"name": "...", "kind": "agent|orchestration"}
elif event.type == "session_end":
session_id = event.data.get("session_id")
else:
Expand Down
2 changes: 1 addition & 1 deletion examples/12_streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ bracketing all per-agent activity.

| Type | When it fires | `data` |
|------|---------------|--------|
| `session_start` | Before any agent runs — first event on the queue | Serialised `SessionManifest` (agents, orchestrations, entry, model info) |
| `session_start` | Before any agent runs — first event on the queue | `{"session_id": null \| "...", "manifest": {agents, orchestrations, entry, model info}}` |
| `agent_start` | Agent begins processing | — |
| `token` | Streaming text chunk | `{"text": "..."}` |
| `reasoning` | Streaming reasoning chunk | `{"text": "..."}` |
Expand Down
10 changes: 4 additions & 6 deletions src/strands_compose/config/resolvers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from ...manifest import build_manifest, first_session_id
from ...manifest import build_manifest
from ...mcp.lifecycle import MCPLifecycle
from ...wire import make_event_queue
from .mcp import resolve_mcp_client, resolve_mcp_server
Expand Down Expand Up @@ -44,6 +44,7 @@ class ResolvedConfig:
def wire_event_queue(
self,
*,
session_id: str | None = None,
tool_labels: dict[str, str] | None = None,
) -> EventQueue:
"""Wire all agents and orchestrators for event streaming.
Expand All @@ -58,17 +59,14 @@ def wire_event_queue(
3. Emits a SESSION_START event carrying the manifest as the first
event on the queue.

The effective session id is the first non-``None`` ``session_id``
found in the manifest (agents first, then orchestrations); it is
included in the SESSION_END event payload.

.. warning::

This **mutates** the agents and orchestrators stored on this
instance by adding hooks and overwriting ``callback_handler``.
Call it only once per ``ResolvedConfig`` instance.

Args:
session_id: Optional session ID to embed in events.
tool_labels: Optional tool name → display label mapping.

Returns:
Expand All @@ -84,7 +82,7 @@ def wire_event_queue(
orchestrators=self.orchestrators,
tool_labels=tool_labels,
entry_name=manifest.entry.name,
session_id=first_session_id(manifest),
session_id=session_id,
)
event_queue.emit_session_start(manifest)
return event_queue
Expand Down
5 changes: 1 addition & 4 deletions src/strands_compose/hooks/event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ def _on_agent_start(self, event: BeforeInvocationEvent) -> None:
StreamEvent(
type=EventType.AGENT_START,
agent_name=self._agent_name,
data={"type": "agent"},
),
)

Expand Down Expand Up @@ -246,14 +245,12 @@ def _on_complete(self, event: AfterInvocationEvent) -> None:
)
return

metrics = event.agent.event_loop_metrics

# Usage from the latest invocation (current turn only).
metrics = event.agent.event_loop_metrics
invocation = metrics.latest_agent_invocation
usage = invocation.usage if invocation else metrics.accumulated_usage

data: dict[str, Any] = {
"type": "agent",
"usage": {
"input_tokens": usage.get("inputTokens", 0),
"output_tokens": usage.get("outputTokens", 0),
Expand Down
21 changes: 0 additions & 21 deletions src/strands_compose/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,24 +304,3 @@ def build_manifest(
],
entry=_resolve_entry(entry, agents, orchestrators),
)


def first_session_id(manifest: SessionManifest) -> str | None:
"""Return the first non-None ``session_id`` found in the manifest.

Iterates ``manifest.agents`` first, then ``manifest.orchestrations``,
returning the first ``session_manager.session_id`` that is set. Used by
:meth:`ResolvedConfig.wire_event_queue` to determine the effective session
id for the SESSION_END event payload.

Args:
manifest: The session manifest.

Returns:
The first session id found, or ``None`` when no descriptor in the
manifest has a session manager set.
"""
for descriptor in (*manifest.agents, *manifest.orchestrations):
if descriptor.session_manager is not None:
return descriptor.session_manager.session_id
return None
2 changes: 1 addition & 1 deletion src/strands_compose/renderers/ansi.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def flush(self) -> None: # noqa: D102

def _handle_session_start(self, event: StreamEvent) -> None:
self._break()
manifest = SessionManifest.model_validate(event.data)
manifest = SessionManifest.model_validate(event.data["manifest"])
agent_names = ", ".join(a.name for a in manifest.agents) or "—"
orch_names = ", ".join(o.name for o in manifest.orchestrations) or "—"
self._out.write(self._separator(manifest.entry.name, "SESSION START", color=self._cyan))
Expand Down
35 changes: 13 additions & 22 deletions src/strands_compose/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import asyncio
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from strands import Agent
from strands.multiagent import Swarm
Expand Down Expand Up @@ -63,22 +63,6 @@ def __init__(
SESSION_END events emitted by :meth:`emit_session_start` and
:meth:`close` respectively.

Example::

events = make_event_queue(config.agents)


async def _run():
try:
await config.entry.invoke_async(prompt)
finally:
await events.close()


asyncio.create_task(_run())
while (event := await events.get()) is not None:
yield event.asdict()

Args:
queue: The underlying asyncio.Queue to wrap.
entry_name: The configured name of the entry node. Used as
Expand Down Expand Up @@ -132,8 +116,8 @@ def emit_session_start(self, manifest: SessionManifest) -> None:

Args:
manifest: The :class:`SessionManifest` describing the wired
session. Serialised via ``.model_dump()`` into the event
payload.
session. Placed in the event payload as
``{"session_id": <session_id>, "manifest": manifest.model_dump()}``.
"""
if self._session_start_emitted:
return
Expand All @@ -142,7 +126,10 @@ def emit_session_start(self, manifest: SessionManifest) -> None:
StreamEvent(
type=EventType.SESSION_START,
agent_name=self._entry_name,
data=manifest.model_dump(),
data={
"session_id": self._session_id,
"manifest": manifest.model_dump(),
},
)
)
logger.debug("entry=<%s> | session_start emitted", self._entry_name)
Expand All @@ -164,7 +151,7 @@ def put_event(self, event: StreamEvent) -> None:
"""
self._put(event)

async def close(self) -> None:
async def close(self, data: dict[str, Any] = {}) -> None:
"""Signal end-of-stream.

Emits a SESSION_END event before placing the sentinel on the queue.
Expand All @@ -175,14 +162,18 @@ async def close(self) -> None:

Typically called in a ``finally`` block after the agent invocation
finishes.

Args:
data: Additional data to include in the SESSION_END event.

"""
if not self._session_end_emitted:
self._session_end_emitted = True
self._put(
StreamEvent(
type=EventType.SESSION_END,
agent_name=self._entry_name,
data={"session_id": self._session_id},
data={"session_id": self._session_id, **data},
)
)
logger.debug(
Expand Down
4 changes: 2 additions & 2 deletions tasks/test.just
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ test: test-coverage

# check code coverage
[group('test')]
test-coverage numprocesses="auto" cov_fail_under="90":
uv run python -m pytest --numprocesses={{numprocesses}} --cov={{SOURCES}} --cov-fail-under={{cov_fail_under}} {{TESTS}}
test-coverage cov_fail_under="90":
uv run python -m pytest --numprocesses=2 --cov={{SOURCES}} --cov-fail-under={{cov_fail_under}} {{TESTS}}

# run mutation testing (requires: pip install mutmut)
[group('test')]
Expand Down
10 changes: 6 additions & 4 deletions tests/integration/test_session_lifecycle_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ async def test_session_start_payload_contains_manifest(self, fixture_path):
assert event is not None
assert event.type == EventType.SESSION_START

manifest = event.data
assert isinstance(event.data, dict)
assert event.data["session_id"] is None
manifest = event.data["manifest"]
assert isinstance(manifest, dict)
assert "agents" in manifest
assert "orchestrations" in manifest
Expand Down Expand Up @@ -109,7 +111,7 @@ async def test_session_start_session_end_multiple_agents(self, fixture_path):
assert events[-1].type == EventType.SESSION_END
assert events[-1].agent_name == "coordinator"

manifest = events[0].data
manifest = events[0].data["manifest"]
agent_names = {agent["name"] for agent in manifest["agents"]}
assert "researcher" in agent_names
assert "writer" in agent_names
Expand Down Expand Up @@ -143,7 +145,7 @@ async def test_session_start_session_end_swarm(self, fixture_path):
assert events[-1].type == EventType.SESSION_END
assert events[-1].agent_name == "team"

manifest = events[0].data
manifest = events[0].data["manifest"]
assert len(manifest["orchestrations"]) > 0
swarm = manifest["orchestrations"][0]
assert swarm["kind"] == "swarm"
Expand Down Expand Up @@ -179,7 +181,7 @@ async def test_session_start_session_end_graph(self, fixture_path):
assert events[-1].type == EventType.SESSION_END
assert events[-1].agent_name == "pipeline"

manifest = events[0].data
manifest = events[0].data["manifest"]
assert len(manifest["orchestrations"]) > 0
graph = manifest["orchestrations"][0]
assert graph["kind"] == "graph"
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/hooks/test_event_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_agent_start_event(self):
assert len(events) == 1
assert events[0].type == EventType.AGENT_START
assert events[0].agent_name == "test"
assert events[0].data == {"type": "agent"}
assert events[0].data == {}

def test_tool_start_and_end_events(self):
events = []
Expand Down Expand Up @@ -258,7 +258,7 @@ def test_model_error_sets_errored_flag(self) -> None:
model_event.exception = ValueError("bad request")
pub._on_model_error(model_event)

assert pub._errored is True
assert pub._errored

def test_successful_model_call_does_not_emit_error(self) -> None:
"""AfterModelCallEvent without exception emits nothing."""
Expand Down Expand Up @@ -322,7 +322,7 @@ def test_errored_flag_resets_on_next_invocation(self) -> None:
# Second invocation: agent_start resets the flag
agent_start_event = MagicMock()
pub._on_agent_start(agent_start_event)
assert pub._errored is False
assert not pub._errored

def test_register_hooks_includes_model_error(self) -> None:
"""register_hooks registers AfterModelCallEvent callback."""
Expand Down
Loading