Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"devDependencies": {
"@fastify/express": "^1.1.0",
"@forestadmin/datasource-sql": "1.17.10",
"@forestadmin/workflow-executor": "^1.9.1",
"@nestjs/common": "^10.4.16",
"@nestjs/core": "^10.4.16",
"@nestjs/platform-express": "^10.4.16",
Expand All @@ -72,11 +73,15 @@
"@paralleldrive/cuid2": "2.2.2"
},
"peerDependencies": {
"@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0"
"@fastify/express": "^1.1.0 || ^2.0.0 || ^3.0.0 || ^4.0.0",
"@forestadmin/workflow-executor": "^1.9.1"
},
"peerDependenciesMeta": {
"@fastify/express": {
"optional": true
},
"@forestadmin/workflow-executor": {
"optional": true
}
}
}
55 changes: 54 additions & 1 deletion packages/agent/src/agent.ts

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium

If this.embeddedExecutor.start() throws after this.mount(router) succeeds, the catch block only rethrows without unmounting or closing the server. The agent is left mounted and serving routes with no running executor, and a subsequent start() retry fails with EADDRINUSE because the listener was never cleaned up. Consider stopping/unmounting the agent in the catch block before rethrowing so startup failures don't leave a half-started server bound to the port.

Also found in 1 other location(s)

packages/agent/src/embedded-workflow-executor.ts:99

start() does not clean up a partially started executor when this.executor.start() rejects. In @forestadmin/workflow-executor, WorkflowExecutor.start() starts the runner before binding the HTTP port, so a port conflict or listen error leaves the polling timer and database connections running. Here that rejection is just propagated from line 99, which means agent.start() fails but the embedded executor keeps background resources alive and can hang the process until it is killed.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/agent.ts around line 96:

If `this.embeddedExecutor.start()` throws after `this.mount(router)` succeeds, the `catch` block only rethrows without unmounting or closing the server. The agent is left mounted and serving routes with no running executor, and a subsequent `start()` retry fails with `EADDRINUSE` because the listener was never cleaned up. Consider stopping/unmounting the agent in the `catch` block before rethrowing so startup failures don't leave a half-started server bound to the port.

Also found in 1 other location(s):
- packages/agent/src/embedded-workflow-executor.ts:99 -- `start()` does not clean up a partially started executor when `this.executor.start()` rejects. In `@forestadmin/workflow-executor`, `WorkflowExecutor.start()` starts the runner before binding the HTTP port, so a port conflict or listen error leaves the polling timer and database connections running. Here that rejection is just propagated from line 99, which means `agent.start()` fails but the embedded executor keeps background resources alive and can hang the process until it is killed.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium src/agent.ts:118

stop() awaits this.embeddedExecutor?.stop() without a finally, so if that promise rejects, forestAdminClient.close() and super.stop() are skipped. The agent's server socket stays open and the Forest Admin client remains subscribed after a failed executor shutdown. Wrap the executor drain in try/finally (or .catch()) so the remaining cleanup always runs.

  override async stop(): Promise<void> {
-    // Drain the embedded executor first, while the agent it depends on is still serving.
-    await this.embeddedExecutor?.stop();
-    // Close anything related to ForestAdmin client
-    this.options.forestAdminClient.close();
-    // Stop at framework level
-    await super.stop();
+    try {
+      // Drain the embedded executor first, while the agent it depends on is still serving.
+      await this.embeddedExecutor?.stop();
+    } finally {
+      // Close anything related to ForestAdmin client
+      this.options.forestAdminClient.close();
+      // Stop at framework level
+      await super.stop();
+    }
  }
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/agent.ts around lines 118-125:

`stop()` awaits `this.embeddedExecutor?.stop()` without a `finally`, so if that promise rejects, `forestAdminClient.close()` and `super.stop()` are skipped. The agent's server socket stays open and the Forest Admin client remains subscribed after a failed executor shutdown. Wrap the executor drain in `try/finally` (or `.catch()`) so the remaining cleanup always runs.

Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { ForestAdminHttpDriverServices } from './services';
import type { AgentOptions, AgentOptionsWithDefaults, HttpCallback } from './types';
import type {
AgentOptions,
AgentOptionsWithDefaults,
HttpCallback,
WorkflowExecutorEmbedOptions,
} from './types';
import type { AiProviderDefinition } from '@forestadmin/agent-toolkit';
import type {
CollectionCustomizer,
Expand All @@ -21,6 +26,7 @@ import Router from '@koa/router';
import { readFile, writeFile } from 'fs/promises';
import stringify from 'json-stringify-pretty-compact';

import EmbeddedWorkflowExecutor from './embedded-workflow-executor';
import FrameworkMounter from './framework-mounter';
import makeRoutes from './routes';
import makeServices from './services';
Expand Down Expand Up @@ -50,6 +56,9 @@ export default class Agent<S extends TSchema = TSchema> extends FrameworkMounter
private mcpEnabled = false;
private mcpEnabledTools?: ToolName[];

/** In-process workflow executor, created only when addWorkflowExecutor() is called. */
private embeddedExecutor: EmbeddedWorkflowExecutor | null = null;

private isRestarting = false;

/**
Expand Down Expand Up @@ -92,6 +101,10 @@ export default class Agent<S extends TSchema = TSchema> extends FrameworkMounter

this.setMcpCallback(mcpHttpCallback ?? null);
await this.mount(router);

// Boot after mount(): the embedded executor reaches the agent over HTTP, and the
// standalone server's host/port (used to derive that URL) are only known once mounted.
await this.embeddedExecutor?.start(this.standaloneServerHost, this.standaloneServerPort);
} catch (error) {
const { message } = error as Error;
this.options.logger('Error', `Forest Admin agent startup failure: ${message}`);
Expand All @@ -103,6 +116,8 @@ export default class Agent<S extends TSchema = TSchema> extends FrameworkMounter
* Stop the agent.
*/
override async stop(): Promise<void> {
// Drain the embedded executor first, while the agent it depends on is still serving.
await this.embeddedExecutor?.stop();
// Close anything related to ForestAdmin client
this.options.forestAdminClient.close();
// Stop at framework level
Expand Down Expand Up @@ -286,6 +301,44 @@ export default class Agent<S extends TSchema = TSchema> extends FrameworkMounter
return this;
}

/**
* Run a workflow executor in-process, alongside the agent. The agent boots it on start(),
* stops it on stop(), and proxies `/_internal/executor/*` to it — no separate deployment.
*
* Requires the `@forestadmin/workflow-executor` package to be installed:
* ```bash
* npm install @forestadmin/workflow-executor
* ```
*
* The executor persists run state in a database: provide `database`, or set the `DATABASE_URL`
* environment variable. Mutually exclusive with the `workflowExecutorUrl` option, which targets
* a separately-deployed executor instead.
*
* @param options embedded executor options
* @returns the agent instance for chaining
* @throws Error if called more than once, or if `workflowExecutorUrl` is also set
*
* @example
* createAgent(options)
* .addDataSource(...)
* .addWorkflowExecutor({ database: { uri: process.env.DATABASE_URL } })
* .start();
*/
addWorkflowExecutor(options: WorkflowExecutorEmbedOptions = {}): this {
if (this.embeddedExecutor) {
throw new Error('addWorkflowExecutor can only be called once.');
}

const executor = new EmbeddedWorkflowExecutor(this.options);
// Wire the existing proxy route to the loopback executor. Set here (before start() builds the
// routes) so the proxy is registered with the right target. Assign the field only after
// configure() succeeds, so a conflict error leaves no half-initialized executor.
(this.options as AgentOptions).workflowExecutorUrl = executor.configure(options);
this.embeddedExecutor = executor;

return this;
}

protected getRoutes(dataSource: DataSource, services: ForestAdminHttpDriverServices) {
// init() is called on every start/restart to recreate routing state with a fresh Router.
const aiRouter = this.aiProvider?.init(this.options.logger) ?? null;
Expand Down
156 changes: 156 additions & 0 deletions packages/agent/src/embedded-workflow-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import type { AgentOptionsWithDefaults, WorkflowExecutorEmbedOptions } from './types';
import type { WorkflowExecutor } from '@forestadmin/workflow-executor';

import path from 'path';

/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */
const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400;

/**
* Serialize the executor's structured log context onto the message. The agent's logger only
* accepts an Error as its third argument, so the executor's rich context (runId, stepId, error,
* stack…) would be dropped otherwise. Never throws — logging must not crash the executor.
*/
function formatLog(message: string, context?: Record<string, unknown>): string {
if (!context || Object.keys(context).length === 0) return `[workflow-executor] ${message}`;

try {
return `[workflow-executor] ${message} ${JSON.stringify(context)}`;
} catch {
return `[workflow-executor] ${message} [unserializable context]`;
}
}

/**
* Turn the standalone server's bind host into a host the embedded executor can actually connect
* to. Wildcard/unspecified binds (undefined, `0.0.0.0`, `::`) are not valid connect targets, so
* fall back to loopback. IPv6 literals must be bracketed to sit in a URL authority.
*/
function toConnectableHost(host?: string): string {
if (!host || host === '0.0.0.0' || host === '::') return '127.0.0.1';

return host.includes(':') ? `[${host}]` : host;
}

/**
* Owns the lifecycle of a workflow executor embedded in the agent process: configuration,
* dynamic loading of the optional package, build, and start/stop. The agent only wires the proxy
* route to the loopback URL `configure()` returns and delegates start/stop.
*/
export default class EmbeddedWorkflowExecutor {
private config: (WorkflowExecutorEmbedOptions & { port: number }) | null = null;
private executor: WorkflowExecutor | null = null;

constructor(private readonly options: AgentOptionsWithDefaults) {}

/**
* Register the embedded executor and return the loopback URL the agent must proxy
* `/_internal/executor/*` to. Throws when the remote `workflowExecutorUrl` option is also set.
*/
configure(embedOptions: WorkflowExecutorEmbedOptions): string {
if (this.options.workflowExecutorUrl) {
throw new Error(
'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' +
'embeds an executor in-process, the latter targets a remote one. Choose one.',
);
}

const port =
embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT);
this.config = { ...embedOptions, port };

return `http://127.0.0.1:${port}`;
}

/**
* Build and start the executor. Called from agent.start() after mount(), so the standalone
* server's host/port (used to derive the agent URL) are already known.
*/
async start(
standaloneServerHost: string | undefined,
standaloneServerPort: number | undefined,
): Promise<void> {
const { config } = this;
if (!config) return;

const agentUrl =
config.agentUrl ?? this.deriveAgentUrl(standaloneServerHost, standaloneServerPort);

if (!agentUrl) {
throw new Error(
'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' +
'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' +
'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().',
);
}

const database =
config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined);

if (!database) {
throw new Error(
'Embedded workflow executor requires a database to persist run state. Pass `database` to ' +
'addWorkflowExecutor() or set the DATABASE_URL environment variable.',
);
}

const { buildDatabaseExecutor } = await this.importPackage();

this.executor = buildDatabaseExecutor({
envSecret: this.options.envSecret,
authSecret: this.options.authSecret,
forestServerUrl: this.options.forestServerUrl,
agentUrl,
httpPort: config.port,
pollingIntervalS: config.pollingIntervalS,
stepTimeoutS: config.stepTimeoutS,
logger: (level, message, context) => this.options.logger(level, formatLog(message, context)),
// Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop()
// drains the executor explicitly.
manageProcessSignals: false,
database: database as Parameters<typeof buildDatabaseExecutor>[0]['database'],
});

await this.executor.start();
this.options.logger(
'Info',
`Embedded workflow executor started (loopback port ${config.port})`,
);
}

async stop(): Promise<void> {
await this.executor?.stop();
}

/**
* Derive the URL the embedded executor uses to reach this agent over HTTP. Only possible on a
* standalone server, where the agent owns the listening host/port.
*/
private deriveAgentUrl(
standaloneServerHost: string | undefined,
standaloneServerPort: number | undefined,
): string | null {
if (!standaloneServerPort) return null;

// agent-client appends `/forest`, so the URL stops at the configured prefix.
const prefixPath = path.posix.join('/', this.options.prefix);
const base = `http://${toConnectableHost(standaloneServerHost)}:${standaloneServerPort}`;

return prefixPath === '/' ? base : `${base}${prefixPath}`;
}

/**
* Dynamically load the optional @forestadmin/workflow-executor package.
* Deferred so agents that don't embed an executor never load its code at startup.
*/
private async importPackage() {
try {
return await import('@forestadmin/workflow-executor');
} catch (error) {
throw new Error(
'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
'Install it with `npm install @forestadmin/workflow-executor`.',
);
}
}
Comment on lines +146 to +155

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium src/embedded-workflow-executor.ts:146

importPackage() catches every rejection from import('@forestadmin/workflow-executor') and replaces it with a generic "package not installed" message. When the package is installed but throws during module evaluation (e.g. a broken transitive dependency or a runtime error), agent.start() surfaces the false missing-dependency error, hiding the real cause. Consider detecting MODULE_NOT_FOUND specifically and rethrowing all other errors unchanged.

  private async importPackage() {
    try {
      return await import('@forestadmin/workflow-executor');
-    } catch (error) {
-      throw new Error(
-        'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
-          'Install it with `npm install @forestadmin/workflow-executor`.',
-      );
+    } catch (error: any) {
+      if (error?.code === 'MODULE_NOT_FOUND') {
+        throw new Error(
+          'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
+            'Install it with `npm install @forestadmin/workflow-executor`.',
+        );
+      }
+
+      throw error;
    }
  }
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/embedded-workflow-executor.ts around lines 146-155:

`importPackage()` catches every rejection from `import('@forestadmin/workflow-executor')` and replaces it with a generic "package not installed" message. When the package is installed but throws during module evaluation (e.g. a broken transitive dependency or a runtime error), `agent.start()` surfaces the false missing-dependency error, hiding the real cause. Consider detecting `MODULE_NOT_FOUND` specifically and rethrowing all other errors unchanged.

}
2 changes: 2 additions & 0 deletions packages/agent/src/framework-mounter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import McpMiddleware from './mcp-middleware';

export default class FrameworkMounter {
public standaloneServerPort: number;
public standaloneServerHost?: string;

private readonly onFirstStart: (() => Promise<void>)[] = [];
private readonly onEachStart: ((router: Router) => Promise<void>)[] = [];
Expand Down Expand Up @@ -67,6 +68,7 @@ export default class FrameworkMounter {
mountOnStandaloneServer(port?: number, host?: string): this {
const portFromEnv = Number(process.env.PORT) || undefined;
const chosenPort = port ?? portFromEnv ?? 3351;
this.standaloneServerHost = host;
const server = createServer(this.getConnectCallback(true));

this.onFirstStart.push(() => {
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export function createAgent<S extends TSchema = TSchema>(options: AgentOptions):
}

export { Agent };
export { AgentOptions } from './types';
export { AgentOptions, WorkflowExecutorEmbedOptions } from './types';
export * from '@forestadmin/datasource-customizer';

// export is necessary for the agent-generator package
Expand Down
32 changes: 32 additions & 0 deletions packages/agent/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,42 @@ export type AgentOptions = {
* Base URL of the workflow executor to proxy requests to.
* When set, the agent forwards `/_internal/executor/*` to the executor verbatim,
* benefiting from the agent's authentication layer.
*
* Mutually exclusive with `agent.addWorkflowExecutor()`: use this option to target a
* separately-deployed executor, or `addWorkflowExecutor()` to run one in-process.
* @example 'http://localhost:4001'
*/
workflowExecutorUrl?: string | null;
};

/**
* Options for an embedded workflow executor, started in the same process as the agent
* through `agent.addWorkflowExecutor()`.
*/
export type WorkflowExecutorEmbedOptions = {
/**
* Database connection used to persist workflow run state. Accepts a connection URI or a
* Sequelize options object. Falls back to the `DATABASE_URL` environment variable when omitted.
* The agent throws at startup if neither is provided.
*/
database?: { uri?: string; [option: string]: unknown };
/**
* URL the executor uses to reach this agent's data layer.
* Auto-derived when the agent runs on its own server (`mountOnStandaloneServer`).
* Required when the agent is mounted on an external framework (Express/Fastify/NestJS),
* since the agent cannot know the host application's address.
*/
agentUrl?: string;
/**
* Loopback port the embedded executor listens on; the agent proxies to it internally.
* Defaults to the `HTTP_PORT` environment variable, or `3400`.
*/
port?: number;
/** Interval in seconds at which the executor polls the orchestrator for pending steps. */
pollingIntervalS?: number;
/** Per-step execution timeout in seconds. */
stepTimeoutS?: number;
};
export type AgentOptionsWithDefaults = Readonly<Required<AgentOptions>>;

export type HttpCallback = (req: IncomingMessage, res: ServerResponse, next?: () => void) => void;
Expand Down
45 changes: 45 additions & 0 deletions packages/agent/test/agent-workflow-executor-missing-dep.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* Covers the optional-dependency failure path: when @forestadmin/workflow-executor is not
* installed, the dynamic import() rejects and the agent surfaces an actionable error.
*/
import { DataSourceCustomizer } from '@forestadmin/datasource-customizer';

import * as factories from './__factories__';
import Agent from '../src/agent';

const mockMakeRoutes = jest.fn();
jest.mock('../src/routes', () => ({
__esModule: true,
default: (...args) => mockMakeRoutes(...args),
}));
jest.mock('@forestadmin/datasource-customizer');

// Simulate the package being absent: importing it throws, like a MODULE_NOT_FOUND at runtime.
jest.mock('@forestadmin/workflow-executor', () => {
throw new Error("Cannot find module '@forestadmin/workflow-executor'");
});

beforeEach(() => {
jest.clearAllMocks();
mockMakeRoutes.mockReturnValue([{ setupRoutes: jest.fn(), bootstrap: jest.fn() }]);
jest
.mocked(DataSourceCustomizer.prototype.getDataSource)
.mockResolvedValue(factories.dataSource.build());
});

describe('Agent.addWorkflowExecutor (optional dependency missing)', () => {
test('throws an actionable error when @forestadmin/workflow-executor cannot be imported', async () => {
const agent = new Agent(
factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true }),
);
agent.addWorkflowExecutor({
agentUrl: 'http://my-agent',
database: { uri: 'postgres://localhost/db' },
});

await expect(agent.start()).rejects.toThrow(
'The embedded workflow executor requires the `@forestadmin/workflow-executor` package',
);
});
});
Loading
Loading