diff --git a/components/frontend/src/components/session/MessagesTab.tsx b/components/frontend/src/components/session/MessagesTab.tsx index f717d580c..e78007f71 100644 --- a/components/frontend/src/components/session/MessagesTab.tsx +++ b/components/frontend/src/components/session/MessagesTab.tsx @@ -74,13 +74,17 @@ const MessagesTab: React.FC = ({ session, streamMessages, chat // Filter out system messages unless showSystemMessages is true const filteredMessages = streamMessages.filter((msg) => { if (showSystemMessages) return true; - - // Hide system_message type by default - // Check if msg has a type property and if it's a system_message + + // Hide system_message type by default (legacy) if ('type' in msg && msg.type === "system_message") { return false; } - + + // Hide messages with system or developer role (AG-UI protocol) + if ('role' in msg && (msg.role === "system" || msg.role === "developer")) { + return false; + } + return true; }); diff --git a/components/frontend/src/components/ui/message.tsx b/components/frontend/src/components/ui/message.tsx index 51d989630..483e89f2c 100644 --- a/components/frontend/src/components/ui/message.tsx +++ b/components/frontend/src/components/ui/message.tsx @@ -7,7 +7,7 @@ import remarkGfm from "remark-gfm"; import type { Components } from "react-markdown"; import { formatTimestamp } from "@/lib/format-timestamp"; -export type MessageRole = "bot" | "user"; +export type MessageRole = "bot" | "user" | "system"; export type MessageProps = { role: MessageRole; @@ -178,8 +178,9 @@ export const Message = React.forwardRef( ref ) => { const isBot = role === "bot"; - const avatarBg = isBot ? "bg-blue-600" : "bg-green-600"; - const avatarText = isBot ? "AI" : "U"; + const isSystem = role === "system"; + const avatarBg = isBot ? "bg-blue-600" : isSystem ? "bg-gray-500" : "bg-green-600"; + const avatarText = isBot ? "AI" : isSystem ? "SYS" : "U"; const formattedTime = formatTimestamp(timestamp); const isActivelyStreaming = streaming && isBot; @@ -200,25 +201,25 @@ export const Message = React.forwardRef( ) return ( -
-
+
+
{/* Avatar */} - {isBot ? avatar : null} + {(isBot || isSystem) ? avatar : null} {/* Message Content */} -
+
{/* Timestamp */} {formattedTime && ( -
+
{formattedTime}
)}
{/* Content */} -
+
{isLoading ? (
{content}
@@ -252,7 +253,7 @@ export const Message = React.forwardRef(
- {isBot ? null : avatar} + {(isBot || isSystem) ? null : avatar}
); diff --git a/components/frontend/src/components/ui/stream-message.tsx b/components/frontend/src/components/ui/stream-message.tsx index 70c80f42d..844781fda 100644 --- a/components/frontend/src/components/ui/stream-message.tsx +++ b/components/frontend/src/components/ui/stream-message.tsx @@ -64,37 +64,30 @@ export const StreamMessage: React.FC = ({ message, onGoToRes case "user_message": case "agent_message": { const isStreaming = 'streaming' in message && message.streaming; - const isAgent = m.type === "agent_message"; - - // Get content text for feedback context - const getContentText = () => { - if (typeof m.content === "string") return m.content; - if ("text" in m.content) return m.content.text; - if ("thinking" in m.content) return m.content.thinking; - return ""; - }; - - // Feedback buttons for agent text messages (not tool use/result, not streaming) - const feedbackElement = isAgent && !isStreaming ? ( - - ) : undefined; - + + // Check for AG-UI role field + const role = 'role' in m ? m.role : undefined; + + // Determine display role based on AG-UI role + let displayRole: "user" | "bot" | "system" = "user"; + let displayName = "You"; + + if (role === "assistant" || (m.type === "agent_message" && !role)) { + displayRole = "bot"; + displayName = "Claude AI"; + } else if (role === "developer") { + displayRole = "system"; + displayName = "Platform"; + } else if (role === "system") { + displayRole = "system"; + displayName = "System"; + } else if (role === "user" || m.type === "user_message") { + displayRole = "user"; + displayName = "You"; + } + if (typeof m.content === "string") { - return ( - - ); + return ; } switch (m.content.type) { case "thinking_block": @@ -102,9 +95,9 @@ export const StreamMessage: React.FC = ({ message, onGoToRes case "text_block": return ( ; timestamp: string; } +// New AG-UI role-based messages +export type DeveloperMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "developer"; // Platform/internal logging +} +export type SystemRoleMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "system"; // Claude system messages (turn info, etc) +} export type ResultMessage = { type: "result_message"; subtype: string; diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 61258f5f0..09a0aabe9 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -109,6 +109,32 @@ def _timestamp(self) -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() + async def _emit_developer_message( + self, message: str, thread_id: str, run_id: str + ) -> AsyncIterator[BaseEvent]: + """Helper to emit a developer role message (platform logging).""" + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta=message, + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + ) + async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """ Process a run and yield AG-UI events. @@ -213,11 +239,27 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven if not user_message: logger.warning("No user message found in input") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "No user message provided"}, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="No user message provided", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) yield RunFinishedEvent( type=EventType.RUN_FINISHED, @@ -518,6 +560,36 @@ async def restart_session_tool(args: dict) -> dict: include_partial_messages=True, ) + # Enable continue_conversation for session resumption + if not self._first_run or is_continuation: + try: + options.continue_conversation = True + logger.info("Enabled continue_conversation for session resumption") + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="🔄 Continuing conversation from previous state", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + ) + except Exception as e: + logger.warning(f"Failed to set continue_conversation: {e}") + if self._skip_resume_on_restart: self._skip_resume_on_restart = False @@ -594,14 +666,27 @@ def create_sdk_client(opts, disable_continue=False): error_str = str(resume_error).lower() if "no conversation found" in error_str or "session" in error_str: logger.warning(f"Conversation continuation failed: {resume_error}") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, thread_id=thread_id, run_id=run_id, - event={ - "type": "system_log", - "message": "⚠️ Could not continue conversation, starting fresh...", - }, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="⚠️ Could not continue conversation, starting fresh...", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) client = create_sdk_client(options, disable_continue=True) await client.connect() @@ -612,6 +697,31 @@ def create_sdk_client(opts, disable_continue=False): # Store client reference for interrupt support self._active_client = client + if not self._first_run: + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="✅ Continuing conversation", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + ) + logger.info("SDK continuing conversation from local state") + # Process the prompt step_id = str(uuid.uuid4()) yield StepStartedEvent( @@ -1121,7 +1231,151 @@ async def _prepare_workspace(self) -> AsyncIterator[BaseEvent]: else: logger.info("No state hydrated (fresh session)") - # No further preparation needed - init container did the work + repos_cfg = self._get_repos_config() + if repos_cfg: + async for event in self._prepare_multi_repo_workspace(workspace, repos_cfg, reusing_workspace): + yield event + return + + # Single-repo legacy flow + input_repo = os.getenv("INPUT_REPO_URL", "").strip() + if not input_repo: + logger.info("No INPUT_REPO_URL configured, skipping single-repo setup") + return + + input_branch = os.getenv("INPUT_BRANCH", "").strip() or "main" + output_repo = os.getenv("OUTPUT_REPO_URL", "").strip() + + token = await self._fetch_token_for_url(input_repo) + workspace_has_git = (workspace / ".git").exists() + + try: + if not workspace_has_git: + async for event in self._emit_developer_message( + "📥 Cloning input repository...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + clone_url = self._url_with_token(input_repo, token) if token else input_repo + await self._run_cmd(["git", "clone", "--branch", input_branch, "--single-branch", clone_url, str(workspace)], cwd=str(workspace.parent)) + await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(workspace), ignore_errors=True) + elif reusing_workspace: + async for event in self._emit_developer_message( + "✓ Preserving workspace (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace), ignore_errors=True) + else: + async for event in self._emit_developer_message( + "🔄 Resetting workspace to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace)) + await self._run_cmd(["git", "fetch", "origin", input_branch], cwd=str(workspace)) + await self._run_cmd(["git", "checkout", input_branch], cwd=str(workspace)) + await self._run_cmd(["git", "reset", "--hard", f"origin/{input_branch}"], cwd=str(workspace)) + + # Git identity + user_name = os.getenv("GIT_USER_NAME", "").strip() or "Ambient Code Bot" + user_email = os.getenv("GIT_USER_EMAIL", "").strip() or "bot@ambient-code.local" + await self._run_cmd(["git", "config", "user.name", user_name], cwd=str(workspace)) + await self._run_cmd(["git", "config", "user.email", user_email], cwd=str(workspace)) + + if output_repo: + out_url = self._url_with_token(output_repo, token) if token else output_repo + await self._run_cmd(["git", "remote", "remove", "output"], cwd=str(workspace), ignore_errors=True) + await self._run_cmd(["git", "remote", "add", "output", out_url], cwd=str(workspace)) + + except Exception as e: + logger.error(f"Failed to prepare workspace: {e}") + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + + # Create artifacts directory + try: + artifacts_dir = workspace / "artifacts" + artifacts_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + logger.warning(f"Failed to create artifacts directory: {e}") + + async def _prepare_multi_repo_workspace( + self, workspace: Path, repos_cfg: list, reusing_workspace: bool + ) -> AsyncIterator[BaseEvent]: + """Prepare workspace for multi-repo mode.""" + try: + for r in repos_cfg: + name = (r.get('name') or '').strip() + inp = r.get('input') or {} + url = (inp.get('url') or '').strip() + branch = (inp.get('branch') or '').strip() or 'main' + if not name or not url: + continue + + repo_dir = workspace / name + token = await self._fetch_token_for_url(url) + repo_exists = repo_dir.exists() and (repo_dir / ".git").exists() + + if not repo_exists: + async for event in self._emit_developer_message( + f"📥 Cloning {name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + clone_url = self._url_with_token(url, token) if token else url + await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(repo_dir)], cwd=str(workspace)) + await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(repo_dir), ignore_errors=True) + elif reusing_workspace: + async for event in self._emit_developer_message( + f"✓ Preserving {name} (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) + else: + async for event in self._emit_developer_message( + f"🔄 Resetting {name} to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) + await self._run_cmd(["git", "fetch", "origin", branch], cwd=str(repo_dir)) + await self._run_cmd(["git", "checkout", branch], cwd=str(repo_dir)) + await self._run_cmd(["git", "reset", "--hard", f"origin/{branch}"], cwd=str(repo_dir)) + + # Git identity + user_name = os.getenv("GIT_USER_NAME", "").strip() or "Ambient Code Bot" + user_email = os.getenv("GIT_USER_EMAIL", "").strip() or "bot@ambient-code.local" + await self._run_cmd(["git", "config", "user.name", user_name], cwd=str(repo_dir)) + await self._run_cmd(["git", "config", "user.email", user_email], cwd=str(repo_dir)) + + # Configure output remote + out = r.get('output') or {} + out_url_raw = (out.get('url') or '').strip() + if out_url_raw: + out_url = self._url_with_token(out_url_raw, token) if token else out_url_raw + await self._run_cmd(["git", "remote", "remove", "output"], cwd=str(repo_dir), ignore_errors=True) + await self._run_cmd(["git", "remote", "add", "output", out_url], cwd=str(repo_dir)) + + except Exception as e: + logger.error(f"Failed to prepare multi-repo workspace: {e}") + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _validate_prerequisites(self): """Validate prerequisite files exist for phase-based slash commands.""" @@ -1201,7 +1455,65 @@ async def _initialize_workflow_if_set(self) -> AsyncIterator[BaseEvent]: ) except Exception as e: - logger.error(f"Failed to validate workflow: {e}") + logger.error(f"Failed to initialize workflow on startup: {e}") + + async def _clone_workflow_repository( + self, git_url: str, branch: str, path: str, workflow_name: str + ) -> AsyncIterator[BaseEvent]: + """Clone workflow repository.""" + workspace = Path(self.context.workspace_path) + workflow_dir = workspace / "workflows" / workflow_name + temp_clone_dir = workspace / "workflows" / f"{workflow_name}-clone-temp" + + if workflow_dir.exists(): + async for event in self._emit_developer_message( + f"✓ Workflow {workflow_name} already loaded", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + return + + token = await self._fetch_token_for_url(git_url) + + async for event in self._emit_developer_message( + f"📥 Cloning workflow {workflow_name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + + clone_url = self._url_with_token(git_url, token) if token else git_url + await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(temp_clone_dir)], cwd=str(workspace)) + + if path and path.strip(): + subdir_path = temp_clone_dir / path.strip() + if subdir_path.exists() and subdir_path.is_dir(): + shutil.copytree(subdir_path, workflow_dir) + shutil.rmtree(temp_clone_dir) + async for event in self._emit_developer_message( + f"✓ Extracted workflow from: {path}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + else: + temp_clone_dir.rename(workflow_dir) + async for event in self._emit_developer_message( + f"⚠️ Path '{path}' not found, using full repository", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event + else: + temp_clone_dir.rename(workflow_dir) + + async for event in self._emit_developer_message( + f"✅ Workflow {workflow_name} ready", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _run_cmd(self, cmd, cwd=None, capture_stdout=False, ignore_errors=False): """Run a subprocess command asynchronously."""