Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions hub/src/sync/sessionCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class SessionCache {
private readonly lastBroadcastAtBySessionId: Map<string, number> = new Map()
private readonly todoBackfillAttemptedSessionIds: Set<string> = new Set()
private readonly deduplicateInProgress: Set<string> = new Set()
private readonly deduplicatePending: Set<string> = new Set()

constructor(
private readonly store: Store,
Expand Down Expand Up @@ -406,6 +407,27 @@ export class SessionCache {
}

async mergeSessions(oldSessionId: string, newSessionId: string, namespace: string): Promise<void> {
await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: true })
}

async mergeSessionHistory(
oldSessionId: string,
newSessionId: string,
namespace: string,
options: { mergeAgentState?: boolean } = {}
): Promise<void> {
await this.mergeSessionData(oldSessionId, newSessionId, namespace, {
deleteOldSession: false,
mergeAgentState: options.mergeAgentState ?? true
})
}

private async mergeSessionData(
oldSessionId: string,
newSessionId: string,
namespace: string,
options: { deleteOldSession: boolean; mergeAgentState?: boolean }
): Promise<void> {
if (oldSessionId === newSessionId) {
return
}
Expand All @@ -416,7 +438,10 @@ export class SessionCache {
throw new Error('Session not found for merge')
}

this.store.messages.mergeSessionMessages(oldSessionId, newSessionId)
const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] This history-only merge mutates both sessions' message sets, but only the target session gets messages-invalidated. An already-open source duplicate will never refetch because the app only reloads when event.sessionId matches the current route (web/src/App.tsx:232-240), and the new test currently asserts only s2 invalidation (hub/src/sync/sessionModel.test.ts:652).

Suggested fix:

const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId)
if (movedMessages.moved > 0) {
    if (!options.deleteOldSession) {
        this.publisher.emit({ type: 'messages-invalidated', sessionId: oldSessionId, namespace })
    }
    this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace })
}

if (movedMessages.moved > 0) {
this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace })
}

const mergedMetadata = this.mergeSessionMetadata(oldStored.metadata, newStored.metadata)
if (mergedMetadata !== null && mergedMetadata !== newStored.metadata) {
Expand Down Expand Up @@ -476,10 +501,10 @@ export class SessionCache {
}

// Merge agentState: union requests/completedRequests from both sessions so pending
// approvals on the duplicate are not lost. Only inactive duplicates reach this point
// (active ones are skipped by deduplicateByAgentSessionId).
// approvals on inactive duplicates are not lost. Active duplicates keep their
// own agentState because permission approve/deny RPCs are routed by session id.
// Read the latest target state right before writing to avoid overwriting live updates.
if (oldStored.agentState !== null) {
if ((options.mergeAgentState ?? true) && oldStored.agentState !== null) {
for (let attempt = 0; attempt < 2; attempt += 1) {
const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace)
if (!latest) break
Expand All @@ -505,19 +530,26 @@ export class SessionCache {
)
}

const deleted = this.store.sessions.deleteSession(oldSessionId, namespace)
if (!deleted) {
throw new Error('Failed to delete old session during merge')
}
if (options.deleteOldSession) {
const deleted = this.store.sessions.deleteSession(oldSessionId, namespace)
if (!deleted) {
throw new Error('Failed to delete old session during merge')
}

const existed = this.sessions.delete(oldSessionId)
if (existed) {
this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace })
const existed = this.sessions.delete(oldSessionId)
if (existed) {
this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace })
}
this.lastBroadcastAtBySessionId.delete(oldSessionId)
this.todoBackfillAttemptedSessionIds.delete(oldSessionId)
} else {
this.refreshSession(oldSessionId)
}
this.lastBroadcastAtBySessionId.delete(oldSessionId)
this.todoBackfillAttemptedSessionIds.delete(oldSessionId)

this.refreshSession(newSessionId)
const refreshed = this.refreshSession(newSessionId)
if (refreshed) {
this.publisher.emit({ type: 'session-updated', sessionId: newSessionId, data: refreshed })
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] This history-only merge rewrites message rows in SQLite, but it only publishes session-updated afterwards. The web cache appends on message-received and otherwise reloads messages only on mount/refetch, so an already-open canonical chat keeps showing the pre-merge transcript until the user manually refreshes.

Suggested fix:

const moved = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId)
if (moved.moved > 0) {
    this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId })
}

}
}

private mergeSessionMetadata(oldMetadata: unknown | null, newMetadata: unknown | null): unknown | null {
Expand Down Expand Up @@ -605,44 +637,74 @@ export class SessionCache {
const agentId = this.extractAgentSessionId(session.metadata)
if (!agentId) return

// Guard: skip if another dedup for this agent ID is already in progress.
// A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates.
if (this.deduplicateInProgress.has(agentId.value)) return
// Guard: if another dedup for this agent ID is already in progress,
// coalesce this trigger and run one more pass afterwards. This matters
// for active duplicates: a session can become inactive while the first
// pass is only allowed to move history, and the follow-up pass should
// then be allowed to delete the inactive duplicate record.
if (this.deduplicateInProgress.has(agentId.value)) {
this.deduplicatePending.add(agentId.value)
return
}
this.deduplicateInProgress.add(agentId.value)

try {
const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }]
for (const [existingId, existing] of this.sessions) {
if (existingId === sessionId) continue
if (existing.namespace !== session.namespace) continue
if (!existing.metadata) continue
if (existing.metadata[agentId.field] !== agentId.value) continue
// Only merge inactive duplicates. Active ones still have a live CLI socket
// whose keepalive/messages would fail if we deleted their session record.
// The web-side display dedup hides active duplicates from the UI.
if (existing.active) continue
candidates.push({ id: existingId, session: existing })
}
do {
this.deduplicatePending.delete(agentId.value)

if (candidates.length <= 1) return
const currentSession = this.sessions.get(sessionId)
const candidates: { id: string; session: Session }[] = []
if (currentSession?.metadata && currentSession.metadata[agentId.field] === agentId.value) {
candidates.push({ id: sessionId, session: currentSession })
}
for (const [existingId, existing] of this.sessions) {
if (existingId === sessionId) continue
if (existing.namespace !== session.namespace) continue
if (!existing.metadata) continue
if (existing.metadata[agentId.field] !== agentId.value) continue
candidates.push({ id: existingId, session: existing })
}

// Keep the most recent session as the merge target so newer state survives.
candidates.sort((a, b) =>
(b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt)
)
const targetId = candidates[0].id
const targetNamespace = candidates[0].session.namespace

for (const { id } of candidates.slice(1)) {
if (id === targetId) continue
try {
await this.mergeSessions(id, targetId, targetNamespace)
} catch {
// best-effort: duplicate remains if merge fails
if (candidates.length <= 1) continue

// Keep the same canonical session the sidebar is likely to show:
// active sessions win, then the most recently updated session wins.
// If timestamps tie, prefer the session that triggered this dedup run
// so callers can intentionally preserve the visible/resumed session.
candidates.sort((a, b) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] Active duplicate target selection can still move history off the session the web keeps visible. The hub now chooses the newest active duplicate here, but the web preserves the currently selected duplicate in deduplicateSessionsByAgentId() (web/src/components/SessionList.tsx:61-67, web/src/App.tsx:120-121). In the common resume case, that means s1 can stay selected while this branch moves its history into s2.

Suggested fix:

const activeCandidates = candidates.filter(({ session }) => session.active)
if (activeCandidates.length > 1) {
    // Don't move history between two live session ids until the UI can follow the canonical target.
    continue
}

if (a.session.active !== b.session.active) return a.session.active ? -1 : 1
const updatedDelta = b.session.updatedAt - a.session.updatedAt
if (updatedDelta !== 0) return updatedDelta
if (a.id === sessionId) return -1
if (b.id === sessionId) return 1
return b.session.activeAt - a.session.activeAt
})
const targetId = candidates[0].id
const targetNamespace = candidates[0].session.namespace

for (const { id } of candidates.slice(1)) {
if (id === targetId) continue
try {
const candidate = this.sessions.get(id)
if (candidate?.active) {
// Keep the live session record/socket intact, but move its already
// persisted history into the visible dedup target. This preserves
// left-sidebar dedup while making resumed/restarted sessions show
// the full conversation history.
await this.mergeSessionHistory(id, targetId, targetNamespace, {
mergeAgentState: false
})
} else {
await this.mergeSessions(id, targetId, targetNamespace)
}
} catch {
// best-effort: duplicate remains if merge fails
}
}
}
} while (this.deduplicatePending.has(agentId.value))
} finally {
this.deduplicateInProgress.delete(agentId.value)
this.deduplicatePending.delete(agentId.value)
}
}
}
54 changes: 42 additions & 12 deletions hub/src/sync/sessionModel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -606,33 +606,58 @@ describe('session model', () => {
expect(cache.getSession(s1.id)).toBeDefined()
})

it('does not merge active duplicates', async () => {
it('moves history from active duplicates without deleting their live session records', async () => {
const store = new Store(':memory:')
const events: SyncEvent[] = []
const cache = new SessionCache(store, createPublisher(events))

const s1 = cache.getOrCreateSession(
'tag-1',
{ path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' },
null,
{
requests: { 'req-from-active-duplicate': { tool: 'Bash', arguments: {} } },
completedRequests: {}
},
'default'
)

// Mark s1 as active (simulating a live CLI connection)
store.messages.addMessage(s1.id, { type: 'text', text: 'history from s1' }, 'local-s1')
cache.handleSessionAlive({ sid: s1.id, time: Date.now(), thinking: false })

const s2 = cache.getOrCreateSession(
'tag-2',
{ path: '/tmp/project', host: 'localhost', flavor: 'codex', codexSessionId: 'thread-X' },
null,
{
requests: { 'req-from-target': { tool: 'Read', arguments: {} } },
completedRequests: {}
},
'default'
)
store.messages.addMessage(s2.id, { type: 'text', text: 'history from s2' }, 'local-s2')
cache.handleSessionAlive({ sid: s2.id, time: Date.now() + 1000, thinking: false })

await cache.deduplicateByAgentSessionId(s2.id)

// s1 is active, so it should NOT be merged/deleted
// Both live session records stay around so their sockets/keepalives
// remain valid, but the older active session's persisted history is
// moved into the visible dedup target.
expect(cache.getSession(s1.id)).toBeDefined()
expect(cache.getSession(s2.id)).toBeDefined()
expect(store.messages.getMessages(s1.id, 100)).toHaveLength(0)
const targetMessages = store.messages.getMessages(s2.id, 100)
expect(targetMessages.map((message) => (message.content as { text?: string }).text)).toEqual([
'history from s1',
'history from s2'
])
expect(events).toContainEqual({ type: 'messages-invalidated', sessionId: s2.id, namespace: 'default' })

// Active duplicates keep their own pending permission requests because
// approve/deny RPCs still route by the originating HAPI session id.
const sourceRequests = cache.getSession(s1.id)?.agentState?.requests ?? {}
const targetRequests = cache.getSession(s2.id)?.agentState?.requests ?? {}
expect(sourceRequests['req-from-active-duplicate']).toBeDefined()
expect(targetRequests['req-from-active-duplicate']).toBeUndefined()
expect(targetRequests['req-from-target']).toBeDefined()
})

it('merges duplicate after it becomes inactive via session-end', async () => {
Expand Down Expand Up @@ -661,12 +686,11 @@ describe('session model', () => {
// Mark s1 as active
engine.handleSessionAlive({ sid: s1.id, time: Date.now() })

// s1 is active, dedup from s2 should skip it
// s1 is active, so dedup keeps its live record around
const events: SyncEvent[] = []
const cache = (engine as any).sessionCache as SessionCache
await cache.deduplicateByAgentSessionId(s2.id)
expect(cache.getSession(s1.id)).toBeDefined()
expect(cache.getSession(s2.id)).toBeDefined()

// Now s1 ends — handleSessionEnd should trigger dedup retry
engine.handleSessionEnd({ sid: s1.id, time: Date.now() })
Expand Down Expand Up @@ -701,16 +725,22 @@ describe('session model', () => {
'default'
)

// Mark s1 as active now
cache.handleSessionAlive({ sid: s1.id, time: Date.now() })
// Mark both duplicates active. The older live record should keep
// existing while active, because its socket may still send keepalives.
const now = Date.now()
cache.handleSessionAlive({ sid: s1.id, time: now })
cache.handleSessionAlive({ sid: s2.id, time: now })

// s1 is active — dedup skips it
// s1 is active — dedup only moves history and keeps the record.
await cache.deduplicateByAgentSessionId(s2.id)
expect(cache.getSession(s1.id)).toBeDefined()
expect(cache.getSession(s2.id)).toBeDefined()

// Simulate time passing beyond the 30s timeout
const expired = cache.expireInactive(Date.now() + 60_000)
// Simulate only s1 passing beyond the 30s timeout.
cache.getSession(s1.id)!.activeAt = now - 31_000
const expired = cache.expireInactive(now)
expect(expired).toContain(s1.id)
expect(expired).not.toContain(s2.id)

// Now s1 is inactive — dedup should merge it
await cache.deduplicateByAgentSessionId(s2.id)
Expand Down
1 change: 1 addition & 0 deletions hub/src/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ export class SyncEngine {
collaborationMode?: CodexCollaborationMode
}): void {
this.sessionCache.handleSessionAlive(payload)
this.triggerDedupIfNeeded(payload.sid)
}

handleSessionEnd(payload: { sid: string; time: number }): void {
Expand Down
3 changes: 3 additions & 0 deletions shared/src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ export const SyncEventSchema = z.discriminatedUnion('type', [
type: z.literal('message-received'),
message: DecryptedMessageSchema
}),
SessionChangedSchema.extend({
type: z.literal('messages-invalidated')
}),
MachineChangedSchema.extend({
type: z.literal('machine-updated'),
data: z.unknown().optional()
Expand Down
10 changes: 9 additions & 1 deletion web/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,15 @@ function AppInner() {
}
}, [])

const handleSseEvent = useCallback(() => {}, [])
const handleSseEvent = useCallback((event: SyncEvent) => {
if (event.type !== 'messages-invalidated') {
return
}
if (!api || event.sessionId !== selectedSessionId) {
return
}
void fetchLatestMessages(api, event.sessionId)
}, [api, selectedSessionId])
const handleToast = useCallback((event: ToastEvent) => {
addToast({
title: event.data.title,
Expand Down
Loading