diff --git a/src/main/agents/agent-coordinator.ts b/src/main/agents/agent-coordinator.ts index 8679d6a2..d0b4c8a2 100644 --- a/src/main/agents/agent-coordinator.ts +++ b/src/main/agents/agent-coordinator.ts @@ -64,8 +64,20 @@ export class AgentCoordinator { getInboxEmails: (accountId?: string) => db.getInboxEmails(accountId), getAllEmails: (accountId?: string) => db.getAllEmails(accountId), searchEmails: (query: string, options?: db.SearchOptions) => db.searchEmails(query, options), - saveAnalysis: (emailId: string, needsReply: boolean, reason: string, priority?: string) => - db.saveAnalysis(emailId, needsReply, reason, priority), + saveAnalysis: (emailId: string, needsReply: boolean, reason: string, priority?: string) => { + const cleanup = db.saveAnalysis(emailId, needsReply, reason, priority); + // If reclassified as skip, fire-and-forget Gmail draft cleanup + if (cleanup) { + for (const c of cleanup) { + if (c.gmailDraftId && c.accountId) { + import("../services/gmail-draft-sync").then(({ deleteGmailDraftById }) => + deleteGmailDraftById(c.accountId!, c.gmailDraftId!).catch(() => {}), + ); + } + } + } + return cleanup; + }, saveDraft: ( emailId: string, draftBody: string, diff --git a/src/main/db/index.ts b/src/main/db/index.ts index 85772919..b01559f7 100644 --- a/src/main/db/index.ts +++ b/src/main/db/index.ts @@ -1468,18 +1468,94 @@ function rowToDashboardEmail(row: Record): DashboardEmail { } // Analysis operations +export interface DraftCleanupInfo { + gmailDraftId: string | null; + agentTaskId: string | null; + accountId: string | null; +} + +/** + * Save analysis for an email. When the priority is "skip" and the email had + * a draft, the local draft and its agent trace are deleted automatically. + * Returns cleanup info so the caller can cancel in-flight agents and delete + * the Gmail draft (which requires async network calls outside the DB layer). + */ export function saveAnalysis( emailId: string, needsReply: boolean, reason: string, priority?: string, -): void { - const db = getDatabase(); - const stmt = db.prepare(` - INSERT OR REPLACE INTO analyses (email_id, needs_reply, reason, priority, analyzed_at) - VALUES (?, ?, ?, ?, ?) - `); - stmt.run(emailId, needsReply ? 1 : 0, reason, priority || null, Date.now()); +): DraftCleanupInfo[] | null { + const db = getDatabase(); + const effectivePriority = priority || null; + + // When reclassifying as "skip", clean up drafts for the ENTIRE thread, not just + // this email. The UI shows priority at the thread level, and the draft might be + // on a different email than the one being reclassified (e.g., draft is on email A + // but email B arrives and gets classified as skip). + const cleanupInfos: DraftCleanupInfo[] = []; + if (effectivePriority === "skip") { + // Look up thread_id and account_id for this email + const emailRow = db + .prepare(`SELECT thread_id, account_id FROM emails WHERE id = ?`) + .get(emailId) as { thread_id: string; account_id: string } | undefined; + + if (emailRow) { + // Find all drafts in this thread + const draftRows = db + .prepare( + `SELECT d.email_id, d.gmail_draft_id, d.agent_task_id + FROM drafts d JOIN emails e ON d.email_id = e.id + WHERE e.thread_id = ? AND e.account_id = ?`, + ) + .all(emailRow.thread_id, emailRow.account_id) as Array<{ + email_id: string; + gmail_draft_id: string | null; + agent_task_id: string | null; + }>; + + for (const row of draftRows) { + cleanupInfos.push({ + gmailDraftId: row.gmail_draft_id, + agentTaskId: row.agent_task_id, + accountId: emailRow.account_id, + }); + } + } + } + + // Wrap all DB writes in a transaction so draft deletion + analysis save are atomic + const doWrites = db.transaction(() => { + for (const info of cleanupInfos) { + if (info.agentTaskId) { + db.prepare(`DELETE FROM agent_conversation_mirror WHERE local_task_id = ?`).run( + info.agentTaskId, + ); + } + } + if (cleanupInfos.length > 0) { + // Look up the thread again inside the transaction for the DELETE + const emailRow = db + .prepare(`SELECT thread_id, account_id FROM emails WHERE id = ?`) + .get(emailId) as { thread_id: string; account_id: string } | undefined; + if (emailRow) { + db.prepare( + `DELETE FROM drafts WHERE email_id IN ( + SELECT d.email_id FROM drafts d JOIN emails e ON d.email_id = e.id + WHERE e.thread_id = ? AND e.account_id = ? + )`, + ).run(emailRow.thread_id, emailRow.account_id); + } + } + + db.prepare( + `INSERT OR REPLACE INTO analyses (email_id, needs_reply, reason, priority, analyzed_at) + VALUES (?, ?, ?, ?, ?)`, + ).run(emailId, needsReply ? 1 : 0, reason, effectivePriority, Date.now()); + }); + doWrites(); + + return cleanupInfos.length > 0 ? cleanupInfos : null; } // Draft operations @@ -1571,6 +1647,52 @@ export function deleteDraft(emailId: string): void { db.prepare("DELETE FROM drafts WHERE email_id = ?").run(emailId); } +/** + * Delete all drafts for a thread. Removes local draft rows and agent traces. + * Returns cleanup info for each deleted draft so callers can handle Gmail + * draft deletion and agent cancellation (async operations outside the DB layer). + */ +export function deleteThreadDrafts(threadId: string, accountId: string): DraftCleanupInfo[] { + const db = getDatabase(); + const rows = db + .prepare( + `SELECT d.email_id, d.gmail_draft_id, d.agent_task_id + FROM drafts d JOIN emails e ON d.email_id = e.id + WHERE e.thread_id = ? AND e.account_id = ?`, + ) + .all(threadId, accountId) as Array<{ + email_id: string; + gmail_draft_id: string | null; + agent_task_id: string | null; + }>; + + if (rows.length === 0) return []; + + const cleanupInfos: DraftCleanupInfo[] = []; + for (const row of rows) { + cleanupInfos.push({ + gmailDraftId: row.gmail_draft_id, + agentTaskId: row.agent_task_id, + accountId, + }); + } + + // Batch all deletes in a transaction for atomicity + const doDeletes = db.transaction(() => { + for (const row of rows) { + if (row.agent_task_id) { + db.prepare(`DELETE FROM agent_conversation_mirror WHERE local_task_id = ?`).run( + row.agent_task_id, + ); + } + db.prepare("DELETE FROM drafts WHERE email_id = ?").run(row.email_id); + } + }); + doDeletes(); + + return cleanupInfos; +} + /** Get the RFC 5322 Message-ID header for an email (used for reply threading). */ export function getEmailMessageIdHeader(emailId: string): string | null { const db = getDatabase(); diff --git a/src/main/ipc/analysis.ipc.ts b/src/main/ipc/analysis.ipc.ts index 45376c92..49f3d6f2 100644 --- a/src/main/ipc/analysis.ipc.ts +++ b/src/main/ipc/analysis.ipc.ts @@ -9,6 +9,7 @@ import { learnFromPriorityOverrideInferred, } from "../services/analysis-edit-learner"; import { stripQuotedContent } from "../services/strip-quoted-content"; +import { deleteGmailDraftById } from "../services/gmail-draft-sync"; import { createLogger } from "../services/logger"; const log = createLogger("analysis-ipc"); @@ -110,8 +111,20 @@ export function registerAnalysisIpc(): void { const result = await analyzerInstance.analyze(emailForAnalysis, userEmail, email.accountId); - // Save analysis to database - saveAnalysis(emailId, result.needs_reply, result.reason, result.priority); + // Save analysis to database (also cleans up thread drafts if reclassified as skip) + const draftCleanup = saveAnalysis( + emailId, + result.needs_reply, + result.reason, + result.priority, + ); + if (draftCleanup) { + for (const cleanup of draftCleanup) { + if (cleanup.gmailDraftId && cleanup.accountId) { + deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {}); + } + } + } // Return updated email with analysis const updatedEmail = getEmail(emailId); @@ -188,7 +201,19 @@ export function registerAnalysisIpc(): void { userEmail, email.accountId, ); - saveAnalysis(emailId, result.needs_reply, result.reason, result.priority); + const batchCleanup = saveAnalysis( + emailId, + result.needs_reply, + result.reason, + result.priority, + ); + if (batchCleanup) { + for (const cleanup of batchCleanup) { + if (cleanup.gmailDraftId && cleanup.accountId) { + deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {}); + } + } + } const updatedEmail = getEmail(emailId); if (updatedEmail) { @@ -239,18 +264,45 @@ export function registerAnalysisIpc(): void { const originalNeedsReply = originalAnalysis?.needsReply ?? false; const originalPriority = originalAnalysis?.priority ?? null; - // Update the analysis in DB - saveAnalysis( + // Normalize: the UI represents "skip" as priority=null + needsReply=false, + // but saveAnalysis checks for the literal string "skip" to trigger draft cleanup. + const normalizedPriority = + newPriority === null && !newNeedsReply ? "skip" : (newPriority ?? undefined); + + // Update the analysis in DB (also deletes local draft + trace if reclassified as skip) + const draftCleanup = saveAnalysis( emailId, newNeedsReply, originalAnalysis?.reason ?? "User override", - newPriority ?? undefined, + normalizedPriority, ); log.info( `[Analysis] Priority overridden for ${emailId}: ${originalPriority ?? "skip"} → ${newPriority ?? "skip"}`, ); + // If reclassified as skip and thread had drafts, clean up Gmail drafts and cancel agents + if (draftCleanup) { + log.info( + `[Analysis] Cleaning up ${draftCleanup.length} thread draft(s) for ${emailId} after skip reclassification`, + ); + const { agentCoordinator } = await import("../agents/agent-coordinator"); + for (const cleanup of draftCleanup) { + if (cleanup.gmailDraftId && cleanup.accountId) { + deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {}); + } + if (cleanup.agentTaskId) { + agentCoordinator.cancel(cleanup.agentTaskId); + } + } + // Cancel any in-flight auto-draft agent + const { prefetchService } = await import("../services/prefetch-service"); + const activeTaskId = prefetchService.getActiveAgentTaskId(emailId); + if (activeTaskId) { + agentCoordinator.cancel(activeTaskId); + } + } + // Learn from the override in the background (don't block the UI) const accountId = email.accountId ?? "default"; const senderMatch = email.from.match(/<([^>]+)>/) ?? email.from.match(/([^\s<]+@[^\s>]+)/); diff --git a/src/main/ipc/compose.ipc.ts b/src/main/ipc/compose.ipc.ts index 4a27a864..49611b9c 100644 --- a/src/main/ipc/compose.ipc.ts +++ b/src/main/ipc/compose.ipc.ts @@ -16,11 +16,14 @@ import { getSendAsAliases, getSendAsAliasFetchedAt, upsertSendAsAliases, + deleteThreadDrafts, + getThreadDraftBody, } from "../db"; import { networkMonitor } from "../services/network-monitor"; import { outboxService } from "../services/outbox-service"; import { prefetchService } from "../services/prefetch-service"; import { isNetworkError } from "../services/network-errors"; +import { deleteGmailDraftById } from "../services/gmail-draft-sync"; import { learnFromDraftEdit } from "../services/draft-edit-learner"; import type { IpcResponse, @@ -302,11 +305,21 @@ export function registerComposeIpc(): void { await new Promise((resolve) => setTimeout(resolve, 500)); // Still trigger draft-edit learning in demo mode so we can test it if (options.threadId && !options.isForward) { + const draftSnapshot = getThreadDraftBody(options.threadId, options.accountId); + const demoCleanups = deleteThreadDrafts(options.threadId, options.accountId); + // Cancel in-flight agents (agent drafts now run in demo mode) + for (const cleanup of demoCleanups) { + if (cleanup.agentTaskId) { + const { agentCoordinator } = await import("../agents/agent-coordinator"); + agentCoordinator.cancel(cleanup.agentTaskId); + } + } learnFromDraftEdit({ threadId: options.threadId, accountId: options.accountId, sentBodyHtml: options.bodyHtml || "", sentBodyText: options.bodyText, + draftSnapshot: draftSnapshot ?? undefined, }) .then((result) => { if (result && (result.promoted.length > 0 || result.draftMemoriesCreated > 0)) { @@ -356,20 +369,45 @@ export function registerComposeIpc(): void { return { success: true, data: result }; } + // Snapshot draft body BEFORE send so it's available for learning even if + // the draft is deleted (e.g. user archives the thread immediately after send). + const draftSnapshot = + options.threadId && !options.isForward + ? getThreadDraftBody(options.threadId, options.accountId) + : null; + const result = await client.sendMessage(options); - // After sending a reply, mark the thread as read and re-queue analysis - // Skip for forwards — forwarding doesn't mean the user addressed the original conversation + // After sending a reply, clean up thread drafts and re-queue analysis. + // Skip for forwards — forwarding doesn't mean the user addressed the original conversation. if (options.threadId && !options.isForward) { triggerThreadReanalysis(options.threadId, options.accountId); // Fire-and-forget: mark thread read so Gmail shows it as read markThreadAsReadAfterSend(client, options.threadId, options.accountId); + + // Clean up the AI draft now that the reply has been sent. + // Must happen after snapshotting the draft body (for learning) but before + // returning to the renderer — prevents a race where the user archives the + // thread immediately and deleteThreadDrafts tries to delete the Gmail draft + // that was already consumed or is no longer relevant. + const draftCleanups = deleteThreadDrafts(options.threadId, options.accountId); + for (const cleanup of draftCleanups) { + if (cleanup.gmailDraftId) { + deleteGmailDraftById(options.accountId, cleanup.gmailDraftId).catch(() => {}); + } + if (cleanup.agentTaskId) { + const { agentCoordinator } = await import("../agents/agent-coordinator"); + agentCoordinator.cancel(cleanup.agentTaskId); + } + } + // Fire-and-forget: learn from draft edits (compare AI draft vs what was sent) learnFromDraftEdit({ threadId: options.threadId, accountId: options.accountId, sentBodyHtml: options.bodyHtml || "", sentBodyText: options.bodyText, + draftSnapshot: draftSnapshot ?? undefined, }) .then((result) => { if (result && (result.promoted.length > 0 || result.draftMemoriesCreated > 0)) { diff --git a/src/main/ipc/settings.ipc.ts b/src/main/ipc/settings.ipc.ts index 4a264da0..fe0d7ec9 100644 --- a/src/main/ipc/settings.ipc.ts +++ b/src/main/ipc/settings.ipc.ts @@ -725,7 +725,7 @@ export function registerSettingsIpc(): void { const isDemoMode = process.env.EXO_DEMO_MODE === "true"; if (isTestMode || isDemoMode) { - // Return mock data in demo mode + // Return mock data in test/demo mode const mockProfile: SenderProfile = { email: emailAddr, name: from.split("<")[0].trim() || "Demo Sender", diff --git a/src/main/ipc/sync.ipc.ts b/src/main/ipc/sync.ipc.ts index 7cd50294..8166a3cb 100644 --- a/src/main/ipc/sync.ipc.ts +++ b/src/main/ipc/sync.ipc.ts @@ -1221,6 +1221,14 @@ export function registerSyncIpc(): void { ); } + // NOTE: Draft cleanup is intentionally NOT done here. Archiving can race + // with undo-send (the user sends, then immediately archives within the undo + // window). If we delete drafts now, the user can't undo the send. Instead: + // - compose:send cleans up drafts when the send actually fires + // - saveAnalysis cleans up drafts when emails are reclassified as "skip" + // - Orphaned drafts for archived threads are harmless (invisible in inbox) + // and get cleaned up by the 30-day data retention sweep. + if (useFakeData) { return { success: true, data: undefined }; } @@ -1268,17 +1276,6 @@ export function registerSyncIpc(): void { } } - // Clean up agent traces for archived thread emails - for (const email of threadEmails) { - if (email.draft?.agentTaskId) { - try { - deleteAgentTrace(email.draft.agentTaskId); - } catch { - /* non-critical */ - } - } - } - // Notify renderer: remove entire thread (including SENT) so no ghost threads remain if (archivedIds.length > 0) { const allThreadEmailIds = threadEmails.map((e) => e.id); diff --git a/src/main/services/draft-edit-learner.ts b/src/main/services/draft-edit-learner.ts index d6052683..68924ae8 100644 --- a/src/main/services/draft-edit-learner.ts +++ b/src/main/services/draft-edit-learner.ts @@ -399,7 +399,7 @@ export async function filterAgainstPromotedMemories( return observations; } - // Skip API call in demo/test mode — return all observations unfiltered + // Skip API call in test/demo mode — return all observations unfiltered if (process.env.EXO_TEST_MODE === "true" || process.env.EXO_DEMO_MODE === "true") { return observations; } @@ -497,7 +497,7 @@ export async function consolidateMemoryScopes( return { action: "save", deletedIds: [], createdGlobal: null, coveringMemoryId: null }; } - // Skip API call in demo/test mode — treat all candidates as new + // Skip API call in test/demo mode — treat all candidates as new if (process.env.EXO_TEST_MODE === "true" || process.env.EXO_DEMO_MODE === "true") { return { action: "save", deletedIds: [], createdGlobal: null, coveringMemoryId: null }; } @@ -660,12 +660,14 @@ export async function learnFromDraftEdit(params: { accountId: string; sentBodyHtml: string; sentBodyText?: string; + /** Pre-read draft snapshot — use when the draft row may be deleted before this runs. */ + draftSnapshot?: { draftBody: string; fromAddress: string; subject: string } | null; }): Promise { const { threadId, accountId, sentBodyHtml } = params; log.info(`[DraftEditLearner] Called for thread ${threadId}`); - // 1. Find the original AI draft for this thread - const draftInfo = getThreadDraftBody(threadId, accountId); + // 1. Find the original AI draft for this thread (use snapshot if provided) + const draftInfo = params.draftSnapshot ?? getThreadDraftBody(threadId, accountId); if (!draftInfo) { log.info(`[DraftEditLearner] No AI draft found for thread ${threadId} — skipping`); return null; diff --git a/src/main/services/draft-pipeline.ts b/src/main/services/draft-pipeline.ts index cc73ea7b..df80f864 100644 --- a/src/main/services/draft-pipeline.ts +++ b/src/main/services/draft-pipeline.ts @@ -6,7 +6,7 @@ * assembly → DraftGenerator call → DB save. */ import { getEmail, saveAnalysis } from "../db"; -import { saveDraftAndSync } from "./gmail-draft-sync"; +import { saveDraftAndSync, deleteGmailDraftById } from "./gmail-draft-sync"; import { getConfig, getModelIdForFeature } from "../ipc/settings.ipc"; import { getEmailSyncService } from "../ipc/sync.ipc"; import { buildStyleContext } from "./style-profiler"; @@ -139,12 +139,19 @@ export async function generateDraftForEmail( config.analysisPrompt ?? undefined, ); const analysisResult = await analyzer.analyze(emailForDraft); - saveAnalysis( + const draftCleanup = saveAnalysis( emailId, analysisResult.needs_reply, analysisResult.reason, analysisResult.priority, ); + if (draftCleanup) { + for (const cleanup of draftCleanup) { + if (cleanup.gmailDraftId && cleanup.accountId) { + deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {}); + } + } + } email.analysis = { needsReply: analysisResult.needs_reply, reason: analysisResult.reason, diff --git a/src/main/services/prefetch-service.ts b/src/main/services/prefetch-service.ts index e97e6d5a..040c1268 100644 --- a/src/main/services/prefetch-service.ts +++ b/src/main/services/prefetch-service.ts @@ -18,6 +18,7 @@ import { agentCoordinator } from "../agents/agent-coordinator"; import type { AgentContext } from "../agents/types"; import { DEFAULT_AGENT_DRAFTER_PROMPT } from "../../shared/types"; import type { Email, DashboardEmail } from "../../shared/types"; +import { deleteGmailDraftById } from "./gmail-draft-sync"; import { createLogger } from "./logger"; const log = createLogger("prefetch"); @@ -751,10 +752,40 @@ When you see emails in a thread where ${eaName} is coordinating scheduling with const userEmail = account?.email; const result = await analyzer.analyze(emailForAnalysis, userEmail, email.accountId); - saveAnalysis(emailId, result.needs_reply, result.reason, result.priority); + const draftCleanup = saveAnalysis( + emailId, + result.needs_reply, + result.reason, + result.priority, + ); this.processedAnalysis.add(emailId); this.processedCounts.analysis++; + // If reclassified as "skip" and thread had drafts, clean up Gmail drafts and cancel agents + if (draftCleanup) { + log.info( + `[Prefetch] Email ${emailId} reclassified as skip — deleted ${draftCleanup.length} thread draft(s)`, + ); + for (const cleanup of draftCleanup) { + if (cleanup.gmailDraftId && cleanup.accountId) { + deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {}); + } + if (cleanup.agentTaskId) { + agentCoordinator.cancel(cleanup.agentTaskId); + } + } + // Cancel any in-flight agent draft for this email + const activeTaskId = this.activeAgentTaskIds.get(emailId); + if (activeTaskId) { + agentCoordinator.cancel(activeTaskId); + } + this.processedDrafts.delete(emailId); + // Also clear thread-level dedup so new emails in the thread can get drafts + if (email.threadId) { + this.processedDraftThreads.delete(email.threadId); + } + } + log.info( `[Prefetch] Analyzed ${emailId}: ${result.priority} priority, needs_reply=${result.needs_reply}`, ); diff --git a/src/renderer/components/EmailDetail.tsx b/src/renderer/components/EmailDetail.tsx index 8975e65f..75ffa7bc 100644 --- a/src/renderer/components/EmailDetail.tsx +++ b/src/renderer/components/EmailDetail.tsx @@ -3570,13 +3570,24 @@ export function EmailDetail({ isFullView = false }: EmailDetailProps) { { - updateEmail(latestReceivedEmail.id, { + const updates: Partial = { analysis: { ...latestReceivedEmail.analysis!, needsReply: newNeedsReply, priority: (newPriority as "high" | "medium" | "low" | "skip" | null) ?? undefined, }, - }); + }; + updateEmail(latestReceivedEmail.id, updates); + // When reclassified as skip, clear drafts from ALL thread emails in UI state + // (main process deletes all thread drafts automatically via saveAnalysis). + // The draft may be on a different email than latestReceivedEmail. + if (newPriority === "skip" || newPriority === null) { + for (const email of threadEmails) { + if (email.draft) { + updateEmail(email.id, { draft: undefined }); + } + } + } }} /> )}