Skip to content
Open
4 changes: 4 additions & 0 deletions packages/workflow-executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ FOREST_AUTH_SECRET="your-auth-secret" \
AGENT_URL="https://your-agent-url" \
npx @forestadmin/workflow-executor --in-memory
```

### OAuth2-protected MCP

To exercise the executor's OAuth2-protected MCP path end-to-end, point it at [mcp-oauth-test-server](https://github.com/hercemer42/mcp-oauth-test-server) — a standalone OAuth2 + MCP server that can simulate refresh-token revocation/rotation, consent denial, and upstream 403s.
21 changes: 20 additions & 1 deletion packages/workflow-executor/src/executors/mcp-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
try {
return await this.runStep();
} catch (error) {
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
// An unrefreshable OAuth credential pauses the step for re-authentication rather than failing it.
// An unrefreshable OAuth credential pauses the step for re-authentication rather than failing
// it. Clear the write-ahead marker so the resumed step is not rejected as interrupted.
if (error instanceof OAuthReauthRequiredError) {
await this.clearReauthPauseState();

return this.buildOutcomeResult({
status: 'awaiting-input',
awaitingInputReason: error.awaitingInputReason,
Expand All @@ -100,6 +103,22 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
}
}

// Keep a confirmation-flow record's approved pendingData (clear only the marker) so resume replays
// it; delete a pendingData-less record, which would otherwise mis-route resume into confirmation.
private async clearReauthPauseState(): Promise<void> {
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
const existing = await this.findPendingExecution<McpStepExecutionData>('mcp');
if (!existing) return;

if (existing.pendingData) {
await this.context.runStore.saveStepExecution(this.context.runId, {
...existing,
idempotencyPhase: undefined,
});
} else {
await this.context.runStore.deleteStepExecution(this.context.runId, this.context.stepIndex);
}
}

private async runStep(): Promise<StepExecutionResult> {
const pending = await this.patchAndReloadPendingData<McpStepExecutionData>(
this.context.incomingPendingData,
Expand Down
4 changes: 3 additions & 1 deletion packages/workflow-executor/src/oauth/token-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ export default class OAuthTokenService {
try {
const encrypted = this.encryption.encrypt(refreshToken);

await this.store.upsert({
// Id-scoped so a disconnect (or disconnect + re-authorize) after the grant read leaves an
// absent or different row — the rotated-token write-back then can't resurrect or clobber it.
await this.store.updateIfPresent(credential.id, {
userId: credential.userId,
mcpServerId: credential.mcpServerId,
refreshTokenEnc: encrypted.ciphertext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ export interface McpOAuthCredentialsStore {
close(logger?: Logger): Promise<void>;
get(userId: number, mcpServerId: string): Promise<StoredMcpOAuthCredential | null>;
upsert(credential: McpOAuthCredentialInput): Promise<void>;
// Update-only by the row id the caller read: never inserts, and no-ops if that row is gone or was
// re-created with a new id — so a stale write-back can't resurrect or clobber a credential.
updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise<void>;
delete(userId: number, mcpServerId: string): Promise<void>;
}
1 change: 1 addition & 0 deletions packages/workflow-executor/src/ports/run-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export interface RunStore {
close(logger?: Logger): Promise<void>;
getStepExecutions(runId: string): Promise<StepExecutionData[]>;
saveStepExecution(runId: string, stepExecution: StepExecutionData): Promise<void>;
deleteStepExecution(runId: string, stepIndex: number): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,31 @@ export default class DatabaseMcpOAuthCredentialsStore implements McpOAuthCredent
});
}

async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise<void> {
// Single atomic UPDATE … WHERE id — affects zero rows if that row was deleted or re-created.
await this.sequelize.query(
`UPDATE ${this.tableReference} SET ` +
'refresh_token_enc = :refreshTokenEnc, client_id = :clientId, ' +
'client_secret_enc = :clientSecretEnc, client_secret_expires_at = :clientSecretExpiresAt, ' +
'token_endpoint = :tokenEndpoint, token_endpoint_auth_method = :tokenEndpointAuthMethod, ' +
'scopes = :scopes, updated_at = :now ' +
'WHERE id = :id',
{
replacements: {
id,
refreshTokenEnc: credential.refreshTokenEnc,
clientId: credential.clientId ?? null,
clientSecretEnc: credential.clientSecretEnc ?? null,
clientSecretExpiresAt: credential.clientSecretExpiresAt ?? null,
tokenEndpoint: credential.tokenEndpoint,
tokenEndpointAuthMethod: credential.tokenEndpointAuthMethod ?? null,
scopes: credential.scopes ?? null,
now: new Date(),
},
},
);
}

async delete(userId: number, mcpServerId: string): Promise<void> {
await this.sequelize.query(
`DELETE FROM ${this.tableReference} WHERE user_id = :userId AND mcp_server_id = :mcpServerId`,
Expand Down
9 changes: 9 additions & 0 deletions packages/workflow-executor/src/stores/database-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ export default class DatabaseStore implements RunStore {
});
}

async deleteStepExecution(runId: string, stepIndex: number): Promise<void> {
return this.callPort('deleteStepExecution', async () => {
await this.sequelize.query(
`DELETE FROM ${this.tableReference} WHERE run_id = :runId AND step_index = :stepIndex`,
{ replacements: { runId, stepIndex } },
);
});
}

async close(logger?: Logger): Promise<void> {
return this.callPort('close', async () => {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ export default class InMemoryMcpOAuthCredentialsStore implements McpOAuthCredent
this.nextId += 1;
}

async updateIfPresent(id: number, credential: McpOAuthCredentialInput): Promise<void> {
const key = InMemoryMcpOAuthCredentialsStore.key(credential.userId, credential.mcpServerId);
const existing = this.data.get(key);
// Only update the exact row the caller read; a re-created row has a new id, so skip it.
if (!existing || existing.id !== id) return;

this.data.set(key, { ...credential, id });
}

async delete(userId: number, mcpServerId: string): Promise<void> {
this.data.delete(InMemoryMcpOAuthCredentialsStore.key(userId, mcpServerId));
}
Expand Down
6 changes: 6 additions & 0 deletions packages/workflow-executor/src/stores/in-memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ export default class InMemoryStore implements RunStore {
});
}

async deleteStepExecution(runId: string, stepIndex: number): Promise<void> {
return this.callPort('deleteStepExecution', async () => {
this.data.get(runId)?.delete(stepIndex);
});
}

private async callPort<T>(operation: string, fn: () => Promise<T>): Promise<T> {
try {
return await fn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ function makeMockRunStore(stepExecutions: StepExecutionData[] = []): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue(stepExecutions),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down Expand Up @@ -2276,7 +2277,7 @@

await new LoadRelatedRecordStepExecutor(context).execute();

const firstRow = JSON.parse(selectRecordPrompt(invoke).match(/\[0\] (\{[^\n]*\})/)![1]);

Check warning on line 2280 in packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion
expect(Object.keys(firstRow)).toHaveLength(6);
});

Expand Down
162 changes: 162 additions & 0 deletions packages/workflow-executor/test/executors/mcp-step-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import AgentWithLog from '../../src/executors/agent-with-log';
import McpStepExecutor from '../../src/executors/mcp-step-executor';
import SchemaCache from '../../src/schema-cache';
import SchemaResolver from '../../src/schema-resolver';
import InMemoryStore from '../../src/stores/in-memory-store';
import { StepExecutionMode, StepType } from '../../src/types/validated/step-definition';

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -58,6 +59,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down Expand Up @@ -1247,3 +1249,163 @@ describe('McpStepExecutor — OAuth2 tool-call re-authentication', () => {
expect(result.stepOutcome.status).toBe('error');
});
});

// On a re-auth pause the executor clears the 'executing' write-ahead marker so the resumed step is
// not rejected as interrupted; the failed tool call still surfaces as a failed audit-log entry.
describe('McpStepExecutor — re-auth pause hardening', () => {
const authError = () => new Error('Request failed with status 401');

// A FullyAutomated step whose tool call 401s and whose refresh cannot recover → re-auth pause.
function pauseFor(runStore: ReturnType<typeof makeMockRunStore> | InMemoryStore) {
const tool = new MockRemoteTool({
name: 'send_notification',
sourceId: 'mcp-server-1',
invoke: jest.fn().mockRejectedValue(authError()),
});
const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv'));
const context = makeContext({
runStore: runStore as RunStore,
stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }),
});

return new McpStepExecutor(context, [tool], 'srv', reloadWithFreshAuth);
}

describe('idempotency phase cleared on the re-auth pause path', () => {
it('does not leave the step execution marked executing after pausing for re-auth', async () => {
// GIVEN a real store so the persisted write-ahead marker is observable.
const store = new InMemoryStore();

// WHEN the step pauses for re-authentication.
const result = await pauseFor(store).execute();

// THEN it pauses, and the 'executing' marker written by beforeCall must not survive — else
// a resume would read a stale marker.
expect(result.stepOutcome.status).toBe('awaiting-input');
const persisted = await store.getStepExecutions('run-1');
const mcpExecution = persisted.find(execution => execution.stepIndex === 0) as
| McpStepExecutionData
| undefined;
expect(mcpExecution?.idempotencyPhase).not.toBe('executing');
});

it('resumes to success after a re-auth pause instead of failing as interrupted', async () => {
// GIVEN a step that paused for re-auth, persisting its state in a shared store.
const store = new InMemoryStore();
const pause = await pauseFor(store).execute();
expect(pause.stepOutcome.status).toBe('awaiting-input');

// WHEN the user reconnects and the step is re-dispatched against the same store, with the
// tool now succeeding.
const reconnectedTool = new MockRemoteTool({
name: 'send_notification',
sourceId: 'mcp-server-1',
invoke: jest.fn().mockResolvedValue('ok-after-reconnect'),
});
const resumeContext = makeContext({
runStore: store,
stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }),
});

const resumed = await new McpStepExecutor(resumeContext, [reconnectedTool], 'srv').execute();

// THEN checkIdempotency must not throw StepStateError on the stale 'executing' marker; the
// step runs to success.
expect(resumed.stepOutcome.status).toBe('success');
});

it('preserves the approved pendingData on a confirmation-flow re-auth pause so resume replays it', async () => {
// GIVEN a confirmation-flow step with a user-approved tool call already persisted.
const store = new InMemoryStore();
await store.saveStepExecution('run-1', {
type: 'mcp',
stepIndex: 0,
pendingData: {
name: 'send_notification',
sourceId: 'mcp-server-1',
input: { message: 'Hello' },
},
userConfirmation: { userConfirmed: true },
} as McpStepExecutionData);
const tool = new MockRemoteTool({
name: 'send_notification',
sourceId: 'mcp-server-1',
invoke: jest.fn().mockRejectedValue(authError()),
});
const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv'));
const context = makeContext({ runStore: store });

// WHEN the approved call 401s and cannot re-auth.
const result = await new McpStepExecutor(
context,
[tool],
'srv',
reloadWithFreshAuth,
).execute();

// THEN the marker is cleared but the approved call is kept, so a resume replays it rather than
// re-selecting a tool.
expect(result.stepOutcome.status).toBe('awaiting-input');
const persisted = (await store.getStepExecutions('run-1')).find(e => e.stepIndex === 0) as
| McpStepExecutionData
| undefined;
expect(persisted?.idempotencyPhase).not.toBe('executing');
expect(persisted?.pendingData).toEqual({
name: 'send_notification',
sourceId: 'mcp-server-1',
input: { message: 'Hello' },
});
});

it('surfaces a store error from the re-auth cleanup as a step error, not a stuck pause', async () => {
// GIVEN cleanup that fails — a left-behind 'executing' marker would make the pause
// non-resumable, so the failure must surface as an error rather than a stuck awaiting-input.
const store = new InMemoryStore();
jest
.spyOn(store, 'deleteStepExecution')
.mockRejectedValue(new RunStorePortError('deleteStepExecution', new Error('store down')));

// WHEN a FullyAutomated step pauses for re-auth (no pendingData → cleanup goes via delete).
const result = await pauseFor(store).execute();

// THEN the store failure propagates to a step error instead of a non-resumable pause.
expect(result.stepOutcome.status).toBe('error');
expect(result.stepOutcome.error).toBe('The step state could not be accessed. Please retry.');
});
});

describe('re-auth pause surfaces the tool-call failure in the audit log', () => {
it('marks the activity-log entry failed while the step pauses for re-auth', async () => {
// GIVEN an activity-log port we can inspect.
const activityLogPort = {
createPending: jest.fn().mockResolvedValue({ id: 'log-1', index: '0' }),
markSucceeded: jest.fn().mockResolvedValue(undefined),
markFailed: jest.fn().mockResolvedValue(undefined),
};
const tool = new MockRemoteTool({
name: 'send_notification',
sourceId: 'mcp-server-1',
invoke: jest.fn().mockRejectedValue(authError()),
});
const reloadWithFreshAuth = jest.fn().mockRejectedValue(new OAuthReauthRequiredError('srv'));
const context = makeContext({
activityLogPort,
stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }),
});

// WHEN the step pauses for re-authentication.
const result = await new McpStepExecutor(
context,
[tool],
'srv',
reloadWithFreshAuth,
).execute();

// THEN the failed tool call is audited as failed, while the step itself pauses (not errors).
expect(result.stepOutcome.status).toBe('awaiting-input');
expect(activityLogPort.createPending).toHaveBeenCalledTimes(1);
expect(activityLogPort.markFailed).toHaveBeenCalledTimes(1);
expect(activityLogPort.markSucceeded).not.toHaveBeenCalled();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ function makeMockRunStore(overrides: Partial<RunStore> = {}): RunStore {
close: jest.fn().mockResolvedValue(undefined),
getStepExecutions: jest.fn().mockResolvedValue([]),
saveStepExecution: jest.fn().mockResolvedValue(undefined),
deleteStepExecution: jest.fn().mockResolvedValue(undefined),
...overrides,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ function createMockStore() {
return {
init: jest.fn().mockResolvedValue(undefined),
upsert: jest.fn().mockResolvedValue(undefined),
updateIfPresent: jest.fn().mockResolvedValue(undefined),
get: jest.fn().mockResolvedValue(null),
delete: jest.fn().mockResolvedValue(undefined),
close: jest.fn().mockResolvedValue(undefined),
Expand Down
Loading
Loading