Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/main/agents/agent-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,20 @@ export class AgentCoordinator {
getInboxEmails: (accountId?: string) => db.getInboxEmails(accountId),
getAllEmails: (accountId?: string) => db.getAllEmails(accountId),
searchEmails: (query: string, options?: db.SearchOptions) => db.searchEmails(query, options),
saveAnalysis: (emailId: string, needsReply: boolean, reason: string, priority?: string) =>
db.saveAnalysis(emailId, needsReply, reason, priority),
saveAnalysis: (emailId: string, needsReply: boolean, reason: string, priority?: string) => {
const cleanup = db.saveAnalysis(emailId, needsReply, reason, priority);
// If reclassified as skip, fire-and-forget Gmail draft cleanup
if (cleanup) {
for (const c of cleanup) {
if (c.gmailDraftId && c.accountId) {
import("../services/gmail-draft-sync").then(({ deleteGmailDraftById }) =>
deleteGmailDraftById(c.accountId!, c.gmailDraftId!).catch(() => {}),
);
}
}
}
return cleanup;
},
saveDraft: (
emailId: string,
draftBody: string,
Expand Down
136 changes: 129 additions & 7 deletions src/main/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1468,18 +1468,94 @@ function rowToDashboardEmail(row: Record<string, unknown>): DashboardEmail {
}

// Analysis operations
export interface DraftCleanupInfo {
gmailDraftId: string | null;
agentTaskId: string | null;
accountId: string | null;
}

/**
* Save analysis for an email. When the priority is "skip" and the email had
* a draft, the local draft and its agent trace are deleted automatically.
* Returns cleanup info so the caller can cancel in-flight agents and delete
* the Gmail draft (which requires async network calls outside the DB layer).
*/
export function saveAnalysis(
emailId: string,
needsReply: boolean,
reason: string,
priority?: string,
): void {
const db = getDatabase();
const stmt = db.prepare(`
INSERT OR REPLACE INTO analyses (email_id, needs_reply, reason, priority, analyzed_at)
VALUES (?, ?, ?, ?, ?)
`);
stmt.run(emailId, needsReply ? 1 : 0, reason, priority || null, Date.now());
): DraftCleanupInfo[] | null {
const db = getDatabase();
const effectivePriority = priority || null;

// When reclassifying as "skip", clean up drafts for the ENTIRE thread, not just
// this email. The UI shows priority at the thread level, and the draft might be
// on a different email than the one being reclassified (e.g., draft is on email A
// but email B arrives and gets classified as skip).
const cleanupInfos: DraftCleanupInfo[] = [];
if (effectivePriority === "skip") {
Comment thread
ankitvgupta marked this conversation as resolved.
// Look up thread_id and account_id for this email
const emailRow = db
.prepare(`SELECT thread_id, account_id FROM emails WHERE id = ?`)
.get(emailId) as { thread_id: string; account_id: string } | undefined;

if (emailRow) {
// Find all drafts in this thread
const draftRows = db
.prepare(
`SELECT d.email_id, d.gmail_draft_id, d.agent_task_id
FROM drafts d JOIN emails e ON d.email_id = e.id
WHERE e.thread_id = ? AND e.account_id = ?`,
)
.all(emailRow.thread_id, emailRow.account_id) as Array<{
email_id: string;
gmail_draft_id: string | null;
agent_task_id: string | null;
}>;

for (const row of draftRows) {
cleanupInfos.push({
gmailDraftId: row.gmail_draft_id,
agentTaskId: row.agent_task_id,
accountId: emailRow.account_id,
});
}
}
}

// Wrap all DB writes in a transaction so draft deletion + analysis save are atomic
const doWrites = db.transaction(() => {
for (const info of cleanupInfos) {
if (info.agentTaskId) {
db.prepare(`DELETE FROM agent_conversation_mirror WHERE local_task_id = ?`).run(
info.agentTaskId,
);
}
}
if (cleanupInfos.length > 0) {
// Look up the thread again inside the transaction for the DELETE
const emailRow = db
.prepare(`SELECT thread_id, account_id FROM emails WHERE id = ?`)
.get(emailId) as { thread_id: string; account_id: string } | undefined;
if (emailRow) {
db.prepare(
`DELETE FROM drafts WHERE email_id IN (
SELECT d.email_id FROM drafts d JOIN emails e ON d.email_id = e.id
WHERE e.thread_id = ? AND e.account_id = ?
)`,
).run(emailRow.thread_id, emailRow.account_id);
}
}

db.prepare(
`INSERT OR REPLACE INTO analyses (email_id, needs_reply, reason, priority, analyzed_at)
VALUES (?, ?, ?, ?, ?)`,
).run(emailId, needsReply ? 1 : 0, reason, effectivePriority, Date.now());
});
doWrites();

return cleanupInfos.length > 0 ? cleanupInfos : null;
Comment thread
ankitvgupta marked this conversation as resolved.
}

// Draft operations
Expand Down Expand Up @@ -1571,6 +1647,52 @@ export function deleteDraft(emailId: string): void {
db.prepare("DELETE FROM drafts WHERE email_id = ?").run(emailId);
}

/**
* Delete all drafts for a thread. Removes local draft rows and agent traces.
* Returns cleanup info for each deleted draft so callers can handle Gmail
* draft deletion and agent cancellation (async operations outside the DB layer).
*/
export function deleteThreadDrafts(threadId: string, accountId: string): DraftCleanupInfo[] {
const db = getDatabase();
const rows = db
.prepare(
`SELECT d.email_id, d.gmail_draft_id, d.agent_task_id
FROM drafts d JOIN emails e ON d.email_id = e.id
WHERE e.thread_id = ? AND e.account_id = ?`,
)
.all(threadId, accountId) as Array<{
email_id: string;
gmail_draft_id: string | null;
agent_task_id: string | null;
}>;

if (rows.length === 0) return [];

const cleanupInfos: DraftCleanupInfo[] = [];
for (const row of rows) {
cleanupInfos.push({
gmailDraftId: row.gmail_draft_id,
agentTaskId: row.agent_task_id,
accountId,
});
}

// Batch all deletes in a transaction for atomicity
const doDeletes = db.transaction(() => {
for (const row of rows) {
if (row.agent_task_id) {
db.prepare(`DELETE FROM agent_conversation_mirror WHERE local_task_id = ?`).run(
row.agent_task_id,
);
}
db.prepare("DELETE FROM drafts WHERE email_id = ?").run(row.email_id);
}
});
doDeletes();

return cleanupInfos;
}

/** Get the RFC 5322 Message-ID header for an email (used for reply threading). */
export function getEmailMessageIdHeader(emailId: string): string | null {
const db = getDatabase();
Expand Down
64 changes: 58 additions & 6 deletions src/main/ipc/analysis.ipc.ts
Comment thread
ankitvgupta marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
learnFromPriorityOverrideInferred,
} from "../services/analysis-edit-learner";
import { stripQuotedContent } from "../services/strip-quoted-content";
import { deleteGmailDraftById } from "../services/gmail-draft-sync";
import { createLogger } from "../services/logger";

const log = createLogger("analysis-ipc");
Expand Down Expand Up @@ -110,8 +111,20 @@ export function registerAnalysisIpc(): void {

const result = await analyzerInstance.analyze(emailForAnalysis, userEmail, email.accountId);

// Save analysis to database
saveAnalysis(emailId, result.needs_reply, result.reason, result.priority);
// Save analysis to database (also cleans up thread drafts if reclassified as skip)
const draftCleanup = saveAnalysis(
emailId,
result.needs_reply,
result.reason,
result.priority,
);
if (draftCleanup) {
for (const cleanup of draftCleanup) {
if (cleanup.gmailDraftId && cleanup.accountId) {
deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {});
}
}
}

// Return updated email with analysis
const updatedEmail = getEmail(emailId);
Expand Down Expand Up @@ -188,7 +201,19 @@ export function registerAnalysisIpc(): void {
userEmail,
email.accountId,
);
saveAnalysis(emailId, result.needs_reply, result.reason, result.priority);
const batchCleanup = saveAnalysis(
emailId,
result.needs_reply,
result.reason,
result.priority,
);
if (batchCleanup) {
for (const cleanup of batchCleanup) {
if (cleanup.gmailDraftId && cleanup.accountId) {
deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {});
}
}
}

const updatedEmail = getEmail(emailId);
if (updatedEmail) {
Expand Down Expand Up @@ -239,18 +264,45 @@ export function registerAnalysisIpc(): void {
const originalNeedsReply = originalAnalysis?.needsReply ?? false;
const originalPriority = originalAnalysis?.priority ?? null;

// Update the analysis in DB
saveAnalysis(
// Normalize: the UI represents "skip" as priority=null + needsReply=false,
// but saveAnalysis checks for the literal string "skip" to trigger draft cleanup.
const normalizedPriority =
newPriority === null && !newNeedsReply ? "skip" : (newPriority ?? undefined);

// Update the analysis in DB (also deletes local draft + trace if reclassified as skip)
const draftCleanup = saveAnalysis(
Comment thread
ankitvgupta marked this conversation as resolved.
emailId,
newNeedsReply,
originalAnalysis?.reason ?? "User override",
newPriority ?? undefined,
normalizedPriority,
);

log.info(
`[Analysis] Priority overridden for ${emailId}: ${originalPriority ?? "skip"} → ${newPriority ?? "skip"}`,
);

// If reclassified as skip and thread had drafts, clean up Gmail drafts and cancel agents
if (draftCleanup) {
log.info(
`[Analysis] Cleaning up ${draftCleanup.length} thread draft(s) for ${emailId} after skip reclassification`,
);
const { agentCoordinator } = await import("../agents/agent-coordinator");
for (const cleanup of draftCleanup) {
if (cleanup.gmailDraftId && cleanup.accountId) {
deleteGmailDraftById(cleanup.accountId, cleanup.gmailDraftId).catch(() => {});
}
if (cleanup.agentTaskId) {
agentCoordinator.cancel(cleanup.agentTaskId);
}
}
// Cancel any in-flight auto-draft agent
const { prefetchService } = await import("../services/prefetch-service");
const activeTaskId = prefetchService.getActiveAgentTaskId(emailId);
if (activeTaskId) {
agentCoordinator.cancel(activeTaskId);
}
Comment on lines +298 to +303

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Agent cancellation on skip reclassification only checks a single emailId, missing in-flight agents on other thread emails

When the user reclassifies an email as "skip" via analysis:override-priority, the handler calls prefetchService.getActiveAgentTaskId(emailId) at line 300, which only checks for an in-flight agent task on the specific email being reclassified. However, the agent draft may be running on a different email in the same thread (the prefetch service deduplicates at the thread level via processedDraftThreads, and the draft target email may not be the one the user is overriding). If so, the in-flight agent continues running and eventually writes a new draft via saveDraft, leaving the thread in an inconsistent state: priority "skip" but with a newly-created draft.

Concrete scenario
  1. Thread has emails A and B. Email A gets analyzed → agent draft queued for A
  2. Email B arrives, becomes latestReceivedEmail
  3. User reclassifies email B as "skip"
  4. saveAnalysis deletes thread drafts — none yet (agent A still running)
  5. getActiveAgentTaskId(emailId_B) → no match (agent is keyed to email A)
  6. Agent for email A finishes, calls saveDraft(A, ...) → draft row re-created
  7. Thread now has priority="skip" AND a draft on email A
Prompt for agents
In analysis.ipc.ts, the analysis:override-priority handler (around line 298-303) uses prefetchService.getActiveAgentTaskId(emailId) to cancel in-flight agent tasks when reclassifying as skip. This only checks the single emailId being reclassified, but the in-flight agent draft might be keyed to a different email in the same thread.

To fix this, look up the threadId (email.threadId is already available from the getEmail call earlier) and iterate over all emails in the thread to cancel any active agent tasks. You could either:
1. Add a new method to PrefetchService like getActiveAgentTaskIdsForThread(threadId) that looks up all emailIds in the thread and checks activeAgentTaskIds for each.
2. Or iterate getEmailsByThread and check getActiveAgentTaskId for each email in the thread.

The same issue exists in prefetch-service.ts processAnalysis at line 778, where only the single emailId is checked.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}

// Learn from the override in the background (don't block the UI)
const accountId = email.accountId ?? "default";
const senderMatch = email.from.match(/<([^>]+)>/) ?? email.from.match(/([^\s<]+@[^\s>]+)/);
Expand Down
42 changes: 40 additions & 2 deletions src/main/ipc/compose.ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import {
getSendAsAliases,
getSendAsAliasFetchedAt,
upsertSendAsAliases,
deleteThreadDrafts,
getThreadDraftBody,
} from "../db";
import { networkMonitor } from "../services/network-monitor";
import { outboxService } from "../services/outbox-service";
import { prefetchService } from "../services/prefetch-service";
import { isNetworkError } from "../services/network-errors";
import { deleteGmailDraftById } from "../services/gmail-draft-sync";
import { learnFromDraftEdit } from "../services/draft-edit-learner";
import type {
IpcResponse,
Expand Down Expand Up @@ -302,11 +305,21 @@ export function registerComposeIpc(): void {
await new Promise((resolve) => setTimeout(resolve, 500));
// Still trigger draft-edit learning in demo mode so we can test it
if (options.threadId && !options.isForward) {
const draftSnapshot = getThreadDraftBody(options.threadId, options.accountId);
const demoCleanups = deleteThreadDrafts(options.threadId, options.accountId);
// Cancel in-flight agents (agent drafts now run in demo mode)
for (const cleanup of demoCleanups) {
if (cleanup.agentTaskId) {
const { agentCoordinator } = await import("../agents/agent-coordinator");
agentCoordinator.cancel(cleanup.agentTaskId);
}
}
learnFromDraftEdit({
threadId: options.threadId,
accountId: options.accountId,
sentBodyHtml: options.bodyHtml || "",
sentBodyText: options.bodyText,
draftSnapshot: draftSnapshot ?? undefined,
})
.then((result) => {
if (result && (result.promoted.length > 0 || result.draftMemoriesCreated > 0)) {
Expand Down Expand Up @@ -356,20 +369,45 @@ export function registerComposeIpc(): void {
return { success: true, data: result };
}

// Snapshot draft body BEFORE send so it's available for learning even if
// the draft is deleted (e.g. user archives the thread immediately after send).
const draftSnapshot =
options.threadId && !options.isForward
? getThreadDraftBody(options.threadId, options.accountId)
: null;

const result = await client.sendMessage(options);

// After sending a reply, mark the thread as read and re-queue analysis
// Skip for forwards — forwarding doesn't mean the user addressed the original conversation
// After sending a reply, clean up thread drafts and re-queue analysis.
// Skip for forwards — forwarding doesn't mean the user addressed the original conversation.
if (options.threadId && !options.isForward) {
triggerThreadReanalysis(options.threadId, options.accountId);
// Fire-and-forget: mark thread read so Gmail shows it as read
markThreadAsReadAfterSend(client, options.threadId, options.accountId);

// Clean up the AI draft now that the reply has been sent.
// Must happen after snapshotting the draft body (for learning) but before
// returning to the renderer — prevents a race where the user archives the
// thread immediately and deleteThreadDrafts tries to delete the Gmail draft
// that was already consumed or is no longer relevant.
const draftCleanups = deleteThreadDrafts(options.threadId, options.accountId);
for (const cleanup of draftCleanups) {
if (cleanup.gmailDraftId) {
deleteGmailDraftById(options.accountId, cleanup.gmailDraftId).catch(() => {});
}
if (cleanup.agentTaskId) {
const { agentCoordinator } = await import("../agents/agent-coordinator");
agentCoordinator.cancel(cleanup.agentTaskId);
}
}
Comment thread
ankitvgupta marked this conversation as resolved.
Comment on lines +388 to +402

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Inconsistent indentation in draft cleanup block makes code misleading

The draft cleanup code block at lines 388-402 is indented at a different level (10 spaces) compared to the surrounding code inside the same if block (12 spaces at lines 384-386). While JavaScript execution is unaffected by indentation, this makes the code appear to be outside the if (options.threadId && !options.isForward) block when reading the diff. This violates the CLAUDE.md rule: "Only write comments that explain why code is written the way it was, especially when it's unintuitive" — the indentation itself is unintuitive and makes the code harder to understand.

Suggested change
// Clean up the AI draft now that the reply has been sent.
// Must happen after snapshotting the draft body (for learning) but before
// returning to the renderer — prevents a race where the user archives the
// thread immediately and deleteThreadDrafts tries to delete the Gmail draft
// that was already consumed or is no longer relevant.
const draftCleanups = deleteThreadDrafts(options.threadId, options.accountId);
for (const cleanup of draftCleanups) {
if (cleanup.gmailDraftId) {
deleteGmailDraftById(options.accountId, cleanup.gmailDraftId).catch(() => {});
}
if (cleanup.agentTaskId) {
const { agentCoordinator } = await import("../agents/agent-coordinator");
agentCoordinator.cancel(cleanup.agentTaskId);
}
}
// Clean up the AI draft now that the reply has been sent.
// Must happen after snapshotting the draft body (for learning) but before
// returning to the renderer — prevents a race where the user archives the
// thread immediately and deleteThreadDrafts tries to delete the Gmail draft
// that was already consumed or is no longer relevant.
const draftCleanups = deleteThreadDrafts(options.threadId, options.accountId);
for (const cleanup of draftCleanups) {
if (cleanup.gmailDraftId) {
deleteGmailDraftById(options.accountId, cleanup.gmailDraftId).catch(() => {});
}
if (cleanup.agentTaskId) {
const { agentCoordinator } = await import("../agents/agent-coordinator");
agentCoordinator.cancel(cleanup.agentTaskId);
}
}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


// Fire-and-forget: learn from draft edits (compare AI draft vs what was sent)
learnFromDraftEdit({
threadId: options.threadId,
accountId: options.accountId,
sentBodyHtml: options.bodyHtml || "",
sentBodyText: options.bodyText,
draftSnapshot: draftSnapshot ?? undefined,
})
.then((result) => {
if (result && (result.promoted.length > 0 || result.draftMemoriesCreated > 0)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/ipc/settings.ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ export function registerSettingsIpc(): void {
const isDemoMode = process.env.EXO_DEMO_MODE === "true";

if (isTestMode || isDemoMode) {
// Return mock data in demo mode
// Return mock data in test/demo mode
const mockProfile: SenderProfile = {
email: emailAddr,
name: from.split("<")[0].trim() || "Demo Sender",
Expand Down
Loading
Loading