Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/strands_compose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .tools import (
node_as_async_tool,
node_as_tool,
serialize_multiagent_result,
)
from .types import EventType, StreamEvent
from .utils import cli_errors
Expand Down Expand Up @@ -61,4 +62,5 @@
"node_as_async_tool",
"node_as_tool",
"resolve_infra",
"serialize_multiagent_result",
]
2 changes: 1 addition & 1 deletion src/strands_compose/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _render_check_success_ansi(app_config: AppConfig) -> None:

# Collect rows as (label, value) pairs, then align on the colon.
rows: list[tuple[str, str]] = [
("entry", str(app_config.entry)),
("entry", app_config.entry),
("agents", agent_str),
]
if app_config.models:
Expand Down
3 changes: 3 additions & 0 deletions src/strands_compose/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
- Loading ``@tool``-decorated functions from files, modules, and directories.
- Wrapping ``Agent`` / ``MultiAgentBase`` nodes as ``AgentTool`` instances
(``node_as_tool``, ``node_as_async_tool``) for delegation.
- Serializing multi-agent results with full execution metadata.
"""

from __future__ import annotations

from .extractors import serialize_multiagent_result
from .loaders import (
load_tool_function,
load_tools_from_directory,
Expand All @@ -30,4 +32,5 @@
"node_as_tool",
"resolve_tool_spec",
"resolve_tool_specs",
"serialize_multiagent_result",
]
141 changes: 84 additions & 57 deletions src/strands_compose/tools/extractors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
"""Message extraction utilities for agent and multi-agent results.

Key Features:
- Extract the last message from strands Agent and MultiAgent results
- Extract text from messages when a string-only fallback is needed
- Support for SwarmResult and GraphResult node resolution
- Recursive extraction through nested orchestration results
"""
"""Message extraction and serialization utilities for agent and multi-agent results."""

from __future__ import annotations

Expand All @@ -24,73 +17,43 @@ def _message_from_text(text: str) -> Message:
return {"role": "assistant", "content": [{"text": text}]}


def _extract_last_message_from_multi_agent_result(result: MultiAgentResult) -> Message:
"""Extract the final message from a ``MultiAgentResult``."""
last_node_id = resolve_last_node_id(result)

if last_node_id and last_node_id in result.results:
message = extract_last_message(result.results[last_node_id])
if message is not None:
return message

for node_result in reversed(list(result.results.values())):
message = extract_last_message(node_result)
if message is not None:
return message

logger.warning("status=<%s> | no message extracted from MultiAgentResult", result.status)
return _message_from_text(
f"[orchestration completed with status {result.status.value} but produced no message output]"
)


def extract_text_from_message(message: Message | None) -> str | None:
"""Extract the last text block from a message.

Strands ``ContentBlock`` uses ``{"text": "..."}`` for text blocks (no
``"type"`` wrapper). This helper scans content blocks in reverse and
returns the last text block. Use it only when a caller explicitly needs
plain text; ``extract_last_message`` preserves the complete message.

Args:
message: A strands ``Message`` dict (e.g. ``AgentResult.message``).

Returns:
The last text string, or ``None`` if no text blocks exist.
"""
def extract_text(message: Message | None) -> str:
"""Return the last text block from a message, or an empty string."""
if not message:
return None
content = message.get("content", [])
for block in reversed(content):
return ""
for block in reversed(message.get("content", [])):
if isinstance(block, dict) and "text" in block:
return block["text"]
return None
return ""


def extract_last_message(result: Any) -> Message:
"""Extract the final message from an agent, orchestration, or node result.

Dispatches to the appropriate extractor based on the result type:
- ``AgentResult`` returns ``result.message`` directly.
- ``MultiAgentResult`` drills into the last executing node's message.
- ``NodeResult`` unwraps the inner payload and dispatches recursively.
- Unknown types fall back to an assistant text message containing
``str(result)``.

Args:
result: An ``AgentResult``, ``MultiAgentResult``, ``NodeResult``,
or any object.

Returns:
The extracted ``Message``. This can be wrapped in a one-item list and
passed to ``Agent.invoke_async`` as ``Messages`` when richer content
such as images or documents must be preserved.
The extracted ``Message``.
"""
if isinstance(result, AgentResult):
return result.message

if isinstance(result, MultiAgentResult):
return _extract_last_message_from_multi_agent_result(result)
last_node_id = resolve_last_node_id(result)
if last_node_id and last_node_id in result.results:
message = extract_last_message(result.results[last_node_id])
if message is not None:
return message
for node_result in reversed(list(result.results.values())):
message = extract_last_message(node_result)
if message is not None:
return message
logger.warning("status=<%s> | no message extracted from MultiAgentResult", result.status)
return _message_from_text(
f"[orchestration completed with status {result.status.value} but produced no message output]"
)

if isinstance(result, NodeResult):
inner = result.result
Expand Down Expand Up @@ -125,3 +88,67 @@ def resolve_last_node_id(result: MultiAgentResult) -> str | None:
return str(execution_order[-1].node_id)

return None


def serialize_multiagent_result(result: MultiAgentResult) -> dict[str, Any]:
"""Serialize a ``MultiAgentResult`` with execution metadata omitted by ``to_dict()``.

Extends ``result.to_dict()`` with fields only available on the live object:

- ``last_node_id`` — id of the truly last executing node, derived from
``node_history`` / ``execution_order`` (not dict insertion order).
- ``response`` — plain-text answer from that node, ready to use
directly without any further extraction.
- ``swarm.node_history`` — full ordered execution trace including repeated
visits (``SwarmResult`` only).
- ``graph.execution_order``, ``graph.edges``, ``graph.entry_points``, and
node counts (``GraphResult`` only).

Args:
result: A live ``MultiAgentResult``, ``SwarmResult``, or ``GraphResult``
returned directly by ``invoke_async``.

Returns:
A JSON-serializable dict extending ``result.to_dict()``.
"""
data = result.to_dict()

last_node_id = resolve_last_node_id(result)
data["last_node_id"] = last_node_id

final_message = extract_last_message(result)
data["response"] = extract_text(final_message)

# SwarmResult extras — node_history is a list[SwarmNode]
node_history: list[Any] | None = getattr(result, "node_history", None)
if node_history is not None:
data["swarm"] = {
"node_history": [str(n.node_id) for n in node_history],
}

# GraphResult extras — execution_order, edges, node counts
execution_order: list[Any] | None = getattr(result, "execution_order", None)
if execution_order is not None:
edges_raw: list[Any] = getattr(result, "edges", []) or []
entry_points_raw: list[Any] = getattr(result, "entry_points", []) or []

edges: list[list[str]] = []
for edge in edges_raw:
if isinstance(edge, tuple) and len(edge) == 2:
edges.append([str(edge[0].node_id), str(edge[1].node_id)])
else:
# GraphEdge dataclass with from_node / to_node attributes
from_id = str(getattr(getattr(edge, "from_node", None), "node_id", edge))
to_id = str(getattr(getattr(edge, "to_node", None), "node_id", edge))
edges.append([from_id, to_id])

data["graph"] = {
"execution_order": [str(n.node_id) for n in execution_order],
"edges": edges,
"entry_points": [str(getattr(ep, "node_id", ep)) for ep in entry_points_raw],
"completed_nodes": getattr(result, "completed_nodes", 0),
"failed_nodes": getattr(result, "failed_nodes", 0),
"interrupted_nodes": getattr(result, "interrupted_nodes", 0),
}

return data
4 changes: 2 additions & 2 deletions src/strands_compose/tools/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from strands.tools.decorator import DecoratedFunctionTool, tool
from strands.types.content import Message

from .extractors import extract_last_message, extract_text_from_message
from .extractors import extract_last_message, extract_text

if TYPE_CHECKING:
from ..types import Node
Expand Down Expand Up @@ -75,7 +75,7 @@ def _message_to_tool_result(message: Message) -> dict[str, Any]:
if content:
return {"status": "success", "content": content}

return {"status": "success", "content": [{"text": extract_text_from_message(message) or ""}]}
return {"status": "success", "content": [{"text": extract_text(message)}]}


def node_as_tool(
Expand Down
Loading