Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/workflow-executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ are force-killed and the process exits with code `1`.
| `0` | Graceful shutdown |
| `1` | Startup error (missing env, invalid config) or forced shutdown |

### Migrations

By default the executor applies its database migrations on boot, then runs. That's
fine for a single instance (demos, staging). It does **not** coordinate concurrent
boots — if you run several instances that all migrate at once, they race.

For multi-instance deployments, decouple migrations from boot:

```bash
# 1. Once, in your release/CI pipeline (applies migrations, then exits):
forest-workflow-executor migrate

# 2. Then start any number of instances that skip migrating:
forest-workflow-executor --skip-migrations
```

`--skip-migrations` assumes the schema is already migrated; queries fail otherwise.

### In-memory mode (dev only)

```bash
Expand Down
9 changes: 9 additions & 0 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const FORCE_EXIT_DELAY_S = 5;
export interface WorkflowExecutor {
start(): Promise<void>;
stop(): Promise<void>;
// Apply migrations then exit (the `migrate` CLI command). Does not start the runner or server.
migrate(): Promise<void>;
readonly state: RunnerState;
}

Expand All @@ -54,6 +56,8 @@ export interface ExecutorOptions {
schemaCacheTtlS?: number;
// Dev only: makes every AI call fail immediately so error paths can be exercised locally.
forceAiError?: boolean;
// Boot without running migrations (applied out-of-band via the `migrate` command).
skipMigrations?: boolean;
}

export type DatabaseExecutorOptions = ExecutorOptions &
Expand Down Expand Up @@ -130,6 +134,7 @@ function buildCommonDependencies(options: ExecutorOptions) {
stepTimeoutS: positiveOrDefault(options.stepTimeoutS, DEFAULT_STEP_TIMEOUT_S),
aiInvokeTimeoutS: positiveOrDefault(options.aiInvokeTimeoutS, DEFAULT_AI_INVOKE_TIMEOUT_S),
maxChainDepth: options.maxChainDepth,
skipMigrations: options.skipMigrations,
};
}

Expand Down Expand Up @@ -179,6 +184,10 @@ function createWorkflowExecutor(
return runner.state;
},

migrate() {
return runner.migrate();
},

async start() {
await runner.start();
await server.start();
Expand Down
27 changes: 26 additions & 1 deletion packages/workflow-executor/src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ export interface CliArgs {
inMemory: boolean;
pretty: boolean;
json: boolean;
// `migrate` subcommand: apply migrations then exit (for CI/release pipelines).
migrate: boolean;
// Boot without migrating (migrations applied out-of-band via `migrate`).
skipMigrations: boolean;
}

export interface CliConfig {
Expand All @@ -85,10 +89,18 @@ export function parseArgs(argv: string[]): CliArgs {
inMemory: false,
pretty: false,
json: false,
migrate: false,
skipMigrations: false,
};

for (const arg of argv) {
switch (arg) {
case 'migrate':
result.migrate = true;
break;
case '--skip-migrations':
result.skipMigrations = true;
break;
case '--help':
case '-h':
result.help = true;
Expand Down Expand Up @@ -184,6 +196,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH),
schemaCacheTtlS: parsePositiveIntEnv('SCHEMA_CACHE_TTL_S', env.SCHEMA_CACHE_TTL_S),
loggerLevel: parseLoggerLevelEnv(env.LOG_LEVEL) ?? DEFAULT_LOGGER_LEVEL,
skipMigrations: args.skipMigrations,
...(aiConfigurations && { aiConfigurations }),
...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }),
};
Expand All @@ -196,11 +209,16 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
}

export function printHelp(): void {
console.log(`Usage: ${BINARY_NAME} [options]
console.log(`Usage: ${BINARY_NAME} [command] [options]

Run the Forest Admin workflow executor.

Commands:
(default) Apply migrations, then run the executor (HTTP server + polling)
migrate Apply migrations, then exit (run once in CI/release; pair with --skip-migrations)

Options:
--skip-migrations Boot without migrating (migrations applied out-of-band via \`migrate\`)
--in-memory Use an in-memory run store (no DB needed, not for prod)
--pretty Force colorized human-readable logs (default when stdout is a TTY)
--json Force structured JSON logs (default when stdout is not a TTY)
Expand Down Expand Up @@ -300,6 +318,13 @@ export async function runCli(
executor = factories.buildDatabase(databaseOptions);
}

if (args.migrate) {
await executor.migrate();
logger('Info', 'Migrations applied');

return executor;
}

await executor.start();
logger('Info', 'Workflow executor ready', {
url: `http://localhost:${config.executorOptions.httpPort}`,
Expand Down
13 changes: 12 additions & 1 deletion packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ export interface RunnerConfig {
// Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the
// next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50.
maxChainDepth?: number;
// Skip running migrations on start (applied out-of-band, e.g. via the `migrate` command). The
// store must already be migrated or queries will fail.
skipMigrations?: boolean;
}

// eslint-disable-next-line @typescript-eslint/no-var-requires, import/no-dynamic-require, global-require
Expand Down Expand Up @@ -89,14 +92,22 @@ export default class Runner {
// Probe the agent first so we fail fast without opening DB connections when unreachable.
await this.config.agentPort.probe();
this.logger('Info', 'Agent probe passed', {});
await this.config.runStore.init(this.logger);

if (!this.config.skipMigrations) await this.config.runStore.init(this.logger);

this._state = 'running';

this.pushMetadataToOrchestrator();
this.schedulePoll();
}

// One-shot migration entry point for the `migrate` CLI command: applies migrations and closes the
// connection so the process can exit. No agent probe, no polling, no HTTP server.
async migrate(): Promise<void> {
await this.config.runStore.init(this.logger);
await this.config.runStore.close(this.logger);
}

async stop(): Promise<void> {
if (this._state === 'idle' || this._state === 'stopped' || this._state === 'draining') return;

Expand Down
52 changes: 50 additions & 2 deletions packages/workflow-executor/test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function makeFakeExecutor(): WorkflowExecutor {
return {
start: jest.fn().mockResolvedValue(undefined),
stop: jest.fn().mockResolvedValue(undefined),
migrate: jest.fn().mockResolvedValue(undefined),
state: 'running',
} as unknown as WorkflowExecutor;
}
Expand All @@ -66,9 +67,19 @@ describe('parseArgs', () => {
inMemory: false,
pretty: false,
json: false,
migrate: false,
skipMigrations: false,
});
});

it('parses the migrate command', () => {
expect(parseArgs(['migrate']).migrate).toBe(true);
});

it('parses --skip-migrations', () => {
expect(parseArgs(['--skip-migrations']).skipMigrations).toBe(true);
});

it('parses --help and -h', () => {
expect(parseArgs(['--help']).help).toBe(true);
expect(parseArgs(['-h']).help).toBe(true);
Expand Down Expand Up @@ -97,7 +108,15 @@ describe('parseArgs', () => {
});

describe('pickLogger', () => {
const baseArgs = { help: false, version: false, inMemory: false, pretty: false, json: false };
const baseArgs = {
help: false,
version: false,
inMemory: false,
pretty: false,
json: false,
migrate: false,
skipMigrations: false,
};
let infoSpy: jest.SpyInstance;

beforeEach(() => {
Expand Down Expand Up @@ -147,7 +166,15 @@ describe('pickLogger', () => {
});

describe('readEnvConfig', () => {
const args = { help: false, version: false, inMemory: false, pretty: false, json: false };
const args = {
help: false,
version: false,
inMemory: false,
pretty: false,
json: false,
migrate: false,
skipMigrations: false,
};

it('returns a full config when all required vars are present', () => {
const config = readEnvConfig(baseEnv, args);
Expand Down Expand Up @@ -512,6 +539,27 @@ describe('runCli', () => {
expect(executor.start).toHaveBeenCalled();
});

it('runs migrations and exits without starting on the migrate command', async () => {
const { factories, executor } = makeFactories();

await runCli(['migrate'], baseEnv, factories);

expect(executor.migrate).toHaveBeenCalled();
expect(executor.start).not.toHaveBeenCalled();
});

it('threads skipMigrations into the build options and still starts', async () => {
const { factories, executor } = makeFactories();

await runCli(['--skip-migrations'], baseEnv, factories);

expect(factories.buildDatabase).toHaveBeenCalledWith(
expect.objectContaining({ skipMigrations: true }),
);
expect(executor.start).toHaveBeenCalled();
expect(executor.migrate).not.toHaveBeenCalled();
});

it('does not log any secret during startup', async () => {
const { factories } = makeFactories();
await runCli([], baseEnv, factories);
Expand Down
21 changes: 21 additions & 0 deletions packages/workflow-executor/test/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ function createRunnerConfig(
schemaCache: SchemaCache;
stopTimeoutS: number;
maxChainDepth: number;
skipMigrations: boolean;
}> = {},
) {
return {
Expand Down Expand Up @@ -244,6 +245,26 @@ describe('start', () => {
expect(config.runStore.init).toHaveBeenCalledTimes(1);
});

it('should not call runStore.init() on start when skipMigrations is set', async () => {
const config = createRunnerConfig({ skipMigrations: true });
runner = new Runner(config);

await runner.start();

expect(config.runStore.init).not.toHaveBeenCalled();
});

it('migrate() initialises then closes the run store and does not probe the agent', async () => {
const config = createRunnerConfig();
runner = new Runner(config);

await runner.migrate();

expect(config.runStore.init).toHaveBeenCalledTimes(1);
expect(config.runStore.close).toHaveBeenCalledTimes(1);
expect(config.agentPort.probe).not.toHaveBeenCalled();
});

it('should throw ConfigurationError when envSecret is invalid', async () => {
runner = new Runner(createRunnerConfig({ envSecret: 'bad' }));

Expand Down