From 8af188b26ec539af311a3a5c9f49a91761e2c30b Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 10:28:40 +0900 Subject: [PATCH 1/6] refactor: emit messages-consumed event when CLI drains queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add onBatchConsumed callback to MessageQueue2 that fires after collectBatch() returns messages. Wire it through sessionBase → apiSession → Hub socket handler → SSE so the web client can distinguish queued vs consumed messages. New socket event 'messages-consumed' follows the session-alive pattern (sid validation, access check, onWebappEvent relay). Fully backward-compatible: old hubs ignore unknown events, old CLIs never set the optional callback. --- cli/src/agent/sessionBase.ts | 2 ++ cli/src/api/apiSession.ts | 4 +++ cli/src/utils/MessageQueue2.test.ts | 31 +++++++++++++++++++ cli/src/utils/MessageQueue2.ts | 3 ++ .../socket/handlers/cli/sessionHandlers.ts | 12 +++++++ shared/src/schemas.ts | 3 ++ shared/src/socket.ts | 1 + 7 files changed, 56 insertions(+) diff --git a/cli/src/agent/sessionBase.ts b/cli/src/agent/sessionBase.ts index 850f7adbe..b58257d30 100644 --- a/cli/src/agent/sessionBase.ts +++ b/cli/src/agent/sessionBase.ts @@ -70,6 +70,8 @@ export class AgentSessionBase { this.effort = opts.effort; this.collaborationMode = opts.collaborationMode; + this.queue.onBatchConsumed = () => this.client.emitMessagesConsumed(); + this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); this.keepAliveInterval = setInterval(() => { this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); diff --git a/cli/src/api/apiSession.ts b/cli/src/api/apiSession.ts index 174a2cdeb..ea48420b0 100644 --- a/cli/src/api/apiSession.ts +++ b/cli/src/api/apiSession.ts @@ -493,6 +493,10 @@ export class ApiSessionClient extends EventEmitter { }) } + emitMessagesConsumed(): void { + this.socket.emit('messages-consumed', { sid: this.sessionId }) + } + sendSessionDeath(): void { void cleanupUploadDir(this.sessionId) this.socket.emit('session-end', { sid: this.sessionId, time: Date.now() }) diff --git a/cli/src/utils/MessageQueue2.test.ts b/cli/src/utils/MessageQueue2.test.ts index a9306325e..5690bbe94 100644 --- a/cli/src/utils/MessageQueue2.test.ts +++ b/cli/src/utils/MessageQueue2.test.ts @@ -426,6 +426,37 @@ describe('MessageQueue2', () => { expect(batch3?.mode.type).toBe('A'); }); + it('should call onBatchConsumed when collectBatch returns messages', async () => { + const queue = new MessageQueue2(mode => mode); + let consumedCount = 0; + queue.onBatchConsumed = () => { consumedCount++; }; + + queue.push('message1', 'local'); + queue.push('message2', 'local'); + + await queue.waitForMessagesAndGetAsString(); + expect(consumedCount).toBe(1); + + // Push more and consume again + queue.push('message3', 'remote'); + await queue.waitForMessagesAndGetAsString(); + expect(consumedCount).toBe(2); + }); + + it('should not call onBatchConsumed when collectBatch returns null', async () => { + const queue = new MessageQueue2(mode => mode); + let consumedCount = 0; + queue.onBatchConsumed = () => { consumedCount++; }; + + // Close queue while waiting — should return null + const waitPromise = queue.waitForMessagesAndGetAsString(); + queue.close(); + const result = await waitPromise; + + expect(result).toBeNull(); + expect(consumedCount).toBe(0); + }); + it('should differentiate between pushImmediate and pushIsolateAndClear behavior', async () => { const queue = new MessageQueue2<{ type: string }>((mode) => mode.type); diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index 6ba5fcdd1..67623c3ee 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -16,6 +16,7 @@ export class MessageQueue2 { private waiter: ((hasMessages: boolean) => void) | null = null; private closed = false; private onMessageHandler: ((message: string, mode: T) => void) | null = null; + onBatchConsumed: (() => void) | null = null; modeHasher: (mode: T) => string; constructor( @@ -275,6 +276,8 @@ export class MessageQueue2 { // Join all messages with newlines const combinedMessage = sameModeMessages.join('\n'); + this.onBatchConsumed?.(); + return { message: combinedMessage, mode, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index f7a76ec6b..a35801509 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -254,6 +254,18 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session onSessionAlive?.(data) }) + socket.on('messages-consumed', (data: { sid: string }) => { + if (!data || typeof data.sid !== 'string') { + return + } + const sessionAccess = resolveSessionAccess(data.sid) + if (!sessionAccess.ok) { + emitAccessError('session', data.sid, sessionAccess.reason) + return + } + onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid }) + }) + socket.on('session-end', (data: SessionEndPayload) => { if (!data || typeof data.sid !== 'string' || typeof data.time !== 'number') { return diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index c802efe51..07451db65 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -226,6 +226,9 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ url: z.string() }) }), + SessionChangedSchema.extend({ + type: z.literal('messages-consumed') + }), SessionEventBaseSchema.extend({ type: z.literal('heartbeat'), data: z.object({ diff --git a/shared/src/socket.ts b/shared/src/socket.ts index 39d78d164..b29939e02 100644 --- a/shared/src/socket.ts +++ b/shared/src/socket.ts @@ -145,6 +145,7 @@ export interface ClientToServerEvents { collaborationMode?: CodexCollaborationMode }) => void 'session-end': (data: { sid: string; time: number }) => void + 'messages-consumed': (data: { sid: string }) => void 'update-metadata': (data: { sid: string; expectedVersion: number; metadata: unknown }, cb: (answer: { result: 'error' reason?: SocketErrorReason From 19d2036f5c80e51c6d099c11a399ae092e00d845 Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 10:31:33 +0900 Subject: [PATCH 2/6] feat(web): show queued status for messages pending inference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the session is thinking (agent responding), new messages are now displayed with a clock icon and 'queued' status instead of 'sending'. Once the CLI drains its queue and emits the messages-consumed event (added in the previous commit), all queued messages transition to 'sent'. - Add 'queued' to MessageStatus type - useSendMessage: accept isSessionThinking option, set initial optimistic status accordingly via createQueuedMessage() - useSSE: handle messages-consumed sync event via flushQueuedMessages() — both functions are extension points for a future floating-UI PR - MessageStatusIndicator: add QueuedIcon (clock SVG) --- .../messages/MessageStatusIndicator.tsx | 17 +++++++ .../AssistantChat/messages/UserMessage.tsx | 2 +- web/src/hooks/mutations/useSendMessage.ts | 50 ++++++++++++------- web/src/hooks/useSSE.ts | 19 ++++++- web/src/lib/message-window-store.ts | 24 +++++++++ web/src/lib/messages.ts | 15 ++++++ web/src/router.tsx | 1 + web/src/types/api.ts | 2 +- 8 files changed, 108 insertions(+), 22 deletions(-) diff --git a/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx b/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx index 693ab9116..42a19876c 100644 --- a/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx +++ b/web/src/components/AssistantChat/messages/MessageStatusIndicator.tsx @@ -10,6 +10,15 @@ function ErrorIcon() { ) } +function QueuedIcon() { + return ( + + + + + ) +} + function SendingIcon() { return ( @@ -23,6 +32,14 @@ export function MessageStatusIndicator(props: { status?: MessageStatus onRetry?: () => void }) { + if (props.status === 'queued') { + return ( + + + + ) + } + if (props.status === 'sending') { return ( diff --git a/web/src/components/AssistantChat/messages/UserMessage.tsx b/web/src/components/AssistantChat/messages/UserMessage.tsx index 93aa8f269..0c37f4539 100644 --- a/web/src/components/AssistantChat/messages/UserMessage.tsx +++ b/web/src/components/AssistantChat/messages/UserMessage.tsx @@ -45,7 +45,7 @@ export function HappyUserMessage() { const canRetry = status === 'failed' && typeof localId === 'string' && Boolean(ctx.onRetryMessage) const onRetry = canRetry ? () => ctx.onRetryMessage!(localId) : undefined - const userBubbleClass = 'w-fit min-w-0 max-w-[92%] ml-auto rounded-xl bg-[var(--app-secondary-bg)] px-3 py-2 text-[var(--app-fg)] shadow-sm' + const userBubbleClass = `w-fit min-w-0 max-w-[92%] ml-auto rounded-xl bg-[var(--app-secondary-bg)] px-3 py-2 text-[var(--app-fg)] shadow-sm${status === 'queued' ? ' opacity-60' : ''}` if (isCliOutput) { return ( diff --git a/web/src/hooks/mutations/useSendMessage.ts b/web/src/hooks/mutations/useSendMessage.ts index 6d120d9f8..1717b5906 100644 --- a/web/src/hooks/mutations/useSendMessage.ts +++ b/web/src/hooks/mutations/useSendMessage.ts @@ -25,6 +25,28 @@ type UseSendMessageOptions = { onSessionResolved?: (sessionId: string) => void onBlocked?: (reason: BlockedReason) => void onSuccess?: (sessionId: string) => void + isSessionThinking?: boolean +} + +/** Create an optimistic message for display. Extracted as an extension point + * so a future floating-UI PR can route queued messages to a separate area. */ +function createOptimisticMessage(input: SendMessageInput, status: 'queued' | 'sending'): DecryptedMessage { + return { + id: input.localId, + seq: null, + localId: input.localId, + content: { + role: 'user', + content: { + type: 'text', + text: input.text, + attachments: input.attachments + } + }, + createdAt: input.createdAt, + status, + originalText: input.text, + } } function findMessageByLocalId( @@ -53,6 +75,8 @@ export function useSendMessage( const { haptic } = usePlatform() const [isResolving, setIsResolving] = useState(false) const resolveGuardRef = useRef(false) + const isSessionThinkingRef = useRef(options?.isSessionThinking ?? false) + isSessionThinkingRef.current = options?.isSessionThinking ?? false const mutation = useMutation({ mutationFn: async (input: SendMessageInput) => { @@ -62,27 +86,15 @@ export function useSendMessage( await api.sendMessage(input.sessionId, input.text, input.localId, input.attachments) }, onMutate: async (input) => { - const optimisticMessage: DecryptedMessage = { - id: input.localId, - seq: null, - localId: input.localId, - content: { - role: 'user', - content: { - type: 'text', - text: input.text, - attachments: input.attachments - } - }, - createdAt: input.createdAt, - status: 'sending', - originalText: input.text, - } - - appendOptimisticMessage(input.sessionId, optimisticMessage) + const status = isSessionThinkingRef.current ? 'queued' as const : 'sending' as const + appendOptimisticMessage(input.sessionId, createOptimisticMessage(input, status)) }, onSuccess: (_, input) => { - updateMessageStatus(input.sessionId, input.localId, 'sent') + updateMessageStatus( + input.sessionId, + input.localId, + isSessionThinkingRef.current ? 'queued' : 'sent' + ) haptic.notification('success') options?.onSuccess?.(input.sessionId) }, diff --git a/web/src/hooks/useSSE.ts b/web/src/hooks/useSSE.ts index 7b43260c0..f7fa7cd60 100644 --- a/web/src/hooks/useSSE.ts +++ b/web/src/hooks/useSSE.ts @@ -11,7 +11,7 @@ import type { SyncEvent } from '@/types/api' import { queryKeys } from '@/lib/query-keys' -import { clearMessageWindow, ingestIncomingMessages } from '@/lib/message-window-store' +import { clearMessageWindow, flushQueuedStatuses, getMessageWindowState, ingestIncomingMessages, updateMessageStatus } from '@/lib/message-window-store' type SSESubscription = { all?: boolean @@ -475,6 +475,13 @@ export function useSSE(options: { }) } + /** Transition all queued messages to 'sent' for a session. + * Uses flushQueuedStatuses which matches by status rather than localId, + * so it works even after server echo replaces the optimistic message. */ + const flushQueuedMessages = (sessionId: string) => { + flushQueuedStatuses(sessionId) + } + const handleSyncEvent = (event: SyncEvent) => { lastActivityAtRef.current = Date.now() @@ -497,6 +504,10 @@ export function useSSE(options: { return } + if (event.type === 'messages-consumed') { + flushQueuedMessages(event.sessionId) + } + if (event.type === 'message-received') { ingestIncomingMessages(event.sessionId, [event.message]) } @@ -507,11 +518,17 @@ export function useSSE(options: { void queryClient.removeQueries({ queryKey: queryKeys.session(event.sessionId) }) clearMessageWindow(event.sessionId) } else if (isSessionRecord(event.data) && event.data.id === event.sessionId) { + if (!event.data.thinking) { + flushQueuedMessages(event.sessionId) + } queryClient.setQueryData(queryKeys.session(event.sessionId), { session: event.data }) upsertSessionSummary(event.data) } else { const patch = getSessionPatch(event.data) if (patch) { + if (patch.thinking === false) { + flushQueuedMessages(event.sessionId) + } const detailPatched = patchSessionDetail(event.sessionId, patch) const summaryPatched = patchSessionSummary(event.sessionId, patch) diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index 0951fde28..1967ce393 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -537,3 +537,27 @@ export function updateMessageStatus(sessionId: string, localId: string, status: return buildState(prev, { messages, pending }) }) } + +/** Transition all messages with status 'queued' to 'sent' for a session. + * Unlike updateMessageStatus, this matches by status rather than localId, + * so it works even after server echo replaces the optimistic message. */ +export function flushQueuedStatuses(sessionId: string): void { + updateState(sessionId, (prev) => { + let changed = false + const updateList = (list: DecryptedMessage[]) => { + return list.map((message) => { + if (message.status !== 'queued') { + return message + } + changed = true + return { ...message, status: 'sent' as MessageStatus } + }) + } + const messages = updateList(prev.messages) + const pending = updateList(prev.pending) + if (!changed) { + return prev + } + return buildState(prev, { messages, pending }) + }) +} diff --git a/web/src/lib/messages.ts b/web/src/lib/messages.ts index b1433851e..93e5bcbf4 100644 --- a/web/src/lib/messages.ts +++ b/web/src/lib/messages.ts @@ -60,13 +60,28 @@ export function mergeMessages(existing: DecryptedMessage[], incoming: DecryptedM } // If we received stored messages with a localId, drop any optimistic bubbles with the same localId. + // Preserve client-side status (e.g. 'queued') on the replacing server message. if (incomingStoredLocalIds.size > 0) { + const optimisticStatusByLocalId = new Map() + for (const msg of merged) { + if (msg.localId && isOptimisticMessage(msg) && incomingStoredLocalIds.has(msg.localId) && msg.status) { + optimisticStatusByLocalId.set(msg.localId, msg.status) + } + } merged = merged.filter((msg) => { if (!msg.localId || !incomingStoredLocalIds.has(msg.localId)) { return true } return !isOptimisticMessage(msg) }) + if (optimisticStatusByLocalId.size > 0) { + merged = merged.map((msg) => { + if (msg.localId && optimisticStatusByLocalId.has(msg.localId) && !msg.status) { + return { ...msg, status: optimisticStatusByLocalId.get(msg.localId) } + } + return msg + }) + } } // Fallback: if an optimistic message was marked as sent but we didn't get a localId echo, diff --git a/web/src/router.tsx b/web/src/router.tsx index 2680d2680..54b5ccfd6 100644 --- a/web/src/router.tsx +++ b/web/src/router.tsx @@ -238,6 +238,7 @@ function SessionPage() { retryMessage, isSending, } = useSendMessage(api, sessionId, { + isSessionThinking: session?.thinking ?? false, onSuccess: (sentSessionId) => { clearDraftsAfterSend(sentSessionId, sessionId) }, diff --git a/web/src/types/api.ts b/web/src/types/api.ts index 0a2b01b14..ba03f262b 100644 --- a/web/src/types/api.ts +++ b/web/src/types/api.ts @@ -35,7 +35,7 @@ export type SessionMetadataSummary = { worktree?: WorktreeMetadata } -export type MessageStatus = 'sending' | 'sent' | 'failed' +export type MessageStatus = 'queued' | 'sending' | 'sent' | 'failed' export type DecryptedMessage = ProtocolDecryptedMessage & { status?: MessageStatus From 83da5a337b860f3b9aa5f6498f4bfb7f77a97fcc Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 17:38:05 +0900 Subject: [PATCH 3/6] =?UTF-8?q?fix:=20use=20consumedAt=20timestamp=20to=20?= =?UTF-8?q?prevent=20premature=20queued=E2=86=92sent=20transition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback: messages-consumed event now carries a consumedAt timestamp so the web only flushes messages created before that time. Prevents a race where a newly queued message could be marked sent before the CLI actually processes it. Also captures optimistic status in mutation context so onSuccess uses the send-time thinking state instead of re-reading the ref. --- cli/src/agent/sessionBase.ts | 2 +- cli/src/api/apiSession.ts | 4 ++-- cli/src/utils/MessageQueue2.ts | 4 ++-- hub/src/socket/handlers/cli/sessionHandlers.ts | 6 +++--- shared/src/schemas.ts | 3 ++- shared/src/socket.ts | 2 +- web/src/hooks/mutations/useSendMessage.ts | 5 +++-- web/src/hooks/useSSE.ts | 11 +++++------ web/src/lib/message-window-store.ts | 13 +++++++++---- 9 files changed, 28 insertions(+), 22 deletions(-) diff --git a/cli/src/agent/sessionBase.ts b/cli/src/agent/sessionBase.ts index b58257d30..aedef2195 100644 --- a/cli/src/agent/sessionBase.ts +++ b/cli/src/agent/sessionBase.ts @@ -70,7 +70,7 @@ export class AgentSessionBase { this.effort = opts.effort; this.collaborationMode = opts.collaborationMode; - this.queue.onBatchConsumed = () => this.client.emitMessagesConsumed(); + this.queue.onBatchConsumed = (consumedAt) => this.client.emitMessagesConsumed(consumedAt); this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); this.keepAliveInterval = setInterval(() => { diff --git a/cli/src/api/apiSession.ts b/cli/src/api/apiSession.ts index ea48420b0..bbdb78e0c 100644 --- a/cli/src/api/apiSession.ts +++ b/cli/src/api/apiSession.ts @@ -493,8 +493,8 @@ export class ApiSessionClient extends EventEmitter { }) } - emitMessagesConsumed(): void { - this.socket.emit('messages-consumed', { sid: this.sessionId }) + emitMessagesConsumed(consumedAt: number): void { + this.socket.emit('messages-consumed', { sid: this.sessionId, consumedAt }) } sendSessionDeath(): void { diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index 67623c3ee..656231cc6 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -16,7 +16,7 @@ export class MessageQueue2 { private waiter: ((hasMessages: boolean) => void) | null = null; private closed = false; private onMessageHandler: ((message: string, mode: T) => void) | null = null; - onBatchConsumed: (() => void) | null = null; + onBatchConsumed: ((consumedAt: number) => void) | null = null; modeHasher: (mode: T) => string; constructor( @@ -276,7 +276,7 @@ export class MessageQueue2 { // Join all messages with newlines const combinedMessage = sameModeMessages.join('\n'); - this.onBatchConsumed?.(); + this.onBatchConsumed?.(Date.now()); return { message: combinedMessage, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index a35801509..8cead4f5a 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -254,8 +254,8 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session onSessionAlive?.(data) }) - socket.on('messages-consumed', (data: { sid: string }) => { - if (!data || typeof data.sid !== 'string') { + socket.on('messages-consumed', (data: { sid: string; consumedAt: number }) => { + if (!data || typeof data.sid !== 'string' || typeof data.consumedAt !== 'number') { return } const sessionAccess = resolveSessionAccess(data.sid) @@ -263,7 +263,7 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session emitAccessError('session', data.sid, sessionAccess.reason) return } - onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid }) + onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid, consumedAt: data.consumedAt }) }) socket.on('session-end', (data: SessionEndPayload) => { diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index 07451db65..997655eaa 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -227,7 +227,8 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ }) }), SessionChangedSchema.extend({ - type: z.literal('messages-consumed') + type: z.literal('messages-consumed'), + consumedAt: z.number() }), SessionEventBaseSchema.extend({ type: z.literal('heartbeat'), diff --git a/shared/src/socket.ts b/shared/src/socket.ts index b29939e02..ef6bb16c6 100644 --- a/shared/src/socket.ts +++ b/shared/src/socket.ts @@ -145,7 +145,7 @@ export interface ClientToServerEvents { collaborationMode?: CodexCollaborationMode }) => void 'session-end': (data: { sid: string; time: number }) => void - 'messages-consumed': (data: { sid: string }) => void + 'messages-consumed': (data: { sid: string; consumedAt: number }) => void 'update-metadata': (data: { sid: string; expectedVersion: number; metadata: unknown }, cb: (answer: { result: 'error' reason?: SocketErrorReason diff --git a/web/src/hooks/mutations/useSendMessage.ts b/web/src/hooks/mutations/useSendMessage.ts index 1717b5906..1d3c93bea 100644 --- a/web/src/hooks/mutations/useSendMessage.ts +++ b/web/src/hooks/mutations/useSendMessage.ts @@ -88,12 +88,13 @@ export function useSendMessage( onMutate: async (input) => { const status = isSessionThinkingRef.current ? 'queued' as const : 'sending' as const appendOptimisticMessage(input.sessionId, createOptimisticMessage(input, status)) + return { status } }, - onSuccess: (_, input) => { + onSuccess: (_, input, context) => { updateMessageStatus( input.sessionId, input.localId, - isSessionThinkingRef.current ? 'queued' : 'sent' + context?.status === 'queued' ? 'queued' : 'sent' ) haptic.notification('success') options?.onSuccess?.(input.sessionId) diff --git a/web/src/hooks/useSSE.ts b/web/src/hooks/useSSE.ts index f7fa7cd60..4bc49ae30 100644 --- a/web/src/hooks/useSSE.ts +++ b/web/src/hooks/useSSE.ts @@ -475,11 +475,10 @@ export function useSSE(options: { }) } - /** Transition all queued messages to 'sent' for a session. - * Uses flushQueuedStatuses which matches by status rather than localId, - * so it works even after server echo replaces the optimistic message. */ - const flushQueuedMessages = (sessionId: string) => { - flushQueuedStatuses(sessionId) + /** Transition queued messages to 'sent' for a session. + * consumedAt limits the flush to messages created before that time. */ + const flushQueuedMessages = (sessionId: string, consumedAt?: number) => { + flushQueuedStatuses(sessionId, consumedAt) } const handleSyncEvent = (event: SyncEvent) => { @@ -505,7 +504,7 @@ export function useSSE(options: { } if (event.type === 'messages-consumed') { - flushQueuedMessages(event.sessionId) + flushQueuedMessages(event.sessionId, event.consumedAt) } if (event.type === 'message-received') { diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index 1967ce393..f6e2c8b4a 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -538,10 +538,12 @@ export function updateMessageStatus(sessionId: string, localId: string, status: }) } -/** Transition all messages with status 'queued' to 'sent' for a session. - * Unlike updateMessageStatus, this matches by status rather than localId, - * so it works even after server echo replaces the optimistic message. */ -export function flushQueuedStatuses(sessionId: string): void { +/** Transition queued messages to 'sent' for a session. + * When consumedAt is provided, only messages created before that timestamp + * are flushed — this avoids marking messages as sent when they are still + * waiting in the CLI queue (race between new enqueue and SSE delivery). + * When omitted (e.g. thinking→false fallback), all queued messages are flushed. */ +export function flushQueuedStatuses(sessionId: string, consumedAt?: number): void { updateState(sessionId, (prev) => { let changed = false const updateList = (list: DecryptedMessage[]) => { @@ -549,6 +551,9 @@ export function flushQueuedStatuses(sessionId: string): void { if (message.status !== 'queued') { return message } + if (consumedAt !== undefined && message.createdAt > consumedAt) { + return message + } changed = true return { ...message, status: 'sent' as MessageStatus } }) From 4022229729e9a7cd25e950320bb01114049c30b7 Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 19:10:06 +0900 Subject: [PATCH 4/6] fix: track localIds through queue to avoid browser/CLI clock skew MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous consumedAt timestamp approach compared browser-set createdAt against a CLI-host timestamp. With remote/PWA clients these clocks drift independently, so queued messages could be marked sent too early or stay queued after being consumed. Instead, carry the localId from the incoming hub 'update' event down through the message queue, and return the consumed localIds from collectBatch. The 'messages-consumed' socket event now ships those exact localIds to the hub/web, and the web store transitions only matching queued messages to sent. The thinking→idle fallback still performs a broad flush as a last-resort safety net. --- cli/src/agent/runners/runAgentSession.ts | 4 +-- cli/src/agent/sessionBase.ts | 2 +- cli/src/api/apiSession.ts | 24 +++++++------ cli/src/claude/runClaude.ts | 8 ++--- cli/src/codex/runCodex.ts | 4 +-- cli/src/cursor/runCursor.ts | 4 +-- cli/src/gemini/runGemini.ts | 4 +-- cli/src/opencode/runOpencode.ts | 4 +-- cli/src/utils/MessageQueue2.test.ts | 29 ++++++++++----- cli/src/utils/MessageQueue2.ts | 22 ++++++++---- .../socket/handlers/cli/sessionHandlers.ts | 10 ++++-- shared/src/schemas.ts | 2 +- shared/src/socket.ts | 2 +- web/src/hooks/useSSE.ts | 10 +++--- web/src/lib/message-window-store.ts | 35 ++++++++++++++----- 15 files changed, 104 insertions(+), 60 deletions(-) diff --git a/cli/src/agent/runners/runAgentSession.ts b/cli/src/agent/runners/runAgentSession.ts index 3a98a2cfc..19219b7a8 100644 --- a/cli/src/agent/runners/runAgentSession.ts +++ b/cli/src/agent/runners/runAgentSession.ts @@ -50,9 +50,9 @@ export async function runAgentSession(opts: { const messageQueue = new MessageQueue2>(() => hashObject({})); - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, {}); + messageQueue.push(formattedText, {}, localId); }); let currentPermissionMode: SessionPermissionMode = opts.permissionMode ?? sessionInfo.permissionMode ?? 'default'; diff --git a/cli/src/agent/sessionBase.ts b/cli/src/agent/sessionBase.ts index aedef2195..cc43ee8b3 100644 --- a/cli/src/agent/sessionBase.ts +++ b/cli/src/agent/sessionBase.ts @@ -70,7 +70,7 @@ export class AgentSessionBase { this.effort = opts.effort; this.collaborationMode = opts.collaborationMode; - this.queue.onBatchConsumed = (consumedAt) => this.client.emitMessagesConsumed(consumedAt); + this.queue.onBatchConsumed = (localIds) => this.client.emitMessagesConsumed(localIds); this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime()); this.keepAliveInterval = setInterval(() => { diff --git a/cli/src/api/apiSession.ts b/cli/src/api/apiSession.ts index bbdb78e0c..5a7a8db33 100644 --- a/cli/src/api/apiSession.ts +++ b/cli/src/api/apiSession.ts @@ -78,8 +78,8 @@ export class ApiSessionClient extends EventEmitter { private agentState: AgentState | null private agentStateVersion: number private readonly socket: Socket - private pendingMessages: UserMessage[] = [] - private pendingMessageCallback: ((message: UserMessage) => void) | null = null + private pendingMessages: { message: UserMessage; localId?: string }[] = [] + private pendingMessageCallback: ((message: UserMessage, localId?: string) => void) | null = null private lastSeenMessageSeq: number | null = null private backfillInFlight: Promise | null = null private needsBackfill = false @@ -244,22 +244,23 @@ export class ApiSessionClient extends EventEmitter { this.socket.connect() } - onUserMessage(callback: (data: UserMessage) => void): void { + onUserMessage(callback: (data: UserMessage, localId?: string) => void): void { this.pendingMessageCallback = callback while (this.pendingMessages.length > 0) { - callback(this.pendingMessages.shift()!) + const pending = this.pendingMessages.shift()! + callback(pending.message, pending.localId) } } - private enqueueUserMessage(message: UserMessage): void { + private enqueueUserMessage(message: UserMessage, localId?: string): void { if (this.pendingMessageCallback) { - this.pendingMessageCallback(message) + this.pendingMessageCallback(message, localId) } else { - this.pendingMessages.push(message) + this.pendingMessages.push({ message, localId }) } } - private handleIncomingMessage(message: { seq?: number; content: unknown }): void { + private handleIncomingMessage(message: { seq?: number; localId?: string | null; content: unknown }): void { const seq = typeof message.seq === 'number' ? message.seq : null if (seq !== null) { if (this.lastSeenMessageSeq !== null && seq <= this.lastSeenMessageSeq) { @@ -270,7 +271,7 @@ export class ApiSessionClient extends EventEmitter { const userResult = UserMessageSchema.safeParse(message.content) if (userResult.success) { - this.enqueueUserMessage(userResult.data) + this.enqueueUserMessage(userResult.data, message.localId ?? undefined) return } @@ -493,8 +494,9 @@ export class ApiSessionClient extends EventEmitter { }) } - emitMessagesConsumed(consumedAt: number): void { - this.socket.emit('messages-consumed', { sid: this.sessionId, consumedAt }) + emitMessagesConsumed(localIds: string[]): void { + if (localIds.length === 0) return + this.socket.emit('messages-consumed', { sid: this.sessionId, localIds }) } sendSessionDeath(): void { diff --git a/cli/src/claude/runClaude.ts b/cli/src/claude/runClaude.ts index cdc62fe0b..c8b31395e 100644 --- a/cli/src/claude/runClaude.ts +++ b/cli/src/claude/runClaude.ts @@ -170,7 +170,7 @@ export async function runClaude(options: StartOptions = {}): Promise { sessionInstance.setEffort(currentEffort); logger.debug(`[loop] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${currentModel ?? 'auto'}, effort=${currentEffort ?? 'auto'}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const sessionPermissionMode = currentSessionRef.current?.getPermissionMode(); if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'claude')) { currentPermissionMode = sessionPermissionMode as PermissionMode; @@ -258,7 +258,7 @@ export async function runClaude(options: StartOptions = {}): Promise { }; // Use raw text only, ignore attachments for special commands const commandText = specialCommand.originalMessage || message.content.text; - messageQueue.pushIsolateAndClear(commandText, enhancedMode); + messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId); logger.debugLargeJson('[start] /compact command pushed to queue:', message); return; } @@ -277,7 +277,7 @@ export async function runClaude(options: StartOptions = {}): Promise { }; // Use raw text only, ignore attachments for special commands const commandText = specialCommand.originalMessage || message.content.text; - messageQueue.pushIsolateAndClear(commandText, enhancedMode); + messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId); logger.debugLargeJson('[start] /clear command pushed to queue:', message); return; } @@ -293,7 +293,7 @@ export async function runClaude(options: StartOptions = {}): Promise { allowedTools: messageAllowedTools, disallowedTools: messageDisallowedTools }; - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); logger.debugLargeJson('User message pushed to queue:', message) }); diff --git a/cli/src/codex/runCodex.ts b/cli/src/codex/runCodex.ts index 5f11f6428..d4f349651 100644 --- a/cli/src/codex/runCodex.ts +++ b/cli/src/codex/runCodex.ts @@ -95,7 +95,7 @@ export async function runCodex(opts: { ); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const sessionPermissionMode = sessionWrapperRef.current?.getPermissionMode(); if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'codex')) { currentPermissionMode = sessionPermissionMode as PermissionMode; @@ -127,7 +127,7 @@ export async function runCodex(opts: { collaborationMode: currentCollaborationMode }; const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); }); const formatFailureReason = (message: string): string => { diff --git a/cli/src/cursor/runCursor.ts b/cli/src/cursor/runCursor.ts index 86328d61f..3417cb83a 100644 --- a/cli/src/cursor/runCursor.ts +++ b/cli/src/cursor/runCursor.ts @@ -77,13 +77,13 @@ export async function runCursor(opts: { logger.debug(`[cursor] Synced session permission mode: ${currentPermissionMode}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const enhancedMode: EnhancedMode = { permissionMode: currentPermissionMode ?? 'default', model: currentModel }; const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); - messageQueue.push(formattedText, enhancedMode); + messageQueue.push(formattedText, enhancedMode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/gemini/runGemini.ts b/cli/src/gemini/runGemini.ts index b290f7e6c..d0e7b1d9d 100644 --- a/cli/src/gemini/runGemini.ts +++ b/cli/src/gemini/runGemini.ts @@ -111,13 +111,13 @@ export async function runGemini(opts: { logger.debug(`[gemini] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${resolvedModel}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); const mode: GeminiMode = { permissionMode: currentPermissionMode, model: resolvedModel }; - messageQueue.push(formattedText, mode); + messageQueue.push(formattedText, mode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/opencode/runOpencode.ts b/cli/src/opencode/runOpencode.ts index 9498f9bf2..3f9e8d930 100644 --- a/cli/src/opencode/runOpencode.ts +++ b/cli/src/opencode/runOpencode.ts @@ -84,12 +84,12 @@ export async function runOpencode(opts: { logger.debug(`[opencode] Synced session permission mode for keepalive: ${currentPermissionMode}`); }; - session.onUserMessage((message) => { + session.onUserMessage((message, localId) => { const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments); const mode: OpencodeMode = { permissionMode: currentPermissionMode }; - messageQueue.push(formattedText, mode); + messageQueue.push(formattedText, mode, localId); }); const resolvePermissionMode = (value: unknown): PermissionMode => { diff --git a/cli/src/utils/MessageQueue2.test.ts b/cli/src/utils/MessageQueue2.test.ts index 5690bbe94..3c02ba57c 100644 --- a/cli/src/utils/MessageQueue2.test.ts +++ b/cli/src/utils/MessageQueue2.test.ts @@ -426,21 +426,32 @@ describe('MessageQueue2', () => { expect(batch3?.mode.type).toBe('A'); }); - it('should call onBatchConsumed when collectBatch returns messages', async () => { + it('should call onBatchConsumed with collected localIds', async () => { const queue = new MessageQueue2(mode => mode); - let consumedCount = 0; - queue.onBatchConsumed = () => { consumedCount++; }; + const received: string[][] = []; + queue.onBatchConsumed = (localIds) => { received.push(localIds); }; - queue.push('message1', 'local'); - queue.push('message2', 'local'); + queue.push('message1', 'local', 'id1'); + queue.push('message2', 'local', 'id2'); await queue.waitForMessagesAndGetAsString(); - expect(consumedCount).toBe(1); + expect(received).toEqual([['id1', 'id2']]); + + // Push more with a different mode and consume again + queue.push('message3', 'remote', 'id3'); + await queue.waitForMessagesAndGetAsString(); + expect(received).toEqual([['id1', 'id2'], ['id3']]); + }); + + it('should skip onBatchConsumed when batch has no localIds', async () => { + const queue = new MessageQueue2(mode => mode); + let called = false; + queue.onBatchConsumed = () => { called = true; }; - // Push more and consume again - queue.push('message3', 'remote'); + // Push without localIds (e.g., internal commands that do not need UI ack) + queue.push('internal', 'local'); await queue.waitForMessagesAndGetAsString(); - expect(consumedCount).toBe(2); + expect(called).toBe(false); }); it('should not call onBatchConsumed when collectBatch returns null', async () => { diff --git a/cli/src/utils/MessageQueue2.ts b/cli/src/utils/MessageQueue2.ts index 656231cc6..ed4b51419 100644 --- a/cli/src/utils/MessageQueue2.ts +++ b/cli/src/utils/MessageQueue2.ts @@ -4,6 +4,7 @@ interface QueueItem { message: string; mode: T; modeHash: string; + localId?: string; isolate?: boolean; // If true, this message must be processed alone } @@ -16,7 +17,7 @@ export class MessageQueue2 { private waiter: ((hasMessages: boolean) => void) | null = null; private closed = false; private onMessageHandler: ((message: string, mode: T) => void) | null = null; - onBatchConsumed: ((consumedAt: number) => void) | null = null; + onBatchConsumed: ((localIds: string[]) => void) | null = null; modeHasher: (mode: T) => string; constructor( @@ -38,7 +39,7 @@ export class MessageQueue2 { /** * Push a message to the queue with a mode. */ - push(message: string, mode: T): void { + push(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -50,6 +51,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -73,7 +75,7 @@ export class MessageQueue2 { * Push a message immediately without batching delay. * Does not clear the queue or enforce isolation. */ - pushImmediate(message: string, mode: T): void { + pushImmediate(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -85,6 +87,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -109,7 +112,7 @@ export class MessageQueue2 { * Clears any pending messages and ensures this message is never batched with others. * Used for special commands that require dedicated processing. */ - pushIsolateAndClear(message: string, mode: T): void { + pushIsolateAndClear(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot push to closed queue'); } @@ -124,6 +127,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: true }); @@ -146,7 +150,7 @@ export class MessageQueue2 { /** * Push a message to the beginning of the queue with a mode. */ - unshift(message: string, mode: T): void { + unshift(message: string, mode: T, localId?: string): void { if (this.closed) { throw new Error('Cannot unshift to closed queue'); } @@ -158,6 +162,7 @@ export class MessageQueue2 { message, mode, modeHash, + localId, isolate: false }); @@ -253,6 +258,7 @@ export class MessageQueue2 { const firstItem = this.queue[0]; const sameModeMessages: string[] = []; + const consumedLocalIds: string[] = []; let mode = firstItem.mode; let isolate = firstItem.isolate ?? false; const targetModeHash = firstItem.modeHash; @@ -261,6 +267,7 @@ export class MessageQueue2 { if (firstItem.isolate) { const item = this.queue.shift()!; sameModeMessages.push(item.message); + if (item.localId) consumedLocalIds.push(item.localId); logger.debug(`[MessageQueue2] Collected isolated message with mode hash: ${targetModeHash}`); } else { // Collect all messages with the same mode until we hit an isolated message @@ -269,6 +276,7 @@ export class MessageQueue2 { !this.queue[0].isolate) { const item = this.queue.shift()!; sameModeMessages.push(item.message); + if (item.localId) consumedLocalIds.push(item.localId); } logger.debug(`[MessageQueue2] Collected batch of ${sameModeMessages.length} messages with mode hash: ${targetModeHash}`); } @@ -276,7 +284,9 @@ export class MessageQueue2 { // Join all messages with newlines const combinedMessage = sameModeMessages.join('\n'); - this.onBatchConsumed?.(Date.now()); + if (consumedLocalIds.length > 0) { + this.onBatchConsumed?.(consumedLocalIds); + } return { message: combinedMessage, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index 8cead4f5a..ab14052da 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -254,8 +254,12 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session onSessionAlive?.(data) }) - socket.on('messages-consumed', (data: { sid: string; consumedAt: number }) => { - if (!data || typeof data.sid !== 'string' || typeof data.consumedAt !== 'number') { + socket.on('messages-consumed', (data: { sid: string; localIds: string[] }) => { + if (!data || typeof data.sid !== 'string' || !Array.isArray(data.localIds)) { + return + } + const localIds = data.localIds.filter((id): id is string => typeof id === 'string') + if (localIds.length === 0) { return } const sessionAccess = resolveSessionAccess(data.sid) @@ -263,7 +267,7 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session emitAccessError('session', data.sid, sessionAccess.reason) return } - onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid, consumedAt: data.consumedAt }) + onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid, localIds }) }) socket.on('session-end', (data: SessionEndPayload) => { diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index 997655eaa..c8581b90c 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -228,7 +228,7 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ }), SessionChangedSchema.extend({ type: z.literal('messages-consumed'), - consumedAt: z.number() + localIds: z.array(z.string()) }), SessionEventBaseSchema.extend({ type: z.literal('heartbeat'), diff --git a/shared/src/socket.ts b/shared/src/socket.ts index ef6bb16c6..e4072f1ed 100644 --- a/shared/src/socket.ts +++ b/shared/src/socket.ts @@ -145,7 +145,7 @@ export interface ClientToServerEvents { collaborationMode?: CodexCollaborationMode }) => void 'session-end': (data: { sid: string; time: number }) => void - 'messages-consumed': (data: { sid: string; consumedAt: number }) => void + 'messages-consumed': (data: { sid: string; localIds: string[] }) => void 'update-metadata': (data: { sid: string; expectedVersion: number; metadata: unknown }, cb: (answer: { result: 'error' reason?: SocketErrorReason diff --git a/web/src/hooks/useSSE.ts b/web/src/hooks/useSSE.ts index 4bc49ae30..4a64d8944 100644 --- a/web/src/hooks/useSSE.ts +++ b/web/src/hooks/useSSE.ts @@ -11,7 +11,7 @@ import type { SyncEvent } from '@/types/api' import { queryKeys } from '@/lib/query-keys' -import { clearMessageWindow, flushQueuedStatuses, getMessageWindowState, ingestIncomingMessages, updateMessageStatus } from '@/lib/message-window-store' +import { clearMessageWindow, flushQueuedStatuses, getMessageWindowState, ingestIncomingMessages, markMessagesConsumed, updateMessageStatus } from '@/lib/message-window-store' type SSESubscription = { all?: boolean @@ -475,10 +475,8 @@ export function useSSE(options: { }) } - /** Transition queued messages to 'sent' for a session. - * consumedAt limits the flush to messages created before that time. */ - const flushQueuedMessages = (sessionId: string, consumedAt?: number) => { - flushQueuedStatuses(sessionId, consumedAt) + const flushQueuedMessages = (sessionId: string) => { + flushQueuedStatuses(sessionId) } const handleSyncEvent = (event: SyncEvent) => { @@ -504,7 +502,7 @@ export function useSSE(options: { } if (event.type === 'messages-consumed') { - flushQueuedMessages(event.sessionId, event.consumedAt) + markMessagesConsumed(event.sessionId, event.localIds) } if (event.type === 'message-received') { diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index f6e2c8b4a..7790c0d1e 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -538,20 +538,39 @@ export function updateMessageStatus(sessionId: string, localId: string, status: }) } -/** Transition queued messages to 'sent' for a session. - * When consumedAt is provided, only messages created before that timestamp - * are flushed — this avoids marking messages as sent when they are still - * waiting in the CLI queue (race between new enqueue and SSE delivery). - * When omitted (e.g. thinking→false fallback), all queued messages are flushed. */ -export function flushQueuedStatuses(sessionId: string, consumedAt?: number): void { +/** Transition the queued messages whose localIds match to 'sent'. Driven by the + * CLI ack (messages-consumed). Unmatched messages remain queued. */ +export function markMessagesConsumed(sessionId: string, localIds: string[]): void { + if (localIds.length === 0) return + const idSet = new Set(localIds) updateState(sessionId, (prev) => { let changed = false const updateList = (list: DecryptedMessage[]) => { return list.map((message) => { - if (message.status !== 'queued') { + if (message.status !== 'queued' || !message.localId || !idSet.has(message.localId)) { return message } - if (consumedAt !== undefined && message.createdAt > consumedAt) { + changed = true + return { ...message, status: 'sent' as MessageStatus } + }) + } + const messages = updateList(prev.messages) + const pending = updateList(prev.pending) + if (!changed) { + return prev + } + return buildState(prev, { messages, pending }) + }) +} + +/** Flush all queued messages for a session. Used as a fallback when the agent + * transitions from thinking → idle (e.g. on CLI restart where acks were lost). */ +export function flushQueuedStatuses(sessionId: string): void { + updateState(sessionId, (prev) => { + let changed = false + const updateList = (list: DecryptedMessage[]) => { + return list.map((message) => { + if (message.status !== 'queued') { return message } changed = true From 65dbf694db6c7dd2c7f14ff9e2c2c9a31a095c23 Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 19:39:11 +0900 Subject: [PATCH 5/6] =?UTF-8?q?fix(web):=20remove=20over-broad=20thinking?= =?UTF-8?q?=E2=86=92idle=20queued=20flush?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The thinking:false fallback converted every queued message in a session to sent, which overshot when a session had multiple batches (different mode hashes, isolated commands) and the agent briefly went idle between turns. Later messages that were still waiting in the CLI queue ended up marked as sent prematurely. With the localId-based messages-consumed ack, the fallback is no longer necessary for the happy path. Drop it and rely on ack as the sole transition path. If an ack is lost (e.g. CLI crash) the message stays queued, which accurately reflects that it was never processed. Merge logic already preserves the optimistic status on backfill, so a page refresh keeps the queued affordance instead of hiding it. --- web/src/hooks/useSSE.ts | 12 +----------- web/src/lib/message-window-store.ts | 23 ----------------------- 2 files changed, 1 insertion(+), 34 deletions(-) diff --git a/web/src/hooks/useSSE.ts b/web/src/hooks/useSSE.ts index 4a64d8944..fe22fc9b7 100644 --- a/web/src/hooks/useSSE.ts +++ b/web/src/hooks/useSSE.ts @@ -11,7 +11,7 @@ import type { SyncEvent } from '@/types/api' import { queryKeys } from '@/lib/query-keys' -import { clearMessageWindow, flushQueuedStatuses, getMessageWindowState, ingestIncomingMessages, markMessagesConsumed, updateMessageStatus } from '@/lib/message-window-store' +import { clearMessageWindow, getMessageWindowState, ingestIncomingMessages, markMessagesConsumed, updateMessageStatus } from '@/lib/message-window-store' type SSESubscription = { all?: boolean @@ -475,10 +475,6 @@ export function useSSE(options: { }) } - const flushQueuedMessages = (sessionId: string) => { - flushQueuedStatuses(sessionId) - } - const handleSyncEvent = (event: SyncEvent) => { lastActivityAtRef.current = Date.now() @@ -515,17 +511,11 @@ export function useSSE(options: { void queryClient.removeQueries({ queryKey: queryKeys.session(event.sessionId) }) clearMessageWindow(event.sessionId) } else if (isSessionRecord(event.data) && event.data.id === event.sessionId) { - if (!event.data.thinking) { - flushQueuedMessages(event.sessionId) - } queryClient.setQueryData(queryKeys.session(event.sessionId), { session: event.data }) upsertSessionSummary(event.data) } else { const patch = getSessionPatch(event.data) if (patch) { - if (patch.thinking === false) { - flushQueuedMessages(event.sessionId) - } const detailPatched = patchSessionDetail(event.sessionId, patch) const summaryPatched = patchSessionSummary(event.sessionId, patch) diff --git a/web/src/lib/message-window-store.ts b/web/src/lib/message-window-store.ts index 7790c0d1e..a795c41b3 100644 --- a/web/src/lib/message-window-store.ts +++ b/web/src/lib/message-window-store.ts @@ -562,26 +562,3 @@ export function markMessagesConsumed(sessionId: string, localIds: string[]): voi return buildState(prev, { messages, pending }) }) } - -/** Flush all queued messages for a session. Used as a fallback when the agent - * transitions from thinking → idle (e.g. on CLI restart where acks were lost). */ -export function flushQueuedStatuses(sessionId: string): void { - updateState(sessionId, (prev) => { - let changed = false - const updateList = (list: DecryptedMessage[]) => { - return list.map((message) => { - if (message.status !== 'queued') { - return message - } - changed = true - return { ...message, status: 'sent' as MessageStatus } - }) - } - const messages = updateList(prev.messages) - const pending = updateList(prev.pending) - if (!changed) { - return prev - } - return buildState(prev, { messages, pending }) - }) -} From aed452ff900fef0136cf54ed532dfc50491114cb Mon Sep 17 00:00:00 2001 From: Junmo Kim Date: Sun, 19 Apr 2026 21:19:34 +0900 Subject: [PATCH 6/6] test: cover per-batch localId ack across differing modes Verifies that when two queued messages have different mode hashes, only the first batch's localId is reported on ack and the second message remains in the queue until its own batch is consumed. --- cli/src/utils/MessageQueue2.test.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cli/src/utils/MessageQueue2.test.ts b/cli/src/utils/MessageQueue2.test.ts index 3c02ba57c..a4b63472d 100644 --- a/cli/src/utils/MessageQueue2.test.ts +++ b/cli/src/utils/MessageQueue2.test.ts @@ -443,6 +443,27 @@ describe('MessageQueue2', () => { expect(received).toEqual([['id1', 'id2'], ['id3']]); }); + it('should report localIds batch-by-batch when modes differ', async () => { + const queue = new MessageQueue2(mode => mode); + const received: string[][] = []; + queue.onBatchConsumed = (localIds) => { received.push(localIds); }; + + // Two messages land in different batches because their mode hashes differ. + queue.push('first', 'A', 'id1'); + queue.push('second', 'B', 'id2'); + + const batch1 = await queue.waitForMessagesAndGetAsString(); + expect(batch1?.message).toBe('first'); + expect(received).toEqual([['id1']]); + // Second message still waiting in the queue. + expect(queue.size()).toBe(1); + + const batch2 = await queue.waitForMessagesAndGetAsString(); + expect(batch2?.message).toBe('second'); + expect(received).toEqual([['id1'], ['id2']]); + expect(queue.size()).toBe(0); + }); + it('should skip onBatchConsumed when batch has no localIds', async () => { const queue = new MessageQueue2(mode => mode); let called = false;