feat(agent): embed workflow executor in-process via addWorkflowExecutor()#1717
feat(agent): embed workflow executor in-process via addWorkflowExecutor()#1717PMerlet wants to merge 7 commits into
Conversation
…or()
Today the agent and the workflow executor ship as two separate packages so
the executor stays compatible with any agent version. For Node agents that
want a turnkey setup, this adds an opt-in way to run the executor in the same
process, with no separate deployment.
`createAgent(options).addWorkflowExecutor({ database })` boots the executor on
start() and drains it on stop(). It reuses the existing
`/_internal/executor/*` proxy by wiring `workflowExecutorUrl` to a loopback
port, so the agent's auth layer and the raw-bytes passthrough apply unchanged.
- The executor port is known up-front (option, HTTP_PORT env, or 3400) since
the proxy route is built before the listener exists.
- `agentUrl` (used by the executor to reach the agent over HTTP) is derived
from the standalone server port + prefix; an explicit `agentUrl` is required
when the agent is mounted on Express/Fastify/NestJS.
- The run store is database-backed: `database` option, else DATABASE_URL, else
a clear startup error (no silent in-memory fallback that would lose runs).
- Mutually exclusive with the `workflowExecutorUrl` option (remote executor).
- `@forestadmin/workflow-executor` is an optionalDependency, loaded via a
guarded dynamic import; agents that don't embed it pull none of its deps.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Coverage Impact ⬆️ Merging this pull request will increase total coverage on Modified Files with Diff Coverage (4)
🤖 Increase coverage with AI coding...🚦 See full report on Qlty Cloud » 🛟 Help
|
…ckage path Adds coverage for the two diff lines qlty flagged on the embed feature: - the logger callback forwarded to buildDatabaseExecutor (prefix wrapping) - the dynamic import() failure path when @forestadmin/workflow-executor is not installed (new dedicated test file mocking the import to throw) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 new issue
|
There was a problem hiding this comment.
🟡 Medium
agent-nodejs/packages/agent/src/agent.ts
Line 96 in 8b98527
If this.embeddedExecutor.start() throws after this.mount(router) succeeds, the catch block only rethrows without unmounting or closing the server. The agent is left mounted and serving routes with no running executor, and a subsequent start() retry fails with EADDRINUSE because the listener was never cleaned up. Consider stopping/unmounting the agent in the catch block before rethrowing so startup failures don't leave a half-started server bound to the port.
Also found in 1 other location(s)
packages/agent/src/embedded-workflow-executor.ts:99
start()does not clean up a partially started executor whenthis.executor.start()rejects. In@forestadmin/workflow-executor,WorkflowExecutor.start()starts the runner before binding the HTTP port, so a port conflict or listen error leaves the polling timer and database connections running. Here that rejection is just propagated from line 99, which meansagent.start()fails but the embedded executor keeps background resources alive and can hang the process until it is killed.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/agent.ts around line 96:
If `this.embeddedExecutor.start()` throws after `this.mount(router)` succeeds, the `catch` block only rethrows without unmounting or closing the server. The agent is left mounted and serving routes with no running executor, and a subsequent `start()` retry fails with `EADDRINUSE` because the listener was never cleaned up. Consider stopping/unmounting the agent in the `catch` block before rethrowing so startup failures don't leave a half-started server bound to the port.
Also found in 1 other location(s):
- packages/agent/src/embedded-workflow-executor.ts:99 -- `start()` does not clean up a partially started executor when `this.executor.start()` rejects. In `@forestadmin/workflow-executor`, `WorkflowExecutor.start()` starts the runner before binding the HTTP port, so a port conflict or listen error leaves the polling timer and database connections running. Here that rejection is just propagated from line 99, which means `agent.start()` fails but the embedded executor keeps background resources alive and can hang the process until it is killed.
| private async importPackage() { | ||
| try { | ||
| return await import('@forestadmin/workflow-executor'); | ||
| } catch (error) { | ||
| throw new Error( | ||
| 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' + | ||
| 'Install it with `npm install @forestadmin/workflow-executor`.', | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
🟡 Medium src/embedded-workflow-executor.ts:146
importPackage() catches every rejection from import('@forestadmin/workflow-executor') and replaces it with a generic "package not installed" message. When the package is installed but throws during module evaluation (e.g. a broken transitive dependency or a runtime error), agent.start() surfaces the false missing-dependency error, hiding the real cause. Consider detecting MODULE_NOT_FOUND specifically and rethrowing all other errors unchanged.
private async importPackage() {
try {
return await import('@forestadmin/workflow-executor');
- } catch (error) {
- throw new Error(
- 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
- 'Install it with `npm install @forestadmin/workflow-executor`.',
- );
+ } catch (error: any) {
+ if (error?.code === 'MODULE_NOT_FOUND') {
+ throw new Error(
+ 'The embedded workflow executor requires the `@forestadmin/workflow-executor` package. ' +
+ 'Install it with `npm install @forestadmin/workflow-executor`.',
+ );
+ }
+
+ throw error;
}
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/embedded-workflow-executor.ts around lines 146-155:
`importPackage()` catches every rejection from `import('@forestadmin/workflow-executor')` and replaces it with a generic "package not installed" message. When the package is installed but throws during module evaluation (e.g. a broken transitive dependency or a runtime error), `agent.start()` surfaces the false missing-dependency error, hiding the real cause. Consider detecting `MODULE_NOT_FOUND` specifically and rethrowing all other errors unchanged.
There was a problem hiding this comment.
🟡 Medium src/agent.ts:118
stop() awaits this.embeddedExecutor?.stop() without a finally, so if that promise rejects, forestAdminClient.close() and super.stop() are skipped. The agent's server socket stays open and the Forest Admin client remains subscribed after a failed executor shutdown. Wrap the executor drain in try/finally (or .catch()) so the remaining cleanup always runs.
override async stop(): Promise<void> {
- // Drain the embedded executor first, while the agent it depends on is still serving.
- await this.embeddedExecutor?.stop();
- // Close anything related to ForestAdmin client
- this.options.forestAdminClient.close();
- // Stop at framework level
- await super.stop();
+ try {
+ // Drain the embedded executor first, while the agent it depends on is still serving.
+ await this.embeddedExecutor?.stop();
+ } finally {
+ // Close anything related to ForestAdmin client
+ this.options.forestAdminClient.close();
+ // Stop at framework level
+ await super.stop();
+ }
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/agent/src/agent.ts around lines 118-125:
`stop()` awaits `this.embeddedExecutor?.stop()` without a `finally`, so if that promise rejects, `forestAdminClient.close()` and `super.stop()` are skipped. The agent's server socket stays open and the Forest Admin client remains subscribed after a failed executor shutdown. Wrap the executor drain in `try/finally` (or `.catch()`) so the remaining cleanup always runs.

Why
The agent and the workflow executor ship as two separate packages so the executor stays compatible with any agent version. For Node agents that want a turnkey setup, this adds an opt-in way to run the executor in the same process — no separate deployment, no second service to operate.
What
addWorkflowExecutor()boots the executor onstart()and drains it onstop(). It reuses the existing/_internal/executor/*proxy by wiringworkflowExecutorUrlto a loopback port, so the agent's auth layer and the raw-bytes passthrough apply unchanged.Design decisions
HTTP_PORTenv →3400): the proxy route is built before the listener exists, so a random:0port wasn't viable.agentUrlderived from the standalone server port + prefix (the executor reaches the agent over HTTP viaagent-client). An explicitagentUrlis required when the agent is mounted on Express/Fastify/NestJS, since the agent can't know the host app's address.databaseoption →DATABASE_URLenv → clear startup error. No silent in-memory fallback (it would lose runs on restart).workflowExecutorUrloption (remote executor) — throws if both are set.@forestadmin/workflow-executoris anoptionalDependency, loaded via a guarded dynamicimport(). Agents that don't embed it pull none of its (heavy) deps; a missing package yields an actionable error.OpenTelemetry
No incompatibility. The executor's OTel setup is an entrypoint/Docker concern (
--require tracing.js), fully decoupled from the library consumed here. In embedded mode, instrumentation is governed by the agent process — if the customer instruments it, the embedded executor is auto-traced in the same process for free.Tests
packages/agent/test/agent-workflow-executor.test.ts): port wiring,HTTP_PORTfallback, chaining, double-call guard, mutual exclusion, agentUrl derivation, DATABASE_URL fallback, missing-database error, lifecycle (start/stop).buildDatabaseExecutor, not mocked): executor boots, migrations create theforestschema +workflow_step_executionstable,/healthreturns 200, agent probe passes over loopback, clean shutdown. (Not committed as a CI test — jest's resolver can't load the executor's@langchain/anthropicdep tree; Node resolves it fine, so the unit tests mock the package.)🤖 Generated with Claude Code
Note
Add
addWorkflowExecutor()to run a workflow executor in-process within the agentaddWorkflowExecutor(options?)to theAgentclass in agent.ts, which configures and starts an embedded@forestadmin/workflow-executortied to the agent lifecycle (started after router mount, stopped before shutdown).agentUrlfrom the standalone server's host/port when not explicitly provided.manageProcessSignalsflag tobuildDatabaseExecutorandbuildInMemoryExecutorin theworkflow-executorpackage; whenfalse, the executor skips SIGTERM/SIGINT handler registration so the host process manages signals.addWorkflowExecutor()andworkflowExecutorUrlin agent options are mutually exclusive; calling both throws an error.@forestadmin/workflow-executoris an optional peer dependency; if missing,agent.start()throws with a user-facing error.Macroscope summarized 0f23209.