Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/browser/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ function AppInner() {
_messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean },
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null
) => {
// Only notify on final message (when assistant is done with all work)
Expand All @@ -882,6 +882,9 @@ function AppInner() {
updatePersistedState(getWorkspaceLastReadKey(workspaceId), completedAt);
}

// Skip notification for idle compaction (background maintenance, not user-initiated).
if (compaction?.isIdle) return;

// Skip notification if compaction completed with a continue message.
// We use the compaction metadata instead of queued state since the queue
// can be drained before compaction finishes.
Expand Down
115 changes: 112 additions & 3 deletions src/browser/stores/WorkspaceStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ describe("WorkspaceStore", () => {
_messageId: string,
_isFinal: boolean,
_finalText: string,
_compaction?: { hasContinueMessage: boolean },
_compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
_completedAt?: number | null
) => undefined
);
Expand Down Expand Up @@ -1297,7 +1297,7 @@ describe("WorkspaceStore", () => {
_messageId: string,
_isFinal: boolean,
_finalText: string,
_compaction?: { hasContinueMessage: boolean },
_compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
_completedAt?: number | null
) => undefined
);
Expand Down Expand Up @@ -1344,6 +1344,115 @@ describe("WorkspaceStore", () => {
);
});

it("marks compaction completions with queued follow-up as continue for active callbacks", async () => {
const workspaceId = "active-workspace-queued-follow-up";

mockOnChat.mockImplementation(async function* (
input?: { workspaceId: string; mode?: unknown },
options?: { signal?: AbortSignal }
): AsyncGenerator<WorkspaceChatMessage, void, unknown> {
if (input?.workspaceId !== workspaceId) {
await waitForAbortSignal(options?.signal);
return;
}

const timestamp = Date.now();

yield { type: "caught-up", hasOlderHistory: false };

yield {
type: "message",
id: "compaction-request-msg",
role: "user",
parts: [{ type: "text", text: "/compact" }],
metadata: {
historySequence: 1,
timestamp,
muxMetadata: {
type: "compaction-request",
rawCommand: "/compact",
parsed: {
model: "claude-sonnet-4",
},
},
},
};

yield {
type: "stream-start",
workspaceId,
messageId: "compaction-stream",
historySequence: 2,
model: "claude-sonnet-4",
startTime: timestamp + 1,
mode: "compact",
};

// A queued message will be auto-sent by the backend when compaction stream ends.
yield {
type: "queued-message-changed",
workspaceId,
queuedMessages: ["follow-up after compaction"],
displayText: "follow-up after compaction",
};

yield {
type: "stream-end",
workspaceId,
messageId: "compaction-stream",
metadata: {
model: "claude-sonnet-4",
},
parts: [],
};

await waitForAbortSignal(options?.signal);
});

const onResponseComplete = mock(
(
_workspaceId: string,
_messageId: string,
_isFinal: boolean,
_finalText: string,
_compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
_completedAt?: number | null
) => undefined
);

store.dispose();
store = new WorkspaceStore(mockOnModelUsed);
store.setOnResponseComplete(onResponseComplete);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-explicit-any
store.setClient(mockClient as any);

createAndAddWorkspace(store, workspaceId);

const waitUntil = async (condition: () => boolean, timeoutMs = 2000): Promise<boolean> => {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (condition()) {
return true;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
return false;
};

const sawResponseComplete = await waitUntil(() => onResponseComplete.mock.calls.length > 0);
expect(sawResponseComplete).toBe(true);

expect(onResponseComplete).toHaveBeenCalledTimes(1);
expect(onResponseComplete).toHaveBeenCalledWith(
workspaceId,
"compaction-stream",
true,
"",
{ hasContinueMessage: true },
expect.any(Number)
);
});

it("does not fire response-complete callback when background streaming stops without recency advance", async () => {
const activeWorkspaceId = "active-workspace-no-replay";
const backgroundWorkspaceId = "background-workspace-no-replay";
Expand Down Expand Up @@ -1397,7 +1506,7 @@ describe("WorkspaceStore", () => {
_messageId: string,
_isFinal: boolean,
_finalText: string,
_compaction?: { hasContinueMessage: boolean },
_compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
_completedAt?: number | null
) => undefined
);
Expand Down
147 changes: 96 additions & 51 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,6 @@ export class WorkspaceStore {
private sessionUsage = new Map<string, z.infer<typeof SessionUsageFileSchema>>();
private sessionUsageRequestVersion = new Map<string, number>();

// Idle compaction notification callbacks (called when backend signals idle compaction started)
private idleCompactionCallbacks = new Set<(workspaceId: string) => void>();

// Global callback for navigating to a workspace (set by App, used for notification clicks)
private navigateToWorkspaceCallback: ((workspaceId: string) => void) | null = null;

Expand All @@ -585,7 +582,7 @@ export class WorkspaceStore {
messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean },
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null
) => void)
| null = null;
Expand Down Expand Up @@ -1290,15 +1287,84 @@ export class WorkspaceStore {
messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean },
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null
) => void
): void {
this.responseCompleteCallback = callback;
// Update existing aggregators with the callback
for (const aggregator of this.aggregators.values()) {
aggregator.onResponseComplete = callback;
this.bindAggregatorResponseCompleteCallback(aggregator);
}
}

private maybeMarkCompactionContinueFromQueuedFollowUp(
workspaceId: string,
compaction: { hasContinueMessage: boolean; isIdle?: boolean } | undefined,
includeQueuedFollowUpSignal: boolean
): { hasContinueMessage: boolean; isIdle?: boolean } | undefined {
if (!compaction || compaction.hasContinueMessage || !includeQueuedFollowUpSignal) {
return compaction;
}

const queuedMessage = this.chatTransientState.get(workspaceId)?.queuedMessage;
if (!queuedMessage) {
return compaction;
}

// A queued message will be auto-sent after stream-end. Suppress the intermediate
// "Compaction complete" notification and only notify for the follow-up response.
return {
...compaction,
hasContinueMessage: true,
};
}

private emitResponseComplete(
workspaceId: string,
messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null,
includeQueuedFollowUpSignal = true
): void {
if (!this.responseCompleteCallback) {
return;
}

this.responseCompleteCallback(
workspaceId,
messageId,
isFinal,
finalText,
this.maybeMarkCompactionContinueFromQueuedFollowUp(
workspaceId,
compaction,
includeQueuedFollowUpSignal
),
completedAt
);
}

private bindAggregatorResponseCompleteCallback(aggregator: StreamingMessageAggregator): void {
aggregator.onResponseComplete = (
workspaceId: string,
messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null
) => {
this.emitResponseComplete(
workspaceId,
messageId,
isFinal,
finalText,
compaction,
completedAt
);
};
}

/**
Expand Down Expand Up @@ -2369,25 +2435,35 @@ export class WorkspaceStore {
const backgroundCompaction = isBackgroundStreamingStop
? this.getBackgroundCompletionCompaction(workspaceId)
: undefined;
// The backend tags the streaming=false (stop) snapshot with isIdleCompaction.
// The idle marker is added after sendMessage returns (to avoid races with
// concurrent user streams), so only the stop snapshot carries the flag.
// Check both previous and current as defense-in-depth.
const wasIdleCompaction =
previous?.isIdleCompaction === true || snapshot?.isIdleCompaction === true;

// Trigger response completion notifications for background workspaces only when
// activity indicates a true completion (streaming true -> false WITH recency advance).
// stream-abort/error transitions also flip streaming to false, but recency stays
// unchanged there, so suppress completion notifications in those cases.
if (stoppedStreamingSnapshot && recencyAdvancedSinceStreamStart && isBackgroundStreamingStop) {
if (this.responseCompleteCallback) {
// Activity snapshots don't include message/content metadata. Reuse any
// still-active stream context captured before this workspace was backgrounded
// so compaction continue turns remain suppressible in App notifications.
this.responseCompleteCallback(
workspaceId,
"",
true,
"",
backgroundCompaction,
stoppedStreamingSnapshot.recency
);
}
// Activity snapshots don't include message/content metadata. Reuse any
// still-active stream context captured before this workspace was backgrounded
// so compaction continue turns remain suppressible in App notifications.
this.emitResponseComplete(
workspaceId,
"",
true,
"",
wasIdleCompaction
? {
hasContinueMessage: backgroundCompaction?.hasContinueMessage ?? false,
isIdle: true,
}
: backgroundCompaction,
stoppedStreamingSnapshot.recency,
false
);
}

if (isBackgroundStreamingStop) {
Expand Down Expand Up @@ -3118,29 +3194,6 @@ export class WorkspaceStore {
this.workspaceCreatedAt.clear();
}

/**
* Subscribe to idle compaction events.
* Callback is called when backend signals a workspace started idle compaction.
* Returns unsubscribe function.
*/
onIdleCompactionStarted(callback: (workspaceId: string) => void): () => void {
this.idleCompactionCallbacks.add(callback);
return () => this.idleCompactionCallbacks.delete(callback);
}

/**
* Notify all listeners that a workspace started idle compaction.
*/
private notifyIdleCompactionStarted(workspaceId: string): void {
for (const callback of this.idleCompactionCallbacks) {
try {
callback(workspaceId);
} catch (error) {
console.error("Error in idle compaction callback:", error);
}
}
}

/**
* Subscribe to file-modifying tool completions.
* @param listener Called with workspaceId when a file-modifying tool completes
Expand Down Expand Up @@ -3211,7 +3264,7 @@ export class WorkspaceStore {
}
// Wire up response complete callback for "notify on response" feature
if (this.responseCompleteCallback) {
aggregator.onResponseComplete = this.responseCompleteCallback;
this.bindAggregatorResponseCompleteCallback(aggregator);
}
this.aggregators.set(workspaceId, aggregator);
this.workspaceCreatedAt.set(workspaceId, createdAt);
Expand Down Expand Up @@ -3362,12 +3415,6 @@ export class WorkspaceStore {
return;
}

// Handle idle-compaction-started event from backend execution.
if ("type" in data && data.type === "idle-compaction-started") {
this.notifyIdleCompactionStarted(workspaceId);
return;
}

// Heartbeat events are no-ops for UI state - they exist only for connection liveness detection
if ("type" in data && data.type === "heartbeat") {
return;
Expand Down Expand Up @@ -3535,8 +3582,6 @@ function getStoreInstance(): WorkspaceStore {
* Use this for non-hook subscriptions (e.g., in useEffect callbacks).
*/
export const workspaceStore = {
onIdleCompactionStarted: (callback: (workspaceId: string) => void) =>
getStoreInstance().onIdleCompactionStarted(callback),
subscribeFileModifyingTool: (listener: (workspaceId: string) => void, workspaceId?: string) =>
getStoreInstance().subscribeFileModifyingTool(listener, workspaceId),
getFileModifyingToolMs: (workspaceId: string) =>
Expand Down
2 changes: 1 addition & 1 deletion src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ export class StreamingMessageAggregator {
messageId: string,
isFinal: boolean,
finalText: string,
compaction?: { hasContinueMessage: boolean },
compaction?: { hasContinueMessage: boolean; isIdle?: boolean },
completedAt?: number | null
) => void;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ export function applyWorkspaceChatEventToAggregator(
isBashOutputEvent(event) ||
("type" in event && event.type === "session-usage-delta") ||
("type" in event && event.type === "auto-compaction-triggered") ||
("type" in event && event.type === "auto-compaction-completed") ||
("type" in event && event.type === "idle-compaction-started")
("type" in event && event.type === "auto-compaction-completed")
) {
return "ignored";
}
Expand Down
Loading
Loading