diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 902bda9a6..12fb916f4 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -11,6 +11,7 @@ export class SessionCache { private readonly lastBroadcastAtBySessionId: Map = new Map() private readonly todoBackfillAttemptedSessionIds: Set = new Set() private readonly deduplicateInProgress: Set = new Set() + private readonly deduplicatePending: Set = new Set() constructor( private readonly store: Store, @@ -406,6 +407,27 @@ export class SessionCache { } async mergeSessions(oldSessionId: string, newSessionId: string, namespace: string): Promise { + await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: true }) + } + + async mergeSessionHistory( + oldSessionId: string, + newSessionId: string, + namespace: string, + options: { mergeAgentState?: boolean } = {} + ): Promise { + await this.mergeSessionData(oldSessionId, newSessionId, namespace, { + deleteOldSession: false, + mergeAgentState: options.mergeAgentState ?? true + }) + } + + private async mergeSessionData( + oldSessionId: string, + newSessionId: string, + namespace: string, + options: { deleteOldSession: boolean; mergeAgentState?: boolean } + ): Promise { if (oldSessionId === newSessionId) { return } @@ -416,7 +438,10 @@ export class SessionCache { throw new Error('Session not found for merge') } - this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) + const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) + if (movedMessages.moved > 0) { + this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace }) + } const mergedMetadata = this.mergeSessionMetadata(oldStored.metadata, newStored.metadata) if (mergedMetadata !== null && mergedMetadata !== newStored.metadata) { @@ -476,10 +501,10 @@ export class SessionCache { } // Merge agentState: union requests/completedRequests from both sessions so pending - // approvals on the duplicate are not lost. Only inactive duplicates reach this point - // (active ones are skipped by deduplicateByAgentSessionId). + // approvals on inactive duplicates are not lost. Active duplicates keep their + // own agentState because permission approve/deny RPCs are routed by session id. // Read the latest target state right before writing to avoid overwriting live updates. - if (oldStored.agentState !== null) { + if ((options.mergeAgentState ?? true) && oldStored.agentState !== null) { for (let attempt = 0; attempt < 2; attempt += 1) { const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) if (!latest) break @@ -505,19 +530,26 @@ export class SessionCache { ) } - const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) - if (!deleted) { - throw new Error('Failed to delete old session during merge') - } + if (options.deleteOldSession) { + const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) + if (!deleted) { + throw new Error('Failed to delete old session during merge') + } - const existed = this.sessions.delete(oldSessionId) - if (existed) { - this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) + const existed = this.sessions.delete(oldSessionId) + if (existed) { + this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) + } + this.lastBroadcastAtBySessionId.delete(oldSessionId) + this.todoBackfillAttemptedSessionIds.delete(oldSessionId) + } else { + this.refreshSession(oldSessionId) } - this.lastBroadcastAtBySessionId.delete(oldSessionId) - this.todoBackfillAttemptedSessionIds.delete(oldSessionId) - this.refreshSession(newSessionId) + const refreshed = this.refreshSession(newSessionId) + if (refreshed) { + this.publisher.emit({ type: 'session-updated', sessionId: newSessionId, data: refreshed }) + } } private mergeSessionMetadata(oldMetadata: unknown | null, newMetadata: unknown | null): unknown | null { @@ -605,44 +637,74 @@ export class SessionCache { const agentId = this.extractAgentSessionId(session.metadata) if (!agentId) return - // Guard: skip if another dedup for this agent ID is already in progress. - // A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates. - if (this.deduplicateInProgress.has(agentId.value)) return + // Guard: if another dedup for this agent ID is already in progress, + // coalesce this trigger and run one more pass afterwards. This matters + // for active duplicates: a session can become inactive while the first + // pass is only allowed to move history, and the follow-up pass should + // then be allowed to delete the inactive duplicate record. + if (this.deduplicateInProgress.has(agentId.value)) { + this.deduplicatePending.add(agentId.value) + return + } this.deduplicateInProgress.add(agentId.value) try { - const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }] - for (const [existingId, existing] of this.sessions) { - if (existingId === sessionId) continue - if (existing.namespace !== session.namespace) continue - if (!existing.metadata) continue - if (existing.metadata[agentId.field] !== agentId.value) continue - // Only merge inactive duplicates. Active ones still have a live CLI socket - // whose keepalive/messages would fail if we deleted their session record. - // The web-side display dedup hides active duplicates from the UI. - if (existing.active) continue - candidates.push({ id: existingId, session: existing }) - } + do { + this.deduplicatePending.delete(agentId.value) - if (candidates.length <= 1) return + const currentSession = this.sessions.get(sessionId) + const candidates: { id: string; session: Session }[] = [] + if (currentSession?.metadata && currentSession.metadata[agentId.field] === agentId.value) { + candidates.push({ id: sessionId, session: currentSession }) + } + for (const [existingId, existing] of this.sessions) { + if (existingId === sessionId) continue + if (existing.namespace !== session.namespace) continue + if (!existing.metadata) continue + if (existing.metadata[agentId.field] !== agentId.value) continue + candidates.push({ id: existingId, session: existing }) + } - // Keep the most recent session as the merge target so newer state survives. - candidates.sort((a, b) => - (b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt) - ) - const targetId = candidates[0].id - const targetNamespace = candidates[0].session.namespace - - for (const { id } of candidates.slice(1)) { - if (id === targetId) continue - try { - await this.mergeSessions(id, targetId, targetNamespace) - } catch { - // best-effort: duplicate remains if merge fails + if (candidates.length <= 1) continue + + // Keep the same canonical session the sidebar is likely to show: + // active sessions win, then the most recently updated session wins. + // If timestamps tie, prefer the session that triggered this dedup run + // so callers can intentionally preserve the visible/resumed session. + candidates.sort((a, b) => { + if (a.session.active !== b.session.active) return a.session.active ? -1 : 1 + const updatedDelta = b.session.updatedAt - a.session.updatedAt + if (updatedDelta !== 0) return updatedDelta + if (a.id === sessionId) return -1 + if (b.id === sessionId) return 1 + return b.session.activeAt - a.session.activeAt + }) + const targetId = candidates[0].id + const targetNamespace = candidates[0].session.namespace + + for (const { id } of candidates.slice(1)) { + if (id === targetId) continue + try { + const candidate = this.sessions.get(id) + if (candidate?.active) { + // Keep the live session record/socket intact, but move its already + // persisted history into the visible dedup target. This preserves + // left-sidebar dedup while making resumed/restarted sessions show + // the full conversation history. + await this.mergeSessionHistory(id, targetId, targetNamespace, { + mergeAgentState: false + }) + } else { + await this.mergeSessions(id, targetId, targetNamespace) + } + } catch { + // best-effort: duplicate remains if merge fails + } } - } + } while (this.deduplicatePending.has(agentId.value)) } finally { this.deduplicateInProgress.delete(agentId.value) + this.deduplicatePending.delete(agentId.value) } } } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 8b68c560b..8741b1510 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -606,7 +606,7 @@ describe('session model', () => { expect(cache.getSession(s1.id)).toBeDefined() }) - it('does not merge active duplicates', async () => { + it('moves history from active duplicates without deleting their live session records', async () => { const store = new Store(':memory:') const events: SyncEvent[] = [] const cache = new SessionCache(store, createPublisher(events)) @@ -614,25 +614,50 @@ describe('session model', () => { const s1 = cache.getOrCreateSession( 'tag-1', { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, - null, + { + requests: { 'req-from-active-duplicate': { tool: 'Bash', arguments: {} } }, + completedRequests: {} + }, 'default' ) - // Mark s1 as active (simulating a live CLI connection) + store.messages.addMessage(s1.id, { type: 'text', text: 'history from s1' }, 'local-s1') cache.handleSessionAlive({ sid: s1.id, time: Date.now(), thinking: false }) const s2 = cache.getOrCreateSession( 'tag-2', { path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' }, - null, + { + requests: { 'req-from-target': { tool: 'Read', arguments: {} } }, + completedRequests: {} + }, 'default' ) + store.messages.addMessage(s2.id, { type: 'text', text: 'history from s2' }, 'local-s2') + cache.handleSessionAlive({ sid: s2.id, time: Date.now() + 1000, thinking: false }) await cache.deduplicateByAgentSessionId(s2.id) - // s1 is active, so it should NOT be merged/deleted + // Both live session records stay around so their sockets/keepalives + // remain valid, but the older active session's persisted history is + // moved into the visible dedup target. expect(cache.getSession(s1.id)).toBeDefined() expect(cache.getSession(s2.id)).toBeDefined() + expect(store.messages.getMessages(s1.id, 100)).toHaveLength(0) + const targetMessages = store.messages.getMessages(s2.id, 100) + expect(targetMessages.map((message) => (message.content as { text?: string }).text)).toEqual([ + 'history from s1', + 'history from s2' + ]) + expect(events).toContainEqual({ type: 'messages-invalidated', sessionId: s2.id, namespace: 'default' }) + + // Active duplicates keep their own pending permission requests because + // approve/deny RPCs still route by the originating HAPI session id. + const sourceRequests = cache.getSession(s1.id)?.agentState?.requests ?? {} + const targetRequests = cache.getSession(s2.id)?.agentState?.requests ?? {} + expect(sourceRequests['req-from-active-duplicate']).toBeDefined() + expect(targetRequests['req-from-active-duplicate']).toBeUndefined() + expect(targetRequests['req-from-target']).toBeDefined() }) it('merges duplicate after it becomes inactive via session-end', async () => { @@ -661,12 +686,11 @@ describe('session model', () => { // Mark s1 as active engine.handleSessionAlive({ sid: s1.id, time: Date.now() }) - // s1 is active, dedup from s2 should skip it + // s1 is active, so dedup keeps its live record around const events: SyncEvent[] = [] const cache = (engine as any).sessionCache as SessionCache await cache.deduplicateByAgentSessionId(s2.id) expect(cache.getSession(s1.id)).toBeDefined() - expect(cache.getSession(s2.id)).toBeDefined() // Now s1 ends — handleSessionEnd should trigger dedup retry engine.handleSessionEnd({ sid: s1.id, time: Date.now() }) @@ -701,16 +725,22 @@ describe('session model', () => { 'default' ) - // Mark s1 as active now - cache.handleSessionAlive({ sid: s1.id, time: Date.now() }) + // Mark both duplicates active. The older live record should keep + // existing while active, because its socket may still send keepalives. + const now = Date.now() + cache.handleSessionAlive({ sid: s1.id, time: now }) + cache.handleSessionAlive({ sid: s2.id, time: now }) - // s1 is active — dedup skips it + // s1 is active — dedup only moves history and keeps the record. await cache.deduplicateByAgentSessionId(s2.id) expect(cache.getSession(s1.id)).toBeDefined() + expect(cache.getSession(s2.id)).toBeDefined() - // Simulate time passing beyond the 30s timeout - const expired = cache.expireInactive(Date.now() + 60_000) + // Simulate only s1 passing beyond the 30s timeout. + cache.getSession(s1.id)!.activeAt = now - 31_000 + const expired = cache.expireInactive(now) expect(expired).toContain(s1.id) + expect(expired).not.toContain(s2.id) // Now s1 is inactive — dedup should merge it await cache.deduplicateByAgentSessionId(s2.id) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index b4d391005..04ac09831 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -202,6 +202,7 @@ export class SyncEngine { collaborationMode?: CodexCollaborationMode }): void { this.sessionCache.handleSessionAlive(payload) + this.triggerDedupIfNeeded(payload.sid) } handleSessionEnd(payload: { sid: string; time: number }): void { diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index c802efe51..fa9361081 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -213,6 +213,9 @@ export const SyncEventSchema = z.discriminatedUnion('type', [ type: z.literal('message-received'), message: DecryptedMessageSchema }), + SessionChangedSchema.extend({ + type: z.literal('messages-invalidated') + }), MachineChangedSchema.extend({ type: z.literal('machine-updated'), data: z.unknown().optional() diff --git a/web/src/App.tsx b/web/src/App.tsx index 7ab0798c2..c04638dfc 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -229,7 +229,15 @@ function AppInner() { } }, []) - const handleSseEvent = useCallback(() => {}, []) + const handleSseEvent = useCallback((event: SyncEvent) => { + if (event.type !== 'messages-invalidated') { + return + } + if (!api || event.sessionId !== selectedSessionId) { + return + } + void fetchLatestMessages(api, event.sessionId) + }, [api, selectedSessionId]) const handleToast = useCallback((event: ToastEvent) => { addToast({ title: event.data.title,