diff --git a/packages/workflow-executor/README.md b/packages/workflow-executor/README.md index 56635d6806..fbb9653af2 100644 --- a/packages/workflow-executor/README.md +++ b/packages/workflow-executor/README.md @@ -87,6 +87,24 @@ are force-killed and the process exits with code `1`. | `0` | Graceful shutdown | | `1` | Startup error (missing env, invalid config) or forced shutdown | +### Migrations + +By default the executor applies its database migrations on boot, then runs. That's +fine for a single instance (demos, staging). It does **not** coordinate concurrent +boots — if you run several instances that all migrate at once, they race. + +For multi-instance deployments, decouple migrations from boot: + +```bash +# 1. Once, in your release/CI pipeline (applies migrations, then exits): +forest-workflow-executor migrate + +# 2. Then start any number of instances that skip migrating: +forest-workflow-executor --skip-migrations +``` + +`--skip-migrations` assumes the schema is already migrated; queries fail otherwise. + ### In-memory mode (dev only) ```bash diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 3c0dc0ddd7..b0d41c896d 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -32,6 +32,8 @@ const FORCE_EXIT_DELAY_S = 5; export interface WorkflowExecutor { start(): Promise; stop(): Promise; + // Apply migrations then exit (the `migrate` CLI command). Does not start the runner or server. + migrate(): Promise; readonly state: RunnerState; } @@ -54,6 +56,8 @@ export interface ExecutorOptions { schemaCacheTtlS?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. forceAiError?: boolean; + // Boot without running migrations (applied out-of-band via the `migrate` command). + skipMigrations?: boolean; } export type DatabaseExecutorOptions = ExecutorOptions & @@ -130,6 +134,7 @@ function buildCommonDependencies(options: ExecutorOptions) { stepTimeoutS: positiveOrDefault(options.stepTimeoutS, DEFAULT_STEP_TIMEOUT_S), aiInvokeTimeoutS: positiveOrDefault(options.aiInvokeTimeoutS, DEFAULT_AI_INVOKE_TIMEOUT_S), maxChainDepth: options.maxChainDepth, + skipMigrations: options.skipMigrations, }; } @@ -179,6 +184,10 @@ function createWorkflowExecutor( return runner.state; }, + migrate() { + return runner.migrate(); + }, + async start() { await runner.start(); await server.start(); diff --git a/packages/workflow-executor/src/cli-core.ts b/packages/workflow-executor/src/cli-core.ts index aabde91bf1..2dd9d1c8e3 100644 --- a/packages/workflow-executor/src/cli-core.ts +++ b/packages/workflow-executor/src/cli-core.ts @@ -65,6 +65,10 @@ export interface CliArgs { inMemory: boolean; pretty: boolean; json: boolean; + // `migrate` subcommand: apply migrations then exit (for CI/release pipelines). + migrate: boolean; + // Boot without migrating (migrations applied out-of-band via `migrate`). + skipMigrations: boolean; } export interface CliConfig { @@ -85,10 +89,18 @@ export function parseArgs(argv: string[]): CliArgs { inMemory: false, pretty: false, json: false, + migrate: false, + skipMigrations: false, }; for (const arg of argv) { switch (arg) { + case 'migrate': + result.migrate = true; + break; + case '--skip-migrations': + result.skipMigrations = true; + break; case '--help': case '-h': result.help = true; @@ -184,6 +196,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH), schemaCacheTtlS: parsePositiveIntEnv('SCHEMA_CACHE_TTL_S', env.SCHEMA_CACHE_TTL_S), loggerLevel: parseLoggerLevelEnv(env.LOG_LEVEL) ?? DEFAULT_LOGGER_LEVEL, + skipMigrations: args.skipMigrations, ...(aiConfigurations && { aiConfigurations }), ...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }), }; @@ -196,11 +209,16 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig } export function printHelp(): void { - console.log(`Usage: ${BINARY_NAME} [options] + console.log(`Usage: ${BINARY_NAME} [command] [options] Run the Forest Admin workflow executor. +Commands: + (default) Apply migrations, then run the executor (HTTP server + polling) + migrate Apply migrations, then exit (run once in CI/release; pair with --skip-migrations) + Options: + --skip-migrations Boot without migrating (migrations applied out-of-band via \`migrate\`) --in-memory Use an in-memory run store (no DB needed, not for prod) --pretty Force colorized human-readable logs (default when stdout is a TTY) --json Force structured JSON logs (default when stdout is not a TTY) @@ -300,6 +318,13 @@ export async function runCli( executor = factories.buildDatabase(databaseOptions); } + if (args.migrate) { + await executor.migrate(); + logger('Info', 'Migrations applied'); + + return executor; + } + await executor.start(); logger('Info', 'Workflow executor ready', { url: `http://localhost:${config.executorOptions.httpPort}`, diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ae07bb9e04..70c60dffb0 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -50,6 +50,9 @@ export interface RunnerConfig { // Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the // next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50. maxChainDepth?: number; + // Skip running migrations on start (applied out-of-band, e.g. via the `migrate` command). The + // store must already be migrated or queries will fail. + skipMigrations?: boolean; } // eslint-disable-next-line @typescript-eslint/no-var-requires, import/no-dynamic-require, global-require @@ -89,7 +92,8 @@ export default class Runner { // Probe the agent first so we fail fast without opening DB connections when unreachable. await this.config.agentPort.probe(); this.logger('Info', 'Agent probe passed', {}); - await this.config.runStore.init(this.logger); + + if (!this.config.skipMigrations) await this.config.runStore.init(this.logger); this._state = 'running'; @@ -97,6 +101,13 @@ export default class Runner { this.schedulePoll(); } + // One-shot migration entry point for the `migrate` CLI command: applies migrations and closes the + // connection so the process can exit. No agent probe, no polling, no HTTP server. + async migrate(): Promise { + await this.config.runStore.init(this.logger); + await this.config.runStore.close(this.logger); + } + async stop(): Promise { if (this._state === 'idle' || this._state === 'stopped' || this._state === 'draining') return; diff --git a/packages/workflow-executor/test/cli.test.ts b/packages/workflow-executor/test/cli.test.ts index 76399e5d05..98079da514 100644 --- a/packages/workflow-executor/test/cli.test.ts +++ b/packages/workflow-executor/test/cli.test.ts @@ -42,6 +42,7 @@ function makeFakeExecutor(): WorkflowExecutor { return { start: jest.fn().mockResolvedValue(undefined), stop: jest.fn().mockResolvedValue(undefined), + migrate: jest.fn().mockResolvedValue(undefined), state: 'running', } as unknown as WorkflowExecutor; } @@ -66,9 +67,19 @@ describe('parseArgs', () => { inMemory: false, pretty: false, json: false, + migrate: false, + skipMigrations: false, }); }); + it('parses the migrate command', () => { + expect(parseArgs(['migrate']).migrate).toBe(true); + }); + + it('parses --skip-migrations', () => { + expect(parseArgs(['--skip-migrations']).skipMigrations).toBe(true); + }); + it('parses --help and -h', () => { expect(parseArgs(['--help']).help).toBe(true); expect(parseArgs(['-h']).help).toBe(true); @@ -97,7 +108,15 @@ describe('parseArgs', () => { }); describe('pickLogger', () => { - const baseArgs = { help: false, version: false, inMemory: false, pretty: false, json: false }; + const baseArgs = { + help: false, + version: false, + inMemory: false, + pretty: false, + json: false, + migrate: false, + skipMigrations: false, + }; let infoSpy: jest.SpyInstance; beforeEach(() => { @@ -147,7 +166,15 @@ describe('pickLogger', () => { }); describe('readEnvConfig', () => { - const args = { help: false, version: false, inMemory: false, pretty: false, json: false }; + const args = { + help: false, + version: false, + inMemory: false, + pretty: false, + json: false, + migrate: false, + skipMigrations: false, + }; it('returns a full config when all required vars are present', () => { const config = readEnvConfig(baseEnv, args); @@ -512,6 +539,27 @@ describe('runCli', () => { expect(executor.start).toHaveBeenCalled(); }); + it('runs migrations and exits without starting on the migrate command', async () => { + const { factories, executor } = makeFactories(); + + await runCli(['migrate'], baseEnv, factories); + + expect(executor.migrate).toHaveBeenCalled(); + expect(executor.start).not.toHaveBeenCalled(); + }); + + it('threads skipMigrations into the build options and still starts', async () => { + const { factories, executor } = makeFactories(); + + await runCli(['--skip-migrations'], baseEnv, factories); + + expect(factories.buildDatabase).toHaveBeenCalledWith( + expect.objectContaining({ skipMigrations: true }), + ); + expect(executor.start).toHaveBeenCalled(); + expect(executor.migrate).not.toHaveBeenCalled(); + }); + it('does not log any secret during startup', async () => { const { factories } = makeFactories(); await runCli([], baseEnv, factories); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 29f24abce5..4e22559b12 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -95,6 +95,7 @@ function createRunnerConfig( schemaCache: SchemaCache; stopTimeoutS: number; maxChainDepth: number; + skipMigrations: boolean; }> = {}, ) { return { @@ -244,6 +245,26 @@ describe('start', () => { expect(config.runStore.init).toHaveBeenCalledTimes(1); }); + it('should not call runStore.init() on start when skipMigrations is set', async () => { + const config = createRunnerConfig({ skipMigrations: true }); + runner = new Runner(config); + + await runner.start(); + + expect(config.runStore.init).not.toHaveBeenCalled(); + }); + + it('migrate() initialises then closes the run store and does not probe the agent', async () => { + const config = createRunnerConfig(); + runner = new Runner(config); + + await runner.migrate(); + + expect(config.runStore.init).toHaveBeenCalledTimes(1); + expect(config.runStore.close).toHaveBeenCalledTimes(1); + expect(config.agentPort.probe).not.toHaveBeenCalled(); + }); + it('should throw ConfigurationError when envSecret is invalid', async () => { runner = new Runner(createRunnerConfig({ envSecret: 'bad' }));