diff --git a/cli/src/agent/runners/runAgentSession.ts b/cli/src/agent/runners/runAgentSession.ts index 3a98a2cfc..19219b7a8 100644 --- a/cli/src/agent/runners/runAgentSession.ts +++ b/cli/src/agent/runners/runAgentSession.ts @@ -50,9 +50,9 @@ export async function runAgentSession(opts: { const messageQueue = new MessageQueue2>(() => hashObject({})); - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, {}); + messageQueue.push(formattedText, {}, localId); }); let currentPermissionMode: SessionPermissionMode = opts.permissionMode ?? sessionInfo.permissionMode ?? 'default'; diff --git a/cli/src/agent/sessionBase.ts b/cli/src/agent/sessionBase.ts index 850f7adbe..cc43ee8b3 100644 --- a/cli/src/agent/sessionBase.ts +++ b/cli/src/agent/sessionBase.ts @@ -70,6 +70,8 @@ export class AgentSessionBase { this.effort = opts.effort; this.collaborationMode = opts.collaborationMode; + this.queue.onBatchConsumed = (localIds) => this.client.emitMessagesConsumed(localIds); + this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); this.keepAliveInterval = setInterval(() => { this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); diff --git a/cli/src/api/apiSession.ts b/cli/src/api/apiSession.ts index 174a2cdeb..5a7a8db33 100644 --- a/cli/src/api/apiSession.ts +++ b/cli/src/api/apiSession.ts @@ -78,8 +78,8 @@ export class ApiSessionClient extends EventEmitter { private agentState: AgentState | null private agentStateVersion: number private readonly socket: Socket - private pendingMessages: UserMessage[] = [] - private pendingMessageCallback: ((message: UserMessage) => void) | null = null + private pendingMessages: { message: UserMessage; localId?: string }[] = [] + private pendingMessageCallback: ((message: UserMessage, localId?: string) => void) | null = null private lastSeenMessageSeq: number | null = null private backfillInFlight: Promise | null = null private needsBackfill = false @@ -244,22 +244,23 @@ export class ApiSessionClient extends EventEmitter { this.socket.connect() } - onUserMessage(callback: (data: UserMessage) => void): void { + onUserMessage(callback: (data: UserMessage, localId?: string) => void): void { this.pendingMessageCallback = callback while (this.pendingMessages.length > 0) { - callback(this.pendingMessages.shift()!) + const pending = this.pendingMessages.shift()! + callback(pending.message, pending.localId) } } - private enqueueUserMessage(message: UserMessage): void { + private enqueueUserMessage(message: UserMessage, localId?: string): void { if (this.pendingMessageCallback) { - this.pendingMessageCallback(message) + this.pendingMessageCallback(message, localId) } else { - this.pendingMessages.push(message) + this.pendingMessages.push({ message, localId }) } } - private handleIncomingMessage(message: { seq?: number; content: unknown }): void { + private handleIncomingMessage(message: { seq?: number; localId?: string | null; content: unknown }): void { const seq = typeof message.seq === 'number' ? message.seq : null if (seq !== null) { if (this.lastSeenMessageSeq !== null && seq <= this.lastSeenMessageSeq) { @@ -270,7 +271,7 @@ export class ApiSessionClient extends EventEmitter { const userResult = UserMessageSchema.safeParse(message.content) if (userResult.success) { - this.enqueueUserMessage(userResult.data) + this.enqueueUserMessage(userResult.data, message.localId ?? undefined) return } @@ -493,6 +494,11 @@ export class ApiSessionClient extends EventEmitter { }) } + emitMessagesConsumed(localIds: string[]): void { + if (localIds.length === 0) return + this.socket.emit('messages-consumed', { sid: this.sessionId, localIds }) + } + sendSessionDeath(): void { void cleanupUploadDir(this.sessionId) this.socket.emit('session-end', { sid: this.sessionId, time: Date.now() }) diff --git a/cli/src/claude/runClaude.ts b/cli/src/claude/runClaude.ts index cdc62fe0b..c8b31395e 100644 --- a/cli/src/claude/runClaude.ts +++ b/cli/src/claude/runClaude.ts @@ -170,7 +170,7 @@ export async function runClaude(options: StartOptions = {}): Promise { sessionInstance.setEffort(currentEffort); logger.debug(`[loop] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${currentModel ?? 'auto'}, effort=${currentEffort ?? 'auto'}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const sessionPermissionMode = currentSessionRef.current?.getPermissionMode(); if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'claude')) { currentPermissionMode = sessionPermissionMode as PermissionMode; @@ -258,7 +258,7 @@ export async function runClaude(options: StartOptions = {}): Promise { }; // Use raw text only, ignore attachments for special commands const commandText = specialCommand.originalMessage || message.content.text; - messageQueue.pushIsolateAndClear(commandText, enhancedMode); + messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId); logger.debugLargeJson('[start] /compact command pushed to queue:', message); return; } @@ -277,7 +277,7 @@ export async function runClaude(options: StartOptions = {}): Promise { }; // Use raw text only, ignore attachments for special commands const commandText = specialCommand.originalMessage || message.content.text; - messageQueue.pushIsolateAndClear(commandText, enhancedMode); + messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId); logger.debugLargeJson('[start] /clear command pushed to queue:', message); return; } @@ -293,7 +293,7 @@ export async function runClaude(options: StartOptions = {}): Promise { allowedTools: messageAllowedTools, disallowedTools: messageDisallowedTools }; - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); logger.debugLargeJson('User message pushed to queue:', message) }); diff --git a/cli/src/codex/runCodex.ts b/cli/src/codex/runCodex.ts index 5f11f6428..d4f349651 100644 --- a/cli/src/codex/runCodex.ts +++ b/cli/src/codex/runCodex.ts @@ -95,7 +95,7 @@ export async function runCodex(opts: { ); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const sessionPermissionMode = sessionWrapperRef.current?.getPermissionMode(); if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'codex')) { currentPermissionMode = sessionPermissionMode as PermissionMode; @@ -127,7 +127,7 @@ export async function runCodex(opts: { collaborationMode: currentCollaborationMode }; const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); }); const formatFailureReason = (message: string): string => { diff --git a/cli/src/cursor/runCursor.ts b/cli/src/cursor/runCursor.ts index 86328d61f..3417cb83a 100644 --- a/cli/src/cursor/runCursor.ts +++ b/cli/src/cursor/runCursor.ts @@ -77,13 +77,13 @@ export async function runCursor(opts: { logger.debug(`[cursor] Synced session permission mode: ${currentPermissionMode}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const enhancedMode: EnhancedMode = { permissionMode: currentPermissionMode ?? 'default', model: currentModel }; const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/gemini/runGemini.ts b/cli/src/gemini/runGemini.ts index b290f7e6c..d0e7b1d9d 100644 --- a/cli/src/gemini/runGemini.ts +++ b/cli/src/gemini/runGemini.ts @@ -111,13 +111,13 @@ export async function runGemini(opts: { logger.debug(`[gemini] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${resolvedModel}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); const mode: GeminiMode = { permissionMode: currentPermissionMode, model: resolvedModel }; - messageQueue.push(formattedText, mode); + messageQueue.push(formattedText, mode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/opencode/runOpencode.ts b/cli/src/opencode/runOpencode.ts index 9498f9bf2..3f9e8d930 100644 --- a/cli/src/opencode/runOpencode.ts +++ b/cli/src/opencode/runOpencode.ts @@ -84,12 +84,12 @@ export async function runOpencode(opts: { logger.debug(`[opencode] Synced session permission mode for keepalive: ${currentPermissionMode}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); const mode: OpencodeMode = { permissionMode: currentPermissionMode }; - messageQueue.push(formattedText, mode); + messageQueue.push(formattedText, mode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/utils/MessageQueue2.test.ts b/cli/src/utils/MessageQueue2.test.ts index a9306325e..a4b63472d 100644 --- a/cli/src/utils/MessageQueue2.test.ts +++ b/cli/src/utils/MessageQueue2.test.ts @@ -426,6 +426,69 @@ describe('MessageQueue2', () => { expect(batch3?.mode.type).toBe('A'); }); + it('should call onBatchConsumed with collected localIds', async () => { + const queue = new MessageQueue2(mode => mode); + const received: string[][] = []; + queue.onBatchConsumed = (localIds) => { received.push(localIds); }; + + queue.push('message1', 'local', 'id1'); + queue.push('message2', 'local', 'id2'); + + await queue.waitForMessagesAndGetAsString(); + expect(received).toEqual([['id1', 'id2']]); + + // Push more with a different mode and consume again + queue.push('message3', 'remote', 'id3'); + await queue.waitForMessagesAndGetAsString(); + expect(received).toEqual([['id1', 'id2'], ['id3']]); + }); + + it('should report localIds batch-by-batch when modes differ', async () => { + const queue = new MessageQueue2(mode => mode); + const received: string[][] = []; + queue.onBatchConsumed = (localIds) => { received.push(localIds); }; + + // Two messages land in different batches because their mode hashes differ. + queue.push('first', 'A', 'id1'); + queue.push('second', 'B', 'id2'); + + const batch1 = await queue.waitForMessagesAndGetAsString(); + expect(batch1?.message).toBe('first'); + expect(received).toEqual([['id1']]); + // Second message still waiting in the queue. + expect(queue.size()).toBe(1); + + const batch2 = await queue.waitForMessagesAndGetAsString(); + expect(batch2?.message).toBe('second'); + expect(received).toEqual([['id1'], ['id2']]); + expect(queue.size()).toBe(0); + }); + + it('should skip onBatchConsumed when batch has no localIds', async () => { + const queue = new MessageQueue2(mode => mode); + let called = false; + queue.onBatchConsumed = () => { called = true; }; + + // Push without localIds (e.g., internal commands that do not need UI ack) + queue.push('internal', 'local'); + await queue.waitForMessagesAndGetAsString(); + expect(called).toBe(false); + }); + + it('should not call onBatchConsumed when collectBatch returns null', async () => { + const queue = new MessageQueue2(mode => mode); + let consumedCount = 0; + queue.onBatchConsumed = () => { consumedCount++; }; + + // Close queue while waiting — should return null + const waitPromise = queue.waitForMessagesAndGetAsString(); + queue.close(); + const result = await waitPromise; + + expect(result).toBeNull(); + expect(consumedCount).toBe(0); + }); + it('should differentiate between pushImmediate and pushIsolateAndClear behavior', async () => { const queue = new MessageQueue2<{ type: string }>((mode) => mode.type); diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index 6ba5fcdd1..ed4b51419 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -4,6 +4,7 @@ interface QueueItem { message: string; mode: T; modeHash: string; + localId?: string; isolate?: boolean; // If true, this message must be processed alone } @@ -16,6 +17,7 @@ export class MessageQueue2 { private waiter: ((hasMessages: boolean) => void) | null = null; private closed = false; private onMessageHandler: ((message: string, mode: T) => void) | null = null; + onBatchConsumed: ((localIds: string[]) => void) | null = null; modeHasher: (mode: T) => string; constructor( @@ -37,7 +39,7 @@ export class MessageQueue2 { /** * Push a message to the queue with a mode. */ - push(message: string, mode: T): void { + push(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -49,6 +51,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -72,7 +75,7 @@ export class MessageQueue2 { * Push a message immediately without batching delay. * Does not clear the queue or enforce isolation. */ - pushImmediate(message: string, mode: T): void { + pushImmediate(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -84,6 +87,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -108,7 +112,7 @@ export class MessageQueue2 { * Clears any pending messages and ensures this message is never batched with others. * Used for special commands that require dedicated processing. */ - pushIsolateAndClear(message: string, mode: T): void { + pushIsolateAndClear(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -123,6 +127,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: true }); @@ -145,7 +150,7 @@ export class MessageQueue2 { /** * Push a message to the beginning of the queue with a mode. */ - unshift(message: string, mode: T): void { + unshift(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot unshift to closed queue'); } @@ -157,6 +162,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -252,6 +258,7 @@ export class MessageQueue2 { const firstItem = this.queue[0]; const sameModeMessages: string[] = []; + const consumedLocalIds: string[] = []; let mode = firstItem.mode; let isolate = firstItem.isolate ?? false; const targetModeHash = firstItem.modeHash; @@ -260,6 +267,7 @@ export class MessageQueue2 { if (firstItem.isolate) { const item = this.queue.shift()!; sameModeMessages.push(item.message); + if (item.localId) consumedLocalIds.push(item.localId); logger.debug(`[MessageQueue2] Collected isolated message with mode hash: ${targetModeHash}`); } else { // Collect all messages with the same mode until we hit an isolated message @@ -268,6 +276,7 @@ export class MessageQueue2 { !this.queue[0].isolate) { const item = this.queue.shift()!; sameModeMessages.push(item.message); + if (item.localId) consumedLocalIds.push(item.localId); } logger.debug(`[MessageQueue2] Collected batch of ${sameModeMessages.length} messages with mode hash: ${targetModeHash}`); } @@ -275,6 +284,10 @@ export class MessageQueue2 { // Join all messages with newlines const combinedMessage = sameModeMessages.join('\n'); + if (consumedLocalIds.length > 0) { + this.onBatchConsumed?.(consumedLocalIds); + } + return { message: combinedMessage, mode, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index f7a76ec6b..ab14052da 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -254,6 +254,22 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session onSessionAlive?.(data) }) + socket.on('messages-consumed', (data: { sid: string; localIds: string[] }) => { + if (!data || typeof data.sid !== 'string' || !Array.isArray(data.localIds)) { + return + } + const localIds = data.localIds.filter((id): id is string => typeof id === 'string') + if (localIds.length === 0) { + return + } + const sessionAccess = resolveSessionAccess(data.sid) + if (!sessionAccess.ok) { + emitAccessError('session', data.sid, sessionAccess.reason) + return + } + onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid, localIds }) + }) + socket.on('session-end', (data: SessionEndPayload) => { if (!data || typeof data.sid !== 'string' || typeof data.time !== 'number') { return diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index c802efe51..c8581b90c 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -226,6 +226,10 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ url: z.string() }) }), + SessionChangedSchema.extend({ + type: z.literal('messages-consumed'), + localIds: z.array(z.string()) + }), SessionEventBaseSchema.extend({ type: z.literal('heartbeat'), data: z.object({ diff --git a/shared/src/socket.ts b/shared/src/socket.ts index 39d78d164..e4072f1ed 100644 --- a/shared/src/socket.ts +++ b/shared/src/socket.ts @@ -145,6 +145,7 @@ export interface ClientToServerEvents { collaborationMode?: CodexCollaborationMode }) => void 'session-end': (data: { sid: string; time: number }) => void + 'messages-consumed': (data: { sid: string; localIds: string[] }) => void 'update-metadata': (data: { sid: string; expectedVersion: number; metadata: unknown }, cb: (answer: { result: 'error' reason?: SocketErrorReason diff --git a/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx b/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx index 693ab9116..42a19876c 100644 --- a/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx +++ b/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx @@ -10,6 +10,15 @@ function ErrorIcon() { ) } +function QueuedIcon() { + return ( + + + + + ) +} + function SendingIcon() { return ( @@ -23,6 +32,14 @@ export function MessageStatusIndicator(props: { status?: MessageStatus onRetry?: () => void }) { + if (props.status === 'queued') { + return ( + + + + ) + } + if (props.status === 'sending') { return ( diff --git a/web/src/components/AssistantChat/messages/UserMessage.tsx b/web/src/components/AssistantChat/messages/UserMessage.tsx index 93aa8f269..0c37f4539 100644 --- a/web/src/components/AssistantChat/messages/UserMessage.tsx +++ b/web/src/components/AssistantChat/messages/UserMessage.tsx @@ -45,7 +45,7 @@ export function HappyUserMessage() { const canRetry = status === 'failed' && typeof localId === 'string' && Boolean(ctx.onRetryMessage) const onRetry = canRetry ? () => ctx.onRetryMessage!(localId) : undefined - const userBubbleClass = 'w-fit min-w-0 max-w-[92%] ml-auto rounded-xl bg-[var(--app-secondary-bg)] px-3 py-2 text-[var(--app-fg)] shadow-sm' + const userBubbleClass = `w-fit min-w-0 max-w-[92%] ml-auto rounded-xl bg-[var(--app-secondary-bg)] px-3 py-2 text-[var(--app-fg)] shadow-sm${status === 'queued' ? ' opacity-60' : ''}` if (isCliOutput) { return ( diff --git a/web/src/hooks/mutations/useSendMessage.ts b/web/src/hooks/mutations/useSendMessage.ts index 6d120d9f8..1d3c93bea 100644 --- a/web/src/hooks/mutations/useSendMessage.ts +++ b/web/src/hooks/mutations/useSendMessage.ts @@ -25,6 +25,28 @@ type UseSendMessageOptions = { onSessionResolved?: (sessionId: string) => void onBlocked?: (reason: BlockedReason) => void onSuccess?: (sessionId: string) => void + isSessionThinking?: boolean +} + +/** Create an optimistic message for display. Extracted as an extension point + * so a future floating-UI PR can route queued messages to a separate area. */ +function createOptimisticMessage(input: SendMessageInput, status: 'queued' | 'sending'): DecryptedMessage { + return { + id: input.localId, + seq: null, + localId: input.localId, + content: { + role: 'user', + content: { + type: 'text', + text: input.text, + attachments: input.attachments + } + }, + createdAt: input.createdAt, + status, + originalText: input.text, + } } function findMessageByLocalId( @@ -53,6 +75,8 @@ export function useSendMessage( const { haptic } = usePlatform() const [isResolving, setIsResolving] = useState(false) const resolveGuardRef = useRef(false) + const isSessionThinkingRef = useRef(options?.isSessionThinking ?? false) + isSessionThinkingRef.current = options?.isSessionThinking ?? false const mutation = useMutation({ mutationFn: async (input: SendMessageInput) => { @@ -62,27 +86,16 @@ export function useSendMessage( await api.sendMessage(input.sessionId, input.text, input.localId, input.attachments) }, onMutate: async (input) => { - const optimisticMessage: DecryptedMessage = { - id: input.localId, - seq: null, - localId: input.localId, - content: { - role: 'user', - content: { - type: 'text', - text: input.text, - attachments: input.attachments - } - }, - createdAt: input.createdAt, - status: 'sending', - originalText: input.text, - } - - appendOptimisticMessage(input.sessionId, optimisticMessage) + const status = isSessionThinkingRef.current ? 'queued' as const : 'sending' as const + appendOptimisticMessage(input.sessionId, createOptimisticMessage(input, status)) + return { status } }, - onSuccess: (_, input) => { - updateMessageStatus(input.sessionId, input.localId, 'sent') + onSuccess: (_, input, context) => { + updateMessageStatus( + input.sessionId, + input.localId, + context?.status === 'queued' ? 'queued' : 'sent' + ) haptic.notification('success') options?.onSuccess?.(input.sessionId) }, diff --git a/web/src/hooks/useSSE.ts b/web/src/hooks/useSSE.ts index 7b43260c0..fe22fc9b7 100644 --- a/web/src/hooks/useSSE.ts +++ b/web/src/hooks/useSSE.ts @@ -11,7 +11,7 @@ import type { SyncEvent } from '@/types/api' import { queryKeys } from '@/lib/query-keys' -import { clearMessageWindow, ingestIncomingMessages } from '@/lib/message-window-store' +import { clearMessageWindow, getMessageWindowState, ingestIncomingMessages, markMessagesConsumed, updateMessageStatus } from '@/lib/message-window-store' type SSESubscription = { all?: boolean @@ -497,6 +497,10 @@ export function useSSE(options: { return } + if (event.type === 'messages-consumed') { + markMessagesConsumed(event.sessionId, event.localIds) + } + if (event.type === 'message-received') { ingestIncomingMessages(event.sessionId, [event.message]) } diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index 0951fde28..a795c41b3 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -537,3 +537,28 @@ export function updateMessageStatus(sessionId: string, localId: string, status: return buildState(prev, { messages, pending }) }) } + +/** Transition the queued messages whose localIds match to 'sent'. Driven by the + * CLI ack (messages-consumed). Unmatched messages remain queued. */ +export function markMessagesConsumed(sessionId: string, localIds: string[]): void { + if (localIds.length === 0) return + const idSet = new Set(localIds) + updateState(sessionId, (prev) => { + let changed = false + const updateList = (list: DecryptedMessage[]) => { + return list.map((message) => { + if (message.status !== 'queued' || !message.localId || !idSet.has(message.localId)) { + return message + } + changed = true + return { ...message, status: 'sent' as MessageStatus } + }) + } + const messages = updateList(prev.messages) + const pending = updateList(prev.pending) + if (!changed) { + return prev + } + return buildState(prev, { messages, pending }) + }) +} diff --git a/web/src/lib/messages.ts b/web/src/lib/messages.ts index b1433851e..93e5bcbf4 100644 --- a/web/src/lib/messages.ts +++ b/web/src/lib/messages.ts @@ -60,13 +60,28 @@ export function mergeMessages(existing: DecryptedMessage[], incoming: DecryptedM } // If we received stored messages with a localId, drop any optimistic bubbles with the same localId. + // Preserve client-side status (e.g. 'queued') on the replacing server message. if (incomingStoredLocalIds.size > 0) { + const optimisticStatusByLocalId = new Map() + for (const msg of merged) { + if (msg.localId && isOptimisticMessage(msg) && incomingStoredLocalIds.has(msg.localId) && msg.status) { + optimisticStatusByLocalId.set(msg.localId, msg.status) + } + } merged = merged.filter((msg) => { if (!msg.localId || !incomingStoredLocalIds.has(msg.localId)) { return true } return !isOptimisticMessage(msg) }) + if (optimisticStatusByLocalId.size > 0) { + merged = merged.map((msg) => { + if (msg.localId && optimisticStatusByLocalId.has(msg.localId) && !msg.status) { + return { ...msg, status: optimisticStatusByLocalId.get(msg.localId) } + } + return msg + }) + } } // Fallback: if an optimistic message was marked as sent but we didn't get a localId echo, diff --git a/web/src/router.tsx b/web/src/router.tsx index 2680d2680..54b5ccfd6 100644 --- a/web/src/router.tsx +++ b/web/src/router.tsx @@ -238,6 +238,7 @@ function SessionPage() { retryMessage, isSending, } = useSendMessage(api, sessionId, { + isSessionThinking: session?.thinking ?? false, onSuccess: (sentSessionId) => { clearDraftsAfterSend(sentSessionId, sessionId) }, diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 0a2b01b14..ba03f262b 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -35,7 +35,7 @@ export type SessionMetadataSummary = { worktree?: WorktreeMetadata } -export type MessageStatus = 'sending' | 'sent' | 'failed' +export type MessageStatus = 'queued' | 'sending' | 'sent' | 'failed' export type DecryptedMessage = ProtocolDecryptedMessage & { status?: MessageStatus