From 317e26209468fd3ab5fc25ed98aa3b642c4130d0 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Fri, 26 Jun 2026 20:22:26 +0200 Subject: [PATCH 1/7] feat(agent): embed workflow executor in-process via addWorkflowExecutor() Today the agent and the workflow executor ship as two separate packages so the executor stays compatible with any agent version. For Node agents that want a turnkey setup, this adds an opt-in way to run the executor in the same process, with no separate deployment. `createAgent(options).addWorkflowExecutor({ database })` boots the executor on start() and drains it on stop(). It reuses the existing `/_internal/executor/*` proxy by wiring `workflowExecutorUrl` to a loopback port, so the agent's auth layer and the raw-bytes passthrough apply unchanged. - The executor port is known up-front (option, HTTP_PORT env, or 3400) since the proxy route is built before the listener exists. - `agentUrl` (used by the executor to reach the agent over HTTP) is derived from the standalone server port + prefix; an explicit `agentUrl` is required when the agent is mounted on Express/Fastify/NestJS. - The run store is database-backed: `database` option, else DATABASE_URL, else a clear startup error (no silent in-memory fallback that would lose runs). - Mutually exclusive with the `workflowExecutorUrl` option (remote executor). - `@forestadmin/workflow-executor` is an optionalDependency, loaded via a guarded dynamic import; agents that don't embed it pull none of its deps. Co-Authored-By: Claude Opus 4.8 --- packages/agent/package.json | 3 + packages/agent/src/agent.ts | 145 ++++++++++++- packages/agent/src/index.ts | 2 +- packages/agent/src/types.ts | 32 +++ .../test/agent-workflow-executor.test.ts | 194 ++++++++++++++++++ 5 files changed, 374 insertions(+), 2 deletions(-) create mode 100644 packages/agent/test/agent-workflow-executor.test.ts diff --git a/packages/agent/package.json b/packages/agent/package.json index 63324e8236..ac721711b6 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -34,6 +34,9 @@ "superagent": "^10.3.0", "uuid": "11.1.1" }, + "optionalDependencies": { + "@forestadmin/workflow-executor": "1.9.1" + }, "files": [ "dist/**/*.js", "dist/**/*.d.ts" diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index 3c88834f69..825c781f68 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -1,6 +1,11 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { ForestAdminHttpDriverServices } from './services'; -import type { AgentOptions, AgentOptionsWithDefaults, HttpCallback } from './types'; +import type { + AgentOptions, + AgentOptionsWithDefaults, + HttpCallback, + WorkflowExecutorEmbedOptions, +} from './types'; import type { AiProviderDefinition } from '@forestadmin/agent-toolkit'; import type { CollectionCustomizer, @@ -13,6 +18,7 @@ import type { import type { DataSource, DataSourceFactory } from '@forestadmin/datasource-toolkit'; import type { ForestSchema } from '@forestadmin/forestadmin-client'; import type { ToolName } from '@forestadmin/mcp-server'; +import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; import bodyParser from '@koa/bodyparser'; @@ -20,6 +26,7 @@ import cors from '@koa/cors'; import Router from '@koa/router'; import { readFile, writeFile } from 'fs/promises'; import stringify from 'json-stringify-pretty-compact'; +import path from 'path'; import FrameworkMounter from './framework-mounter'; import makeRoutes from './routes'; @@ -28,6 +35,9 @@ import CustomizationService from './services/model-customizations/customization' import SchemaGenerator from './utils/forest-schema/generator'; import OptionsValidator from './utils/options-validator'; +/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ +const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; + /** * Allow to create a new Forest Admin agent from scratch. * Builds the application by composing and configuring all the collection decorators. @@ -50,6 +60,11 @@ export default class Agent extends FrameworkMounter private mcpEnabled = false; private mcpEnabledTools?: ToolName[]; + /** Embedded workflow executor configuration, set by addWorkflowExecutor() (null when disabled). */ + private embeddedExecutorConfig: (WorkflowExecutorEmbedOptions & { port: number }) | null = null; + /** Running embedded workflow executor instance, created on start(). */ + private embeddedExecutor: WorkflowExecutor | null = null; + private isRestarting = false; /** @@ -92,6 +107,10 @@ export default class Agent extends FrameworkMounter this.setMcpCallback(mcpHttpCallback ?? null); await this.mount(router); + + // Boot after mount(): the embedded executor reaches the agent over HTTP, and the + // standalone server's port (used to derive that URL) is only known once mounted. + if (this.embeddedExecutorConfig) await this.startEmbeddedExecutor(); } catch (error) { const { message } = error as Error; this.options.logger('Error', `Forest Admin agent startup failure: ${message}`); @@ -103,6 +122,8 @@ export default class Agent extends FrameworkMounter * Stop the agent. */ override async stop(): Promise { + // Drain the embedded executor first, while the agent it depends on is still serving. + await this.embeddedExecutor?.stop(); // Close anything related to ForestAdmin client this.options.forestAdminClient.close(); // Stop at framework level @@ -134,6 +155,83 @@ export default class Agent extends FrameworkMounter } } + /** + * Build and start the embedded workflow executor. Called from start() after mount(). + */ + private async startEmbeddedExecutor(): Promise { + const config = this.embeddedExecutorConfig; + if (!config) return; + + const agentUrl = config.agentUrl ?? this.deriveEmbeddedExecutorAgentUrl(); + + if (!agentUrl) { + throw new Error( + 'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' + + 'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' + + 'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().', + ); + } + + const database = + config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); + + if (!database) { + throw new Error( + 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + + 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', + ); + } + + const { buildDatabaseExecutor } = await this.importWorkflowExecutor(); + + this.embeddedExecutor = buildDatabaseExecutor({ + envSecret: this.options.envSecret, + authSecret: this.options.authSecret, + forestServerUrl: this.options.forestServerUrl, + agentUrl, + httpPort: config.port, + pollingIntervalS: config.pollingIntervalS, + stepTimeoutS: config.stepTimeoutS, + logger: (level, message) => this.options.logger(level, `[workflow-executor] ${message}`), + database: database as Parameters[0]['database'], + }); + + await this.embeddedExecutor.start(); + this.options.logger( + 'Info', + `Embedded workflow executor started (loopback port ${config.port})`, + ); + } + + /** + * Derive the URL the embedded executor uses to reach this agent over HTTP. + * Only possible on a standalone server, where the agent owns the listening port. + */ + private deriveEmbeddedExecutorAgentUrl(): string | null { + if (!this.standaloneServerPort) return null; + + // agent-client appends `/forest`, so the URL stops at the configured prefix. + const prefixPath = path.posix.join('/', this.options.prefix); + const base = `http://127.0.0.1:${this.standaloneServerPort}`; + + return prefixPath === '/' ? base : `${base}${prefixPath}`; + } + + /** + * Dynamically load the optional @forestadmin/workflow-executor package. + * Deferred so agents that don't embed an executor never pull its dependencies. + */ + private async importWorkflowExecutor() { + try { + return await import('@forestadmin/workflow-executor'); + } catch (error) { + throw new Error( + 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + + 'Install it with `npm install @forestadmin/workflow-executor`.', + ); + } + } + /** * Add a datasource * @param factory the datasource to add @@ -286,6 +384,51 @@ export default class Agent extends FrameworkMounter return this; } + /** + * Run a workflow executor in-process, alongside the agent. The agent boots it on start(), + * stops it on stop(), and proxies `/_internal/executor/*` to it — no separate deployment. + * + * Requires the `@forestadmin/workflow-executor` package to be installed: + * ```bash + * npm install @forestadmin/workflow-executor + * ``` + * + * The executor persists run state in a database: provide `database`, or set the `DATABASE_URL` + * environment variable. Mutually exclusive with the `workflowExecutorUrl` option, which targets + * a separately-deployed executor instead. + * + * @param options embedded executor options + * @returns the agent instance for chaining + * @throws Error if called more than once, or if `workflowExecutorUrl` is also set + * + * @example + * createAgent(options) + * .addDataSource(...) + * .addWorkflowExecutor({ database: { uri: process.env.DATABASE_URL } }) + * .start(); + */ + addWorkflowExecutor(options: WorkflowExecutorEmbedOptions = {}): this { + if (this.embeddedExecutorConfig) { + throw new Error('addWorkflowExecutor can only be called once.'); + } + + if (this.options.workflowExecutorUrl) { + throw new Error( + 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' + + 'embeds an executor in-process, the latter targets a remote one. Choose one.', + ); + } + + const port = options.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); + this.embeddedExecutorConfig = { ...options, port }; + + // Wire the existing proxy route to the loopback executor. Set here (before start() builds the + // routes) so the proxy is registered with the right target. + (this.options as AgentOptions).workflowExecutorUrl = `http://127.0.0.1:${port}`; + + return this; + } + protected getRoutes(dataSource: DataSource, services: ForestAdminHttpDriverServices) { // init() is called on every start/restart to recreate routing state with a fresh Router. const aiRouter = this.aiProvider?.init(this.options.logger) ?? null; diff --git a/packages/agent/src/index.ts b/packages/agent/src/index.ts index 49ade7e5dd..ff8c25fdc8 100644 --- a/packages/agent/src/index.ts +++ b/packages/agent/src/index.ts @@ -8,7 +8,7 @@ export function createAgent(options: AgentOptions): } export { Agent }; -export { AgentOptions } from './types'; +export { AgentOptions, WorkflowExecutorEmbedOptions } from './types'; export * from '@forestadmin/datasource-customizer'; // export is necessary for the agent-generator package diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index c83eb39d4d..37faa57585 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -49,10 +49,42 @@ export type AgentOptions = { * Base URL of the workflow executor to proxy requests to. * When set, the agent forwards `/_internal/executor/*` to the executor verbatim, * benefiting from the agent's authentication layer. + * + * Mutually exclusive with `agent.addWorkflowExecutor()`: use this option to target a + * separately-deployed executor, or `addWorkflowExecutor()` to run one in-process. * @example 'http://localhost:4001' */ workflowExecutorUrl?: string | null; }; + +/** + * Options for an embedded workflow executor, started in the same process as the agent + * through `agent.addWorkflowExecutor()`. + */ +export type WorkflowExecutorEmbedOptions = { + /** + * Database connection used to persist workflow run state. Accepts a connection URI or a + * Sequelize options object. Falls back to the `DATABASE_URL` environment variable when omitted. + * The agent throws at startup if neither is provided. + */ + database?: { uri?: string; [option: string]: unknown }; + /** + * URL the executor uses to reach this agent's data layer. + * Auto-derived when the agent runs on its own server (`mountOnStandaloneServer`). + * Required when the agent is mounted on an external framework (Express/Fastify/NestJS), + * since the agent cannot know the host application's address. + */ + agentUrl?: string; + /** + * Loopback port the embedded executor listens on; the agent proxies to it internally. + * Defaults to the `HTTP_PORT` environment variable, or `3400`. + */ + port?: number; + /** Interval in seconds at which the executor polls the orchestrator for pending steps. */ + pollingIntervalS?: number; + /** Per-step execution timeout in seconds. */ + stepTimeoutS?: number; +}; export type AgentOptionsWithDefaults = Readonly>; export type HttpCallback = (req: IncomingMessage, res: ServerResponse, next?: () => void) => void; diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts new file mode 100644 index 0000000000..f4a1eafb71 --- /dev/null +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -0,0 +1,194 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; + +import * as factories from './__factories__'; +import Agent from '../src/agent'; + +// Mock routes so start() doesn't need a real datasource HTTP stack. +const mockSetupRoute = jest.fn(); +const mockBootstrap = jest.fn(); +const mockMakeRoutes = jest.fn(); + +jest.mock('../src/routes', () => ({ + __esModule: true, + default: (...args) => mockMakeRoutes(...args), +})); + +jest.mock('@forestadmin/datasource-customizer'); + +// Mock the optional executor package: start()/stop() are observable, no real DB or HTTP server. +const mockExecutorStart = jest.fn(); +const mockExecutorStop = jest.fn(); +const mockBuildDatabaseExecutor = jest.fn(); + +jest.mock('@forestadmin/workflow-executor', () => ({ + __esModule: true, + buildDatabaseExecutor: (options: unknown) => mockBuildDatabaseExecutor(options), +})); + +beforeEach(() => { + jest.clearAllMocks(); + delete process.env.DATABASE_URL; + delete process.env.HTTP_PORT; + + mockMakeRoutes.mockReturnValue([{ setupRoutes: mockSetupRoute, bootstrap: mockBootstrap }]); + mockBuildDatabaseExecutor.mockReturnValue({ + start: mockExecutorStart, + stop: mockExecutorStop, + state: 'idle', + }); + jest + .mocked(DataSourceCustomizer.prototype.getDataSource) + .mockResolvedValue(factories.dataSource.build()); +}); + +describe('Agent.addWorkflowExecutor', () => { + const buildOptions = (overrides: Record = {}) => + factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true, ...overrides }); + + describe('configuration (synchronous)', () => { + test('wires the proxy to the default loopback port 3400', () => { + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor(); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:3400'); + }); + + test('honors an explicit port', () => { + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor({ port: 5005 }); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:5005'); + }); + + test('falls back to the HTTP_PORT environment variable', () => { + process.env.HTTP_PORT = '4567'; + const agent = new Agent(buildOptions()); + + agent.addWorkflowExecutor(); + + expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:4567'); + }); + + test('returns the agent for chaining', () => { + const agent = new Agent(buildOptions()); + + expect(agent.addWorkflowExecutor()).toBe(agent); + }); + + test('throws when called more than once', () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor(); + + expect(() => agent.addWorkflowExecutor()).toThrow( + 'addWorkflowExecutor can only be called once.', + ); + }); + + test('throws when the workflowExecutorUrl option is already set (remote executor)', () => { + const agent = new Agent(buildOptions({ workflowExecutorUrl: 'http://remote:4001' })); + + expect(() => agent.addWorkflowExecutor()).toThrow( + 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option', + ); + }); + }); + + describe('boot on start()', () => { + test('builds the executor with the agent secrets, provided agentUrl, port and database', async () => { + const options = buildOptions(); + const agent = new Agent(options); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent/prefix', + database: { uri: 'postgres://localhost/db' }, + port: 4400, + }); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + envSecret: options.envSecret, + authSecret: options.authSecret, + forestServerUrl: options.forestServerUrl, + agentUrl: 'http://my-agent/prefix', + httpPort: 4400, + database: { uri: 'postgres://localhost/db' }, + }), + ); + expect(mockExecutorStart).toHaveBeenCalledTimes(1); + }); + + test('falls back to the DATABASE_URL environment variable when no database is provided', async () => { + process.env.DATABASE_URL = 'postgres://env-host/env-db'; + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ database: { uri: 'postgres://env-host/env-db' } }), + ); + }); + + test('derives the agentUrl from the standalone server port and prefix', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + agent.mountOnStandaloneServer(0); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://127.0.0.1:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + + test('throws when no database is configured', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); + + await expect(agent.start()).rejects.toThrow( + 'Embedded workflow executor requires a database to persist run state', + ); + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + + test('throws when the agentUrl cannot be derived (not standalone) and is not provided', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + + await expect(agent.start()).rejects.toThrow('unable to derive the agent URL'); + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + + test('does not build an executor when addWorkflowExecutor was not called', async () => { + const agent = new Agent(buildOptions()); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + }); + }); + + describe('stop()', () => { + test('stops the embedded executor', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + await agent.start(); + + await agent.stop(); + + expect(mockExecutorStop).toHaveBeenCalledTimes(1); + }); + }); +}); From ab4e36f6600db9ba68fd5d81034da545ed6e5329 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Mon, 29 Jun 2026 10:05:21 +0200 Subject: [PATCH 2/7] test(agent): cover embedded executor logger forwarding and missing-package path Adds coverage for the two diff lines qlty flagged on the embed feature: - the logger callback forwarded to buildDatabaseExecutor (prefix wrapping) - the dynamic import() failure path when @forestadmin/workflow-executor is not installed (new dedicated test file mocking the import to throw) Co-Authored-By: Claude Opus 4.8 --- ...gent-workflow-executor-missing-dep.test.ts | 45 +++++++++++++++++++ .../test/agent-workflow-executor.test.ts | 18 ++++++++ 2 files changed, 63 insertions(+) create mode 100644 packages/agent/test/agent-workflow-executor-missing-dep.test.ts diff --git a/packages/agent/test/agent-workflow-executor-missing-dep.test.ts b/packages/agent/test/agent-workflow-executor-missing-dep.test.ts new file mode 100644 index 0000000000..748281ad13 --- /dev/null +++ b/packages/agent/test/agent-workflow-executor-missing-dep.test.ts @@ -0,0 +1,45 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/** + * Covers the optional-dependency failure path: when @forestadmin/workflow-executor is not + * installed, the dynamic import() rejects and the agent surfaces an actionable error. + */ +import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; + +import * as factories from './__factories__'; +import Agent from '../src/agent'; + +const mockMakeRoutes = jest.fn(); +jest.mock('../src/routes', () => ({ + __esModule: true, + default: (...args) => mockMakeRoutes(...args), +})); +jest.mock('@forestadmin/datasource-customizer'); + +// Simulate the package being absent: importing it throws, like a MODULE_NOT_FOUND at runtime. +jest.mock('@forestadmin/workflow-executor', () => { + throw new Error("Cannot find module '@forestadmin/workflow-executor'"); +}); + +beforeEach(() => { + jest.clearAllMocks(); + mockMakeRoutes.mockReturnValue([{ setupRoutes: jest.fn(), bootstrap: jest.fn() }]); + jest + .mocked(DataSourceCustomizer.prototype.getDataSource) + .mockResolvedValue(factories.dataSource.build()); +}); + +describe('Agent.addWorkflowExecutor (optional dependency missing)', () => { + test('throws an actionable error when @forestadmin/workflow-executor cannot be imported', async () => { + const agent = new Agent( + factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true }), + ); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await expect(agent.start()).rejects.toThrow( + 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package', + ); + }); +}); diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts index f4a1eafb71..d367ecabe7 100644 --- a/packages/agent/test/agent-workflow-executor.test.ts +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -122,6 +122,24 @@ describe('Agent.addWorkflowExecutor', () => { expect(mockExecutorStart).toHaveBeenCalledTimes(1); }); + test('forwards executor logs through the agent logger with a prefix', async () => { + const logger = jest.fn(); + mockBuildDatabaseExecutor.mockImplementation((opts: any) => { + opts.logger('Warn', 'something happened'); + + return { start: mockExecutorStart, stop: mockExecutorStop, state: 'idle' }; + }); + const agent = new Agent(buildOptions({ logger })); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await agent.start(); + + expect(logger).toHaveBeenCalledWith('Warn', '[workflow-executor] something happened'); + }); + test('falls back to the DATABASE_URL environment variable when no database is provided', async () => { process.env.DATABASE_URL = 'postgres://env-host/env-db'; const agent = new Agent(buildOptions()); From 8b9852724c3c087fbdce84eb8e08620337e70574 Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Wed, 1 Jul 2026 12:08:28 +0200 Subject: [PATCH 3/7] chore: update code to handle properly sigterm signal --- packages/agent/src/agent.ts | 110 ++------------ .../agent/src/embedded-workflow-executor.ts | 138 ++++++++++++++++++ .../test/agent-workflow-executor.test.ts | 38 +++++ .../src/build-workflow-executor.ts | 24 ++- .../test/build-workflow-executor.test.ts | 18 +++ 5 files changed, 222 insertions(+), 106 deletions(-) create mode 100644 packages/agent/src/embedded-workflow-executor.ts diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index 825c781f68..1ff6506caf 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -18,7 +18,6 @@ import type { import type { DataSource, DataSourceFactory } from '@forestadmin/datasource-toolkit'; import type { ForestSchema } from '@forestadmin/forestadmin-client'; import type { ToolName } from '@forestadmin/mcp-server'; -import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; import bodyParser from '@koa/bodyparser'; @@ -26,8 +25,8 @@ import cors from '@koa/cors'; import Router from '@koa/router'; import { readFile, writeFile } from 'fs/promises'; import stringify from 'json-stringify-pretty-compact'; -import path from 'path'; +import EmbeddedWorkflowExecutor from './embedded-workflow-executor'; import FrameworkMounter from './framework-mounter'; import makeRoutes from './routes'; import makeServices from './services'; @@ -35,9 +34,6 @@ import CustomizationService from './services/model-customizations/customization' import SchemaGenerator from './utils/forest-schema/generator'; import OptionsValidator from './utils/options-validator'; -/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ -const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; - /** * Allow to create a new Forest Admin agent from scratch. * Builds the application by composing and configuring all the collection decorators. @@ -60,10 +56,8 @@ export default class Agent extends FrameworkMounter private mcpEnabled = false; private mcpEnabledTools?: ToolName[]; - /** Embedded workflow executor configuration, set by addWorkflowExecutor() (null when disabled). */ - private embeddedExecutorConfig: (WorkflowExecutorEmbedOptions & { port: number }) | null = null; - /** Running embedded workflow executor instance, created on start(). */ - private embeddedExecutor: WorkflowExecutor | null = null; + /** In-process workflow executor, created only when addWorkflowExecutor() is called. */ + private embeddedExecutor: EmbeddedWorkflowExecutor | null = null; private isRestarting = false; @@ -110,7 +104,7 @@ export default class Agent extends FrameworkMounter // Boot after mount(): the embedded executor reaches the agent over HTTP, and the // standalone server's port (used to derive that URL) is only known once mounted. - if (this.embeddedExecutorConfig) await this.startEmbeddedExecutor(); + await this.embeddedExecutor?.start(this.standaloneServerPort); } catch (error) { const { message } = error as Error; this.options.logger('Error', `Forest Admin agent startup failure: ${message}`); @@ -155,83 +149,6 @@ export default class Agent extends FrameworkMounter } } - /** - * Build and start the embedded workflow executor. Called from start() after mount(). - */ - private async startEmbeddedExecutor(): Promise { - const config = this.embeddedExecutorConfig; - if (!config) return; - - const agentUrl = config.agentUrl ?? this.deriveEmbeddedExecutorAgentUrl(); - - if (!agentUrl) { - throw new Error( - 'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' + - 'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' + - 'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().', - ); - } - - const database = - config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); - - if (!database) { - throw new Error( - 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + - 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', - ); - } - - const { buildDatabaseExecutor } = await this.importWorkflowExecutor(); - - this.embeddedExecutor = buildDatabaseExecutor({ - envSecret: this.options.envSecret, - authSecret: this.options.authSecret, - forestServerUrl: this.options.forestServerUrl, - agentUrl, - httpPort: config.port, - pollingIntervalS: config.pollingIntervalS, - stepTimeoutS: config.stepTimeoutS, - logger: (level, message) => this.options.logger(level, `[workflow-executor] ${message}`), - database: database as Parameters[0]['database'], - }); - - await this.embeddedExecutor.start(); - this.options.logger( - 'Info', - `Embedded workflow executor started (loopback port ${config.port})`, - ); - } - - /** - * Derive the URL the embedded executor uses to reach this agent over HTTP. - * Only possible on a standalone server, where the agent owns the listening port. - */ - private deriveEmbeddedExecutorAgentUrl(): string | null { - if (!this.standaloneServerPort) return null; - - // agent-client appends `/forest`, so the URL stops at the configured prefix. - const prefixPath = path.posix.join('/', this.options.prefix); - const base = `http://127.0.0.1:${this.standaloneServerPort}`; - - return prefixPath === '/' ? base : `${base}${prefixPath}`; - } - - /** - * Dynamically load the optional @forestadmin/workflow-executor package. - * Deferred so agents that don't embed an executor never pull its dependencies. - */ - private async importWorkflowExecutor() { - try { - return await import('@forestadmin/workflow-executor'); - } catch (error) { - throw new Error( - 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + - 'Install it with `npm install @forestadmin/workflow-executor`.', - ); - } - } - /** * Add a datasource * @param factory the datasource to add @@ -408,23 +325,16 @@ export default class Agent extends FrameworkMounter * .start(); */ addWorkflowExecutor(options: WorkflowExecutorEmbedOptions = {}): this { - if (this.embeddedExecutorConfig) { + if (this.embeddedExecutor) { throw new Error('addWorkflowExecutor can only be called once.'); } - if (this.options.workflowExecutorUrl) { - throw new Error( - 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' + - 'embeds an executor in-process, the latter targets a remote one. Choose one.', - ); - } - - const port = options.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); - this.embeddedExecutorConfig = { ...options, port }; - + const executor = new EmbeddedWorkflowExecutor(this.options); // Wire the existing proxy route to the loopback executor. Set here (before start() builds the - // routes) so the proxy is registered with the right target. - (this.options as AgentOptions).workflowExecutorUrl = `http://127.0.0.1:${port}`; + // routes) so the proxy is registered with the right target. Assign the field only after + // configure() succeeds, so a conflict error leaves no half-initialized executor. + (this.options as AgentOptions).workflowExecutorUrl = executor.configure(options); + this.embeddedExecutor = executor; return this; } diff --git a/packages/agent/src/embedded-workflow-executor.ts b/packages/agent/src/embedded-workflow-executor.ts new file mode 100644 index 0000000000..ef304ea5c8 --- /dev/null +++ b/packages/agent/src/embedded-workflow-executor.ts @@ -0,0 +1,138 @@ +import type { AgentOptionsWithDefaults, WorkflowExecutorEmbedOptions } from './types'; +import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; + +import path from 'path'; + +/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ +const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; + +/** + * Serialize the executor's structured log context onto the message. The agent's logger only + * accepts an Error as its third argument, so the executor's rich context (runId, stepId, error, + * stack…) would be dropped otherwise. Never throws — logging must not crash the executor. + */ +function formatLog(message: string, context?: Record): string { + if (!context || Object.keys(context).length === 0) return `[workflow-executor] ${message}`; + + try { + return `[workflow-executor] ${message} ${JSON.stringify(context)}`; + } catch { + return `[workflow-executor] ${message} [unserializable context]`; + } +} + +/** + * Owns the lifecycle of a workflow executor embedded in the agent process: configuration, + * dynamic loading of the optional package, build, and start/stop. The agent only wires the proxy + * route to the loopback URL `configure()` returns and delegates start/stop. + */ +export default class EmbeddedWorkflowExecutor { + private config: (WorkflowExecutorEmbedOptions & { port: number }) | null = null; + private executor: WorkflowExecutor | null = null; + + constructor(private readonly options: AgentOptionsWithDefaults) {} + + /** + * Register the embedded executor and return the loopback URL the agent must proxy + * `/_internal/executor/*` to. Throws when the remote `workflowExecutorUrl` option is also set. + */ + configure(embedOptions: WorkflowExecutorEmbedOptions): string { + if (this.options.workflowExecutorUrl) { + throw new Error( + 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' + + 'embeds an executor in-process, the latter targets a remote one. Choose one.', + ); + } + + const port = + embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); + this.config = { ...embedOptions, port }; + + return `http://127.0.0.1:${port}`; + } + + /** + * Build and start the executor. Called from agent.start() after mount(), so the standalone + * server's port (used to derive the agent URL) is already known. + */ + async start(standaloneServerPort: number | undefined): Promise { + const { config } = this; + if (!config) return; + + const agentUrl = config.agentUrl ?? this.deriveAgentUrl(standaloneServerPort); + + if (!agentUrl) { + throw new Error( + 'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' + + 'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' + + 'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().', + ); + } + + const database = + config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); + + if (!database) { + throw new Error( + 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + + 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', + ); + } + + const { buildDatabaseExecutor } = await this.importPackage(); + + this.executor = buildDatabaseExecutor({ + envSecret: this.options.envSecret, + authSecret: this.options.authSecret, + forestServerUrl: this.options.forestServerUrl, + agentUrl, + httpPort: config.port, + pollingIntervalS: config.pollingIntervalS, + stepTimeoutS: config.stepTimeoutS, + logger: (level, message, context) => this.options.logger(level, formatLog(message, context)), + // Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop() + // drains the executor explicitly. + manageProcessSignals: false, + database: database as Parameters[0]['database'], + }); + + await this.executor.start(); + this.options.logger( + 'Info', + `Embedded workflow executor started (loopback port ${config.port})`, + ); + } + + async stop(): Promise { + await this.executor?.stop(); + } + + /** + * Derive the URL the embedded executor uses to reach this agent over HTTP. Only possible on a + * standalone server, where the agent owns the listening port. + */ + private deriveAgentUrl(standaloneServerPort: number | undefined): string | null { + if (!standaloneServerPort) return null; + + // agent-client appends `/forest`, so the URL stops at the configured prefix. + const prefixPath = path.posix.join('/', this.options.prefix); + const base = `http://127.0.0.1:${standaloneServerPort}`; + + return prefixPath === '/' ? base : `${base}${prefixPath}`; + } + + /** + * Dynamically load the optional @forestadmin/workflow-executor package. + * Deferred so agents that don't embed an executor never load its code at startup. + */ + private async importPackage() { + try { + return await import('@forestadmin/workflow-executor'); + } catch (error) { + throw new Error( + 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + + 'Install it with `npm install @forestadmin/workflow-executor`.', + ); + } + } +} diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts index d367ecabe7..d4f5c1fa01 100644 --- a/packages/agent/test/agent-workflow-executor.test.ts +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -117,11 +117,26 @@ describe('Agent.addWorkflowExecutor', () => { agentUrl: 'http://my-agent/prefix', httpPort: 4400, database: { uri: 'postgres://localhost/db' }, + // Embedded: the host process keeps control of SIGTERM/SIGINT and its own exit. + manageProcessSignals: false, }), ); expect(mockExecutorStart).toHaveBeenCalledTimes(1); }); + test('fails the agent start when the embedded executor fails to start', async () => { + // No graceful degradation: a failing executor (e.g. failed agent probe / unreachable DB) + // must crash agent.start() rather than leave the agent serving a dead executor. + mockExecutorStart.mockRejectedValueOnce(new Error('agent probe failed')); + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await expect(agent.start()).rejects.toThrow('agent probe failed'); + }); + test('forwards executor logs through the agent logger with a prefix', async () => { const logger = jest.fn(); mockBuildDatabaseExecutor.mockImplementation((opts: any) => { @@ -140,6 +155,29 @@ describe('Agent.addWorkflowExecutor', () => { expect(logger).toHaveBeenCalledWith('Warn', '[workflow-executor] something happened'); }); + test('forwards the executor structured log context onto the message', async () => { + const logger = jest.fn(); + mockBuildDatabaseExecutor.mockImplementation((opts: any) => { + opts.logger('Error', 'step failed', { runId: 'run-1', error: 'boom' }); + + return { start: mockExecutorStart, stop: mockExecutorStop, state: 'idle' }; + }); + const agent = new Agent(buildOptions({ logger })); + agent.addWorkflowExecutor({ + agentUrl: 'http://my-agent', + database: { uri: 'postgres://localhost/db' }, + }); + + await agent.start(); + + // The agent logger only takes an Error as 3rd arg, so the context must survive in the + // message rather than being dropped. + expect(logger).toHaveBeenCalledWith( + 'Error', + '[workflow-executor] step failed {"runId":"run-1","error":"boom"}', + ); + }); + test('falls back to the DATABASE_URL environment variable when no database is provided', async () => { process.env.DATABASE_URL = 'postgres://env-host/env-db'; const agent = new Agent(buildOptions()); diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 1d2aa90163..dcf373d5e2 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -54,6 +54,11 @@ export interface ExecutorOptions { schemaCacheTtlS?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. forceAiError?: boolean; + // When true (default), the executor installs its own SIGTERM/SIGINT handlers that drain and + // call process.exit() — correct for the standalone CLI which owns the process. Set false when + // embedding in a host process (e.g. @forestadmin/agent): the executor must never touch the + // host's signals or exit it; the host drives shutdown by calling stop(). + manageProcessSignals?: boolean; } export type DatabaseExecutorOptions = ExecutorOptions & @@ -137,6 +142,7 @@ function createWorkflowExecutor( runner: Runner, server: ExecutorHttpServer, logger: Logger, + manageProcessSignals: boolean, ): WorkflowExecutor { let shutdownPromise: Promise | null = null; @@ -183,13 +189,19 @@ function createWorkflowExecutor( await runner.start(); await server.start(); - process.on('SIGTERM', onSignal); - process.on('SIGINT', onSignal); + // Only own the host's signals when explicitly allowed (the standalone CLI). When embedded, + // the host process must keep control of SIGTERM/SIGINT and its own exit. + if (manageProcessSignals) { + process.on('SIGTERM', onSignal); + process.on('SIGINT', onSignal); + } }, async stop() { - process.removeListener('SIGTERM', onSignal); - process.removeListener('SIGINT', onSignal); + if (manageProcessSignals) { + process.removeListener('SIGTERM', onSignal); + process.removeListener('SIGINT', onSignal); + } if (!shutdownPromise) shutdownPromise = shutdown(); await shutdownPromise; @@ -213,7 +225,7 @@ export function buildInMemoryExecutor(options: ExecutorOptions): WorkflowExecuto logger: deps.logger, }); - return createWorkflowExecutor(runner, server, deps.logger); + return createWorkflowExecutor(runner, server, deps.logger, options.manageProcessSignals ?? true); } export function buildDatabaseExecutor(options: DatabaseExecutorOptions): WorkflowExecutor { @@ -241,5 +253,5 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo logger: deps.logger, }); - return createWorkflowExecutor(runner, server, deps.logger); + return createWorkflowExecutor(runner, server, deps.logger, options.manageProcessSignals ?? true); } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index c9e1131af1..8d95d92e83 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -389,6 +389,24 @@ describe('WorkflowExecutor lifecycle', () => { expect(removeSpy).toHaveBeenCalledWith('SIGINT', expect.any(Function)); }); + it('does not register signal handlers when manageProcessSignals is false', async () => { + const exec = buildInMemoryExecutor({ ...BASE_OPTIONS, manageProcessSignals: false }); + await exec.start(); + + // Embedded in a host process: the executor must never touch the host's signals. + expect(onSpy).not.toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(onSpy).not.toHaveBeenCalledWith('SIGINT', expect.any(Function)); + }); + + it('still drains the runner on stop() when manageProcessSignals is false', async () => { + const exec = buildInMemoryExecutor({ ...BASE_OPTIONS, manageProcessSignals: false }); + await exec.start(); + await exec.stop(); + + expect(removeSpy).not.toHaveBeenCalledWith('SIGTERM', expect.any(Function)); + expect(MockedRunner.prototype.stop).toHaveBeenCalled(); + }); + it('stop() calls runner.stop', async () => { await executor.start(); await executor.stop(); From ee856831ee240b75eef7358d6758b4f442d1af51 Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Wed, 1 Jul 2026 14:50:54 +0200 Subject: [PATCH 4/7] chore: fix review --- packages/agent/src/agent.ts | 4 +-- .../agent/src/embedded-workflow-executor.ts | 30 ++++++++++++---- packages/agent/src/framework-mounter.ts | 2 ++ .../test/agent-workflow-executor.test.ts | 34 +++++++++++++++++++ 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index 1ff6506caf..779d4329f8 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -103,8 +103,8 @@ export default class Agent extends FrameworkMounter await this.mount(router); // Boot after mount(): the embedded executor reaches the agent over HTTP, and the - // standalone server's port (used to derive that URL) is only known once mounted. - await this.embeddedExecutor?.start(this.standaloneServerPort); + // standalone server's host/port (used to derive that URL) are only known once mounted. + await this.embeddedExecutor?.start(this.standaloneServerHost, this.standaloneServerPort); } catch (error) { const { message } = error as Error; this.options.logger('Error', `Forest Admin agent startup failure: ${message}`); diff --git a/packages/agent/src/embedded-workflow-executor.ts b/packages/agent/src/embedded-workflow-executor.ts index ef304ea5c8..6a7f362a39 100644 --- a/packages/agent/src/embedded-workflow-executor.ts +++ b/packages/agent/src/embedded-workflow-executor.ts @@ -21,6 +21,17 @@ function formatLog(message: string, context?: Record): string { } } +/** + * Turn the standalone server's bind host into a host the embedded executor can actually connect + * to. Wildcard/unspecified binds (undefined, `0.0.0.0`, `::`) are not valid connect targets, so + * fall back to loopback. IPv6 literals must be bracketed to sit in a URL authority. + */ +function toConnectableHost(host?: string): string { + if (!host || host === '0.0.0.0' || host === '::') return '127.0.0.1'; + + return host.includes(':') ? `[${host}]` : host; +} + /** * Owns the lifecycle of a workflow executor embedded in the agent process: configuration, * dynamic loading of the optional package, build, and start/stop. The agent only wires the proxy @@ -53,13 +64,17 @@ export default class EmbeddedWorkflowExecutor { /** * Build and start the executor. Called from agent.start() after mount(), so the standalone - * server's port (used to derive the agent URL) is already known. + * server's host/port (used to derive the agent URL) are already known. */ - async start(standaloneServerPort: number | undefined): Promise { + async start( + standaloneServerHost: string | undefined, + standaloneServerPort: number | undefined, + ): Promise { const { config } = this; if (!config) return; - const agentUrl = config.agentUrl ?? this.deriveAgentUrl(standaloneServerPort); + const agentUrl = + config.agentUrl ?? this.deriveAgentUrl(standaloneServerHost, standaloneServerPort); if (!agentUrl) { throw new Error( @@ -109,14 +124,17 @@ export default class EmbeddedWorkflowExecutor { /** * Derive the URL the embedded executor uses to reach this agent over HTTP. Only possible on a - * standalone server, where the agent owns the listening port. + * standalone server, where the agent owns the listening host/port. */ - private deriveAgentUrl(standaloneServerPort: number | undefined): string | null { + private deriveAgentUrl( + standaloneServerHost: string | undefined, + standaloneServerPort: number | undefined, + ): string | null { if (!standaloneServerPort) return null; // agent-client appends `/forest`, so the URL stops at the configured prefix. const prefixPath = path.posix.join('/', this.options.prefix); - const base = `http://127.0.0.1:${standaloneServerPort}`; + const base = `http://${toConnectableHost(standaloneServerHost)}:${standaloneServerPort}`; return prefixPath === '/' ? base : `${base}${prefixPath}`; } diff --git a/packages/agent/src/framework-mounter.ts b/packages/agent/src/framework-mounter.ts index a9eafece2c..412e53b804 100644 --- a/packages/agent/src/framework-mounter.ts +++ b/packages/agent/src/framework-mounter.ts @@ -13,6 +13,7 @@ import McpMiddleware from './mcp-middleware'; export default class FrameworkMounter { public standaloneServerPort: number; + public standaloneServerHost?: string; private readonly onFirstStart: (() => Promise)[] = []; private readonly onEachStart: ((router: Router) => Promise)[] = []; @@ -67,6 +68,7 @@ export default class FrameworkMounter { mountOnStandaloneServer(port?: number, host?: string): this { const portFromEnv = Number(process.env.PORT) || undefined; const chosenPort = port ?? portFromEnv ?? 3351; + this.standaloneServerHost = host; const server = createServer(this.getConnectCallback(true)); this.onFirstStart.push(() => { diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts index d4f5c1fa01..b660bafc04 100644 --- a/packages/agent/test/agent-workflow-executor.test.ts +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -206,6 +206,40 @@ describe('Agent.addWorkflowExecutor', () => { await agent.stop(); }); + test('derives the agentUrl using the standalone server host when one is provided', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + // A specific bind host must be used verbatim, not replaced by a hard-coded 127.0.0.1. + agent.mountOnStandaloneServer(0, 'localhost'); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://localhost:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + + test('maps a wildcard standalone host to loopback for the agentUrl', async () => { + const agent = new Agent(buildOptions()); + agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); + // 0.0.0.0 is a bind wildcard, not a connectable address → the executor must use loopback. + agent.mountOnStandaloneServer(0, '0.0.0.0'); + + await agent.start(); + + expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + agentUrl: `http://127.0.0.1:${agent.standaloneServerPort}/prefix`, + }), + ); + + await agent.stop(); + }); + test('throws when no database is configured', async () => { const agent = new Agent(buildOptions()); agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); From ba4a95fdd86e5346a171095889ca163c14608bf6 Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Wed, 1 Jul 2026 15:42:31 +0200 Subject: [PATCH 5/7] fix: build --- packages/agent/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/agent/package.json b/packages/agent/package.json index ac721711b6..79c6d2dff0 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -35,7 +35,7 @@ "uuid": "11.1.1" }, "optionalDependencies": { - "@forestadmin/workflow-executor": "1.9.1" + "@forestadmin/workflow-executor": "^1.9.1" }, "files": [ "dist/**/*.js", From 1d0bfc9f144d55b234ed7ce0dbb751c9777e41d6 Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Wed, 1 Jul 2026 17:22:38 +0200 Subject: [PATCH 6/7] chore: does not install executor systematically --- packages/_example/src/forest/agent.ts | 135 ++++++++++---------- packages/_example/src/forest/typings.ts | 156 ++++++++++++++++++++++++ packages/agent/package.json | 10 +- 3 files changed, 231 insertions(+), 70 deletions(-) diff --git a/packages/_example/src/forest/agent.ts b/packages/_example/src/forest/agent.ts index 8534595db3..5dabb2368f 100644 --- a/packages/_example/src/forest/agent.ts +++ b/packages/_example/src/forest/agent.ts @@ -31,78 +31,81 @@ export default function makeAgent() { envSecret: process.env.FOREST_ENV_SECRET, forestServerUrl: process.env.FOREST_SERVER_URL, forestAppUrl: process.env.FOREST_APP_URL, - workflowExecutorUrl: process.env.WORKFLOW_EXECUTOR_URL, + // workflowExecutorUrl: process.env.WORKFLOW_EXECUTOR_URL, isProduction: false, loggerLevel: 'Info', typingsPath: 'src/forest/typings.ts', }; - return createAgent(envOptions) - .addDataSource(createSqlDataSource({ dialect: 'sqlite', storage: './assets/db.sqlite' })) + return ( + createAgent(envOptions) + .addDataSource(createSqlDataSource({ dialect: 'sqlite', storage: './assets/db.sqlite' })) - .addDataSource( - // Using an URI - createSqlDataSource('mariadb://example:password@localhost:3808/example', { - liveQueryConnections: 'Main database', - }), - { include: ['customer'] }, - ) - .addDataSource( - // Using a connection object - createSqlDataSource({ - dialect: 'mariadb', - username: 'example', - password: 'password', - port: 3808, - database: 'example', - }), - { include: ['card', 'active_cards'] }, // active_cards is a view - ) - .addDataSource(createTypicode()) - .addDataSource( - createSequelizeDataSource(sequelizePostgres, { - liveQueryConnections: 'Business intel', - }), - ) - .addDataSource(createSequelizeDataSource(sequelizeMySql)) - .addDataSource(createSequelizeDataSource(sequelizeMsSql)) - .addDataSource( - createMongooseDataSource(mongoose, { asModels: { account: ['address', 'bills.items'] } }), - ) - .addDataSource( - createMongoDataSource({ - uri: connectionString, - dataSource: { flattenMode: 'auto' }, - }), - { - exclude: ['accounts', 'accounts_bills', 'accounts_bills_items'], - }, - ) - .addChart('numRentals', async (context, resultBuilder) => { - const rentals = context.dataSource.getCollection('rental'); - const rows = await rentals.aggregate({}, { operation: 'Count' }); + .addDataSource( + // Using an URI + createSqlDataSource('mariadb://example:password@localhost:3808/example', { + liveQueryConnections: 'Main database', + }), + { include: ['customer'] }, + ) + .addDataSource( + // Using a connection object + createSqlDataSource({ + dialect: 'mariadb', + username: 'example', + password: 'password', + port: 3808, + database: 'example', + }), + { include: ['card', 'active_cards'] }, // active_cards is a view + ) + .addDataSource(createTypicode()) + .addDataSource( + createSequelizeDataSource(sequelizePostgres, { + liveQueryConnections: 'Business intel', + }), + ) + .addDataSource(createSequelizeDataSource(sequelizeMySql)) + .addDataSource(createSequelizeDataSource(sequelizeMsSql)) + .addDataSource( + createMongooseDataSource(mongoose, { asModels: { account: ['address', 'bills.items'] } }), + ) + .addDataSource( + createMongoDataSource({ + uri: connectionString, + dataSource: { flattenMode: 'auto' }, + }), + { + exclude: ['accounts', 'accounts_bills', 'accounts_bills_items'], + }, + ) + .addChart('numRentals', async (context, resultBuilder) => { + const rentals = context.dataSource.getCollection('rental'); + const rows = await rentals.aggregate({}, { operation: 'Count' }); - return resultBuilder.value((rows?.[0]?.value as number) ?? 0); - }) - .mountAiMcpServer() + return resultBuilder.value((rows?.[0]?.value as number) ?? 0); + }) + .mountAiMcpServer() - .customizeCollection('card', customizeCard) - .customizeCollection('account', customizeAccount) - .customizeCollection('owner', customizeOwner) - .customizeCollection('store', customizeStore) - .customizeCollection('rental', customizeRental) - .customizeCollection('dvd', customizeDvd) - .customizeCollection('customer', customizeCustomer) - .customizeCollection('post', customizePost) - .customizeCollection('comment', customizeComment) - .customizeCollection('review', customizeReview) - .customizeCollection('sales', customizeSales) - .addAi( - createAiProvider({ - model: 'gpt-4o', - provider: 'openai', - name: 'test', - apiKey: process.env.OPENAI_API_KEY, - }), - ); + .customizeCollection('card', customizeCard) + .customizeCollection('account', customizeAccount) + .customizeCollection('owner', customizeOwner) + .customizeCollection('store', customizeStore) + .customizeCollection('rental', customizeRental) + .customizeCollection('dvd', customizeDvd) + .customizeCollection('customer', customizeCustomer) + .customizeCollection('post', customizePost) + .customizeCollection('comment', customizeComment) + .customizeCollection('review', customizeReview) + .customizeCollection('sales', customizeSales) + // .addAi( + // createAiProvider({ + // model: 'gpt-4o', + // provider: 'openai', + // name: 'test', + // apiKey: process.env.OPENAI_API_KEY, + // }), + // ) + .addWorkflowExecutor({ database: { uri: 'postgres://forest:secret@localhost:5435/exec' } }) + ); } diff --git a/packages/_example/src/forest/typings.ts b/packages/_example/src/forest/typings.ts index badbc94045..1088efe98c 100644 --- a/packages/_example/src/forest/typings.ts +++ b/packages/_example/src/forest/typings.ts @@ -120,6 +120,62 @@ export type AccountBillsItemsFilter = TPaginatedFilter; export type AccountBillsItemsAggregation = TAggregation; +export type ConversationsCustomizer = CollectionCustomizer; +export type ConversationsRecord = TPartialRow; +export type ConversationsConditionTree = TConditionTree; +export type ConversationsFilter = TPaginatedFilter; +export type ConversationsSortClause = TSortClause; +export type ConversationsAggregation = TAggregation; + +export type ConversationsTagValuesCustomizer = CollectionCustomizer; +export type ConversationsTagValuesRecord = TPartialRow; +export type ConversationsTagValuesConditionTree = TConditionTree; +export type ConversationsTagValuesFilter = TPaginatedFilter; +export type ConversationsTagValuesSortClause = TSortClause; +export type ConversationsTagValuesAggregation = TAggregation; + +export type PartisCustomizer = CollectionCustomizer; +export type PartisRecord = TPartialRow; +export type PartisConditionTree = TConditionTree; +export type PartisFilter = TPaginatedFilter; +export type PartisSortClause = TSortClause; +export type PartisAggregation = TAggregation; + +export type PartisPsCustomizer = CollectionCustomizer; +export type PartisPsRecord = TPartialRow; +export type PartisPsConditionTree = TConditionTree; +export type PartisPsFilter = TPaginatedFilter; +export type PartisPsSortClause = TSortClause; +export type PartisPsAggregation = TAggregation; + +export type ProgramsCustomizer = CollectionCustomizer; +export type ProgramsRecord = TPartialRow; +export type ProgramsConditionTree = TConditionTree; +export type ProgramsFilter = TPaginatedFilter; +export type ProgramsSortClause = TSortClause; +export type ProgramsAggregation = TAggregation; + +export type ProgramsAttributeFieldsCustomizer = CollectionCustomizer; +export type ProgramsAttributeFieldsRecord = TPartialRow; +export type ProgramsAttributeFieldsConditionTree = TConditionTree; +export type ProgramsAttributeFieldsFilter = TPaginatedFilter; +export type ProgramsAttributeFieldsSortClause = TSortClause; +export type ProgramsAttributeFieldsAggregation = TAggregation; + +export type ProgramsAttributeFieldsValuesCustomizer = CollectionCustomizer; +export type ProgramsAttributeFieldsValuesRecord = TPartialRow; +export type ProgramsAttributeFieldsValuesConditionTree = TConditionTree; +export type ProgramsAttributeFieldsValuesFilter = TPaginatedFilter; +export type ProgramsAttributeFieldsValuesSortClause = TSortClause; +export type ProgramsAttributeFieldsValuesAggregation = TAggregation; + +export type ProgramsCsCustomizer = CollectionCustomizer; +export type ProgramsCsRecord = TPartialRow; +export type ProgramsCsConditionTree = TConditionTree; +export type ProgramsCsFilter = TPaginatedFilter; +export type ProgramsCsSortClause = TSortClause; +export type ProgramsCsAggregation = TAggregation; + export type SalesCustomizer = CollectionCustomizer; export type SalesRecord = TPartialRow; export type SalesConditionTree = TConditionTree; @@ -317,6 +373,32 @@ export type Schema = { 'post:owner:lastName': string; }; }; + 'conversations': { + plain: { + '_id': string; + }; + nested: {}; + flat: {}; + }; + 'conversations_tagValues': { + plain: { + '_id': string; + 'parentId': string; + 'participantTagName@@@_id': string; + 'participantTagName@@@label': string; + 'participantTagName@@@order': number; + 'participantTagName@@@values': Array | null; + 'participantTagValue@@@_id': string; + 'participantTagValue@@@label': string; + 'participantTagValue@@@order': number; + }; + nested: { + 'parent': Schema['conversations']['plain'] & Schema['conversations']['nested']; + }; + flat: { + 'parent:_id': string; + }; + }; 'customer': { plain: { 'createdAt': string; @@ -405,6 +487,26 @@ export type Schema = { nested: {}; flat: {}; }; + 'partis': { + plain: { + '_id': string; + }; + nested: {}; + flat: {}; + }; + 'partis_ps': { + plain: { + '_id': string; + 'name': string; + 'parentId': string; + }; + nested: { + 'parent': Schema['partis']['plain'] & Schema['partis']['nested']; + }; + flat: { + 'parent:_id': string; + }; + }; 'post': { plain: { 'body': string | null; @@ -422,6 +524,60 @@ export type Schema = { 'owner:lastName': string; }; }; + 'programs': { + plain: { + '_id': string; + }; + nested: {}; + flat: {}; + }; + 'programs_attributeFields': { + plain: { + '_id': string; + 'label': string; + 'order': number; + 'parentId': string; + 'valuesEmbed': Array | null; + }; + nested: { + 'parent': Schema['programs']['plain'] & Schema['programs']['nested']; + }; + flat: { + 'parent:_id': string; + }; + }; + 'programs_attributeFields_values': { + plain: { + '_id': string; + 'label': string; + 'order': number; + 'parentId': string; + }; + nested: { + 'parent': Schema['programs_attributeFields']['plain'] & Schema['programs_attributeFields']['nested']; + }; + flat: { + 'parent:_id': string; + 'parent:label': string; + 'parent:order': number; + 'parent:parentId': string; + 'parent:valuesEmbed': Array | null; + 'parent:parent:_id': string; + }; + }; + 'programs_cs': { + plain: { + '_id': string; + 'p': string; + 'parentId': string; + }; + nested: { + 'parent': Schema['programs']['plain'] & Schema['programs']['nested']; + }; + flat: { + 'parent:_id': string; + }; + }; 'rental': { plain: { 'customerId': number | null; diff --git a/packages/agent/package.json b/packages/agent/package.json index 79c6d2dff0..d3ac098360 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -34,9 +34,6 @@ "superagent": "^10.3.0", "uuid": "11.1.1" }, - "optionalDependencies": { - "@forestadmin/workflow-executor": "^1.9.1" - }, "files": [ "dist/**/*.js", "dist/**/*.d.ts" @@ -51,6 +48,7 @@ "devDependencies": { "@fastify/express": "^1.1.0", "@forestadmin/datasource-sql": "1.17.10", + "@forestadmin/workflow-executor": "^1.9.1", "@nestjs/common": "^10.4.16", "@nestjs/core": "^10.4.16", "@nestjs/platform-express": "^10.4.16", @@ -75,11 +73,15 @@ "@paralleldrive/cuid2": "2.2.2" }, "peerDependencies": { - "@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0" + "@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0", + "@forestadmin/workflow-executor": "^1.9.1" }, "peerDependenciesMeta": { "@fastify/express": { "optional": true + }, + "@forestadmin/workflow-executor": { + "optional": true } } } From 0f23209716dafb4f969056b30231bcbf44281c90 Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Wed, 1 Jul 2026 17:35:11 +0200 Subject: [PATCH 7/7] chore: remove useless modification --- packages/_example/src/forest/agent.ts | 135 ++++++++++---------- packages/_example/src/forest/typings.ts | 156 ------------------------ 2 files changed, 66 insertions(+), 225 deletions(-) diff --git a/packages/_example/src/forest/agent.ts b/packages/_example/src/forest/agent.ts index 5dabb2368f..8534595db3 100644 --- a/packages/_example/src/forest/agent.ts +++ b/packages/_example/src/forest/agent.ts @@ -31,81 +31,78 @@ export default function makeAgent() { envSecret: process.env.FOREST_ENV_SECRET, forestServerUrl: process.env.FOREST_SERVER_URL, forestAppUrl: process.env.FOREST_APP_URL, - // workflowExecutorUrl: process.env.WORKFLOW_EXECUTOR_URL, + workflowExecutorUrl: process.env.WORKFLOW_EXECUTOR_URL, isProduction: false, loggerLevel: 'Info', typingsPath: 'src/forest/typings.ts', }; - return ( - createAgent(envOptions) - .addDataSource(createSqlDataSource({ dialect: 'sqlite', storage: './assets/db.sqlite' })) + return createAgent(envOptions) + .addDataSource(createSqlDataSource({ dialect: 'sqlite', storage: './assets/db.sqlite' })) - .addDataSource( - // Using an URI - createSqlDataSource('mariadb://example:password@localhost:3808/example', { - liveQueryConnections: 'Main database', - }), - { include: ['customer'] }, - ) - .addDataSource( - // Using a connection object - createSqlDataSource({ - dialect: 'mariadb', - username: 'example', - password: 'password', - port: 3808, - database: 'example', - }), - { include: ['card', 'active_cards'] }, // active_cards is a view - ) - .addDataSource(createTypicode()) - .addDataSource( - createSequelizeDataSource(sequelizePostgres, { - liveQueryConnections: 'Business intel', - }), - ) - .addDataSource(createSequelizeDataSource(sequelizeMySql)) - .addDataSource(createSequelizeDataSource(sequelizeMsSql)) - .addDataSource( - createMongooseDataSource(mongoose, { asModels: { account: ['address', 'bills.items'] } }), - ) - .addDataSource( - createMongoDataSource({ - uri: connectionString, - dataSource: { flattenMode: 'auto' }, - }), - { - exclude: ['accounts', 'accounts_bills', 'accounts_bills_items'], - }, - ) - .addChart('numRentals', async (context, resultBuilder) => { - const rentals = context.dataSource.getCollection('rental'); - const rows = await rentals.aggregate({}, { operation: 'Count' }); + .addDataSource( + // Using an URI + createSqlDataSource('mariadb://example:password@localhost:3808/example', { + liveQueryConnections: 'Main database', + }), + { include: ['customer'] }, + ) + .addDataSource( + // Using a connection object + createSqlDataSource({ + dialect: 'mariadb', + username: 'example', + password: 'password', + port: 3808, + database: 'example', + }), + { include: ['card', 'active_cards'] }, // active_cards is a view + ) + .addDataSource(createTypicode()) + .addDataSource( + createSequelizeDataSource(sequelizePostgres, { + liveQueryConnections: 'Business intel', + }), + ) + .addDataSource(createSequelizeDataSource(sequelizeMySql)) + .addDataSource(createSequelizeDataSource(sequelizeMsSql)) + .addDataSource( + createMongooseDataSource(mongoose, { asModels: { account: ['address', 'bills.items'] } }), + ) + .addDataSource( + createMongoDataSource({ + uri: connectionString, + dataSource: { flattenMode: 'auto' }, + }), + { + exclude: ['accounts', 'accounts_bills', 'accounts_bills_items'], + }, + ) + .addChart('numRentals', async (context, resultBuilder) => { + const rentals = context.dataSource.getCollection('rental'); + const rows = await rentals.aggregate({}, { operation: 'Count' }); - return resultBuilder.value((rows?.[0]?.value as number) ?? 0); - }) - .mountAiMcpServer() + return resultBuilder.value((rows?.[0]?.value as number) ?? 0); + }) + .mountAiMcpServer() - .customizeCollection('card', customizeCard) - .customizeCollection('account', customizeAccount) - .customizeCollection('owner', customizeOwner) - .customizeCollection('store', customizeStore) - .customizeCollection('rental', customizeRental) - .customizeCollection('dvd', customizeDvd) - .customizeCollection('customer', customizeCustomer) - .customizeCollection('post', customizePost) - .customizeCollection('comment', customizeComment) - .customizeCollection('review', customizeReview) - .customizeCollection('sales', customizeSales) - // .addAi( - // createAiProvider({ - // model: 'gpt-4o', - // provider: 'openai', - // name: 'test', - // apiKey: process.env.OPENAI_API_KEY, - // }), - // ) - .addWorkflowExecutor({ database: { uri: 'postgres://forest:secret@localhost:5435/exec' } }) - ); + .customizeCollection('card', customizeCard) + .customizeCollection('account', customizeAccount) + .customizeCollection('owner', customizeOwner) + .customizeCollection('store', customizeStore) + .customizeCollection('rental', customizeRental) + .customizeCollection('dvd', customizeDvd) + .customizeCollection('customer', customizeCustomer) + .customizeCollection('post', customizePost) + .customizeCollection('comment', customizeComment) + .customizeCollection('review', customizeReview) + .customizeCollection('sales', customizeSales) + .addAi( + createAiProvider({ + model: 'gpt-4o', + provider: 'openai', + name: 'test', + apiKey: process.env.OPENAI_API_KEY, + }), + ); } diff --git a/packages/_example/src/forest/typings.ts b/packages/_example/src/forest/typings.ts index 1088efe98c..badbc94045 100644 --- a/packages/_example/src/forest/typings.ts +++ b/packages/_example/src/forest/typings.ts @@ -120,62 +120,6 @@ export type AccountBillsItemsFilter = TPaginatedFilter; export type AccountBillsItemsAggregation = TAggregation; -export type ConversationsCustomizer = CollectionCustomizer; -export type ConversationsRecord = TPartialRow; -export type ConversationsConditionTree = TConditionTree; -export type ConversationsFilter = TPaginatedFilter; -export type ConversationsSortClause = TSortClause; -export type ConversationsAggregation = TAggregation; - -export type ConversationsTagValuesCustomizer = CollectionCustomizer; -export type ConversationsTagValuesRecord = TPartialRow; -export type ConversationsTagValuesConditionTree = TConditionTree; -export type ConversationsTagValuesFilter = TPaginatedFilter; -export type ConversationsTagValuesSortClause = TSortClause; -export type ConversationsTagValuesAggregation = TAggregation; - -export type PartisCustomizer = CollectionCustomizer; -export type PartisRecord = TPartialRow; -export type PartisConditionTree = TConditionTree; -export type PartisFilter = TPaginatedFilter; -export type PartisSortClause = TSortClause; -export type PartisAggregation = TAggregation; - -export type PartisPsCustomizer = CollectionCustomizer; -export type PartisPsRecord = TPartialRow; -export type PartisPsConditionTree = TConditionTree; -export type PartisPsFilter = TPaginatedFilter; -export type PartisPsSortClause = TSortClause; -export type PartisPsAggregation = TAggregation; - -export type ProgramsCustomizer = CollectionCustomizer; -export type ProgramsRecord = TPartialRow; -export type ProgramsConditionTree = TConditionTree; -export type ProgramsFilter = TPaginatedFilter; -export type ProgramsSortClause = TSortClause; -export type ProgramsAggregation = TAggregation; - -export type ProgramsAttributeFieldsCustomizer = CollectionCustomizer; -export type ProgramsAttributeFieldsRecord = TPartialRow; -export type ProgramsAttributeFieldsConditionTree = TConditionTree; -export type ProgramsAttributeFieldsFilter = TPaginatedFilter; -export type ProgramsAttributeFieldsSortClause = TSortClause; -export type ProgramsAttributeFieldsAggregation = TAggregation; - -export type ProgramsAttributeFieldsValuesCustomizer = CollectionCustomizer; -export type ProgramsAttributeFieldsValuesRecord = TPartialRow; -export type ProgramsAttributeFieldsValuesConditionTree = TConditionTree; -export type ProgramsAttributeFieldsValuesFilter = TPaginatedFilter; -export type ProgramsAttributeFieldsValuesSortClause = TSortClause; -export type ProgramsAttributeFieldsValuesAggregation = TAggregation; - -export type ProgramsCsCustomizer = CollectionCustomizer; -export type ProgramsCsRecord = TPartialRow; -export type ProgramsCsConditionTree = TConditionTree; -export type ProgramsCsFilter = TPaginatedFilter; -export type ProgramsCsSortClause = TSortClause; -export type ProgramsCsAggregation = TAggregation; - export type SalesCustomizer = CollectionCustomizer; export type SalesRecord = TPartialRow; export type SalesConditionTree = TConditionTree; @@ -373,32 +317,6 @@ export type Schema = { 'post:owner:lastName': string; }; }; - 'conversations': { - plain: { - '_id': string; - }; - nested: {}; - flat: {}; - }; - 'conversations_tagValues': { - plain: { - '_id': string; - 'parentId': string; - 'participantTagName@@@_id': string; - 'participantTagName@@@label': string; - 'participantTagName@@@order': number; - 'participantTagName@@@values': Array | null; - 'participantTagValue@@@_id': string; - 'participantTagValue@@@label': string; - 'participantTagValue@@@order': number; - }; - nested: { - 'parent': Schema['conversations']['plain'] & Schema['conversations']['nested']; - }; - flat: { - 'parent:_id': string; - }; - }; 'customer': { plain: { 'createdAt': string; @@ -487,26 +405,6 @@ export type Schema = { nested: {}; flat: {}; }; - 'partis': { - plain: { - '_id': string; - }; - nested: {}; - flat: {}; - }; - 'partis_ps': { - plain: { - '_id': string; - 'name': string; - 'parentId': string; - }; - nested: { - 'parent': Schema['partis']['plain'] & Schema['partis']['nested']; - }; - flat: { - 'parent:_id': string; - }; - }; 'post': { plain: { 'body': string | null; @@ -524,60 +422,6 @@ export type Schema = { 'owner:lastName': string; }; }; - 'programs': { - plain: { - '_id': string; - }; - nested: {}; - flat: {}; - }; - 'programs_attributeFields': { - plain: { - '_id': string; - 'label': string; - 'order': number; - 'parentId': string; - 'valuesEmbed': Array | null; - }; - nested: { - 'parent': Schema['programs']['plain'] & Schema['programs']['nested']; - }; - flat: { - 'parent:_id': string; - }; - }; - 'programs_attributeFields_values': { - plain: { - '_id': string; - 'label': string; - 'order': number; - 'parentId': string; - }; - nested: { - 'parent': Schema['programs_attributeFields']['plain'] & Schema['programs_attributeFields']['nested']; - }; - flat: { - 'parent:_id': string; - 'parent:label': string; - 'parent:order': number; - 'parent:parentId': string; - 'parent:valuesEmbed': Array | null; - 'parent:parent:_id': string; - }; - }; - 'programs_cs': { - plain: { - '_id': string; - 'p': string; - 'parentId': string; - }; - nested: { - 'parent': Schema['programs']['plain'] & Schema['programs']['nested']; - }; - flat: { - 'parent:_id': string; - }; - }; 'rental': { plain: { 'customerId': number | null;