-
Notifications
You must be signed in to change notification settings - Fork 12
feat(agent): embed workflow executor in-process via addWorkflowExecutor() #1717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
317e262
ab4e36f
8b98527
ee85683
ba4a95f
1d0bfc9
0f23209
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Medium
override async stop(): Promise<void> {
- // Drain the embedded executor first, while the agent it depends on is still serving.
- await this.embeddedExecutor?.stop();
- // Close anything related to ForestAdmin client
- this.options.forestAdminClient.close();
- // Stop at framework level
- await super.stop();
+ try {
+ // Drain the embedded executor first, while the agent it depends on is still serving.
+ await this.embeddedExecutor?.stop();
+ } finally {
+ // Close anything related to ForestAdmin client
+ this.options.forestAdminClient.close();
+ // Stop at framework level
+ await super.stop();
+ }
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| import type { AgentOptionsWithDefaults, WorkflowExecutorEmbedOptions } from './types'; | ||
| import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; | ||
|
|
||
| import path from 'path'; | ||
|
|
||
| /** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ | ||
| const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; | ||
|
|
||
| /** | ||
| * Serialize the executor's structured log context onto the message. The agent's logger only | ||
| * accepts an Error as its third argument, so the executor's rich context (runId, stepId, error, | ||
| * stack…) would be dropped otherwise. Never throws — logging must not crash the executor. | ||
| */ | ||
| function formatLog(message: string, context?: Record<string, unknown>): string { | ||
| if (!context || Object.keys(context).length === 0) return `[workflow-executor] ${message}`; | ||
|
|
||
| try { | ||
| return `[workflow-executor] ${message} ${JSON.stringify(context)}`; | ||
| } catch { | ||
| return `[workflow-executor] ${message} [unserializable context]`; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Turn the standalone server's bind host into a host the embedded executor can actually connect | ||
| * to. Wildcard/unspecified binds (undefined, `0.0.0.0`, `::`) are not valid connect targets, so | ||
| * fall back to loopback. IPv6 literals must be bracketed to sit in a URL authority. | ||
| */ | ||
| function toConnectableHost(host?: string): string { | ||
| if (!host || host === '0.0.0.0' || host === '::') return '127.0.0.1'; | ||
|
|
||
| return host.includes(':') ? `[${host}]` : host; | ||
| } | ||
|
|
||
| /** | ||
| * Owns the lifecycle of a workflow executor embedded in the agent process: configuration, | ||
| * dynamic loading of the optional package, build, and start/stop. The agent only wires the proxy | ||
| * route to the loopback URL `configure()` returns and delegates start/stop. | ||
| */ | ||
| export default class EmbeddedWorkflowExecutor { | ||
| private config: (WorkflowExecutorEmbedOptions & { port: number }) | null = null; | ||
| private executor: WorkflowExecutor | null = null; | ||
|
|
||
| constructor(private readonly options: AgentOptionsWithDefaults) {} | ||
|
|
||
| /** | ||
| * Register the embedded executor and return the loopback URL the agent must proxy | ||
| * `/_internal/executor/*` to. Throws when the remote `workflowExecutorUrl` option is also set. | ||
| */ | ||
| configure(embedOptions: WorkflowExecutorEmbedOptions): string { | ||
| if (this.options.workflowExecutorUrl) { | ||
| throw new Error( | ||
| 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option: the former ' + | ||
| 'embeds an executor in-process, the latter targets a remote one. Choose one.', | ||
| ); | ||
| } | ||
|
|
||
| const port = | ||
| embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); | ||
| this.config = { ...embedOptions, port }; | ||
|
|
||
| return `http://127.0.0.1:${port}`; | ||
| } | ||
|
|
||
| /** | ||
| * Build and start the executor. Called from agent.start() after mount(), so the standalone | ||
| * server's host/port (used to derive the agent URL) are already known. | ||
| */ | ||
| async start( | ||
| standaloneServerHost: string | undefined, | ||
| standaloneServerPort: number | undefined, | ||
| ): Promise<void> { | ||
| const { config } = this; | ||
| if (!config) return; | ||
|
|
||
| const agentUrl = | ||
| config.agentUrl ?? this.deriveAgentUrl(standaloneServerHost, standaloneServerPort); | ||
|
|
||
| if (!agentUrl) { | ||
| throw new Error( | ||
| 'Embedded workflow executor: unable to derive the agent URL. It is only auto-derived when ' + | ||
| 'the agent runs on its own server (mountOnStandaloneServer). When mounting on Express, ' + | ||
| 'Fastify or NestJS, pass `agentUrl` to addWorkflowExecutor().', | ||
| ); | ||
| } | ||
|
|
||
| const database = | ||
| config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); | ||
|
|
||
| if (!database) { | ||
| throw new Error( | ||
| 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + | ||
| 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', | ||
| ); | ||
| } | ||
|
|
||
| const { buildDatabaseExecutor } = await this.importPackage(); | ||
|
|
||
| this.executor = buildDatabaseExecutor({ | ||
| envSecret: this.options.envSecret, | ||
| authSecret: this.options.authSecret, | ||
| forestServerUrl: this.options.forestServerUrl, | ||
| agentUrl, | ||
| httpPort: config.port, | ||
| pollingIntervalS: config.pollingIntervalS, | ||
| stepTimeoutS: config.stepTimeoutS, | ||
| logger: (level, message, context) => this.options.logger(level, formatLog(message, context)), | ||
| // Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop() | ||
| // drains the executor explicitly. | ||
| manageProcessSignals: false, | ||
| database: database as Parameters<typeof buildDatabaseExecutor>[0]['database'], | ||
| }); | ||
|
|
||
| await this.executor.start(); | ||
| this.options.logger( | ||
| 'Info', | ||
| `Embedded workflow executor started (loopback port ${config.port})`, | ||
| ); | ||
| } | ||
|
|
||
| async stop(): Promise<void> { | ||
| await this.executor?.stop(); | ||
| } | ||
|
|
||
| /** | ||
| * Derive the URL the embedded executor uses to reach this agent over HTTP. Only possible on a | ||
| * standalone server, where the agent owns the listening host/port. | ||
| */ | ||
| private deriveAgentUrl( | ||
| standaloneServerHost: string | undefined, | ||
| standaloneServerPort: number | undefined, | ||
| ): string | null { | ||
| if (!standaloneServerPort) return null; | ||
|
|
||
| // agent-client appends `/forest`, so the URL stops at the configured prefix. | ||
| const prefixPath = path.posix.join('/', this.options.prefix); | ||
| const base = `http://${toConnectableHost(standaloneServerHost)}:${standaloneServerPort}`; | ||
|
|
||
| return prefixPath === '/' ? base : `${base}${prefixPath}`; | ||
| } | ||
|
|
||
| /** | ||
| * Dynamically load the optional @forestadmin/workflow-executor package. | ||
| * Deferred so agents that don't embed an executor never load its code at startup. | ||
| */ | ||
| private async importPackage() { | ||
| try { | ||
| return await import('@forestadmin/workflow-executor'); | ||
| } catch (error) { | ||
| throw new Error( | ||
| 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + | ||
| 'Install it with `npm install @forestadmin/workflow-executor`.', | ||
| ); | ||
| } | ||
| } | ||
|
Comment on lines
+146
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Medium
private async importPackage() {
try {
return await import('@forestadmin/workflow-executor');
- } catch (error) {
- throw new Error(
- 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
- 'Install it with `npm install @forestadmin/workflow-executor`.',
- );
+ } catch (error: any) {
+ if (error?.code === 'MODULE_NOT_FOUND') {
+ throw new Error(
+ 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
+ 'Install it with `npm install @forestadmin/workflow-executor`.',
+ );
+ }
+
+ throw error;
}
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent: |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* eslint-disable @typescript-eslint/no-explicit-any */ | ||
| /** | ||
| * Covers the optional-dependency failure path: when @forestadmin/workflow-executor is not | ||
| * installed, the dynamic import() rejects and the agent surfaces an actionable error. | ||
| */ | ||
| import { DataSourceCustomizer } from '@forestadmin/datasource-customizer'; | ||
|
|
||
| import * as factories from './__factories__'; | ||
| import Agent from '../src/agent'; | ||
|
|
||
| const mockMakeRoutes = jest.fn(); | ||
| jest.mock('../src/routes', () => ({ | ||
| __esModule: true, | ||
| default: (...args) => mockMakeRoutes(...args), | ||
| })); | ||
| jest.mock('@forestadmin/datasource-customizer'); | ||
|
|
||
| // Simulate the package being absent: importing it throws, like a MODULE_NOT_FOUND at runtime. | ||
| jest.mock('@forestadmin/workflow-executor', () => { | ||
| throw new Error("Cannot find module '@forestadmin/workflow-executor'"); | ||
| }); | ||
|
|
||
| beforeEach(() => { | ||
| jest.clearAllMocks(); | ||
| mockMakeRoutes.mockReturnValue([{ setupRoutes: jest.fn(), bootstrap: jest.fn() }]); | ||
| jest | ||
| .mocked(DataSourceCustomizer.prototype.getDataSource) | ||
| .mockResolvedValue(factories.dataSource.build()); | ||
| }); | ||
|
|
||
| describe('Agent.addWorkflowExecutor (optional dependency missing)', () => { | ||
| test('throws an actionable error when @forestadmin/workflow-executor cannot be imported', async () => { | ||
| const agent = new Agent( | ||
| factories.forestAdminHttpDriverOptions.build({ skipSchemaUpdate: true }), | ||
| ); | ||
| agent.addWorkflowExecutor({ | ||
| agentUrl: 'http://my-agent', | ||
| database: { uri: 'postgres://localhost/db' }, | ||
| }); | ||
|
|
||
| await expect(agent.start()).rejects.toThrow( | ||
| 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package', | ||
| ); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Medium
agent-nodejs/packages/agent/src/agent.ts
Line 96 in 8b98527
If
this.embeddedExecutor.start()throws afterthis.mount(router)succeeds, thecatchblock only rethrows without unmounting or closing the server. The agent is left mounted and serving routes with no running executor, and a subsequentstart()retry fails withEADDRINUSEbecause the listener was never cleaned up. Consider stopping/unmounting the agent in thecatchblock before rethrowing so startup failures don't leave a half-started server bound to the port.Also found in 1 other location(s)
packages/agent/src/embedded-workflow-executor.ts:99🚀 Reply "fix it for me" or copy this AI Prompt for your agent: