diff --git a/packages/workflow-executor/README.md b/packages/workflow-executor/README.md index be223ab768..471f8e9c1f 100644 --- a/packages/workflow-executor/README.md +++ b/packages/workflow-executor/README.md @@ -96,3 +96,7 @@ FOREST_AUTH_SECRET="your-auth-secret" \ AGENT_URL="https://your-agent-url" \ npx @forestadmin/workflow-executor --in-memory ``` + +### OAuth2-protected MCP + +To exercise the executor's OAuth2-protected MCP path end-to-end, point it at [mcp-oauth-test-server](https://github.com/hercemer42/mcp-oauth-test-server) — a standalone OAuth2 + MCP server that can simulate refresh-token revocation/rotation, consent denial, and upstream 403s. diff --git a/packages/workflow-executor/src/executors/mcp-step-executor.ts b/packages/workflow-executor/src/executors/mcp-step-executor.ts index 73727ba862..db688c34a8 100644 --- a/packages/workflow-executor/src/executors/mcp-step-executor.ts +++ b/packages/workflow-executor/src/executors/mcp-step-executor.ts @@ -88,8 +88,11 @@ export default class McpStepExecutor extends BaseStepExecutor try { return await this.runStep(); } catch (error) { - // An unrefreshable OAuth credential pauses the step for re-authentication rather than failing it. + // An unrefreshable OAuth credential pauses the step for re-authentication rather than failing + // it. Clear the write-ahead marker so the resumed step is not rejected as interrupted. if (error instanceof OAuthReauthRequiredError) { + await this.clearReauthPauseState(); + return this.buildOutcomeResult({ status: 'awaiting-input', awaitingInputReason: error.awaitingInputReason, @@ -100,6 +103,22 @@ export default class McpStepExecutor extends BaseStepExecutor } } + // Keep a confirmation-flow record's approved pendingData (clear only the marker) so resume replays + // it; delete a pendingData-less record, which would otherwise mis-route resume into confirmation. + private async clearReauthPauseState(): Promise { + const existing = await this.findPendingExecution('mcp'); + if (!existing) return; + + if (existing.pendingData) { + await this.context.runStore.saveStepExecution(this.context.runId, { + ...existing, + idempotencyPhase: undefined, + }); + } else { + await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex); + } + } + private async runStep(): Promise { const pending = await this.patchAndReloadPendingData( this.context.incomingPendingData, diff --git a/packages/workflow-executor/src/oauth/token-service.ts b/packages/workflow-executor/src/oauth/token-service.ts index 5c06e9104c..51f129f36d 100644 --- a/packages/workflow-executor/src/oauth/token-service.ts +++ b/packages/workflow-executor/src/oauth/token-service.ts @@ -186,7 +186,9 @@ export default class OAuthTokenService { try { const encrypted = this.encryption.encrypt(refreshToken); - await this.store.upsert({ + // Id-scoped so a disconnect (or disconnect + re-authorize) after the grant read leaves an + // absent or different row — the rotated-token write-back then can't resurrect or clobber it. + await this.store.updateIfPresent(credential.id, { userId: credential.userId, mcpServerId: credential.mcpServerId, refreshTokenEnc: encrypted.ciphertext, diff --git a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts index e0b247c568..ef745b9c1c 100644 --- a/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/ports/mcp-oauth-credentials-store.ts @@ -24,5 +24,8 @@ export interface McpOAuthCredentialsStore { close(logger?: Logger): Promise; get(userId: number, mcpServerId: string): Promise; upsert(credential: McpOAuthCredentialInput): Promise; + // Update-only by the row id the caller read: never inserts, and no-ops if that row is gone or was + // re-created with a new id — so a stale write-back can't resurrect or clobber a credential. + updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise; delete(userId: number, mcpServerId: string): Promise; } diff --git a/packages/workflow-executor/src/ports/run-store.ts b/packages/workflow-executor/src/ports/run-store.ts index de5a2da1ab..7e30facbfb 100644 --- a/packages/workflow-executor/src/ports/run-store.ts +++ b/packages/workflow-executor/src/ports/run-store.ts @@ -6,4 +6,5 @@ export interface RunStore { close(logger?: Logger): Promise; getStepExecutions(runId: string): Promise; saveStepExecution(runId: string, stepExecution: StepExecutionData): Promise; + deleteStepExecution(runId: string, stepIndex: number): Promise; } diff --git a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts index 8a86774894..613aa2be52 100644 --- a/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/database-mcp-oauth-credentials-store.ts @@ -196,6 +196,31 @@ export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredent }); } + async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise { + // Single atomic UPDATE … WHERE id — affects zero rows if that row was deleted or re-created. + await this.sequelize.query( + `UPDATE ${this.tableReference} SET ` + + 'refresh_token_enc = :refreshTokenEnc, client_id = :clientId, ' + + 'client_secret_enc = :clientSecretEnc, client_secret_expires_at = :clientSecretExpiresAt, ' + + 'token_endpoint = :tokenEndpoint, token_endpoint_auth_method = :tokenEndpointAuthMethod, ' + + 'scopes = :scopes, updated_at = :now ' + + 'WHERE id = :id', + { + replacements: { + id, + refreshTokenEnc: credential.refreshTokenEnc, + clientId: credential.clientId ?? null, + clientSecretEnc: credential.clientSecretEnc ?? null, + clientSecretExpiresAt: credential.clientSecretExpiresAt ?? null, + tokenEndpoint: credential.tokenEndpoint, + tokenEndpointAuthMethod: credential.tokenEndpointAuthMethod ?? null, + scopes: credential.scopes ?? null, + now: new Date(), + }, + }, + ); + } + async delete(userId: number, mcpServerId: string): Promise { await this.sequelize.query( `DELETE FROM ${this.tableReference} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`, diff --git a/packages/workflow-executor/src/stores/database-store.ts b/packages/workflow-executor/src/stores/database-store.ts index 1493095d80..8a90ef234c 100644 --- a/packages/workflow-executor/src/stores/database-store.ts +++ b/packages/workflow-executor/src/stores/database-store.ts @@ -159,6 +159,15 @@ export default class DatabaseStore implements RunStore { }); } + async deleteStepExecution(runId: string, stepIndex: number): Promise { + return this.callPort('deleteStepExecution', async () => { + await this.sequelize.query( + `DELETE FROM ${this.tableReference} WHERE run_id = :runId AND step_index = :stepIndex`, + { replacements: { runId, stepIndex } }, + ); + }); + } + async close(logger?: Logger): Promise { return this.callPort('close', async () => { try { diff --git a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts index 29b7f9de59..f1e8ff9607 100644 --- a/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts +++ b/packages/workflow-executor/src/stores/in-memory-mcp-oauth-credentials-store.ts @@ -32,6 +32,15 @@ export default class InMemoryMcpOAuthCredentialsStore implements McpOAuthCredent this.nextId += 1; } + async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise { + const key = InMemoryMcpOAuthCredentialsStore.key(credential.userId, credential.mcpServerId); + const existing = this.data.get(key); + // Only update the exact row the caller read; a re-created row has a new id, so skip it. + if (!existing || existing.id !== id) return; + + this.data.set(key, { ...credential, id }); + } + async delete(userId: number, mcpServerId: string): Promise { this.data.delete(InMemoryMcpOAuthCredentialsStore.key(userId, mcpServerId)); } diff --git a/packages/workflow-executor/src/stores/in-memory-store.ts b/packages/workflow-executor/src/stores/in-memory-store.ts index 90117d70aa..6a20242155 100644 --- a/packages/workflow-executor/src/stores/in-memory-store.ts +++ b/packages/workflow-executor/src/stores/in-memory-store.ts @@ -41,6 +41,12 @@ export default class InMemoryStore implements RunStore { }); } + async deleteStepExecution(runId: string, stepIndex: number): Promise { + return this.callPort('deleteStepExecution', async () => { + this.data.get(runId)?.delete(stepIndex); + }); + } + private async callPort(operation: string, fn: () => Promise): Promise { try { return await fn(); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 63085ca5f1..b2b17fd97e 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -98,6 +98,7 @@ function makeMockRunStore(stepExecutions: StepExecutionData[] = []): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue(stepExecutions), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), }; } diff --git a/packages/workflow-executor/test/executors/condition-step-executor.test.ts b/packages/workflow-executor/test/executors/condition-step-executor.test.ts index d0c422596e..0242da473b 100644 --- a/packages/workflow-executor/test/executors/condition-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/condition-step-executor.test.ts @@ -31,6 +31,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts index 7abc8cebca..308be4058e 100644 --- a/packages/workflow-executor/test/executors/guidance-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/guidance-step-executor.test.ts @@ -20,6 +20,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 229242d07b..9e24e9b5bd 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -130,6 +130,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts index dc54c63566..b57898df1a 100644 --- a/packages/workflow-executor/test/executors/mcp-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/mcp-step-executor.test.ts @@ -14,6 +14,7 @@ import AgentWithLog from '../../src/executors/agent-with-log'; import McpStepExecutor from '../../src/executors/mcp-step-executor'; import SchemaCache from '../../src/schema-cache'; import SchemaResolver from '../../src/schema-resolver'; +import InMemoryStore from '../../src/stores/in-memory-store'; import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition'; // --------------------------------------------------------------------------- @@ -58,6 +59,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } @@ -1247,3 +1249,163 @@ describe('McpStepExecutor — OAuth2 tool-call re-authentication', () => { expect(result.stepOutcome.status).toBe('error'); }); }); + +// On a re-auth pause the executor clears the 'executing' write-ahead marker so the resumed step is +// not rejected as interrupted; the failed tool call still surfaces as a failed audit-log entry. +describe('McpStepExecutor — re-auth pause hardening', () => { + const authError = () => new Error('Request failed with status 401'); + + // A FullyAutomated step whose tool call 401s and whose refresh cannot recover → re-auth pause. + function pauseFor(runStore: ReturnType | InMemoryStore) { + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ + runStore: runStore as RunStore, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + return new McpStepExecutor(context, [tool], 'srv', reloadWithFreshAuth); + } + + describe('idempotency phase cleared on the re-auth pause path', () => { + it('does not leave the step execution marked executing after pausing for re-auth', async () => { + // GIVEN a real store so the persisted write-ahead marker is observable. + const store = new InMemoryStore(); + + // WHEN the step pauses for re-authentication. + const result = await pauseFor(store).execute(); + + // THEN it pauses, and the 'executing' marker written by beforeCall must not survive — else + // a resume would read a stale marker. + expect(result.stepOutcome.status).toBe('awaiting-input'); + const persisted = await store.getStepExecutions('run-1'); + const mcpExecution = persisted.find(execution => execution.stepIndex === 0) as + | McpStepExecutionData + | undefined; + expect(mcpExecution?.idempotencyPhase).not.toBe('executing'); + }); + + it('resumes to success after a re-auth pause instead of failing as interrupted', async () => { + // GIVEN a step that paused for re-auth, persisting its state in a shared store. + const store = new InMemoryStore(); + const pause = await pauseFor(store).execute(); + expect(pause.stepOutcome.status).toBe('awaiting-input'); + + // WHEN the user reconnects and the step is re-dispatched against the same store, with the + // tool now succeeding. + const reconnectedTool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockResolvedValue('ok-after-reconnect'), + }); + const resumeContext = makeContext({ + runStore: store, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + const resumed = await new McpStepExecutor(resumeContext, [reconnectedTool], 'srv').execute(); + + // THEN checkIdempotency must not throw StepStateError on the stale 'executing' marker; the + // step runs to success. + expect(resumed.stepOutcome.status).toBe('success'); + }); + + it('preserves the approved pendingData on a confirmation-flow re-auth pause so resume replays it', async () => { + // GIVEN a confirmation-flow step with a user-approved tool call already persisted. + const store = new InMemoryStore(); + await store.saveStepExecution('run-1', { + type: 'mcp', + stepIndex: 0, + pendingData: { + name: 'send_notification', + sourceId: 'mcp-server-1', + input: { message: 'Hello' }, + }, + userConfirmation: { userConfirmed: true }, + } as McpStepExecutionData); + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ runStore: store }); + + // WHEN the approved call 401s and cannot re-auth. + const result = await new McpStepExecutor( + context, + [tool], + 'srv', + reloadWithFreshAuth, + ).execute(); + + // THEN the marker is cleared but the approved call is kept, so a resume replays it rather than + // re-selecting a tool. + expect(result.stepOutcome.status).toBe('awaiting-input'); + const persisted = (await store.getStepExecutions('run-1')).find(e => e.stepIndex === 0) as + | McpStepExecutionData + | undefined; + expect(persisted?.idempotencyPhase).not.toBe('executing'); + expect(persisted?.pendingData).toEqual({ + name: 'send_notification', + sourceId: 'mcp-server-1', + input: { message: 'Hello' }, + }); + }); + + it('surfaces a store error from the re-auth cleanup as a step error, not a stuck pause', async () => { + // GIVEN cleanup that fails — a left-behind 'executing' marker would make the pause + // non-resumable, so the failure must surface as an error rather than a stuck awaiting-input. + const store = new InMemoryStore(); + jest + .spyOn(store, 'deleteStepExecution') + .mockRejectedValue(new RunStorePortError('deleteStepExecution', new Error('store down'))); + + // WHEN a FullyAutomated step pauses for re-auth (no pendingData → cleanup goes via delete). + const result = await pauseFor(store).execute(); + + // THEN the store failure propagates to a step error instead of a non-resumable pause. + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe('The step state could not be accessed. Please retry.'); + }); + }); + + describe('re-auth pause surfaces the tool-call failure in the audit log', () => { + it('marks the activity-log entry failed while the step pauses for re-auth', async () => { + // GIVEN an activity-log port we can inspect. + const activityLogPort = { + createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }), + markSucceeded: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + }; + const tool = new MockRemoteTool({ + name: 'send_notification', + sourceId: 'mcp-server-1', + invoke: jest.fn().mockRejectedValue(authError()), + }); + const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv')); + const context = makeContext({ + activityLogPort, + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + + // WHEN the step pauses for re-authentication. + const result = await new McpStepExecutor( + context, + [tool], + 'srv', + reloadWithFreshAuth, + ).execute(); + + // THEN the failed tool call is audited as failed, while the step itself pauses (not errors). + expect(result.stepOutcome.status).toBe('awaiting-input'); + expect(activityLogPort.createPending).toHaveBeenCalledTimes(1); + expect(activityLogPort.markFailed).toHaveBeenCalledTimes(1); + expect(activityLogPort.markSucceeded).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index 62da87bfa4..743bbab67e 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -72,6 +72,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index f6037a9fa1..846d7a21a9 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -84,6 +84,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index e8b358c8c1..701eb71f92 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -77,6 +77,7 @@ function makeMockRunStore(overrides: Partial = {}): RunStore { close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), ...overrides, }; } diff --git a/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts index d88fc08024..d533cd0e83 100644 --- a/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts +++ b/packages/workflow-executor/test/http/mcp-oauth-credentials-route.test.ts @@ -55,6 +55,7 @@ function createMockStore() { return { init: jest.fn().mockResolvedValue(undefined), upsert: jest.fn().mockResolvedValue(undefined), + updateIfPresent: jest.fn().mockResolvedValue(undefined), get: jest.fn().mockResolvedValue(null), delete: jest.fn().mockResolvedValue(undefined), close: jest.fn().mockResolvedValue(undefined), diff --git a/packages/workflow-executor/test/oauth/token-service.test.ts b/packages/workflow-executor/test/oauth/token-service.test.ts index 6f68d848a5..e72be8a798 100644 --- a/packages/workflow-executor/test/oauth/token-service.test.ts +++ b/packages/workflow-executor/test/oauth/token-service.test.ts @@ -1,6 +1,7 @@ import type CredentialEncryption from '../../src/crypto/credential-encryption'; import type { RefreshGrantParams, RefreshGrantResult } from '../../src/oauth/refresh-grant'; import type { + McpOAuthCredentialInput, McpOAuthCredentialsStore, StoredMcpOAuthCredential, } from '../../src/ports/mcp-oauth-credentials-store'; @@ -42,7 +43,8 @@ function setup(options?: { const credential = options?.credential === undefined ? makeCredential() : options.credential; const get = jest.fn().mockResolvedValue(credential); const upsert = jest.fn().mockResolvedValue(undefined); - const store = { get, upsert } as unknown as McpOAuthCredentialsStore; + const updateIfPresent = jest.fn().mockResolvedValue(undefined); + const store = { get, upsert, updateIfPresent } as unknown as McpOAuthCredentialsStore; const decrypt = jest.fn((buf: Buffer) => `decrypted:${buf.toString()}`); const encrypt = jest.fn((plain: string) => ({ @@ -68,6 +70,7 @@ function setup(options?: { service, get, upsert, + updateIfPresent, decrypt, encrypt, refresh, @@ -200,12 +203,13 @@ describe('OAuthTokenService.getAccessToken', () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, upsert, encrypt } = setup({ refresh }); + const { service, updateIfPresent, encrypt } = setup({ refresh }); await service.getAccessToken(USER_ID, SERVER_ID); expect(encrypt).toHaveBeenCalledWith('rt-2'); - expect(upsert).toHaveBeenCalledWith( + expect(updateIfPresent).toHaveBeenCalledWith( + 1, expect.objectContaining({ userId: USER_ID, mcpServerId: SERVER_ID, @@ -215,19 +219,19 @@ describe('OAuthTokenService.getAccessToken', () => { }); it('does not write back when the grant returns no new refresh token', async () => { - const { service, upsert } = setup(); + const { service, updateIfPresent } = setup(); await service.getAccessToken(USER_ID, SERVER_ID); - expect(upsert).not.toHaveBeenCalled(); + expect(updateIfPresent).not.toHaveBeenCalled(); }); it('still returns the token when the rotation write-back fails', async () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, upsert } = setup({ refresh }); - (upsert as jest.Mock).mockRejectedValue(new Error('db down')); + const { service, updateIfPresent } = setup({ refresh }); + (updateIfPresent as jest.Mock).mockRejectedValue(new Error('db down')); await expect(service.getAccessToken(USER_ID, SERVER_ID)).resolves.toBe('at-1'); }); @@ -236,13 +240,13 @@ describe('OAuthTokenService.getAccessToken', () => { const refresh = jest .fn() .mockResolvedValue({ accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rt-2' }); - const { service, encrypt, upsert } = setup({ refresh }); + const { service, encrypt, updateIfPresent } = setup({ refresh }); (encrypt as jest.Mock).mockImplementation(() => { throw new Error('key unavailable'); }); await expect(service.getAccessToken(USER_ID, SERVER_ID)).resolves.toBe('at-1'); - expect(upsert).not.toHaveBeenCalled(); + expect(updateIfPresent).not.toHaveBeenCalled(); }); }); @@ -280,14 +284,14 @@ describe('OAuthTokenService.getAccessToken', () => { scopes: 'a b c', }); const get = jest.fn().mockResolvedValueOnce(original).mockResolvedValueOnce(latest); - const upsert = jest.fn().mockResolvedValue(undefined); + const updateIfPresent = jest.fn().mockResolvedValue(undefined); const refresh = jest .fn() .mockRejectedValueOnce(new OAuthInvalidGrantError()) .mockResolvedValueOnce({ accessToken: 'at', expiresInS: 3600, refreshToken: 'rt-3' }); const service = new OAuthTokenService({ - store: { get, upsert } as unknown as McpOAuthCredentialsStore, + store: { get, upsert: jest.fn(), updateIfPresent } as unknown as McpOAuthCredentialsStore, encryption: { decrypt: (buf: Buffer) => buf.toString(), encrypt: () => ({ ciphertext: Buffer.from('enc:rt-3') }), @@ -297,7 +301,7 @@ describe('OAuthTokenService.getAccessToken', () => { await service.getAccessToken(USER_ID, SERVER_ID); - expect(upsert).toHaveBeenCalledWith(expect.objectContaining({ scopes: 'a b c' })); + expect(updateIfPresent).toHaveBeenCalledWith(1, expect.objectContaining({ scopes: 'a b c' })); }); it('raises OAuthReauthRequiredError when the re-read shows the same (unrotated) token', async () => { @@ -396,3 +400,42 @@ describe('OAuthTokenService.getAccessToken', () => { }); }); }); + +// A disconnect (DELETE) landing between the grant read and the write-back must not re-create the +// row: the atomic update-only path leaves a deleted credential deleted, not silently resurrected. +describe('OAuthTokenService — concurrent disconnect during refresh write-back', () => { + it('does not resurrect a credential deleted between the snapshot read and the rotated-token write-back', async () => { + // GIVEN a stored credential and a refresh that rotates the token while a concurrent disconnect + // (the row is DELETED) lands before the write-back; updateIfPresent updates only an existing row. + let current: StoredMcpOAuthCredential | null = makeCredential(); + const store = { + get: jest.fn(async () => current), + updateIfPresent: jest.fn(async (id: number, credential: McpOAuthCredentialInput) => { + if (current && current.id === id) current = { ...credential, id }; + }), + delete: jest.fn(async () => { + current = null; + }), + } as unknown as McpOAuthCredentialsStore; + + const refresh = jest.fn(async () => { + // The disconnect lands after refreshAndCache's snapshot read, before persistRotatedRefreshToken. + await store.delete(USER_ID, SERVER_ID); + + return { accessToken: 'at-1', expiresInS: 3600, refreshToken: 'rotated-rt' }; + }); + + const encryption = { + decrypt: (buf: Buffer) => buf.toString(), + encrypt: (plain: string) => ({ ciphertext: Buffer.from(`enc:${plain}`) }), + } as unknown as CredentialEncryption; + + const service = new OAuthTokenService({ store, encryption, refreshAccessToken: refresh }); + + // WHEN a token is acquired, triggering the refresh and the rotated-token write-back. + await service.getAccessToken(USER_ID, SERVER_ID); + + // THEN the deleted credential must stay deleted — the write-back must not recreate it. + expect(await store.get(USER_ID, SERVER_ID)).toBeNull(); + }); +}); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index bca43c0321..d144b4aec4 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -80,6 +80,7 @@ function createMockRunStore(overrides: Partial = {}): jest.Mocked; } @@ -106,6 +107,7 @@ function createRunnerConfig( close: jest.fn().mockResolvedValue(undefined), getStepExecutions: jest.fn().mockResolvedValue([]), saveStepExecution: jest.fn().mockResolvedValue(undefined), + deleteStepExecution: jest.fn().mockResolvedValue(undefined), } as unknown as RunStore, pollingIntervalS: POLLING_INTERVAL_S, aiModelPort: createMockAiClient() as unknown as AiModelPort, diff --git a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts index dd3ae82e35..d428a01da8 100644 --- a/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/database-mcp-oauth-credentials-store.test.ts @@ -169,6 +169,36 @@ describe('DatabaseMcpOAuthCredentialsStore (SQLite)', () => { }); }); + describe('updateIfPresent', () => { + it('updates the row matching the given id in place', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const { id } = unwrap(await store.get(42, 'mcp-server-1')); + + await store.updateIfPresent(id, makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + expect(row.refreshTokenEnc.toString()).toBe('rotated'); + }); + + it('does not insert a row when none exists for the key', async () => { + await store.updateIfPresent(1, makeCredential()); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + + it('does not touch a row that was re-created with a different id', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const staleId = unwrap(await store.get(42, 'mcp-server-1')).id; + await store.delete(42, 'mcp-server-1'); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('reauthorized') })); + + await store.updateIfPresent(staleId, makeCredential({ refreshTokenEnc: Buffer.from('rot') })); + + const row = unwrap(await store.get(42, 'mcp-server-1')); + expect(row.refreshTokenEnc.toString()).toBe('reauthorized'); + }); + }); + describe('isolation', () => { it('keeps credentials for the same server but different users separate', async () => { await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index cd385b49a0..6e3ce0d4b3 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -27,6 +27,32 @@ describe('DatabaseStore (SQLite)', () => { await store.close(); }); + it('clears the step execution for a given (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([]); + }); + + it('leaves other steps in the same run untouched when clearing one step', async () => { + const kept = makeStepExecution({ stepIndex: 1, type: 'read-record' } as never); + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + await store.saveStepExecution('run-1', kept); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([kept]); + }); + + it('is a no-op when no execution exists for that (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 99); + + expect((await store.getStepExecutions('run-1')).map(s => s.stepIndex)).toEqual([0]); + }); + it('returns empty array for unknown runId', async () => { const result = await store.getStepExecutions('unknown'); expect(result).toEqual([]); diff --git a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts index 9b3c43f7cf..6f1a82f1d0 100644 --- a/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts +++ b/packages/workflow-executor/test/stores/in-memory-mcp-oauth-credentials-store.test.ts @@ -86,6 +86,38 @@ describe('InMemoryMcpOAuthCredentialsStore', () => { }); }); + describe('updateIfPresent', () => { + it('updates the row matching the given id in place', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const { id } = unwrap(await store.get(42, 'mcp-server-1')); + + await store.updateIfPresent(id, makeCredential({ refreshTokenEnc: Buffer.from('rotated') })); + + expect(unwrap(await store.get(42, 'mcp-server-1')).refreshTokenEnc.toString()).toBe( + 'rotated', + ); + }); + + it('does not insert when the row is absent', async () => { + await store.updateIfPresent(1, makeCredential()); + + expect(await store.get(42, 'mcp-server-1')).toBeNull(); + }); + + it('does not touch a row that was re-created with a different id', async () => { + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('old') })); + const staleId = unwrap(await store.get(42, 'mcp-server-1')).id; + await store.delete(42, 'mcp-server-1'); + await store.upsert(makeCredential({ refreshTokenEnc: Buffer.from('reauthorized') })); + + await store.updateIfPresent(staleId, makeCredential({ refreshTokenEnc: Buffer.from('rot') })); + + expect(unwrap(await store.get(42, 'mcp-server-1')).refreshTokenEnc.toString()).toBe( + 'reauthorized', + ); + }); + }); + describe('isolation', () => { it('keeps entries for different users and servers separate', async () => { await store.upsert(makeCredential({ userId: 1, refreshTokenEnc: Buffer.from('user-1') })); diff --git a/packages/workflow-executor/test/stores/in-memory-store.test.ts b/packages/workflow-executor/test/stores/in-memory-store.test.ts index adfe353da8..d9aeb2497d 100644 --- a/packages/workflow-executor/test/stores/in-memory-store.test.ts +++ b/packages/workflow-executor/test/stores/in-memory-store.test.ts @@ -19,6 +19,32 @@ describe('InMemoryStore', () => { store = new InMemoryStore(); }); + it('clears the step execution for a given (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([]); + }); + + it('leaves other steps in the same run untouched when clearing one step', async () => { + const kept = makeStepExecution({ stepIndex: 1, type: 'read-record' } as never); + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + await store.saveStepExecution('run-1', kept); + + await store.deleteStepExecution('run-1', 0); + + expect(await store.getStepExecutions('run-1')).toEqual([kept]); + }); + + it('is a no-op when no execution exists for that (runId, stepIndex)', async () => { + await store.saveStepExecution('run-1', makeStepExecution({ stepIndex: 0 })); + + await store.deleteStepExecution('run-1', 99); + + expect((await store.getStepExecutions('run-1')).map(s => s.stepIndex)).toEqual([0]); + }); + it('returns empty array for unknown runId', async () => { const result = await store.getStepExecutions('unknown'); expect(result).toEqual([]);