diff --git a/packages/agent/package.json b/packages/agent/package.json index 63324e8236..d3ac098360 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -48,6 +48,7 @@ "devDependencies": { "@fastify/express": "^1.1.0", "@forestadmin/datasource-sql": "1.17.10", + "@forestadmin/workflow-executor": "^1.9.1", "@nestjs/common": "^10.4.16", "@nestjs/core": "^10.4.16", "@nestjs/platform-express": "^10.4.16", @@ -72,11 +73,15 @@ "@paralleldrive/cuid2": "2.2.2" }, "peerDependencies": { - "@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0" + "@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0", + "@forestadmin/workflow-executor": "^1.9.1" }, "peerDependenciesMeta": { "@fastify/express": { "optional": true + }, + "@forestadmin/workflow-executor": { + "optional": true } } } diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index 3c88834f69..779d4329f8 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -1,6 +1,11 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { ForestAdminHttpDriverServices } from './services'; -import type { AgentOptions, AgentOptionsWithDefaults, HttpCallback } from './types'; +import type { + AgentOptions, + AgentOptionsWithDefaults, + HttpCallback, + WorkflowExecutorEmbedOptions, +} from './types'; import type { AiProviderDefinition } from '@forestadmin/agent-toolkit'; import type { CollectionCustomizer, @@ -21,6 +26,7 @@ import Router from '@koa/router'; import { readFile, writeFile } from 'fs/promises'; import stringify from 'json-stringify-pretty-compact'; +import EmbeddedWorkflowExecutor from './embedded-workflow-executor'; import FrameworkMounter from './framework-mounter'; import makeRoutes from './routes'; import makeServices from './services'; @@ -50,6 +56,9 @@ export default class Agent extends FrameworkMounter private mcpEnabled = false; private mcpEnabledTools?: ToolName[]; + /** In-process workflow executor, created only when addWorkflowExecutor() is called. */ + private embeddedExecutor: EmbeddedWorkflowExecutor | null = null; + private isRestarting = false; /** @@ -92,6 +101,10 @@ export default class Agent extends FrameworkMounter this.setMcpCallback(mcpHttpCallback ?? null); await this.mount(router); + + // Boot after mount(): the embedded executor reaches the agent over HTTP, and the + // standalone server's host/port (used to derive that URL) are only known once mounted. + await this.embeddedExecutor?.start(this.standaloneServerHost, this.standaloneServerPort); } catch (error) { const { message } = error as Error; this.options.logger('Error', `Forest Admin agent startup failure: ${message}`); @@ -103,6 +116,8 @@ export default class Agent extends FrameworkMounter * Stop the agent. */ override async stop(): Promise { + // Drain the embedded executor first, while the agent it depends on is still serving. + await this.embeddedExecutor?.stop(); // Close anything related to ForestAdmin client this.options.forestAdminClient.close(); // Stop at framework level @@ -286,6 +301,44 @@ export default class Agent extends FrameworkMounter return this; } + /** + * Run a workflow executor in-process, alongside the agent. The agent boots it on start(), + * stops it on stop(), and proxies `/_internal/executor/*` to it — no separate deployment. + * + * Requires the `@forestadmin/workflow-executor` package to be installed: + * ```bash + * npm install @forestadmin/workflow-executor + * ``` + * + * The executor persists run state in a database: provide `database`, or set the `DATABASE_URL` + * environment variable. Mutually exclusive with the `workflowExecutorUrl` option, which targets + * a separately-deployed executor instead. + * + * @param options embedded executor options + * @returns the agent instance for chaining + * @throws Error if called more than once, or if `workflowExecutorUrl` is also set + * + * @example + * createAgent(options) + * .addDataSource(...) + * .addWorkflowExecutor({ database: { uri: process.env.DATABASE_URL } }) + * .start(); + */ + addWorkflowExecutor(options: WorkflowExecutorEmbedOptions = {}): this { + if (this.embeddedExecutor) { + throw new Error('addWorkflowExecutor can only be called once.'); + } + + const executor = new EmbeddedWorkflowExecutor(this.options); + // Wire the existing proxy route to the loopback executor. Set here (before start() builds the + // routes) so the proxy is registered with the right target. Assign the field only after + // configure() succeeds, so a conflict error leaves no half-initialized executor. + (this.options as AgentOptions).workflowExecutorUrl = executor.configure(options); + this.embeddedExecutor = executor; + + return this; + } + protected getRoutes(dataSource: DataSource, services: ForestAdminHttpDriverServices) { // init() is called on every start/restart to recreate routing state with a fresh Router. const aiRouter = this.aiProvider?.init(this.options.logger) ?? null; diff --git a/packages/agent/src/embedded-workflow-executor.ts b/packages/agent/src/embedded-workflow-executor.ts new file mode 100644 index 0000000000..6a7f362a39 --- /dev/null +++ b/packages/agent/src/embedded-workflow-executor.ts @@ -0,0 +1,156 @@ +import type { AgentOptionsWithDefaults, WorkflowExecutorEmbedOptions } from './types'; +import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; + +import path from 'path'; + +/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ +const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; + +/** + * Serialize the executor's structured log context onto the message. The agent's logger only + * accepts an Error as its third argument, so the executor's rich context (runId, stepId, error, + * stack…) would be dropped otherwise. Never throws — logging must not crash the executor. + */ +function formatLog(message: string, context?: Record): string { + if (!context || Object.keys(context).length === 0) return `[workflow-executor] ${message}`; + + try { + return `[workflow-executor] ${message} ${JSON.stringify(context)}`; + } catch { + return `[workflow-executor] ${message} [unserializable context]`; + } +} + +/** + * Turn the standalone server's bind host into a host the embedded executor can actually connect + * to. Wildcard/unspecified binds (undefined, `0.0.0.0`, `::`) are not valid connect targets, so + * fall back to loopback. IPv6 literals must be bracketed to sit in a URL authority. + */ +function toConnectableHost(host?: string): string { + if (!host || host === '0.0.0.0' || host === '::') return '127.0.0.1'; + + return host.includes(':') ? `[${host}]` : host; +} + +/** + * Owns the lifecycle of a workflow executor embedded in the agent process: configuration, + * dynamic loading of the optional package, build, and start/stop. The agent only wires the proxy + * route to the loopback URL `configure()` returns and delegates start/stop. + */ +export default class EmbeddedWorkflowExecutor { + private config: (WorkflowExecutorEmbedOptions & { port: number }) | null = null; + private executor: WorkflowExecutor | null = null; + + constructor(private readonly options: AgentOptionsWithDefaults) {} + + /** + * Register the embedded executor and return the loopback URL the agent must proxy + * `/_internal/executor/*` to. Throws when the remote `workflowExecutorUrl` option is also set. + */ + configure(embedOptions: WorkflowExecutorEmbedOptions): string { + if (this.options.workflowExecutorUrl) { + throw new Error( + 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' + + 'embeds an executor in-process, the latter targets a remote one. Choose one.', + ); + } + + const port = + embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); + this.config = { ...embedOptions, port }; + + return `http://127.0.0.1:${port}`; + } + + /** + * Build and start the executor. Called from agent.start() after mount(), so the standalone + * server's host/port (used to derive the agent URL) are already known. + */ + async start( + standaloneServerHost: string | undefined, + standaloneServerPort: number | undefined, + ): Promise { + const { config } = this; + if (!config) return; + + const agentUrl = + config.agentUrl ?? this.deriveAgentUrl(standaloneServerHost, standaloneServerPort); + + if (!agentUrl) { + throw new Error( + 'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' + + 'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' + + 'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().', + ); + } + + const database = + config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); + + if (!database) { + throw new Error( + 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + + 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', + ); + } + + const { buildDatabaseExecutor } = await this.importPackage(); + + this.executor = buildDatabaseExecutor({ + envSecret: this.options.envSecret, + authSecret: this.options.authSecret, + forestServerUrl: this.options.forestServerUrl, + agentUrl, + httpPort: config.port, + pollingIntervalS: config.pollingIntervalS, + stepTimeoutS: config.stepTimeoutS, + logger: (level, message, context) => this.options.logger(level, formatLog(message, context)), + // Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop() + // drains the executor explicitly. + manageProcessSignals: false, + database: database as Parameters[0]['database'], + }); + + await this.executor.start(); + this.options.logger( + 'Info', + `Embedded workflow executor started (loopback port ${config.port})`, + ); + } + + async stop(): Promise { + await this.executor?.stop(); + } + + /** + * Derive the URL the embedded executor uses to reach this agent over HTTP. Only possible on a + * standalone server, where the agent owns the listening host/port. + */ + private deriveAgentUrl( + standaloneServerHost: string | undefined, + standaloneServerPort: number | undefined, + ): string | null { + if (!standaloneServerPort) return null; + + // agent-client appends `/forest`, so the URL stops at the configured prefix. + const prefixPath = path.posix.join('/', this.options.prefix); + const base = `http://${toConnectableHost(standaloneServerHost)}:${standaloneServerPort}`; + + return prefixPath === '/' ? base : `${base}${prefixPath}`; + } + + /** + * Dynamically load the optional @forestadmin/workflow-executor package. + * Deferred so agents that don't embed an executor never load its code at startup. + */ + private async importPackage() { + try { + return await import('@forestadmin/workflow-executor'); + } catch (error) { + throw new Error( + 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + + 'Install it with `npm install @forestadmin/workflow-executor`.', + ); + } + } +} diff --git a/packages/agent/src/framework-mounter.ts b/packages/agent/src/framework-mounter.ts index a9eafece2c..412e53b804 100644 --- a/packages/agent/src/framework-mounter.ts +++ b/packages/agent/src/framework-mounter.ts @@ -13,6 +13,7 @@ import McpMiddleware from './mcp-middleware'; export default class FrameworkMounter { public standaloneServerPort: number; + public standaloneServerHost?: string; private readonly onFirstStart: (() => Promise)[] = []; private readonly onEachStart: ((router: Router) => Promise)[] = []; @@ -67,6 +68,7 @@ export default class FrameworkMounter { mountOnStandaloneServer(port?: number, host?: string): this { const portFromEnv = Number(process.env.PORT) || undefined; const chosenPort = port ?? portFromEnv ?? 3351; + this.standaloneServerHost = host; const server = createServer(this.getConnectCallback(true)); this.onFirstStart.push(() => { diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 49ade7e5dd..ff8c25fdc8 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -8,7 +8,7 @@ export function createAgent(options: AgentOptions): } export { Agent }; -export { AgentOptions } from './types'; +export { AgentOptions, WorkflowExecutorEmbedOptions } from './types'; export * from '@forestadmin/datasource-customizer'; // export is necessary for the agent-generator package diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index c83eb39d4d..37faa57585 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -49,10 +49,42 @@ export type AgentOptions = { * Base URL of the workflow executor to proxy requests to. * When set, the agent forwards `/_internal/executor/*` to the executor verbatim, * benefiting from the agent's authentication layer. + * + * Mutually exclusive with `agent.addWorkflowExecutor()`: use this option to target a + * separately-deployed executor, or `addWorkflowExecutor()` to run one in-process. * @example 'http://localhost:4001' */ workflowExecutorUrl?: string | null; }; + +/** + * Options for an embedded workflow executor, started in the same process as the agent + * through `agent.addWorkflowExecutor()`. + */ +export type WorkflowExecutorEmbedOptions = { + /** + * Database connection used to persist workflow run state. Accepts a connection URI or a + * Sequelize options object. Falls back to the `DATABASE_URL` environment variable when omitted. + * The agent throws at startup if neither is provided. + */ + database?: { uri?: string; [option: string]: unknown }; + /** + * URL the executor uses to reach this agent's data layer. + * Auto-derived when the agent runs on its own server (`mountOnStandaloneServer`). + * Required when the agent is mounted on an external framework (Express/Fastify/NestJS), + * since the agent cannot know the host application's address. + */ + agentUrl?: string; + /** + * Loopback port the embedded executor listens on; the agent proxies to it internally. + * Defaults to the `HTTP_PORT` environment variable, or `3400`. + */ + port?: number; + /** Interval in seconds at which the executor polls the orchestrator for pending steps. */ + pollingIntervalS?: number; + /** Per-step execution timeout in seconds. */ + stepTimeoutS?: number; +}; export type AgentOptionsWithDefaults = Readonly>; export type HttpCallback = (req: IncomingMessage, res: ServerResponse, next?: () => void) => void; diff --git a/packages/agent/test/agent-workflow-executor-missing-dep.test.ts b/packages/agent/test/agent-workflow-executor-missing-dep.test.ts new file mode 100644 index 0000000000..748281ad13 --- /dev/null +++ b/packages/agent/test/agent-workflow-executor-missing-dep.test.ts @@ -0,0 +1,45 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/** + * Covers the optional-dependency failure path: when @forestadmin/workflow-executor is not + * installed, the dynamic import() rejects and the agent surfaces an actionable error. + */ +import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; + +import * as factories from './__factories__'; +import Agent from '../src/agent'; + +const mockMakeRoutes = jest.fn(); +jest.mock('../src/routes', () => ({ + __esModule: true, + default: (...args) => mockMakeRoutes(...args), +})); +jest.mock('@forestadmin/datasource-customizer'); + +// Simulate the package being absent: importing it throws, like a MODULE_NOT_FOUND at runtime. +jest.mock('@forestadmin/workflow-executor', () => { + throw new Error("Cannot find module '@forestadmin/workflow-executor'"); +}); + +beforeEach(() => { + jest.clearAllMocks(); + mockMakeRoutes.mockReturnValue([{ setupRoutes: jest.fn(), bootstrap: jest.fn() }]); + jest + .mocked(DataSourceCustomizer.prototype.getDataSource) + .mockResolvedValue(factories.dataSource.build()); +}); + +describe('Agent.addWorkflowExecutor (optional dependency missing)', () => { + test('throws an actionable error when @forestadmin/workflow-executor cannot be imported', async () => { + const agent = new Agent( + factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true }), + ); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await expect(agent.start()).rejects.toThrow( + 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package', + ); + }); +}); diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts new file mode 100644 index 0000000000..b660bafc04 --- /dev/null +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -0,0 +1,284 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; + +import * as factories from './__factories__'; +import Agent from '../src/agent'; + +// Mock routes so start() doesn't need a real datasource HTTP stack. +const mockSetupRoute = jest.fn(); +const mockBootstrap = jest.fn(); +const mockMakeRoutes = jest.fn(); + +jest.mock('../src/routes', () => ({ + __esModule: true, + default: (...args) => mockMakeRoutes(...args), +})); + +jest.mock('@forestadmin/datasource-customizer'); + +// Mock the optional executor package: start()/stop() are observable, no real DB or HTTP server. +const mockExecutorStart = jest.fn(); +const mockExecutorStop = jest.fn(); +const mockBuildDatabaseExecutor = jest.fn(); + +jest.mock('@forestadmin/workflow-executor', () => ({ + __esModule: true, + buildDatabaseExecutor: (options: unknown) => mockBuildDatabaseExecutor(options), +})); + +beforeEach(() => { + jest.clearAllMocks(); + delete process.env.DATABASE_URL; + delete process.env.HTTP_PORT; + + mockMakeRoutes.mockReturnValue([{ setupRoutes: mockSetupRoute, bootstrap: mockBootstrap }]); + mockBuildDatabaseExecutor.mockReturnValue({ + start: mockExecutorStart, + stop: mockExecutorStop, + state: 'idle', + }); + jest + .mocked(DataSourceCustomizer.prototype.getDataSource) + .mockResolvedValue(factories.dataSource.build()); +}); + +describe('Agent.addWorkflowExecutor', () => { + const buildOptions = (overrides: Record = {}) => + factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true, ...overrides }); + + describe('configuration (synchronous)', () => { + test('wires the proxy to the default loopback port 3400', () => { + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor(); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:3400'); + }); + + test('honors an explicit port', () => { + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor({ port: 5005 }); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:5005'); + }); + + test('falls back to the HTTP_PORT environment variable', () => { + process.env.HTTP_PORT = '4567'; + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor(); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:4567'); + }); + + test('returns the agent for chaining', () => { + const agent = new Agent(buildOptions()); + + expect(agent.addWorkflowExecutor()).toBe(agent); + }); + + test('throws when called more than once', () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor(); + + expect(() => agent.addWorkflowExecutor()).toThrow( + 'addWorkflowExecutor can only be called once.', + ); + }); + + test('throws when the workflowExecutorUrl option is already set (remote executor)', () => { + const agent = new Agent(buildOptions({ workflowExecutorUrl: 'http://remote:4001' })); + + expect(() => agent.addWorkflowExecutor()).toThrow( + 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option', + ); + }); + }); + + describe('boot on start()', () => { + test('builds the executor with the agent secrets, provided agentUrl, port and database', async () => { + const options = buildOptions(); + const agent = new Agent(options); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent/prefix', + database: { uri: 'postgres://localhost/db' }, + port: 4400, + }); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + envSecret: options.envSecret, + authSecret: options.authSecret, + forestServerUrl: options.forestServerUrl, + agentUrl: 'http://my-agent/prefix', + httpPort: 4400, + database: { uri: 'postgres://localhost/db' }, + // Embedded: the host process keeps control of SIGTERM/SIGINT and its own exit. + manageProcessSignals: false, + }), + ); + expect(mockExecutorStart).toHaveBeenCalledTimes(1); + }); + + test('fails the agent start when the embedded executor fails to start', async () => { + // No graceful degradation: a failing executor (e.g. failed agent probe / unreachable DB) + // must crash agent.start() rather than leave the agent serving a dead executor. + mockExecutorStart.mockRejectedValueOnce(new Error('agent probe failed')); + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await expect(agent.start()).rejects.toThrow('agent probe failed'); + }); + + test('forwards executor logs through the agent logger with a prefix', async () => { + const logger = jest.fn(); + mockBuildDatabaseExecutor.mockImplementation((opts: any) => { + opts.logger('Warn', 'something happened'); + + return { start: mockExecutorStart, stop: mockExecutorStop, state: 'idle' }; + }); + const agent = new Agent(buildOptions({ logger })); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await agent.start(); + + expect(logger).toHaveBeenCalledWith('Warn', '[workflow-executor] something happened'); + }); + + test('forwards the executor structured log context onto the message', async () => { + const logger = jest.fn(); + mockBuildDatabaseExecutor.mockImplementation((opts: any) => { + opts.logger('Error', 'step failed', { runId: 'run-1', error: 'boom' }); + + return { start: mockExecutorStart, stop: mockExecutorStop, state: 'idle' }; + }); + const agent = new Agent(buildOptions({ logger })); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await agent.start(); + + // The agent logger only takes an Error as 3rd arg, so the context must survive in the + // message rather than being dropped. + expect(logger).toHaveBeenCalledWith( + 'Error', + '[workflow-executor] step failed {"runId":"run-1","error":"boom"}', + ); + }); + + test('falls back to the DATABASE_URL environment variable when no database is provided', async () => { + process.env.DATABASE_URL = 'postgres://env-host/env-db'; + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ database: { uri: 'postgres://env-host/env-db' } }), + ); + }); + + test('derives the agentUrl from the standalone server port and prefix', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + agent.mountOnStandaloneServer(0); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://127.0.0.1:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + + test('derives the agentUrl using the standalone server host when one is provided', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + // A specific bind host must be used verbatim, not replaced by a hard-coded 127.0.0.1. + agent.mountOnStandaloneServer(0, 'localhost'); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://localhost:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + + test('maps a wildcard standalone host to loopback for the agentUrl', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + // 0.0.0.0 is a bind wildcard, not a connectable address → the executor must use loopback. + agent.mountOnStandaloneServer(0, '0.0.0.0'); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://127.0.0.1:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + + test('throws when no database is configured', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); + + await expect(agent.start()).rejects.toThrow( + 'Embedded workflow executor requires a database to persist run state', + ); + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + + test('throws when the agentUrl cannot be derived (not standalone) and is not provided', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + + await expect(agent.start()).rejects.toThrow('unable to derive the agent URL'); + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + + test('does not build an executor when addWorkflowExecutor was not called', async () => { + const agent = new Agent(buildOptions()); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + }); + + describe('stop()', () => { + test('stops the embedded executor', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + await agent.start(); + + await agent.stop(); + + expect(mockExecutorStop).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 1d2aa90163..dcf373d5e2 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -54,6 +54,11 @@ export interface ExecutorOptions { schemaCacheTtlS?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. forceAiError?: boolean; + // When true (default), the executor installs its own SIGTERM/SIGINT handlers that drain and + // call process.exit() — correct for the standalone CLI which owns the process. Set false when + // embedding in a host process (e.g. @forestadmin/agent): the executor must never touch the + // host's signals or exit it; the host drives shutdown by calling stop(). + manageProcessSignals?: boolean; } export type DatabaseExecutorOptions = ExecutorOptions & @@ -137,6 +142,7 @@ function createWorkflowExecutor( runner: Runner, server: ExecutorHttpServer, logger: Logger, + manageProcessSignals: boolean, ): WorkflowExecutor { let shutdownPromise: Promise | null = null; @@ -183,13 +189,19 @@ function createWorkflowExecutor( await runner.start(); await server.start(); - process.on('SIGTERM', onSignal); - process.on('SIGINT', onSignal); + // Only own the host's signals when explicitly allowed (the standalone CLI). When embedded, + // the host process must keep control of SIGTERM/SIGINT and its own exit. + if (manageProcessSignals) { + process.on('SIGTERM', onSignal); + process.on('SIGINT', onSignal); + } }, async stop() { - process.removeListener('SIGTERM', onSignal); - process.removeListener('SIGINT', onSignal); + if (manageProcessSignals) { + process.removeListener('SIGTERM', onSignal); + process.removeListener('SIGINT', onSignal); + } if (!shutdownPromise) shutdownPromise = shutdown(); await shutdownPromise; @@ -213,7 +225,7 @@ export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecuto logger: deps.logger, }); - return createWorkflowExecutor(runner, server, deps.logger); + return createWorkflowExecutor(runner, server, deps.logger, options.manageProcessSignals ?? true); } export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor { @@ -241,5 +253,5 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo logger: deps.logger, }); - return createWorkflowExecutor(runner, server, deps.logger); + return createWorkflowExecutor(runner, server, deps.logger, options.manageProcessSignals ?? true); } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index c9e1131af1..8d95d92e83 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -389,6 +389,24 @@ describe('WorkflowExecutor lifecycle', () => { expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); }); + it('does not register signal handlers when manageProcessSignals is false', async () => { + const exec = buildInMemoryExecutor({ ...BASE_OPTIONS, manageProcessSignals: false }); + await exec.start(); + + // Embedded in a host process: the executor must never touch the host's signals. + expect(onSpy).not.toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(onSpy).not.toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('still drains the runner on stop() when manageProcessSignals is false', async () => { + const exec = buildInMemoryExecutor({ ...BASE_OPTIONS, manageProcessSignals: false }); + await exec.start(); + await exec.stop(); + + expect(removeSpy).not.toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + it('stop() calls runner.stop', async () => { await executor.start(); await executor.stop();