From 48ec00449c886735386720ee449000891a34d6f3 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Mon, 29 Jun 2026 22:33:04 +0200 Subject: [PATCH 1/8] fix(workflow-executor): harden OAuth reauth and token write-back Three correctness fixes that gate the OAuth2 MCP executor flag: - clear the step idempotency marker on a re-auth pause so a resumed step is no longer rejected as interrupted - record a re-auth pause as a non-failure in the activity log instead of a spurious failure - re-check credential existence before the rotated-token write-back so a concurrent disconnect is not silently resurrected Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/executors/activity-log.ts | 11 +- .../src/executors/mcp-step-executor.ts | 6 +- .../src/oauth/token-service.ts | 5 + .../workflow-executor/src/ports/run-store.ts | 1 + .../src/stores/database-store.ts | 9 ++ .../src/stores/in-memory-store.ts | 6 + .../test/executors/activity-log.test.ts | 18 +++ .../test/executors/base-step-executor.test.ts | 1 + .../executors/condition-step-executor.test.ts | 1 + .../executors/guidance-step-executor.test.ts | 1 + .../load-related-record-step-executor.test.ts | 1 + .../test/executors/mcp-step-executor.test.ts | 107 ++++++++++++++++++ .../read-record-step-executor.test.ts | 1 + ...rigger-record-action-step-executor.test.ts | 1 + .../update-record-step-executor.test.ts | 1 + .../test/oauth/token-service.test.ts | 45 +++++++- .../workflow-executor/test/runner.test.ts | 2 + .../test/stores/database-store.test.ts | 26 +++++ .../test/stores/in-memory-store.test.ts | 26 +++++ 19 files changed, 265 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/executors/activity-log.ts b/packages/workflow-executor/src/executors/activity-log.ts index 4749fb2bd5..a936b62bb1 100644 --- a/packages/workflow-executor/src/executors/activity-log.ts +++ b/packages/workflow-executor/src/executors/activity-log.ts @@ -9,6 +9,9 @@ export type TrackOptions = { // Runs between createPending and the operation — the executor's write-ahead marker. Optional: // read operations have no marker to persist. beforeCall?: () => Promise; + // Errors that represent a controlled interruption (e.g. pausing for re-authentication) rather + // than a failed action: the entry is closed as succeeded, not failed, and the error still throws. + isNonFailure?: (error: unknown) => boolean; }; // Runs an operation while recording an activity-log entry around it (pending → success/failed). @@ -25,7 +28,10 @@ export default class ActivityLog { this.user = user; } - async track(target: AuditTarget, { operation, beforeCall }: TrackOptions): Promise { + async track( + target: AuditTarget, + { operation, beforeCall, isNonFailure }: TrackOptions, + ): Promise { const handle = await this.activityLogPort.createPending({ renderingId: this.user.renderingId, ...target, @@ -40,7 +46,8 @@ export default class ActivityLog { } catch (err) { // The step error is logged/surfaced by base-step-executor when rethrown, so the audit // transition only needs the handle. - void this.activityLogPort.markFailed(handle); + if (isNonFailure?.(err)) void this.activityLogPort.markSucceeded(handle); + else void this.activityLogPort.markFailed(handle); throw err; } } diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 73727ba862..71a87eeb92 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -88,8 +88,11 @@ export default class McpStepExecutor extends BaseStepExecutor try { return await this.runStep(); } catch (error) { - // An unrefreshable OAuth credential pauses the step for re-authentication rather than failing it. + // An unrefreshable OAuth credential pauses the step for re-authentication rather than failing + // it. Clear the write-ahead marker so the resumed step is not rejected as interrupted. if (error instanceof OAuthReauthRequiredError) { + await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); + return this.buildOutcomeResult({ status: 'awaiting-input', awaitingInputReason: error.awaitingInputReason, @@ -155,6 +158,7 @@ export default class McpStepExecutor extends BaseStepExecutor stepIndex: this.context.stepIndex, idempotencyPhase: 'executing', }), + isNonFailure: error => error instanceof OAuthReauthRequiredError, }, ); diff --git a/packages/workflow-executor/src/oauth/token-service.ts b/packages/workflow-executor/src/oauth/token-service.ts index 5c06e9104c..c7af7316fe 100644 --- a/packages/workflow-executor/src/oauth/token-service.ts +++ b/packages/workflow-executor/src/oauth/token-service.ts @@ -186,6 +186,11 @@ export default class OAuthTokenService { try { const encrypted = this.encryption.encrypt(refreshToken); + // A disconnect (DELETE) may have landed after the grant read; re-check before writing so the + // rotated token does not resurrect a row the user just removed. + const current = await this.store.get(credential.userId, credential.mcpServerId); + if (!current) return; + await this.store.upsert({ userId: credential.userId, mcpServerId: credential.mcpServerId, diff --git a/packages/workflow-executor/src/ports/run-store.ts b/packages/workflow-executor/src/ports/run-store.ts index de5a2da1ab..7e30facbfb 100644 --- a/packages/workflow-executor/src/ports/run-store.ts +++ b/packages/workflow-executor/src/ports/run-store.ts @@ -6,4 +6,5 @@ export interface RunStore { close(logger?: Logger): Promise; getStepExecutions(runId: string): Promise; saveStepExecution(runId: string, stepExecution: StepExecutionData): Promise; + deleteStepExecution(runId: string, stepIndex: number): Promise; } diff --git a/packages/workflow-executor/src/stores/database-store.ts b/packages/workflow-executor/src/stores/database-store.ts index 1493095d80..8a90ef234c 100644 --- a/packages/workflow-executor/src/stores/database-store.ts +++ b/packages/workflow-executor/src/stores/database-store.ts @@ -159,6 +159,15 @@ export default class DatabaseStore implements RunStore { }); } + async deleteStepExecution(runId: string, stepIndex: number): Promise { + return this.callPort('deleteStepExecution', async () => { + await this.sequelize.query( + `DELETE FROM ${this.tableReference} WHERE run_id = :runId AND step_index = :stepIndex`, + { replacements: { runId, stepIndex } }, + ); + }); + } + async close(logger?: Logger): Promise { return this.callPort('close', async () => { try { diff --git a/packages/workflow-executor/src/stores/in-memory-store.ts b/packages/workflow-executor/src/stores/in-memory-store.ts index 90117d70aa..6a20242155 100644 --- a/packages/workflow-executor/src/stores/in-memory-store.ts +++ b/packages/workflow-executor/src/stores/in-memory-store.ts @@ -41,6 +41,12 @@ export default class InMemoryStore implements RunStore { }); } + async deleteStepExecution(runId: string, stepIndex: number): Promise { + return this.callPort('deleteStepExecution', async () => { + this.data.get(runId)?.delete(stepIndex); + }); + } + private async callPort(operation: string, fn: () => Promise): Promise { try { return await fn(); diff --git a/packages/workflow-executor/test/executors/activity-log.test.ts b/packages/workflow-executor/test/executors/activity-log.test.ts index ae40901dfb..e91d69db2d 100644 --- a/packages/workflow-executor/test/executors/activity-log.test.ts +++ b/packages/workflow-executor/test/executors/activity-log.test.ts @@ -127,5 +127,23 @@ describe('ActivityLog', () => { expect(port.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); expect(port.markSucceeded).not.toHaveBeenCalled(); }); + + it('closes the entry as succeeded (not failed) when the error is classified as a non-failure', async () => { + const port = makeActivityLogPort(); + const activityLog = new ActivityLog(port, makeUser()); + const interruption = new NoRecordsError(); + + await expect( + activityLog.track(TARGET, { + operation: async () => { + throw interruption; + }, + isNonFailure: error => error === interruption, + }), + ).rejects.toBe(interruption); + + expect(port.markSucceeded).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); + expect(port.markFailed).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 63085ca5f1..b2b17fd97e 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -98,6 +98,7 @@ function makeMockRunStore(stepExecutions: StepExecutionData[] = []): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue(stepExecutions), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), }; } diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index d0c422596e..0242da473b 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -31,6 +31,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts index 7abc8cebca..308be4058e 100644 --- a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts @@ -20,6 +20,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 229242d07b..9e24e9b5bd 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -130,6 +130,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index dc54c63566..20d78703a8 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -14,6 +14,7 @@ import AgentWithLog from '../../src/executors/agent-with-log'; import McpStepExecutor from '../../src/executors/mcp-step-executor'; import SchemaCache from '../../src/schema-cache'; import SchemaResolver from '../../src/schema-resolver'; +import InMemoryStore from '../../src/stores/in-memory-store'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; // --------------------------------------------------------------------------- @@ -58,6 +59,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } @@ -1247,3 +1249,108 @@ describe('McpStepExecutor — OAuth2 tool-call re-authentication', () => { expect(result.stepOutcome.status).toBe('error'); }); }); + +// PRD-692 hardening — three correctness findings on the executor OAuth runtime that are NOT yet +// fixed (this is the TDD step). Each test below is RED against the current code and must go green +// once the fix lands. +// #1 (HIGH): a re-auth pause leaves idempotencyPhase 'executing', so the resumed step throws +// StepStateError ("interrupted") instead of running. +// #2 (MED) : the OAuthReauthRequiredError thrown inside activityLog.track marks the activity as +// failed, so the audit trail shows a spurious failure for a normal re-auth pause. +describe('McpStepExecutor — PRD-692 re-auth pause hardening', () => { + const authError = () => new Error('Request failed with status 401'); + + // A FullyAutomated step whose tool call 401s and whose refresh cannot recover → re-auth pause. + function pauseFor(runStore: ReturnType | InMemoryStore) { + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ + runStore: runStore as RunStore, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + return new McpStepExecutor(context, [tool], 'srv', reloadWithFreshAuth); + } + + describe('finding #1 — idempotency phase cleared on the re-auth pause path', () => { + it('does not leave the step execution marked executing after pausing for re-auth', async () => { + // GIVEN a real store so the persisted write-ahead marker is observable. + const store = new InMemoryStore(); + + // WHEN the step pauses for re-authentication. + const result = await pauseFor(store).execute(); + + // THEN it pauses, and the 'executing' marker written by beforeCall must not survive — else + // a resume would read a stale marker. + expect(result.stepOutcome.status).toBe('awaiting-input'); + const persisted = await store.getStepExecutions('run-1'); + const mcpExecution = persisted.find(execution => execution.stepIndex === 0) as + | McpStepExecutionData + | undefined; + expect(mcpExecution?.idempotencyPhase).not.toBe('executing'); + }); + + it('resumes to success after a re-auth pause instead of failing as interrupted', async () => { + // GIVEN a step that paused for re-auth, persisting its state in a shared store. + const store = new InMemoryStore(); + const pause = await pauseFor(store).execute(); + expect(pause.stepOutcome.status).toBe('awaiting-input'); + + // WHEN the user reconnects and the step is re-dispatched against the same store, with the + // tool now succeeding. + const reconnectedTool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockResolvedValue('ok-after-reconnect'), + }); + const resumeContext = makeContext({ + runStore: store, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + const resumed = await new McpStepExecutor(resumeContext, [reconnectedTool], 'srv').execute(); + + // THEN checkIdempotency must not throw StepStateError on the stale 'executing' marker; the + // step runs to success. + expect(resumed.stepOutcome.status).toBe('success'); + }); + }); + + describe('finding #2 — re-auth pause is not a logged failure', () => { + it('opens an activity-log entry but does not mark it failed when the step pauses for re-auth', async () => { + // GIVEN an activity-log port we can inspect. + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + // WHEN the step pauses for re-authentication. + const result = await new McpStepExecutor( + context, + [tool], + 'srv', + reloadWithFreshAuth, + ).execute(); + + // THEN the activity entry is opened but a normal re-auth pause is not recorded as a failure. + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); + expect(activityLogPort.markFailed).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index 62da87bfa4..743bbab67e 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -72,6 +72,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index f6037a9fa1..846d7a21a9 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -84,6 +84,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index e8b358c8c1..701eb71f92 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -77,6 +77,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/oauth/token-service.test.ts b/packages/workflow-executor/test/oauth/token-service.test.ts index 6f68d848a5..bbd6baf6eb 100644 --- a/packages/workflow-executor/test/oauth/token-service.test.ts +++ b/packages/workflow-executor/test/oauth/token-service.test.ts @@ -1,6 +1,7 @@ import type CredentialEncryption from '../../src/crypto/credential-encryption'; import type { RefreshGrantParams, RefreshGrantResult } from '../../src/oauth/refresh-grant'; import type { + McpOAuthCredentialInput, McpOAuthCredentialsStore, StoredMcpOAuthCredential, } from '../../src/ports/mcp-oauth-credentials-store'; @@ -279,7 +280,7 @@ describe('OAuthTokenService.getAccessToken', () => { refreshTokenEnc: Buffer.from('enc-rt-rotated'), scopes: 'a b c', }); - const get = jest.fn().mockResolvedValueOnce(original).mockResolvedValueOnce(latest); + const get = jest.fn().mockResolvedValueOnce(original).mockResolvedValue(latest); const upsert = jest.fn().mockResolvedValue(undefined); const refresh = jest .fn() @@ -396,3 +397,45 @@ describe('OAuthTokenService.getAccessToken', () => { }); }); }); + +// PRD-692 finding #3 (TDD, RED until fix): refreshAndCache reads the credential, runs the grant, +// then writes the rotated refresh token back via store.upsert. If the user disconnects (row DELETE) +// while the grant is in flight, the unconditional upsert re-creates the just-deleted row with a +// fresh, valid refresh token — so a disconnected user stays silently connected. The write-back must +// re-check existence (update-only-if-present); an in-process mutex alone does not close this window. +describe('OAuthTokenService — concurrent disconnect during refresh write-back (PRD-692)', () => { + it('does not resurrect a credential deleted between the snapshot read and the rotated-token write-back', async () => { + // GIVEN a stored credential, and a refresh that both rotates the refresh token and observes a + // concurrent disconnect (the row is DELETED) before the write-back runs. + let current: StoredMcpOAuthCredential | null = makeCredential(); + const store = { + get: jest.fn(async () => current), + upsert: jest.fn(async (credential: McpOAuthCredentialInput) => { + current = { id: 99, ...credential }; + }), + delete: jest.fn(async () => { + current = null; + }), + } as unknown as McpOAuthCredentialsStore; + + const refresh = jest.fn(async () => { + // The disconnect lands after refreshAndCache's snapshot read, before persistRotatedRefreshToken. + await store.delete(USER_ID, SERVER_ID); + + return { accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rotated-rt' }; + }); + + const encryption = { + decrypt: (buf: Buffer) => buf.toString(), + encrypt: (plain: string) => ({ ciphertext: Buffer.from(`enc:${plain}`) }), + } as unknown as CredentialEncryption; + + const service = new OAuthTokenService({ store, encryption, refreshAccessToken: refresh }); + + // WHEN a token is acquired, triggering the refresh and the rotated-token write-back. + await service.getAccessToken(USER_ID, SERVER_ID); + + // THEN the deleted credential must stay deleted — the write-back must not recreate it. + expect(await store.get(USER_ID, SERVER_ID)).toBeNull(); + }); +}); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index bca43c0321..d144b4aec4 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -80,6 +80,7 @@ function createMockRunStore(overrides: Partial = {}): jest.Mocked; } @@ -106,6 +107,7 @@ function createRunnerConfig( close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), } as unknown as RunStore, pollingIntervalS: POLLING_INTERVAL_S, aiModelPort: createMockAiClient() as unknown as AiModelPort, diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index cd385b49a0..6e3ce0d4b3 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -27,6 +27,32 @@ describe('DatabaseStore (SQLite)', () => { await store.close(); }); + it('clears the step execution for a given (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([]); + }); + + it('leaves other steps in the same run untouched when clearing one step', async () => { + const kept = makeStepExecution({ stepIndex: 1, type: 'read-record' } as never); + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + await store.saveStepExecution('run-1', kept); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([kept]); + }); + + it('is a no-op when no execution exists for that (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 99); + + expect((await store.getStepExecutions('run-1')).map(s => s.stepIndex)).toEqual([0]); + }); + it('returns empty array for unknown runId', async () => { const result = await store.getStepExecutions('unknown'); expect(result).toEqual([]); diff --git a/packages/workflow-executor/test/stores/in-memory-store.test.ts b/packages/workflow-executor/test/stores/in-memory-store.test.ts index adfe353da8..d9aeb2497d 100644 --- a/packages/workflow-executor/test/stores/in-memory-store.test.ts +++ b/packages/workflow-executor/test/stores/in-memory-store.test.ts @@ -19,6 +19,32 @@ describe('InMemoryStore', () => { store = new InMemoryStore(); }); + it('clears the step execution for a given (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([]); + }); + + it('leaves other steps in the same run untouched when clearing one step', async () => { + const kept = makeStepExecution({ stepIndex: 1, type: 'read-record' } as never); + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + await store.saveStepExecution('run-1', kept); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([kept]); + }); + + it('is a no-op when no execution exists for that (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 99); + + expect((await store.getStepExecutions('run-1')).map(s => s.stepIndex)).toEqual([0]); + }); + it('returns empty array for unknown runId', async () => { const result = await store.getStepExecutions('unknown'); expect(result).toEqual([]); From b5039db2162d6bf910967c3a485310c8182f08eb Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Mon, 29 Jun 2026 22:45:38 +0200 Subject: [PATCH 2/8] test(workflow-executor): strengthen reauth-pause audit assertion Assert the activity entry is closed as succeeded on a re-auth pause, not just that it was never failed. Drop the stale TDD/iteration framing and ticket references from the re-auth pause test block. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../test/executors/mcp-step-executor.test.ts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index 20d78703a8..b94149d44d 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -1250,14 +1250,9 @@ describe('McpStepExecutor — OAuth2 tool-call re-authentication', () => { }); }); -// PRD-692 hardening — three correctness findings on the executor OAuth runtime that are NOT yet -// fixed (this is the TDD step). Each test below is RED against the current code and must go green -// once the fix lands. -// #1 (HIGH): a re-auth pause leaves idempotencyPhase 'executing', so the resumed step throws -// StepStateError ("interrupted") instead of running. -// #2 (MED) : the OAuthReauthRequiredError thrown inside activityLog.track marks the activity as -// failed, so the audit trail shows a spurious failure for a normal re-auth pause. -describe('McpStepExecutor — PRD-692 re-auth pause hardening', () => { +// On a re-auth pause the executor must clear the 'executing' write-ahead marker (so the resumed +// step is not rejected as interrupted) and record the pause as a non-failure in the activity log. +describe('McpStepExecutor — re-auth pause hardening', () => { const authError = () => new Error('Request failed with status 401'); // A FullyAutomated step whose tool call 401s and whose refresh cannot recover → re-auth pause. @@ -1276,7 +1271,7 @@ describe('McpStepExecutor — PRD-692 re-auth pause hardening', () => { return new McpStepExecutor(context, [tool], 'srv', reloadWithFreshAuth); } - describe('finding #1 — idempotency phase cleared on the re-auth pause path', () => { + describe('idempotency phase cleared on the re-auth pause path', () => { it('does not leave the step execution marked executing after pausing for re-auth', async () => { // GIVEN a real store so the persisted write-ahead marker is observable. const store = new InMemoryStore(); @@ -1320,7 +1315,7 @@ describe('McpStepExecutor — PRD-692 re-auth pause hardening', () => { }); }); - describe('finding #2 — re-auth pause is not a logged failure', () => { + describe('re-auth pause is not a logged failure', () => { it('opens an activity-log entry but does not mark it failed when the step pauses for re-auth', async () => { // GIVEN an activity-log port we can inspect. const activityLogPort = { @@ -1347,10 +1342,11 @@ describe('McpStepExecutor — PRD-692 re-auth pause hardening', () => { reloadWithFreshAuth, ).execute(); - // THEN the activity entry is opened but a normal re-auth pause is not recorded as a failure. + // THEN the activity entry is opened and closed as succeeded, never failed. expect(result.stepOutcome.status).toBe('awaiting-input'); expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); expect(activityLogPort.markFailed).not.toHaveBeenCalled(); + expect(activityLogPort.markSucceeded).toHaveBeenCalledTimes(1); }); }); }); From 1612e0ecf19da6c4bf3eff8ab3306b0bf5fafcde Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 09:36:56 +0200 Subject: [PATCH 3/8] fix(workflow-executor): make OAuth refresh write-back atomic Replace the get-then-upsert existence re-check with an atomic updateIfPresent (UPDATE ... WHERE) on the credentials store, closing the read-to-write window where a concurrent disconnect could be resurrected. upsert stays for the consent deposit path. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/oauth/token-service.ts | 9 ++-- .../src/ports/mcp-oauth-credentials-store.ts | 3 ++ .../database-mcp-oauth-credentials-store.ts | 27 +++++++++++ .../in-memory-mcp-oauth-credentials-store.ts | 8 ++++ .../http/mcp-oauth-credentials-route.test.ts | 1 + .../test/oauth/token-service.test.ts | 46 +++++++++---------- ...tabase-mcp-oauth-credentials-store.test.ts | 17 +++++++ ...memory-mcp-oauth-credentials-store.test.ts | 18 ++++++++ 8 files changed, 100 insertions(+), 29 deletions(-) diff --git a/packages/workflow-executor/src/oauth/token-service.ts b/packages/workflow-executor/src/oauth/token-service.ts index c7af7316fe..1f0f5fb8b2 100644 --- a/packages/workflow-executor/src/oauth/token-service.ts +++ b/packages/workflow-executor/src/oauth/token-service.ts @@ -186,12 +186,9 @@ export default class OAuthTokenService { try { const encrypted = this.encryption.encrypt(refreshToken); - // A disconnect (DELETE) may have landed after the grant read; re-check before writing so the - // rotated token does not resurrect a row the user just removed. - const current = await this.store.get(credential.userId, credential.mcpServerId); - if (!current) return; - - await this.store.upsert({ + // Update-only: if a disconnect (DELETE) landed after the grant read, the write-back must not + // re-create the row. updateIfPresent is atomic, so it closes the read→write race entirely. + await this.store.updateIfPresent({ userId: credential.userId, mcpServerId: credential.mcpServerId, refreshTokenEnc: encrypted.ciphertext, diff --git a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts index e0b247c568..c7060a9e77 100644 --- a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts @@ -24,5 +24,8 @@ export interface McpOAuthCredentialsStore { close(logger?: Logger): Promise; get(userId: number, mcpServerId: string): Promise; upsert(credential: McpOAuthCredentialInput): Promise; + // Atomic update-only: writes the fields to an existing (userId, mcpServerId) row, never inserts. + // A no-op if the row is gone, so a refresh write-back cannot resurrect a concurrently-deleted row. + updateIfPresent(credential: McpOAuthCredentialInput): Promise; delete(userId: number, mcpServerId: string): Promise; } diff --git a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts index 8a86774894..484513a0fb 100644 --- a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts @@ -196,6 +196,33 @@ export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredent }); } + async updateIfPresent(credential: McpOAuthCredentialInput): Promise { + // Single UPDATE … WHERE — atomic against a concurrent DELETE: it updates the row if it still + // exists and affects zero rows otherwise, so it never re-creates a disconnected credential. + await this.sequelize.query( + `UPDATE ${this.tableReference} SET ` + + 'refresh_token_enc = :refreshTokenEnc, client_id = :clientId, ' + + 'client_secret_enc = :clientSecretEnc, client_secret_expires_at = :clientSecretExpiresAt, ' + + 'token_endpoint = :tokenEndpoint, token_endpoint_auth_method = :tokenEndpointAuthMethod, ' + + 'scopes = :scopes, updated_at = :now ' + + 'WHERE user_id = :userId AND mcp_server_id = :mcpServerId', + { + replacements: { + userId: credential.userId, + mcpServerId: credential.mcpServerId, + refreshTokenEnc: credential.refreshTokenEnc, + clientId: credential.clientId ?? null, + clientSecretEnc: credential.clientSecretEnc ?? null, + clientSecretExpiresAt: credential.clientSecretExpiresAt ?? null, + tokenEndpoint: credential.tokenEndpoint, + tokenEndpointAuthMethod: credential.tokenEndpointAuthMethod ?? null, + scopes: credential.scopes ?? null, + now: new Date(), + }, + }, + ); + } + async delete(userId: number, mcpServerId: string): Promise { await this.sequelize.query( `DELETE FROM ${this.tableReference} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`, diff --git a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts index 29b7f9de59..05c9d1b3ff 100644 --- a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts @@ -32,6 +32,14 @@ export default class InMemoryMcpOAuthCredentialsStore implements McpOAuthCredent this.nextId += 1; } + async updateIfPresent(credential: McpOAuthCredentialInput): Promise { + const key = InMemoryMcpOAuthCredentialsStore.key(credential.userId, credential.mcpServerId); + const existing = this.data.get(key); + if (!existing) return; + + this.data.set(key, { ...credential, id: existing.id }); + } + async delete(userId: number, mcpServerId: string): Promise { this.data.delete(InMemoryMcpOAuthCredentialsStore.key(userId, mcpServerId)); } diff --git a/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts index d88fc08024..d533cd0e83 100644 --- a/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts +++ b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts @@ -55,6 +55,7 @@ function createMockStore() { return { init: jest.fn().mockResolvedValue(undefined), upsert: jest.fn().mockResolvedValue(undefined), + updateIfPresent: jest.fn().mockResolvedValue(undefined), get: jest.fn().mockResolvedValue(null), delete: jest.fn().mockResolvedValue(undefined), close: jest.fn().mockResolvedValue(undefined), diff --git a/packages/workflow-executor/test/oauth/token-service.test.ts b/packages/workflow-executor/test/oauth/token-service.test.ts index bbd6baf6eb..24a3c09a0a 100644 --- a/packages/workflow-executor/test/oauth/token-service.test.ts +++ b/packages/workflow-executor/test/oauth/token-service.test.ts @@ -43,7 +43,8 @@ function setup(options?: { const credential = options?.credential === undefined ? makeCredential() : options.credential; const get = jest.fn().mockResolvedValue(credential); const upsert = jest.fn().mockResolvedValue(undefined); - const store = { get, upsert } as unknown as McpOAuthCredentialsStore; + const updateIfPresent = jest.fn().mockResolvedValue(undefined); + const store = { get, upsert, updateIfPresent } as unknown as McpOAuthCredentialsStore; const decrypt = jest.fn((buf: Buffer) => `decrypted:${buf.toString()}`); const encrypt = jest.fn((plain: string) => ({ @@ -69,6 +70,7 @@ function setup(options?: { service, get, upsert, + updateIfPresent, decrypt, encrypt, refresh, @@ -201,12 +203,12 @@ describe('OAuthTokenService.getAccessToken', () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, upsert, encrypt } = setup({ refresh }); + const { service, updateIfPresent, encrypt } = setup({ refresh }); await service.getAccessToken(USER_ID, SERVER_ID); expect(encrypt).toHaveBeenCalledWith('rt-2'); - expect(upsert).toHaveBeenCalledWith( + expect(updateIfPresent).toHaveBeenCalledWith( expect.objectContaining({ userId: USER_ID, mcpServerId: SERVER_ID, @@ -216,19 +218,19 @@ describe('OAuthTokenService.getAccessToken', () => { }); it('does not write back when the grant returns no new refresh token', async () => { - const { service, upsert } = setup(); + const { service, updateIfPresent } = setup(); await service.getAccessToken(USER_ID, SERVER_ID); - expect(upsert).not.toHaveBeenCalled(); + expect(updateIfPresent).not.toHaveBeenCalled(); }); it('still returns the token when the rotation write-back fails', async () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, upsert } = setup({ refresh }); - (upsert as jest.Mock).mockRejectedValue(new Error('db down')); + const { service, updateIfPresent } = setup({ refresh }); + (updateIfPresent as jest.Mock).mockRejectedValue(new Error('db down')); await expect(service.getAccessToken(USER_ID, SERVER_ID)).resolves.toBe('at-1'); }); @@ -237,13 +239,13 @@ describe('OAuthTokenService.getAccessToken', () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, encrypt, upsert } = setup({ refresh }); + const { service, encrypt, updateIfPresent } = setup({ refresh }); (encrypt as jest.Mock).mockImplementation(() => { throw new Error('key unavailable'); }); await expect(service.getAccessToken(USER_ID, SERVER_ID)).resolves.toBe('at-1'); - expect(upsert).not.toHaveBeenCalled(); + expect(updateIfPresent).not.toHaveBeenCalled(); }); }); @@ -280,15 +282,15 @@ describe('OAuthTokenService.getAccessToken', () => { refreshTokenEnc: Buffer.from('enc-rt-rotated'), scopes: 'a b c', }); - const get = jest.fn().mockResolvedValueOnce(original).mockResolvedValue(latest); - const upsert = jest.fn().mockResolvedValue(undefined); + const get = jest.fn().mockResolvedValueOnce(original).mockResolvedValueOnce(latest); + const updateIfPresent = jest.fn().mockResolvedValue(undefined); const refresh = jest .fn() .mockRejectedValueOnce(new OAuthInvalidGrantError()) .mockResolvedValueOnce({ accessToken: 'at', expiresInS: 3600, refreshToken: 'rt-3' }); const service = new OAuthTokenService({ - store: { get, upsert } as unknown as McpOAuthCredentialsStore, + store: { get, upsert: jest.fn(), updateIfPresent } as unknown as McpOAuthCredentialsStore, encryption: { decrypt: (buf: Buffer) => buf.toString(), encrypt: () => ({ ciphertext: Buffer.from('enc:rt-3') }), @@ -298,7 +300,7 @@ describe('OAuthTokenService.getAccessToken', () => { await service.getAccessToken(USER_ID, SERVER_ID); - expect(upsert).toHaveBeenCalledWith(expect.objectContaining({ scopes: 'a b c' })); + expect(updateIfPresent).toHaveBeenCalledWith(expect.objectContaining({ scopes: 'a b c' })); }); it('raises OAuthReauthRequiredError when the re-read shows the same (unrotated) token', async () => { @@ -398,20 +400,18 @@ describe('OAuthTokenService.getAccessToken', () => { }); }); -// PRD-692 finding #3 (TDD, RED until fix): refreshAndCache reads the credential, runs the grant, -// then writes the rotated refresh token back via store.upsert. If the user disconnects (row DELETE) -// while the grant is in flight, the unconditional upsert re-creates the just-deleted row with a -// fresh, valid refresh token — so a disconnected user stays silently connected. The write-back must -// re-check existence (update-only-if-present); an in-process mutex alone does not close this window. -describe('OAuthTokenService — concurrent disconnect during refresh write-back (PRD-692)', () => { +// A disconnect (DELETE) landing between the grant read and the rotated-token write-back must not +// re-create the row. The write-back uses the atomic update-only path, so a deleted credential stays +// deleted instead of being silently resurrected with a fresh, valid refresh token. +describe('OAuthTokenService — concurrent disconnect during refresh write-back', () => { it('does not resurrect a credential deleted between the snapshot read and the rotated-token write-back', async () => { - // GIVEN a stored credential, and a refresh that both rotates the refresh token and observes a - // concurrent disconnect (the row is DELETED) before the write-back runs. + // GIVEN a stored credential and a refresh that rotates the token while a concurrent disconnect + // (the row is DELETED) lands before the write-back; updateIfPresent updates only an existing row. let current: StoredMcpOAuthCredential | null = makeCredential(); const store = { get: jest.fn(async () => current), - upsert: jest.fn(async (credential: McpOAuthCredentialInput) => { - current = { id: 99, ...credential }; + updateIfPresent: jest.fn(async (credential: McpOAuthCredentialInput) => { + if (current) current = { ...credential, id: current.id }; }), delete: jest.fn(async () => { current = null; diff --git a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts index dd3ae82e35..6f15ccb8fb 100644 --- a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts @@ -169,6 +169,23 @@ describe('DatabaseMcpOAuthCredentialsStore (SQLite)', () => { }); }); + describe('updateIfPresent', () => { + it('updates an existing row in place', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + + await store.updateIfPresent(makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + expect(row.refreshTokenEnc.toString()).toBe('rotated'); + }); + + it('does not insert a row when none exists for the key', async () => { + await store.updateIfPresent(makeCredential()); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + }); + describe('isolation', () => { it('keeps credentials for the same server but different users separate', async () => { await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); diff --git a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts index 9b3c43f7cf..ada193c85d 100644 --- a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts @@ -86,6 +86,24 @@ describe('InMemoryMcpOAuthCredentialsStore', () => { }); }); + describe('updateIfPresent', () => { + it('updates an existing row in place', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + + await store.updateIfPresent(makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + + expect(unwrap(await store.get(42, 'mcp-server-1')).refreshTokenEnc.toString()).toBe( + 'rotated', + ); + }); + + it('does not insert when the row is absent', async () => { + await store.updateIfPresent(makeCredential()); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + }); + describe('isolation', () => { it('keeps entries for different users and servers separate', async () => { await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); From 3afe056fd413077199d318f3972eef564e7ac0be Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 10:27:46 +0200 Subject: [PATCH 4/8] fix(workflow-executor): audit a re-auth-paused MCP call as failed Revert the isNonFailure audit special-casing on the re-auth pause path. An MCP tool call that 401s has genuinely failed, so it should surface as a failed audit entry (useful for observability) rather than be recorded as completed. The step still pauses (awaiting-input) and the resumed run logs its own entry. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/executors/activity-log.ts | 11 ++--------- .../src/executors/mcp-step-executor.ts | 1 - .../test/executors/activity-log.test.ts | 18 ------------------ .../test/executors/mcp-step-executor.test.ts | 14 +++++++------- 4 files changed, 9 insertions(+), 35 deletions(-) diff --git a/packages/workflow-executor/src/executors/activity-log.ts b/packages/workflow-executor/src/executors/activity-log.ts index a936b62bb1..4749fb2bd5 100644 --- a/packages/workflow-executor/src/executors/activity-log.ts +++ b/packages/workflow-executor/src/executors/activity-log.ts @@ -9,9 +9,6 @@ export type TrackOptions = { // Runs between createPending and the operation — the executor's write-ahead marker. Optional: // read operations have no marker to persist. beforeCall?: () => Promise; - // Errors that represent a controlled interruption (e.g. pausing for re-authentication) rather - // than a failed action: the entry is closed as succeeded, not failed, and the error still throws. - isNonFailure?: (error: unknown) => boolean; }; // Runs an operation while recording an activity-log entry around it (pending → success/failed). @@ -28,10 +25,7 @@ export default class ActivityLog { this.user = user; } - async track( - target: AuditTarget, - { operation, beforeCall, isNonFailure }: TrackOptions, - ): Promise { + async track(target: AuditTarget, { operation, beforeCall }: TrackOptions): Promise { const handle = await this.activityLogPort.createPending({ renderingId: this.user.renderingId, ...target, @@ -46,8 +40,7 @@ export default class ActivityLog { } catch (err) { // The step error is logged/surfaced by base-step-executor when rethrown, so the audit // transition only needs the handle. - if (isNonFailure?.(err)) void this.activityLogPort.markSucceeded(handle); - else void this.activityLogPort.markFailed(handle); + void this.activityLogPort.markFailed(handle); throw err; } } diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 71a87eeb92..8725f33c98 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -158,7 +158,6 @@ export default class McpStepExecutor extends BaseStepExecutor stepIndex: this.context.stepIndex, idempotencyPhase: 'executing', }), - isNonFailure: error => error instanceof OAuthReauthRequiredError, }, ); diff --git a/packages/workflow-executor/test/executors/activity-log.test.ts b/packages/workflow-executor/test/executors/activity-log.test.ts index e91d69db2d..ae40901dfb 100644 --- a/packages/workflow-executor/test/executors/activity-log.test.ts +++ b/packages/workflow-executor/test/executors/activity-log.test.ts @@ -127,23 +127,5 @@ describe('ActivityLog', () => { expect(port.markFailed).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); expect(port.markSucceeded).not.toHaveBeenCalled(); }); - - it('closes the entry as succeeded (not failed) when the error is classified as a non-failure', async () => { - const port = makeActivityLogPort(); - const activityLog = new ActivityLog(port, makeUser()); - const interruption = new NoRecordsError(); - - await expect( - activityLog.track(TARGET, { - operation: async () => { - throw interruption; - }, - isNonFailure: error => error === interruption, - }), - ).rejects.toBe(interruption); - - expect(port.markSucceeded).toHaveBeenCalledWith({ id: 'log-1', index: '0' }); - expect(port.markFailed).not.toHaveBeenCalled(); - }); }); }); diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index b94149d44d..5c539dcec8 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -1250,8 +1250,8 @@ describe('McpStepExecutor — OAuth2 tool-call re-authentication', () => { }); }); -// On a re-auth pause the executor must clear the 'executing' write-ahead marker (so the resumed -// step is not rejected as interrupted) and record the pause as a non-failure in the activity log. +// On a re-auth pause the executor clears the 'executing' write-ahead marker so the resumed step is +// not rejected as interrupted; the failed tool call still surfaces as a failed audit-log entry. describe('McpStepExecutor — re-auth pause hardening', () => { const authError = () => new Error('Request failed with status 401'); @@ -1315,8 +1315,8 @@ describe('McpStepExecutor — re-auth pause hardening', () => { }); }); - describe('re-auth pause is not a logged failure', () => { - it('opens an activity-log entry but does not mark it failed when the step pauses for re-auth', async () => { + describe('re-auth pause surfaces the tool-call failure in the audit log', () => { + it('marks the activity-log entry failed while the step pauses for re-auth', async () => { // GIVEN an activity-log port we can inspect. const activityLogPort = { createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), @@ -1342,11 +1342,11 @@ describe('McpStepExecutor — re-auth pause hardening', () => { reloadWithFreshAuth, ).execute(); - // THEN the activity entry is opened and closed as succeeded, never failed. + // THEN the failed tool call is audited as failed, while the step itself pauses (not errors). expect(result.stepOutcome.status).toBe('awaiting-input'); expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); - expect(activityLogPort.markFailed).not.toHaveBeenCalled(); - expect(activityLogPort.markSucceeded).toHaveBeenCalledTimes(1); + expect(activityLogPort.markFailed).toHaveBeenCalledTimes(1); + expect(activityLogPort.markSucceeded).not.toHaveBeenCalled(); }); }); }); From c7ac43a97dc2c394f8c20961459435d2a7ac4abf Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 11:03:00 +0200 Subject: [PATCH 5/8] fix(workflow-executor): correct reauth-pause cleanup and write-back scope Addresses review findings on the re-auth pause and refresh write-back: - preserve a confirmation-flow step's approved pendingData on a re-auth pause (clear only the marker) so resume replays that exact call; delete only when there is no pendingData (FullyAutomated), which would otherwise mis-route the resumed step into the confirmation flow - make the pause cleanup best-effort so a store error still returns awaiting-input instead of a hard failure - key updateIfPresent on the row id so a disconnect + re-authorize is not clobbered by a stale in-flight write-back (a re-created row has a new id) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/executors/mcp-step-executor.ts | 29 +++++++++- .../src/oauth/token-service.ts | 7 ++- .../src/ports/mcp-oauth-credentials-store.ts | 7 ++- .../database-mcp-oauth-credentials-store.ts | 12 ++-- .../in-memory-mcp-oauth-credentials-store.ts | 7 ++- .../test/executors/mcp-step-executor.test.ts | 55 +++++++++++++++++++ .../test/oauth/token-service.test.ts | 7 ++- ...tabase-mcp-oauth-credentials-store.test.ts | 19 ++++++- ...memory-mcp-oauth-credentials-store.test.ts | 20 ++++++- 9 files changed, 138 insertions(+), 25 deletions(-) diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 8725f33c98..82579aeb2e 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -91,7 +91,7 @@ export default class McpStepExecutor extends BaseStepExecutor // An unrefreshable OAuth credential pauses the step for re-authentication rather than failing // it. Clear the write-ahead marker so the resumed step is not rejected as interrupted. if (error instanceof OAuthReauthRequiredError) { - await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); + await this.clearReauthPauseState(); return this.buildOutcomeResult({ status: 'awaiting-input', @@ -103,6 +103,33 @@ export default class McpStepExecutor extends BaseStepExecutor } } + // Drop the 'executing' write-ahead marker left by beforeCall so the resumed step is not rejected + // as interrupted. A confirmation-flow record carries the user-approved pendingData — preserve it + // (clear only the phase) so resume replays that exact call instead of re-selecting a tool; a + // record with no pendingData would mis-route the resumed step into the confirmation flow, so + // delete it. Best-effort: a store error here must not turn the pause into a hard failure. + private async clearReauthPauseState(): Promise { + try { + const existing = await this.findPendingExecution('mcp'); + if (!existing) return; + + if (existing.pendingData) { + await this.context.runStore.saveStepExecution(this.context.runId, { + ...existing, + idempotencyPhase: undefined, + }); + } else { + await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); + } + } catch (cause) { + this.context.logger('Error', 'Failed to clear re-auth pause state; resume may need a retry', { + runId: this.context.runId, + stepIndex: this.context.stepIndex, + cause: cause instanceof Error ? cause.message : String(cause), + }); + } + } + private async runStep(): Promise { const pending = await this.patchAndReloadPendingData( this.context.incomingPendingData, diff --git a/packages/workflow-executor/src/oauth/token-service.ts b/packages/workflow-executor/src/oauth/token-service.ts index 1f0f5fb8b2..d20a92ceee 100644 --- a/packages/workflow-executor/src/oauth/token-service.ts +++ b/packages/workflow-executor/src/oauth/token-service.ts @@ -186,9 +186,10 @@ export default class OAuthTokenService { try { const encrypted = this.encryption.encrypt(refreshToken); - // Update-only: if a disconnect (DELETE) landed after the grant read, the write-back must not - // re-create the row. updateIfPresent is atomic, so it closes the read→write race entirely. - await this.store.updateIfPresent({ + // Update-only, keyed on the id we read: if a disconnect (DELETE) — or a disconnect plus a + // re-authorize that inserts a new row — landed after the grant read, the write-back must not + // resurrect or clobber it. updateIfPresent is atomic and id-scoped, closing the race entirely. + await this.store.updateIfPresent(credential.id, { userId: credential.userId, mcpServerId: credential.mcpServerId, refreshTokenEnc: encrypted.ciphertext, diff --git a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts index c7060a9e77..6a79e32573 100644 --- a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts @@ -24,8 +24,9 @@ export interface McpOAuthCredentialsStore { close(logger?: Logger): Promise; get(userId: number, mcpServerId: string): Promise; upsert(credential: McpOAuthCredentialInput): Promise; - // Atomic update-only: writes the fields to an existing (userId, mcpServerId) row, never inserts. - // A no-op if the row is gone, so a refresh write-back cannot resurrect a concurrently-deleted row. - updateIfPresent(credential: McpOAuthCredentialInput): Promise; + // Atomic update-only, keyed on the row `id` the caller read: writes the fields to that exact row, + // never inserts. A no-op if the row is gone — or was deleted and re-created with a new id — so a + // stale refresh write-back cannot resurrect a deleted credential nor clobber a re-authorized one. + updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise; delete(userId: number, mcpServerId: string): Promise; } diff --git a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts index 484513a0fb..6ad1c18073 100644 --- a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts @@ -196,20 +196,20 @@ export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredent }); } - async updateIfPresent(credential: McpOAuthCredentialInput): Promise { - // Single UPDATE … WHERE — atomic against a concurrent DELETE: it updates the row if it still - // exists and affects zero rows otherwise, so it never re-creates a disconnected credential. + async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise { + // Single UPDATE … WHERE id — atomic against a concurrent DELETE and keyed on the exact row the + // caller read: it affects zero rows when that row was deleted (or deleted and re-created with a + // new id), so it never resurrects a disconnected credential nor clobbers a re-authorized one. await this.sequelize.query( `UPDATE ${this.tableReference} SET ` + 'refresh_token_enc = :refreshTokenEnc, client_id = :clientId, ' + 'client_secret_enc = :clientSecretEnc, client_secret_expires_at = :clientSecretExpiresAt, ' + 'token_endpoint = :tokenEndpoint, token_endpoint_auth_method = :tokenEndpointAuthMethod, ' + 'scopes = :scopes, updated_at = :now ' + - 'WHERE user_id = :userId AND mcp_server_id = :mcpServerId', + 'WHERE id = :id', { replacements: { - userId: credential.userId, - mcpServerId: credential.mcpServerId, + id, refreshTokenEnc: credential.refreshTokenEnc, clientId: credential.clientId ?? null, clientSecretEnc: credential.clientSecretEnc ?? null, diff --git a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts index 05c9d1b3ff..f1e8ff9607 100644 --- a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts @@ -32,12 +32,13 @@ export default class InMemoryMcpOAuthCredentialsStore implements McpOAuthCredent this.nextId += 1; } - async updateIfPresent(credential: McpOAuthCredentialInput): Promise { + async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise { const key = InMemoryMcpOAuthCredentialsStore.key(credential.userId, credential.mcpServerId); const existing = this.data.get(key); - if (!existing) return; + // Only update the exact row the caller read; a re-created row has a new id, so skip it. + if (!existing || existing.id !== id) return; - this.data.set(key, { ...credential, id: existing.id }); + this.data.set(key, { ...credential, id }); } async delete(userId: number, mcpServerId: string): Promise { diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index 5c539dcec8..dc66a87407 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -1313,6 +1313,61 @@ describe('McpStepExecutor — re-auth pause hardening', () => { // step runs to success. expect(resumed.stepOutcome.status).toBe('success'); }); + + it('preserves the approved pendingData on a confirmation-flow re-auth pause so resume replays it', async () => { + // GIVEN a confirmation-flow step with a user-approved tool call already persisted. + const store = new InMemoryStore(); + await store.saveStepExecution('run-1', { + type: 'mcp', + stepIndex: 0, + pendingData: { + name: 'send_notification', + sourceId: 'mcp-server-1', + input: { message: 'Hello' }, + }, + userConfirmation: { userConfirmed: true }, + } as McpStepExecutionData); + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ runStore: store }); + + // WHEN the approved call 401s and cannot re-auth. + const result = await new McpStepExecutor( + context, + [tool], + 'srv', + reloadWithFreshAuth, + ).execute(); + + // THEN the marker is cleared but the approved call is kept, so a resume replays it rather than + // re-selecting a tool. + expect(result.stepOutcome.status).toBe('awaiting-input'); + const persisted = (await store.getStepExecutions('run-1')).find(e => e.stepIndex === 0) as + | McpStepExecutionData + | undefined; + expect(persisted?.idempotencyPhase).not.toBe('executing'); + expect(persisted?.pendingData).toEqual({ + name: 'send_notification', + sourceId: 'mcp-server-1', + input: { message: 'Hello' }, + }); + }); + + it('still pauses when clearing the re-auth state fails (best-effort cleanup)', async () => { + // GIVEN a store whose cleanup delete rejects. + const store = new InMemoryStore(); + jest.spyOn(store, 'deleteStepExecution').mockRejectedValue(new Error('store down')); + + // WHEN a FullyAutomated step pauses for re-auth (no pendingData → cleanup goes via delete). + const result = await pauseFor(store).execute(); + + // THEN the cleanup failure is swallowed and the step still pauses. + expect(result.stepOutcome.status).toBe('awaiting-input'); + }); }); describe('re-auth pause surfaces the tool-call failure in the audit log', () => { diff --git a/packages/workflow-executor/test/oauth/token-service.test.ts b/packages/workflow-executor/test/oauth/token-service.test.ts index 24a3c09a0a..3a09af695b 100644 --- a/packages/workflow-executor/test/oauth/token-service.test.ts +++ b/packages/workflow-executor/test/oauth/token-service.test.ts @@ -209,6 +209,7 @@ describe('OAuthTokenService.getAccessToken', () => { expect(encrypt).toHaveBeenCalledWith('rt-2'); expect(updateIfPresent).toHaveBeenCalledWith( + 1, expect.objectContaining({ userId: USER_ID, mcpServerId: SERVER_ID, @@ -300,7 +301,7 @@ describe('OAuthTokenService.getAccessToken', () => { await service.getAccessToken(USER_ID, SERVER_ID); - expect(updateIfPresent).toHaveBeenCalledWith(expect.objectContaining({ scopes: 'a b c' })); + expect(updateIfPresent).toHaveBeenCalledWith(1, expect.objectContaining({ scopes: 'a b c' })); }); it('raises OAuthReauthRequiredError when the re-read shows the same (unrotated) token', async () => { @@ -410,8 +411,8 @@ describe('OAuthTokenService — concurrent disconnect during refresh write-back' let current: StoredMcpOAuthCredential | null = makeCredential(); const store = { get: jest.fn(async () => current), - updateIfPresent: jest.fn(async (credential: McpOAuthCredentialInput) => { - if (current) current = { ...credential, id: current.id }; + updateIfPresent: jest.fn(async (id: number, credential: McpOAuthCredentialInput) => { + if (current && current.id === id) current = { ...credential, id }; }), delete: jest.fn(async () => { current = null; diff --git a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts index 6f15ccb8fb..d428a01da8 100644 --- a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts @@ -170,20 +170,33 @@ describe('DatabaseMcpOAuthCredentialsStore (SQLite)', () => { }); describe('updateIfPresent', () => { - it('updates an existing row in place', async () => { + it('updates the row matching the given id in place', async () => { await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const { id } = unwrap(await store.get(42, 'mcp-server-1')); - await store.updateIfPresent(makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + await store.updateIfPresent(id, makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); const row = unwrap(await store.get(42, 'mcp-server-1')); expect(row.refreshTokenEnc.toString()).toBe('rotated'); }); it('does not insert a row when none exists for the key', async () => { - await store.updateIfPresent(makeCredential()); + await store.updateIfPresent(1, makeCredential()); expect(await store.get(42, 'mcp-server-1')).toBeNull(); }); + + it('does not touch a row that was re-created with a different id', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const staleId = unwrap(await store.get(42, 'mcp-server-1')).id; + await store.delete(42, 'mcp-server-1'); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('reauthorized') })); + + await store.updateIfPresent(staleId, makeCredential({ refreshTokenEnc: Buffer.from('rot') })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + expect(row.refreshTokenEnc.toString()).toBe('reauthorized'); + }); }); describe('isolation', () => { diff --git a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts index ada193c85d..6f1a82f1d0 100644 --- a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts @@ -87,10 +87,11 @@ describe('InMemoryMcpOAuthCredentialsStore', () => { }); describe('updateIfPresent', () => { - it('updates an existing row in place', async () => { + it('updates the row matching the given id in place', async () => { await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const { id } = unwrap(await store.get(42, 'mcp-server-1')); - await store.updateIfPresent(makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + await store.updateIfPresent(id, makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); expect(unwrap(await store.get(42, 'mcp-server-1')).refreshTokenEnc.toString()).toBe( 'rotated', @@ -98,10 +99,23 @@ describe('InMemoryMcpOAuthCredentialsStore', () => { }); it('does not insert when the row is absent', async () => { - await store.updateIfPresent(makeCredential()); + await store.updateIfPresent(1, makeCredential()); expect(await store.get(42, 'mcp-server-1')).toBeNull(); }); + + it('does not touch a row that was re-created with a different id', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const staleId = unwrap(await store.get(42, 'mcp-server-1')).id; + await store.delete(42, 'mcp-server-1'); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('reauthorized') })); + + await store.updateIfPresent(staleId, makeCredential({ refreshTokenEnc: Buffer.from('rot') })); + + expect(unwrap(await store.get(42, 'mcp-server-1')).refreshTokenEnc.toString()).toBe( + 'reauthorized', + ); + }); }); describe('isolation', () => { From eec843fb1aaeb6300c05678e2e37564193e7c1a6 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 11:10:42 +0200 Subject: [PATCH 6/8] style(workflow-executor): trim added comments to the 2-line convention Keep the non-obvious why (preserve-vs-delete on re-auth pause, id-scoped write-back); drop what the method names and code already state. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../workflow-executor/src/executors/mcp-step-executor.ts | 7 ++----- packages/workflow-executor/src/oauth/token-service.ts | 5 ++--- .../src/ports/mcp-oauth-credentials-store.ts | 5 ++--- .../src/stores/database-mcp-oauth-credentials-store.ts | 4 +--- .../workflow-executor/test/oauth/token-service.test.ts | 5 ++--- 5 files changed, 9 insertions(+), 17 deletions(-) diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 82579aeb2e..2bc2512e70 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -103,11 +103,8 @@ export default class McpStepExecutor extends BaseStepExecutor } } - // Drop the 'executing' write-ahead marker left by beforeCall so the resumed step is not rejected - // as interrupted. A confirmation-flow record carries the user-approved pendingData — preserve it - // (clear only the phase) so resume replays that exact call instead of re-selecting a tool; a - // record with no pendingData would mis-route the resumed step into the confirmation flow, so - // delete it. Best-effort: a store error here must not turn the pause into a hard failure. + // Keep a confirmation-flow record's approved pendingData (clear only the marker) so resume replays + // it; delete a pendingData-less record, which would otherwise mis-route resume into confirmation. private async clearReauthPauseState(): Promise { try { const existing = await this.findPendingExecution('mcp'); diff --git a/packages/workflow-executor/src/oauth/token-service.ts b/packages/workflow-executor/src/oauth/token-service.ts index d20a92ceee..51f129f36d 100644 --- a/packages/workflow-executor/src/oauth/token-service.ts +++ b/packages/workflow-executor/src/oauth/token-service.ts @@ -186,9 +186,8 @@ export default class OAuthTokenService { try { const encrypted = this.encryption.encrypt(refreshToken); - // Update-only, keyed on the id we read: if a disconnect (DELETE) — or a disconnect plus a - // re-authorize that inserts a new row — landed after the grant read, the write-back must not - // resurrect or clobber it. updateIfPresent is atomic and id-scoped, closing the race entirely. + // Id-scoped so a disconnect (or disconnect + re-authorize) after the grant read leaves an + // absent or different row — the rotated-token write-back then can't resurrect or clobber it. await this.store.updateIfPresent(credential.id, { userId: credential.userId, mcpServerId: credential.mcpServerId, diff --git a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts index 6a79e32573..ef745b9c1c 100644 --- a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts @@ -24,9 +24,8 @@ export interface McpOAuthCredentialsStore { close(logger?: Logger): Promise; get(userId: number, mcpServerId: string): Promise; upsert(credential: McpOAuthCredentialInput): Promise; - // Atomic update-only, keyed on the row `id` the caller read: writes the fields to that exact row, - // never inserts. A no-op if the row is gone — or was deleted and re-created with a new id — so a - // stale refresh write-back cannot resurrect a deleted credential nor clobber a re-authorized one. + // Update-only by the row id the caller read: never inserts, and no-ops if that row is gone or was + // re-created with a new id — so a stale write-back can't resurrect or clobber a credential. updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise; delete(userId: number, mcpServerId: string): Promise; } diff --git a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts index 6ad1c18073..613aa2be52 100644 --- a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts @@ -197,9 +197,7 @@ export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredent } async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise { - // Single UPDATE … WHERE id — atomic against a concurrent DELETE and keyed on the exact row the - // caller read: it affects zero rows when that row was deleted (or deleted and re-created with a - // new id), so it never resurrects a disconnected credential nor clobbers a re-authorized one. + // Single atomic UPDATE … WHERE id — affects zero rows if that row was deleted or re-created. await this.sequelize.query( `UPDATE ${this.tableReference} SET ` + 'refresh_token_enc = :refreshTokenEnc, client_id = :clientId, ' + diff --git a/packages/workflow-executor/test/oauth/token-service.test.ts b/packages/workflow-executor/test/oauth/token-service.test.ts index 3a09af695b..e72be8a798 100644 --- a/packages/workflow-executor/test/oauth/token-service.test.ts +++ b/packages/workflow-executor/test/oauth/token-service.test.ts @@ -401,9 +401,8 @@ describe('OAuthTokenService.getAccessToken', () => { }); }); -// A disconnect (DELETE) landing between the grant read and the rotated-token write-back must not -// re-create the row. The write-back uses the atomic update-only path, so a deleted credential stays -// deleted instead of being silently resurrected with a fresh, valid refresh token. +// A disconnect (DELETE) landing between the grant read and the write-back must not re-create the +// row: the atomic update-only path leaves a deleted credential deleted, not silently resurrected. describe('OAuthTokenService — concurrent disconnect during refresh write-back', () => { it('does not resurrect a credential deleted between the snapshot read and the rotated-token write-back', async () => { // GIVEN a stored credential and a refresh that rotates the token while a concurrent disconnect From f0aefab96dc5f81062afca112055352924f2396b Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 11:22:04 +0200 Subject: [PATCH 7/8] fix(workflow-executor): propagate reauth-cleanup store errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Swallowing a cleanup failure left the 'executing' marker in place, so the pause returned awaiting-input but could never resume (checkIdempotency rejects the stale marker). Let the store error propagate to an ordinary step error instead — consistent with the executor's other store-failure paths. Supersedes the earlier best-effort try/catch. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/executors/mcp-step-executor.ts | 24 +++++++------------ .../test/executors/mcp-step-executor.test.ts | 14 +++++++---- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 2bc2512e70..db688c34a8 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -106,24 +106,16 @@ export default class McpStepExecutor extends BaseStepExecutor // Keep a confirmation-flow record's approved pendingData (clear only the marker) so resume replays // it; delete a pendingData-less record, which would otherwise mis-route resume into confirmation. private async clearReauthPauseState(): Promise { - try { - const existing = await this.findPendingExecution('mcp'); - if (!existing) return; + const existing = await this.findPendingExecution('mcp'); + if (!existing) return; - if (existing.pendingData) { - await this.context.runStore.saveStepExecution(this.context.runId, { - ...existing, - idempotencyPhase: undefined, - }); - } else { - await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); - } - } catch (cause) { - this.context.logger('Error', 'Failed to clear re-auth pause state; resume may need a retry', { - runId: this.context.runId, - stepIndex: this.context.stepIndex, - cause: cause instanceof Error ? cause.message : String(cause), + if (existing.pendingData) { + await this.context.runStore.saveStepExecution(this.context.runId, { + ...existing, + idempotencyPhase: undefined, }); + } else { + await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); } } diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index dc66a87407..b57898df1a 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -1357,16 +1357,20 @@ describe('McpStepExecutor — re-auth pause hardening', () => { }); }); - it('still pauses when clearing the re-auth state fails (best-effort cleanup)', async () => { - // GIVEN a store whose cleanup delete rejects. + it('surfaces a store error from the re-auth cleanup as a step error, not a stuck pause', async () => { + // GIVEN cleanup that fails — a left-behind 'executing' marker would make the pause + // non-resumable, so the failure must surface as an error rather than a stuck awaiting-input. const store = new InMemoryStore(); - jest.spyOn(store, 'deleteStepExecution').mockRejectedValue(new Error('store down')); + jest + .spyOn(store, 'deleteStepExecution') + .mockRejectedValue(new RunStorePortError('deleteStepExecution', new Error('store down'))); // WHEN a FullyAutomated step pauses for re-auth (no pendingData → cleanup goes via delete). const result = await pauseFor(store).execute(); - // THEN the cleanup failure is swallowed and the step still pauses. - expect(result.stepOutcome.status).toBe('awaiting-input'); + // THEN the store failure propagates to a step error instead of a non-resumable pause. + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('The step state could not be accessed. Please retry.'); }); }); From 50b58c540c89949bb2c4b7210339f301041ed19b Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Tue, 30 Jun 2026 17:03:05 +0200 Subject: [PATCH 8/8] docs(workflow-executor): link the OAuth2-MCP test server Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/workflow-executor/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/workflow-executor/README.md b/packages/workflow-executor/README.md index be223ab768..471f8e9c1f 100644 --- a/packages/workflow-executor/README.md +++ b/packages/workflow-executor/README.md @@ -96,3 +96,7 @@ FOREST_AUTH_SECRET="your-auth-secret" \ AGENT_URL="https://your-agent-url" \ npx @forestadmin/workflow-executor --in-memory ``` + +### OAuth2-protected MCP + +To exercise the executor's OAuth2-protected MCP path end-to-end, point it at [mcp-oauth-test-server](https://github.com/hercemer42/mcp-oauth-test-server) — a standalone OAuth2 + MCP server that can simulate refresh-token revocation/rotation, consent denial, and upstream 403s.