-
-
Notifications
You must be signed in to change notification settings - Fork 391
Preserve history when deduplicating agent sessions #471
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ export class SessionCache { | |
| private readonly lastBroadcastAtBySessionId: Map<string, number> = new Map() | ||
| private readonly todoBackfillAttemptedSessionIds: Set<string> = new Set() | ||
| private readonly deduplicateInProgress: Set<string> = new Set() | ||
| private readonly deduplicatePending: Set<string> = new Set() | ||
|
|
||
| constructor( | ||
| private readonly store: Store, | ||
|
|
@@ -406,6 +407,27 @@ export class SessionCache { | |
| } | ||
|
|
||
| async mergeSessions(oldSessionId: string, newSessionId: string, namespace: string): Promise<void> { | ||
| await this.mergeSessionData(oldSessionId, newSessionId, namespace, { deleteOldSession: true }) | ||
| } | ||
|
|
||
| async mergeSessionHistory( | ||
| oldSessionId: string, | ||
| newSessionId: string, | ||
| namespace: string, | ||
| options: { mergeAgentState?: boolean } = {} | ||
| ): Promise<void> { | ||
| await this.mergeSessionData(oldSessionId, newSessionId, namespace, { | ||
| deleteOldSession: false, | ||
| mergeAgentState: options.mergeAgentState ?? true | ||
| }) | ||
| } | ||
|
|
||
| private async mergeSessionData( | ||
| oldSessionId: string, | ||
| newSessionId: string, | ||
| namespace: string, | ||
| options: { deleteOldSession: boolean; mergeAgentState?: boolean } | ||
| ): Promise<void> { | ||
| if (oldSessionId === newSessionId) { | ||
| return | ||
| } | ||
|
|
@@ -416,7 +438,10 @@ export class SessionCache { | |
| throw new Error('Session not found for merge') | ||
| } | ||
|
|
||
| this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) | ||
| const movedMessages = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId) | ||
| if (movedMessages.moved > 0) { | ||
| this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId, namespace }) | ||
| } | ||
|
|
||
| const mergedMetadata = this.mergeSessionMetadata(oldStored.metadata, newStored.metadata) | ||
| if (mergedMetadata !== null && mergedMetadata !== newStored.metadata) { | ||
|
|
@@ -476,10 +501,10 @@ export class SessionCache { | |
| } | ||
|
|
||
| // Merge agentState: union requests/completedRequests from both sessions so pending | ||
| // approvals on the duplicate are not lost. Only inactive duplicates reach this point | ||
| // (active ones are skipped by deduplicateByAgentSessionId). | ||
| // approvals on inactive duplicates are not lost. Active duplicates keep their | ||
| // own agentState because permission approve/deny RPCs are routed by session id. | ||
| // Read the latest target state right before writing to avoid overwriting live updates. | ||
| if (oldStored.agentState !== null) { | ||
| if ((options.mergeAgentState ?? true) && oldStored.agentState !== null) { | ||
| for (let attempt = 0; attempt < 2; attempt += 1) { | ||
| const latest = this.store.sessions.getSessionByNamespace(newSessionId, namespace) | ||
| if (!latest) break | ||
|
|
@@ -505,19 +530,26 @@ export class SessionCache { | |
| ) | ||
| } | ||
|
|
||
| const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) | ||
| if (!deleted) { | ||
| throw new Error('Failed to delete old session during merge') | ||
| } | ||
| if (options.deleteOldSession) { | ||
| const deleted = this.store.sessions.deleteSession(oldSessionId, namespace) | ||
| if (!deleted) { | ||
| throw new Error('Failed to delete old session during merge') | ||
| } | ||
|
|
||
| const existed = this.sessions.delete(oldSessionId) | ||
| if (existed) { | ||
| this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) | ||
| const existed = this.sessions.delete(oldSessionId) | ||
| if (existed) { | ||
| this.publisher.emit({ type: 'session-removed', sessionId: oldSessionId, namespace }) | ||
| } | ||
| this.lastBroadcastAtBySessionId.delete(oldSessionId) | ||
| this.todoBackfillAttemptedSessionIds.delete(oldSessionId) | ||
| } else { | ||
| this.refreshSession(oldSessionId) | ||
| } | ||
| this.lastBroadcastAtBySessionId.delete(oldSessionId) | ||
| this.todoBackfillAttemptedSessionIds.delete(oldSessionId) | ||
|
|
||
| this.refreshSession(newSessionId) | ||
| const refreshed = this.refreshSession(newSessionId) | ||
| if (refreshed) { | ||
| this.publisher.emit({ type: 'session-updated', sessionId: newSessionId, data: refreshed }) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MAJOR] This history-only merge rewrites message rows in SQLite, but it only publishes Suggested fix: const moved = this.store.messages.mergeSessionMessages(oldSessionId, newSessionId)
if (moved.moved > 0) {
this.publisher.emit({ type: 'messages-invalidated', sessionId: newSessionId })
} |
||
| } | ||
| } | ||
|
|
||
| private mergeSessionMetadata(oldMetadata: unknown | null, newMetadata: unknown | null): unknown | null { | ||
|
|
@@ -605,44 +637,74 @@ export class SessionCache { | |
| const agentId = this.extractAgentSessionId(session.metadata) | ||
| if (!agentId) return | ||
|
|
||
| // Guard: skip if another dedup for this agent ID is already in progress. | ||
| // A skipped trigger is acceptable — the web-side display dedup hides any remaining duplicates. | ||
| if (this.deduplicateInProgress.has(agentId.value)) return | ||
| // Guard: if another dedup for this agent ID is already in progress, | ||
| // coalesce this trigger and run one more pass afterwards. This matters | ||
| // for active duplicates: a session can become inactive while the first | ||
| // pass is only allowed to move history, and the follow-up pass should | ||
| // then be allowed to delete the inactive duplicate record. | ||
| if (this.deduplicateInProgress.has(agentId.value)) { | ||
| this.deduplicatePending.add(agentId.value) | ||
| return | ||
| } | ||
| this.deduplicateInProgress.add(agentId.value) | ||
|
|
||
| try { | ||
| const candidates: { id: string; session: Session }[] = [{ id: sessionId, session }] | ||
| for (const [existingId, existing] of this.sessions) { | ||
| if (existingId === sessionId) continue | ||
| if (existing.namespace !== session.namespace) continue | ||
| if (!existing.metadata) continue | ||
| if (existing.metadata[agentId.field] !== agentId.value) continue | ||
| // Only merge inactive duplicates. Active ones still have a live CLI socket | ||
| // whose keepalive/messages would fail if we deleted their session record. | ||
| // The web-side display dedup hides active duplicates from the UI. | ||
| if (existing.active) continue | ||
| candidates.push({ id: existingId, session: existing }) | ||
| } | ||
| do { | ||
| this.deduplicatePending.delete(agentId.value) | ||
|
|
||
| if (candidates.length <= 1) return | ||
| const currentSession = this.sessions.get(sessionId) | ||
| const candidates: { id: string; session: Session }[] = [] | ||
| if (currentSession?.metadata && currentSession.metadata[agentId.field] === agentId.value) { | ||
| candidates.push({ id: sessionId, session: currentSession }) | ||
| } | ||
| for (const [existingId, existing] of this.sessions) { | ||
| if (existingId === sessionId) continue | ||
| if (existing.namespace !== session.namespace) continue | ||
| if (!existing.metadata) continue | ||
| if (existing.metadata[agentId.field] !== agentId.value) continue | ||
| candidates.push({ id: existingId, session: existing }) | ||
| } | ||
|
|
||
| // Keep the most recent session as the merge target so newer state survives. | ||
| candidates.sort((a, b) => | ||
| (b.session.activeAt - a.session.activeAt) || (b.session.updatedAt - a.session.updatedAt) | ||
| ) | ||
| const targetId = candidates[0].id | ||
| const targetNamespace = candidates[0].session.namespace | ||
|
|
||
| for (const { id } of candidates.slice(1)) { | ||
| if (id === targetId) continue | ||
| try { | ||
| await this.mergeSessions(id, targetId, targetNamespace) | ||
| } catch { | ||
| // best-effort: duplicate remains if merge fails | ||
| if (candidates.length <= 1) continue | ||
|
|
||
| // Keep the same canonical session the sidebar is likely to show: | ||
| // active sessions win, then the most recently updated session wins. | ||
| // If timestamps tie, prefer the session that triggered this dedup run | ||
| // so callers can intentionally preserve the visible/resumed session. | ||
| candidates.sort((a, b) => { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MAJOR] Active duplicate target selection can still move history off the session the web keeps visible. The hub now chooses the newest active duplicate here, but the web preserves the currently selected duplicate in Suggested fix: const activeCandidates = candidates.filter(({ session }) => session.active)
if (activeCandidates.length > 1) {
// Don't move history between two live session ids until the UI can follow the canonical target.
continue
} |
||
| if (a.session.active !== b.session.active) return a.session.active ? -1 : 1 | ||
| const updatedDelta = b.session.updatedAt - a.session.updatedAt | ||
| if (updatedDelta !== 0) return updatedDelta | ||
| if (a.id === sessionId) return -1 | ||
| if (b.id === sessionId) return 1 | ||
| return b.session.activeAt - a.session.activeAt | ||
| }) | ||
| const targetId = candidates[0].id | ||
| const targetNamespace = candidates[0].session.namespace | ||
|
|
||
| for (const { id } of candidates.slice(1)) { | ||
| if (id === targetId) continue | ||
| try { | ||
| const candidate = this.sessions.get(id) | ||
| if (candidate?.active) { | ||
| // Keep the live session record/socket intact, but move its already | ||
| // persisted history into the visible dedup target. This preserves | ||
| // left-sidebar dedup while making resumed/restarted sessions show | ||
| // the full conversation history. | ||
| await this.mergeSessionHistory(id, targetId, targetNamespace, { | ||
| mergeAgentState: false | ||
| }) | ||
| } else { | ||
| await this.mergeSessions(id, targetId, targetNamespace) | ||
| } | ||
| } catch { | ||
| // best-effort: duplicate remains if merge fails | ||
| } | ||
| } | ||
| } | ||
| } while (this.deduplicatePending.has(agentId.value)) | ||
| } finally { | ||
| this.deduplicateInProgress.delete(agentId.value) | ||
| this.deduplicatePending.delete(agentId.value) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[MAJOR] This history-only merge mutates both sessions' message sets, but only the target session gets
messages-invalidated. An already-open source duplicate will never refetch because the app only reloads whenevent.sessionIdmatches the current route (web/src/App.tsx:232-240), and the new test currently asserts onlys2invalidation (hub/src/sync/sessionModel.test.ts:652).Suggested fix: