Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/src/agent/runners/runAgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ export async function runAgentSession(opts: {

const messageQueue = new MessageQueue2<Record<string, never>>(() => hashObject({}));

session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments);
messageQueue.push(formattedText, {});
messageQueue.push(formattedText, {}, localId);
});

let currentPermissionMode: SessionPermissionMode = opts.permissionMode ?? sessionInfo.permissionMode ?? 'default';
Expand Down
2 changes: 2 additions & 0 deletions cli/src/agent/sessionBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export class AgentSessionBase<Mode> {
this.effort = opts.effort;
this.collaborationMode = opts.collaborationMode;

this.queue.onBatchConsumed = (localIds) => this.client.emitMessagesConsumed(localIds);

this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime());
this.keepAliveInterval = setInterval(() => {
this.client.keepAlive(this.thinking, this.mode, this.getKeepAliveRuntime());
Expand Down
24 changes: 15 additions & 9 deletions cli/src/api/apiSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ export class ApiSessionClient extends EventEmitter {
private agentState: AgentState | null
private agentStateVersion: number
private readonly socket: Socket<ServerToClientEvents, ClientToServerEvents>
private pendingMessages: UserMessage[] = []
private pendingMessageCallback: ((message: UserMessage) => void) | null = null
private pendingMessages: { message: UserMessage; localId?: string }[] = []
private pendingMessageCallback: ((message: UserMessage, localId?: string) => void) | null = null
private lastSeenMessageSeq: number | null = null
private backfillInFlight: Promise<void> | null = null
private needsBackfill = false
Expand Down Expand Up @@ -244,22 +244,23 @@ export class ApiSessionClient extends EventEmitter {
this.socket.connect()
}

onUserMessage(callback: (data: UserMessage) => void): void {
onUserMessage(callback: (data: UserMessage, localId?: string) => void): void {
this.pendingMessageCallback = callback
while (this.pendingMessages.length > 0) {
callback(this.pendingMessages.shift()!)
const pending = this.pendingMessages.shift()!
callback(pending.message, pending.localId)
}
}

private enqueueUserMessage(message: UserMessage): void {
private enqueueUserMessage(message: UserMessage, localId?: string): void {
if (this.pendingMessageCallback) {
this.pendingMessageCallback(message)
this.pendingMessageCallback(message, localId)
} else {
this.pendingMessages.push(message)
this.pendingMessages.push({ message, localId })
}
}

private handleIncomingMessage(message: { seq?: number; content: unknown }): void {
private handleIncomingMessage(message: { seq?: number; localId?: string | null; content: unknown }): void {
const seq = typeof message.seq === 'number' ? message.seq : null
if (seq !== null) {
if (this.lastSeenMessageSeq !== null && seq <= this.lastSeenMessageSeq) {
Expand All @@ -270,7 +271,7 @@ export class ApiSessionClient extends EventEmitter {

const userResult = UserMessageSchema.safeParse(message.content)
if (userResult.success) {
this.enqueueUserMessage(userResult.data)
this.enqueueUserMessage(userResult.data, message.localId ?? undefined)
return
}

Expand Down Expand Up @@ -493,6 +494,11 @@ export class ApiSessionClient extends EventEmitter {
})
}

emitMessagesConsumed(localIds: string[]): void {
if (localIds.length === 0) return
this.socket.emit('messages-consumed', { sid: this.sessionId, localIds })
}

sendSessionDeath(): void {
void cleanupUploadDir(this.sessionId)
this.socket.emit('session-end', { sid: this.sessionId, time: Date.now() })
Expand Down
8 changes: 4 additions & 4 deletions cli/src/claude/runClaude.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export async function runClaude(options: StartOptions = {}): Promise<void> {
sessionInstance.setEffort(currentEffort);
logger.debug(`[loop] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${currentModel ?? 'auto'}, effort=${currentEffort ?? 'auto'}`);
};
session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const sessionPermissionMode = currentSessionRef.current?.getPermissionMode();
if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'claude')) {
currentPermissionMode = sessionPermissionMode as PermissionMode;
Expand Down Expand Up @@ -258,7 +258,7 @@ export async function runClaude(options: StartOptions = {}): Promise<void> {
};
// Use raw text only, ignore attachments for special commands
const commandText = specialCommand.originalMessage || message.content.text;
messageQueue.pushIsolateAndClear(commandText, enhancedMode);
messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId);
logger.debugLargeJson('[start] /compact command pushed to queue:', message);
return;
}
Expand All @@ -277,7 +277,7 @@ export async function runClaude(options: StartOptions = {}): Promise<void> {
};
// Use raw text only, ignore attachments for special commands
const commandText = specialCommand.originalMessage || message.content.text;
messageQueue.pushIsolateAndClear(commandText, enhancedMode);
messageQueue.pushIsolateAndClear(commandText, enhancedMode, localId);
logger.debugLargeJson('[start] /clear command pushed to queue:', message);
return;
}
Expand All @@ -293,7 +293,7 @@ export async function runClaude(options: StartOptions = {}): Promise<void> {
allowedTools: messageAllowedTools,
disallowedTools: messageDisallowedTools
};
messageQueue.push(formattedText, enhancedMode);
messageQueue.push(formattedText, enhancedMode, localId);
logger.debugLargeJson('User message pushed to queue:', message)
});

Expand Down
4 changes: 2 additions & 2 deletions cli/src/codex/runCodex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export async function runCodex(opts: {
);
};

session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const sessionPermissionMode = sessionWrapperRef.current?.getPermissionMode();
if (sessionPermissionMode && isPermissionModeAllowedForFlavor(sessionPermissionMode, 'codex')) {
currentPermissionMode = sessionPermissionMode as PermissionMode;
Expand Down Expand Up @@ -127,7 +127,7 @@ export async function runCodex(opts: {
collaborationMode: currentCollaborationMode
};
const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments);
messageQueue.push(formattedText, enhancedMode);
messageQueue.push(formattedText, enhancedMode, localId);
});

const formatFailureReason = (message: string): string => {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/cursor/runCursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ export async function runCursor(opts: {
logger.debug(`[cursor] Synced session permission mode: ${currentPermissionMode}`);
};

session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const enhancedMode: EnhancedMode = {
permissionMode: currentPermissionMode ?? 'default',
model: currentModel
};
const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments);
messageQueue.push(formattedText, enhancedMode);
messageQueue.push(formattedText, enhancedMode, localId);
});

const resolvePermissionMode = (value: unknown): PermissionMode => {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/gemini/runGemini.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ export async function runGemini(opts: {
logger.debug(`[gemini] Synced session config for keepalive: permissionMode=${currentPermissionMode}, model=${resolvedModel}`);
};

session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments);
const mode: GeminiMode = {
permissionMode: currentPermissionMode,
model: resolvedModel
};
messageQueue.push(formattedText, mode);
messageQueue.push(formattedText, mode, localId);
});

const resolvePermissionMode = (value: unknown): PermissionMode => {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/opencode/runOpencode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ export async function runOpencode(opts: {
logger.debug(`[opencode] Synced session permission mode for keepalive: ${currentPermissionMode}`);
};

session.onUserMessage((message) => {
session.onUserMessage((message, localId) => {
const formattedText = formatMessageWithAttachments(message.content.text, message.content.attachments);
const mode: OpencodeMode = {
permissionMode: currentPermissionMode
};
messageQueue.push(formattedText, mode);
messageQueue.push(formattedText, mode, localId);
});

const resolvePermissionMode = (value: unknown): PermissionMode => {
Expand Down
63 changes: 63 additions & 0 deletions cli/src/utils/MessageQueue2.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,69 @@ describe('MessageQueue2', () => {
expect(batch3?.mode.type).toBe('A');
});

it('should call onBatchConsumed with collected localIds', async () => {
const queue = new MessageQueue2<string>(mode => mode);
const received: string[][] = [];
queue.onBatchConsumed = (localIds) => { received.push(localIds); };

queue.push('message1', 'local', 'id1');
queue.push('message2', 'local', 'id2');

await queue.waitForMessagesAndGetAsString();
expect(received).toEqual([['id1', 'id2']]);

// Push more with a different mode and consume again
queue.push('message3', 'remote', 'id3');
await queue.waitForMessagesAndGetAsString();
expect(received).toEqual([['id1', 'id2'], ['id3']]);
});

it('should report localIds batch-by-batch when modes differ', async () => {
const queue = new MessageQueue2<string>(mode => mode);
const received: string[][] = [];
queue.onBatchConsumed = (localIds) => { received.push(localIds); };

// Two messages land in different batches because their mode hashes differ.
queue.push('first', 'A', 'id1');
queue.push('second', 'B', 'id2');

const batch1 = await queue.waitForMessagesAndGetAsString();
expect(batch1?.message).toBe('first');
expect(received).toEqual([['id1']]);
// Second message still waiting in the queue.
expect(queue.size()).toBe(1);

const batch2 = await queue.waitForMessagesAndGetAsString();
expect(batch2?.message).toBe('second');
expect(received).toEqual([['id1'], ['id2']]);
expect(queue.size()).toBe(0);
});

it('should skip onBatchConsumed when batch has no localIds', async () => {
const queue = new MessageQueue2<string>(mode => mode);
let called = false;
queue.onBatchConsumed = () => { called = true; };

// Push without localIds (e.g., internal commands that do not need UI ack)
queue.push('internal', 'local');
await queue.waitForMessagesAndGetAsString();
expect(called).toBe(false);
});

it('should not call onBatchConsumed when collectBatch returns null', async () => {
const queue = new MessageQueue2<string>(mode => mode);
let consumedCount = 0;
queue.onBatchConsumed = () => { consumedCount++; };

// Close queue while waiting — should return null
const waitPromise = queue.waitForMessagesAndGetAsString();
queue.close();
const result = await waitPromise;

expect(result).toBeNull();
expect(consumedCount).toBe(0);
});

it('should differentiate between pushImmediate and pushIsolateAndClear behavior', async () => {
const queue = new MessageQueue2<{ type: string }>((mode) => mode.type);

Expand Down
21 changes: 17 additions & 4 deletions cli/src/utils/MessageQueue2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ interface QueueItem<T> {
message: string;
mode: T;
modeHash: string;
localId?: string;
isolate?: boolean; // If true, this message must be processed alone
}

Expand All @@ -16,6 +17,7 @@ export class MessageQueue2<T> {
private waiter: ((hasMessages: boolean) => void) | null = null;
private closed = false;
private onMessageHandler: ((message: string, mode: T) => void) | null = null;
onBatchConsumed: ((localIds: string[]) => void) | null = null;
modeHasher: (mode: T) => string;

constructor(
Expand All @@ -37,7 +39,7 @@ export class MessageQueue2<T> {
/**
* Push a message to the queue with a mode.
*/
push(message: string, mode: T): void {
push(message: string, mode: T, localId?: string): void {
if (this.closed) {
throw new Error('Cannot push to closed queue');
}
Expand All @@ -49,6 +51,7 @@ export class MessageQueue2<T> {
message,
mode,
modeHash,
localId,
isolate: false
});

Expand All @@ -72,7 +75,7 @@ export class MessageQueue2<T> {
* Push a message immediately without batching delay.
* Does not clear the queue or enforce isolation.
*/
pushImmediate(message: string, mode: T): void {
pushImmediate(message: string, mode: T, localId?: string): void {
if (this.closed) {
throw new Error('Cannot push to closed queue');
}
Expand All @@ -84,6 +87,7 @@ export class MessageQueue2<T> {
message,
mode,
modeHash,
localId,
isolate: false
});

Expand All @@ -108,7 +112,7 @@ export class MessageQueue2<T> {
* Clears any pending messages and ensures this message is never batched with others.
* Used for special commands that require dedicated processing.
*/
pushIsolateAndClear(message: string, mode: T): void {
pushIsolateAndClear(message: string, mode: T, localId?: string): void {
if (this.closed) {
throw new Error('Cannot push to closed queue');
}
Expand All @@ -123,6 +127,7 @@ export class MessageQueue2<T> {
message,
mode,
modeHash,
localId,
isolate: true
});

Expand All @@ -145,7 +150,7 @@ export class MessageQueue2<T> {
/**
* Push a message to the beginning of the queue with a mode.
*/
unshift(message: string, mode: T): void {
unshift(message: string, mode: T, localId?: string): void {
if (this.closed) {
throw new Error('Cannot unshift to closed queue');
}
Expand All @@ -157,6 +162,7 @@ export class MessageQueue2<T> {
message,
mode,
modeHash,
localId,
isolate: false
});

Expand Down Expand Up @@ -252,6 +258,7 @@ export class MessageQueue2<T> {

const firstItem = this.queue[0];
const sameModeMessages: string[] = [];
const consumedLocalIds: string[] = [];
let mode = firstItem.mode;
let isolate = firstItem.isolate ?? false;
const targetModeHash = firstItem.modeHash;
Expand All @@ -260,6 +267,7 @@ export class MessageQueue2<T> {
if (firstItem.isolate) {
const item = this.queue.shift()!;
sameModeMessages.push(item.message);
if (item.localId) consumedLocalIds.push(item.localId);
logger.debug(`[MessageQueue2] Collected isolated message with mode hash: ${targetModeHash}`);
} else {
// Collect all messages with the same mode until we hit an isolated message
Expand All @@ -268,13 +276,18 @@ export class MessageQueue2<T> {
!this.queue[0].isolate) {
const item = this.queue.shift()!;
sameModeMessages.push(item.message);
if (item.localId) consumedLocalIds.push(item.localId);
}
logger.debug(`[MessageQueue2] Collected batch of ${sameModeMessages.length} messages with mode hash: ${targetModeHash}`);
}

// Join all messages with newlines
const combinedMessage = sameModeMessages.join('\n');

if (consumedLocalIds.length > 0) {
this.onBatchConsumed?.(consumedLocalIds);
}

return {
message: combinedMessage,
mode,
Expand Down
16 changes: 16 additions & 0 deletions hub/src/socket/handlers/cli/sessionHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session
onSessionAlive?.(data)
})

socket.on('messages-consumed', (data: { sid: string; localIds: string[] }) => {
if (!data || typeof data.sid !== 'string' || !Array.isArray(data.localIds)) {
return
}
const localIds = data.localIds.filter((id): id is string => typeof id === 'string')
if (localIds.length === 0) {
return
}
const sessionAccess = resolveSessionAccess(data.sid)
if (!sessionAccess.ok) {
emitAccessError('session', data.sid, sessionAccess.reason)
return
}
onWebappEvent?.({ type: 'messages-consumed', sessionId: data.sid, localIds })
})

socket.on('session-end', (data: SessionEndPayload) => {
if (!data || typeof data.sid !== 'string' || typeof data.time !== 'number') {
return
Expand Down
Loading
Loading