diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md
index 87a38b79..8671c550 100644
--- a/ARCHITECTURE.md
+++ b/ARCHITECTURE.md
@@ -105,6 +105,7 @@ of coordination. There is no separate orchestrator server.
| |
| - workflow_runs |
| - step_attempts |
+ | - workflow_signals |
+------------------------------+
```
@@ -122,9 +123,9 @@ of coordination. There is no separate orchestrator server.
`npx @openworkflow/cli worker start` with
auto-discovery of workflow files.
- **Backend**: The source of truth. It stores workflow runs and step attempts.
- The `workflow_runs` table serves as the job queue for the workers, while the
- `step_attempts` table serves as a record of started and completed work,
- enabling memoization.
+ The `workflow_runs` table serves as the job queue for the workers, the
+ `step_attempts` table serves as a record of started and completed work, and
+ the `workflow_signals` buffers signals until a waiting step consumes them.
### 2.3. Basic Execution Flow
@@ -149,7 +150,7 @@ of coordination. There is no separate orchestrator server.
`step_attempt` record with status `running`, executes the step function, and
then updates the `step_attempt` to `completed` upon completion. The Worker
continues executing inline until the workflow code completes or encounters a
- sleep.
+ durable wait such as sleep, child-workflow waiting, or signal waiting.
6. **State Update**: The Worker updates the Backend with each `step_attempt` as
it is created and completed, and updates the status of the `workflow_run`
(e.g., `completed`, `running` for parked waits).
@@ -234,10 +235,23 @@ target workflow name in `spec`) and `options.timeout` controls the wait timeout
(default 1y). When the timeout is reached, the parent step fails but the child
workflow continues running independently.
-All step APIs (`step.run`, `step.sleep`, and `step.runWorkflow`) share the same
-collision logic for durable keys. If duplicate base names are encountered in one
-execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`,
-and so on so each step call maps to a distinct step attempt.
+**`step.sendSignal(workflowRunId, signal, options?)`**: Sends signal to another
+workflow run as a durable step. The signal is stored in `workflow_signals` and
+retried safely using a deterministic idempotency key derived from the logical
+step name.
+
+**`step.waitForSignal(signal, options?)`**: Waits durably for the next buffered
+signal with that name for the current workflow run. The common-case API uses
+the signal string itself as the durable step name. When needed,
+`step.waitForSignal({ name, signal, timeout, schema })` lets callers override
+the durable step name explicitly. Waits return the parsed payload or `null` on
+timeout.
+
+All step APIs (`step.run`, `step.sleep`, `step.runWorkflow`,
+`step.sendSignal`, and `step.waitForSignal`) share the same collision logic for
+durable keys. If duplicate base names are encountered in one execution pass,
+OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, and so on so each
+step call maps to a distinct step attempt.
## 4. Error Handling & Retries
diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx
index 4952f4cd..f143c36f 100644
--- a/packages/dashboard/src/routes/runs/$runId.tsx
+++ b/packages/dashboard/src/routes/runs/$runId.tsx
@@ -352,8 +352,7 @@ function RunDetailsPage() {
const config = STEP_STATUS_CONFIG[step.status];
const StatusIcon = config.icon;
const iconColor = config.color;
- const stepTypeLabel =
- step.kind === "function" ? "function" : step.kind;
+ const stepTypeLabel = formatStepKindLabel(step.kind);
const stepDuration = computeDuration(
step.startedAt,
step.finishedAt,
@@ -740,6 +739,10 @@ function StepInspectorPanel({
value={attemptCount.toString()}
mono
/>
+
): unknown {
if (value instanceof Error) {
return {
diff --git a/packages/docs/docs.json b/packages/docs/docs.json
index 13eb3f90..4d34e3bf 100644
--- a/packages/docs/docs.json
+++ b/packages/docs/docs.json
@@ -42,6 +42,7 @@
"docs/parallel-steps",
"docs/dynamic-steps",
"docs/child-workflows",
+ "docs/signals",
"docs/retries",
"docs/type-safety",
"docs/versioning",
diff --git a/packages/docs/docs/openworkflow-vs-temporal.mdx b/packages/docs/docs/openworkflow-vs-temporal.mdx
index 06b0a2c8..5305b066 100644
--- a/packages/docs/docs/openworkflow-vs-temporal.mdx
+++ b/packages/docs/docs/openworkflow-vs-temporal.mdx
@@ -50,8 +50,9 @@ Temporal exposes multiple timeout classes and retry options per workflow and
activity. That flexibility is useful when you need it.
OpenWorkflow keeps the default surface smaller (`step.run`, `step.sleep`,
-retries with backoff, optional `deadlineAt`) and is often faster to adopt for
-TypeScript teams that want fewer moving parts.
+`step.runWorkflow`, `step.waitForSignal`, retries with backoff, optional
+`deadlineAt`) and is often faster to adopt for TypeScript teams that want fewer
+moving parts.
### Tradeoff: happy-path overhead vs resume-path replay CPU
@@ -67,8 +68,7 @@ resume.
- **Polyglot platform**: You need one orchestration platform across multiple
languages.
-- **Advanced runtime interaction**: You need Signals, Queries, Updates, or
- Activity heartbeats. These are coming to OpenWorkflow but are available right
- now in Temporal.
+- **Advanced runtime interaction**: You need the full Temporal surface such as
+ Queries, Updates, or Activity heartbeats.
- **Dedicated platform ownership**: Your team can operate Temporal Server as
shared infrastructure.
diff --git a/packages/docs/docs/overview.mdx b/packages/docs/docs/overview.mdx
index 75ac999c..a83e631f 100644
--- a/packages/docs/docs/overview.mdx
+++ b/packages/docs/docs/overview.mdx
@@ -27,7 +27,8 @@ input)`), creating a `pending` run in the database
3. The worker replays workflow code from the beginning
4. Completed steps return cached outputs; new steps execute and persist results
5. The run transitions to `completed`, `failed`, or `canceled` (or stays
- `running` while durably parked for sleep)
+ `running` while durably parked for sleep, child-workflow waiting, or
+ signal waiting)
Because replay is deterministic, crash recovery is reliable. A replacement
worker can continue from the last durable checkpoint without redoing completed
@@ -37,6 +38,8 @@ work.
- **Memoized steps** prevent duplicate side effects on retries
- **Durable sleep** (`step.sleep`) pauses runs without holding worker capacity
+- **Durable signals** (`step.waitForSignal`) buffer signals until a waiting step
+ consumes them
- **Heartbeats + leases** (`availableAt`) allow automatic crash recovery
- **Database as source of truth** avoids a separate orchestration service
diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx
index f97bc187..210ae320 100644
--- a/packages/docs/docs/roadmap.mdx
+++ b/packages/docs/docs/roadmap.mdx
@@ -20,10 +20,10 @@ description: What's coming next for OpenWorkflow
- ✅ Idempotency keys
- ✅ Prometheus `/metrics` endpoint
- ✅ Child workflows (`step.runWorkflow`)
+- ✅ Signals (`step.waitForSignal`, `step.sendSignal`, `ow.sendSignal`)
## Coming Soon
-- Signals
- Cron / scheduling
- Rollback / compensation functions
- Priority and concurrency controls
diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx
new file mode 100644
index 00000000..7e58d807
--- /dev/null
+++ b/packages/docs/docs/signals.mdx
@@ -0,0 +1,157 @@
+---
+title: Signals
+description: Pause a workflow until your app or another workflow sends a named signal
+---
+
+Use signals when a workflow needs to wait for an event that does not happen at
+a fixed time, like a human approval, a webhook callback, or a result from
+another workflow.
+
+A signal is addressed to one specific workflow run by run ID. OpenWorkflow
+buffers signals durably, so they can arrive before or after the workflow reaches
+`step.waitForSignal()`.
+
+## Basic Usage
+
+Wait for a signal inside a workflow:
+
+```ts
+import { defineWorkflow } from "openworkflow";
+
+export const reviewOrder = defineWorkflow(
+ { name: "review-order" },
+ async ({ input, step }) => {
+ await step.run({ name: "save-order" }, async () => {
+ await db.orders.insert(input);
+ });
+
+ const approval = await step.waitForSignal<{
+ approved: boolean;
+ reviewerId: string;
+ }>("approval", {
+ timeout: "7d",
+ });
+
+ if (!approval?.approved) {
+ throw new Error("Order was not approved");
+ }
+
+ await step.run({ name: "submit-order" }, async () => {
+ await orders.submit(input.orderId, approval.reviewerId);
+ });
+ },
+);
+```
+
+Send the signal from your application code:
+
+```ts
+const handle = await reviewOrder.run({ orderId: "order_123" });
+
+await ow.sendSignal(handle.workflowRun.id, "approval", {
+ data: {
+ approved: true,
+ reviewerId: "manager_42",
+ },
+});
+```
+
+## Send from Your App
+
+Use `ow.sendSignal()` when the event comes from outside workflow code, such as
+an HTTP route, webhook handler, or admin action:
+
+```ts
+await ow.sendSignal(runId, "approval", {
+ data: {
+ approved: true,
+ reviewerId: "manager_42",
+ },
+ idempotencyKey: `approval:${event.id}`,
+});
+```
+
+Use `idempotencyKey` when the sender might retry. If the same
+`workflowRunId` + `idempotencyKey` is sent again, OpenWorkflow stores it only
+once.
+
+## Send from Another Workflow
+
+Use `step.sendSignal()` when one workflow should unblock another and you want
+the send itself to be durable:
+
+```ts
+await step.sendSignal(targetRunId, "approval", {
+ data: { approved: true, reviewerId: "manager_42" },
+});
+```
+
+This is useful when workflows coordinate as peers rather than as
+parent-and-child runs.
+
+## Timeout and Validation
+
+`step.waitForSignal()` returns the payload when a matching signal arrives, or
+`null` if the timeout is reached first:
+
+```ts
+const approval = await step.waitForSignal("approval", {
+ timeout: "3d",
+ schema: z.object({
+ approved: z.boolean(),
+ reviewerId: z.string(),
+ }),
+});
+
+if (approval === null) {
+ await step.run({ name: "mark-expired" }, async () => {
+ await db.requests.update(requestId, { status: "expired" });
+ });
+}
+```
+
+If the signal payload fails schema validation, the wait step fails immediately.
+
+## Step Names
+
+By default, the signal string is also the durable step name:
+
+```ts
+await step.waitForSignal("approval");
+await step.sendSignal(targetRunId, "approval");
+```
+
+Step names share one namespace across all step types. If the same name appears
+again during one execution, OpenWorkflow applies its normal `:1`, `:2`, and so
+on disambiguation.
+
+If you want a more explicit durable step name, use the object form:
+
+```ts
+await step.waitForSignal({
+ name: "manager-approval",
+ signal: "approval",
+ timeout: "7d",
+});
+```
+
+## Important Behavior
+
+- Signals are buffered per `workflowRunId` and signal name until consumed.
+- If multiple buffered signals match, the oldest pending one is consumed first.
+- A run can have only one active waiter for a given signal name at a time.
+- Signals target a specific run ID, not every run of a workflow.
+
+## When to Use Signals
+
+Signals are a good fit for:
+
+- Human approvals and manual review steps
+- Webhook or callback-style resumptions
+- Cross-workflow coordination when you do not want a child workflow
+
+
+ If you need to wait for another workflow's final result, prefer
+ [`step.runWorkflow()`](/docs/child-workflows). If you need to wait until a
+ specific time, prefer [`step.sleep()`](/docs/sleeping).
+
diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx
index 5eaa2f80..349cb8a1 100644
--- a/packages/docs/docs/steps.mdx
+++ b/packages/docs/docs/steps.mdx
@@ -115,7 +115,7 @@ await step.run({ name: "log-event" }, async () => {
## Step Types
-OpenWorkflow provides three step types:
+OpenWorkflow provides five step types:
### `step.run()`
@@ -148,6 +148,42 @@ const childOutput = await step.runWorkflow(
);
```
+### `step.sendSignal()`
+
+Sends signal to another workflow run as a durable step:
+
+```ts
+await step.sendSignal(targetRunId, "approval", {
+ data: { approved: true },
+});
+```
+
+See [Signals](/docs/signals) for buffered delivery, external sends with
+`ow.sendSignal()`, and coordination patterns.
+
+### `step.waitForSignal()`
+
+Waits durably for the next matching signal for the current workflow run:
+
+```ts
+const approval = await step.waitForSignal("approval", {
+ timeout: "7d",
+});
+```
+
+If you need to override the durable step name explicitly, use the object form:
+
+```ts
+const approval = await step.waitForSignal({
+ name: "manager-approval",
+ signal: "approval",
+ timeout: "7d",
+});
+```
+
+See [Signals](/docs/signals) for end-to-end examples, timeout behavior, and
+delivery semantics.
+
## Retry Policy (Optional)
Control backoff and retry limits for an individual step:
diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx
index 5d8ae39e..7ef12e52 100644
--- a/packages/docs/docs/workflows.mdx
+++ b/packages/docs/docs/workflows.mdx
@@ -63,6 +63,15 @@ for the workflow to complete.
runnable workflow that supports `.run(...)` directly.
+To wake an existing run from outside workflow code, send a buffered signal to
+its run ID:
+
+```ts
+await ow.sendSignal(handle.workflowRun.id, "approval", {
+ data: { approved: true },
+});
+```
+
### Scheduling a Workflow Run
You can schedule a workflow run for a specific time by passing `availableAt`:
@@ -213,12 +222,12 @@ create a separate run.
The workflow function receives an object with four properties:
-| Parameter | Type | Description |
-| --------- | --------------------- | --------------------------------------------------------------------- |
-| `input` | Generic | The input data passed when starting the workflow |
-| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`) |
-| `version` | `string \| null` | The workflow version, if specified |
-| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) |
+| Parameter | Type | Description |
+| --------- | --------------------- | -------------------------------------------------------------------------------------------------------------- |
+| `input` | Generic | The input data passed when starting the workflow |
+| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, `step.waitForSignal`) |
+| `version` | `string \| null` | The workflow version, if specified |
+| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) |
```ts
defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => {
diff --git a/packages/openworkflow/client/client.test.ts b/packages/openworkflow/client/client.test.ts
index 910eb418..6d8e84f1 100644
--- a/packages/openworkflow/client/client.test.ts
+++ b/packages/openworkflow/client/client.test.ts
@@ -15,7 +15,7 @@ import { OpenWorkflow } from "./client.js";
import { type as arkType } from "arktype";
import { randomUUID } from "node:crypto";
import * as v from "valibot";
-import { describe, expect, test } from "vitest";
+import { describe, expect, test, vi } from "vitest";
import {
number as yupNumber,
object as yupObject,
@@ -506,6 +506,27 @@ describe("OpenWorkflow", () => {
expect(workflowRun?.finishedAt).not.toBeNull();
});
+ test("sends workflow signals via the backend", async () => {
+ const sendWorkflowSignal = vi.fn(() => Promise.resolve());
+ const client = new OpenWorkflow({
+ backend: {
+ sendWorkflowSignal,
+ } as unknown as Backend,
+ });
+
+ await client.sendSignal("run-123", "approval", {
+ data: { approved: true },
+ idempotencyKey: "signal-1",
+ });
+
+ expect(sendWorkflowSignal).toHaveBeenCalledWith({
+ workflowRunId: "run-123",
+ signal: "approval",
+ data: { approved: true },
+ idempotencyKey: "signal-1",
+ });
+ });
+
test("throws when canceling a non-existent workflow run", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts
index bb936dce..98fc67d3 100644
--- a/packages/openworkflow/client/client.ts
+++ b/packages/openworkflow/client/client.ts
@@ -1,5 +1,6 @@
import type { Backend } from "../core/backend.js";
import type { DurationString } from "../core/duration.js";
+import type { JsonValue } from "../core/json.js";
import type { StandardSchemaV1 } from "../core/standard-schema.js";
import { calculateDateFromDuration } from "../core/step-attempt.js";
import {
@@ -173,6 +174,28 @@ export class OpenWorkflow {
async cancelWorkflowRun(workflowRunId: string): Promise {
await this.backend.cancelWorkflowRun({ workflowRunId });
}
+
+ /**
+ * Sends a buffered signal to a workflow run.
+ * @param workflowRunId - Target workflow run id
+ * @param signal - Signal name
+ * @param options - Optional signal payload and client idempotency key
+ */
+ async sendSignal(
+ workflowRunId: string,
+ signal: string,
+ options?: Readonly<{
+ data?: JsonValue;
+ idempotencyKey?: string;
+ }>,
+ ): Promise {
+ await this.backend.sendWorkflowSignal({
+ workflowRunId,
+ signal,
+ data: options?.data ?? null,
+ idempotencyKey: options?.idempotencyKey ?? null,
+ });
+ }
}
/**
diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts
index d79dc5e6..2d2bf23e 100644
--- a/packages/openworkflow/core/backend.ts
+++ b/packages/openworkflow/core/backend.ts
@@ -68,6 +68,12 @@ export interface Backend {
params: Readonly,
): Promise;
+ // Signals
+ sendWorkflowSignal(params: Readonly): Promise;
+ consumeWorkflowSignal(
+ params: Readonly,
+ ): Promise;
+
// Lifecycle
stop(): Promise;
}
@@ -173,6 +179,20 @@ export interface SetStepAttemptChildWorkflowRunParams {
childWorkflowRunId: string;
}
+export interface SendWorkflowSignalParams {
+ workflowRunId: string;
+ signal: string;
+ data: JsonValue | null;
+ idempotencyKey: string | null;
+}
+
+export interface ConsumeWorkflowSignalParams {
+ workflowRunId: string;
+ signal: string;
+ stepAttemptId: string;
+ workerId: string;
+}
+
export interface PaginationOptions {
limit?: number;
after?: string;
diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts
index ce5f207f..c26a0e7a 100644
--- a/packages/openworkflow/core/step-attempt.test.ts
+++ b/packages/openworkflow/core/step-attempt.test.ts
@@ -6,6 +6,7 @@ import {
normalizeStepOutput,
calculateDateFromDuration,
createSleepContext,
+ createSignalWaitContext,
createWorkflowContext,
} from "./step-attempt.js";
import type { StepAttempt, StepAttemptCache } from "./step-attempt.js";
@@ -339,6 +340,19 @@ describe("createWorkflowContext", () => {
});
});
+describe("createSignalWaitContext", () => {
+ test("creates signal-wait context with signal and timeout", () => {
+ const timeoutAt = new Date("2025-06-15T10:30:00.000Z");
+ const context = createSignalWaitContext("approval", timeoutAt);
+
+ expect(context).toEqual({
+ kind: "signal-wait",
+ signal: "approval",
+ timeoutAt: "2025-06-15T10:30:00.000Z",
+ });
+ });
+});
+
function createMockStepAttempt(
overrides: Partial = {},
): StepAttempt {
diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts
index 888120ac..61d30970 100644
--- a/packages/openworkflow/core/step-attempt.ts
+++ b/packages/openworkflow/core/step-attempt.ts
@@ -7,7 +7,12 @@ import { err, ok } from "./result.js";
/**
* The kind of step in a workflow.
*/
-export type StepKind = "function" | "sleep" | "workflow";
+export type StepKind =
+ | "function"
+ | "sleep"
+ | "workflow"
+ | "signal-send"
+ | "signal-wait";
/**
* Status of a step attempt through its lifecycle.
@@ -34,12 +39,22 @@ export interface WorkflowStepAttemptContext {
timeoutAt: string | null;
}
+/**
+ * Context for a signal-wait step attempt.
+ */
+export interface SignalWaitStepAttemptContext {
+ kind: "signal-wait";
+ signal: string;
+ timeoutAt: string;
+}
+
/**
* Context for a step attempt.
*/
export type StepAttemptContext =
| SleepStepAttemptContext
- | WorkflowStepAttemptContext;
+ | WorkflowStepAttemptContext
+ | SignalWaitStepAttemptContext;
/**
* StepAttempt represents a single attempt of a step within a workflow.
@@ -171,3 +186,20 @@ export function createWorkflowContext(
timeoutAt: timeoutAt?.toISOString() ?? null,
};
}
+
+/**
+ * Create the context object for a signal-wait step attempt.
+ * @param signal - Signal name
+ * @param timeoutAt - Wait timeout deadline
+ * @returns The context object for the signal-wait step
+ */
+export function createSignalWaitContext(
+ signal: string,
+ timeoutAt: Readonly,
+): SignalWaitStepAttemptContext {
+ return {
+ kind: "signal-wait" as const,
+ signal,
+ timeoutAt: timeoutAt.toISOString(),
+ };
+}
diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts
index 01745679..068b19b2 100644
--- a/packages/openworkflow/core/workflow-function.ts
+++ b/packages/openworkflow/core/workflow-function.ts
@@ -1,7 +1,11 @@
import type { DurationString } from "./duration.js";
+import type { JsonValue } from "./json.js";
+import type { StandardSchemaV1 } from "./standard-schema.js";
import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js";
import type { WorkflowRun } from "./workflow-run.js";
+export type StepWaitTimeout = number | string | Date;
+
/**
* Config for an individual step defined with `step.run()`.
*/
@@ -37,13 +41,13 @@ export interface StepRunWorkflowOptions {
/**
* Maximum time to wait for the child workflow to complete.
*/
- timeout?: number | string | Date;
+ timeout?: StepWaitTimeout;
}
/**
* Represents the API for defining steps within a workflow. Used within a
* workflow handler to define steps by calling `step.run()`, `step.sleep()`,
- * and `step.runWorkflow()`.
+ * `step.runWorkflow()`, `step.sendSignal()`, and `step.waitForSignal()`.
*/
export interface StepApi {
run: (
@@ -56,6 +60,30 @@ export interface StepApi {
options?: Readonly,
) => Promise;
sleep: (name: string, duration: DurationString) => Promise;
+ sendSignal: (
+ workflowRunId: string,
+ signal: string,
+ options?: Readonly<{
+ data?: JsonValue;
+ }>,
+ ) => Promise;
+ waitForSignal: {
+ (
+ signal: string,
+ options?: Readonly<{
+ timeout?: StepWaitTimeout;
+ schema?: StandardSchemaV1;
+ }>,
+ ): Promise;
+ (
+ options: Readonly<{
+ name?: string;
+ signal: string;
+ timeout?: StepWaitTimeout;
+ schema?: StandardSchemaV1;
+ }>,
+ ): Promise;
+ };
}
/**
diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts
index 31fecd90..7fe6816e 100644
--- a/packages/openworkflow/postgres/backend.test.ts
+++ b/packages/openworkflow/postgres/backend.test.ts
@@ -8,7 +8,7 @@ import {
} from "./postgres.js";
import assert from "node:assert";
import { randomUUID } from "node:crypto";
-import { describe, expect, test } from "vitest";
+import { describe, expect, test, vi } from "vitest";
test("it is a test file (workaround for sonarjs/no-empty-test-file linter)", () => {
assert.ok(true);
@@ -632,3 +632,356 @@ describe("BackendPostgres workflow wake-up reconciliation", () => {
}
});
});
+
+describe("BackendPostgres.sendWorkflowSignal fallback branches", () => {
+ type BackendPostgresCtor = new (
+ pg: unknown,
+ namespaceId: string,
+ schema: string,
+ ) => BackendPostgres;
+
+ test("re-reads an existing signal after an idempotent insert conflict", async () => {
+ let selectCount = 0;
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith('SELECT "id"')) {
+ selectCount += 1;
+ return selectCount === 2 ? [{ id: "existing-signal" }] : [];
+ }
+
+ if (sql.startsWith("INSERT INTO")) {
+ return [];
+ }
+
+ return [];
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve());
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ workflowRunsTable: () => string;
+ wakeWorkflowRunForSignal: (workflowRunId: string) => Promise;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: { approved: true },
+ idempotencyKey: "signal-key",
+ });
+
+ expect(selectCount).toBe(2);
+ expect(wakeWorkflowRunForSignal).toHaveBeenCalledWith("workflow-run-id");
+ });
+
+ test("throws the generic error when no signal row is inserted for an active run", async () => {
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith("INSERT INTO")) {
+ return [];
+ }
+
+ return [];
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve());
+ const getWorkflowRun = vi.fn(() =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: "workflow-run-id",
+ status: "running",
+ }),
+ ),
+ );
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ workflowRunsTable: () => string;
+ wakeWorkflowRunForSignal: (workflowRunId: string) => Promise;
+ getWorkflowRun: typeof backend.getWorkflowRun;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+ internalBackend.getWorkflowRun =
+ getWorkflowRun as unknown as typeof backend.getWorkflowRun;
+
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ }),
+ ).rejects.toThrow("Failed to send workflow signal");
+
+ expect(getWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: "workflow-run-id",
+ });
+ expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled();
+ });
+
+ test("throws the generic error when the idempotent fallback re-read still finds nothing", async () => {
+ let selectCount = 0;
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith('SELECT "id"')) {
+ selectCount += 1;
+ return [];
+ }
+
+ if (sql.startsWith("INSERT INTO")) {
+ return [];
+ }
+
+ return [];
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve());
+ const getWorkflowRun = vi.fn(() =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: "workflow-run-id",
+ status: "running",
+ }),
+ ),
+ );
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ workflowRunsTable: () => string;
+ wakeWorkflowRunForSignal: (workflowRunId: string) => Promise;
+ getWorkflowRun: typeof backend.getWorkflowRun;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+ internalBackend.getWorkflowRun =
+ getWorkflowRun as unknown as typeof backend.getWorkflowRun;
+
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: null,
+ idempotencyKey: "signal-key",
+ }),
+ ).rejects.toThrow("Failed to send workflow signal");
+
+ expect(selectCount).toBe(2);
+ expect(getWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: "workflow-run-id",
+ });
+ expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled();
+ });
+});
+
+describe("BackendPostgres.consumeWorkflowSignal edge cases", () => {
+ type BackendPostgresCtor = new (
+ pg: unknown,
+ namespaceId: string,
+ schema: string,
+ ) => BackendPostgres;
+
+ test("returns null for previously consumed signals with null data", async () => {
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith('SELECT "data"')) {
+ return [{ data: null }];
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ stepAttemptsTable: () => string;
+ workflowRunsTable: () => string;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.stepAttemptsTable = () => '"step_attempts"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+
+ await expect(
+ backend.consumeWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ stepAttemptId: "step-attempt-id",
+ workerId: "worker-id",
+ }),
+ ).resolves.toBeNull();
+ });
+
+ test("returns undefined when no signal can be consumed", async () => {
+ let selectCount = 0;
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith('SELECT "data"')) {
+ selectCount += 1;
+ return [];
+ }
+
+ if (sql.startsWith("WITH candidate AS")) {
+ return [];
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ stepAttemptsTable: () => string;
+ workflowRunsTable: () => string;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.stepAttemptsTable = () => '"step_attempts"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+
+ await expect(
+ backend.consumeWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ stepAttemptId: "step-attempt-id",
+ workerId: "worker-id",
+ }),
+ ).resolves.toBeUndefined();
+
+ expect(selectCount).toBe(1);
+ });
+
+ test("returns null when a freshly consumed signal has null data", async () => {
+ const fakePg = createFakePostgresQueryHandler((sql) => {
+ if (sql.startsWith('SELECT "data"')) {
+ return [];
+ }
+
+ if (sql.startsWith("WITH candidate AS")) {
+ return [{ data: null }];
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendPostgres as unknown as BackendPostgresCtor)(
+ fakePg,
+ randomUUID(),
+ DEFAULT_SCHEMA,
+ );
+ const internalBackend = backend as unknown as {
+ workflowSignalsTable: () => string;
+ stepAttemptsTable: () => string;
+ workflowRunsTable: () => string;
+ };
+ internalBackend.workflowSignalsTable = () => '"workflow_signals"';
+ internalBackend.stepAttemptsTable = () => '"step_attempts"';
+ internalBackend.workflowRunsTable = () => '"workflow_runs"';
+
+ await expect(
+ backend.consumeWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ stepAttemptId: "step-attempt-id",
+ workerId: "worker-id",
+ }),
+ ).resolves.toBeNull();
+ });
+});
+
+function createFakePostgresQueryHandler(
+ handler: (sql: string) => readonly unknown[],
+): unknown {
+ const fakePg = ((input: TemplateStringsArray | string) => {
+ if (
+ Array.isArray(input) &&
+ Object.prototype.hasOwnProperty.call(input, "raw")
+ ) {
+ const sql = input.join(" ").replaceAll(/\s+/g, " ").trim();
+ return handler(sql);
+ }
+
+ return [];
+ }) as {
+ (input: TemplateStringsArray | string, ...values: unknown[]): unknown;
+ json: (value: unknown) => unknown;
+ end: () => Promise;
+ };
+ fakePg.json = (value) => value;
+ fakePg.end = async () => {
+ // no-op for tests
+ };
+
+ return fakePg;
+}
+
+function createMockWorkflowRun(
+ overrides: Partial<{
+ namespaceId: string;
+ id: string;
+ workflowName: string;
+ version: string | null;
+ status:
+ | "pending"
+ | "running"
+ | "sleeping"
+ | "completed"
+ | "succeeded"
+ | "failed"
+ | "canceled";
+ idempotencyKey: string | null;
+ config: Record;
+ context: Record | null;
+ input: Record | null;
+ output: Record | null;
+ error: Record | null;
+ attempts: number;
+ parentStepAttemptNamespaceId: string | null;
+ parentStepAttemptId: string | null;
+ workerId: string | null;
+ availableAt: Date | null;
+ deadlineAt: Date | null;
+ startedAt: Date | null;
+ finishedAt: Date | null;
+ createdAt: Date;
+ updatedAt: Date;
+ }> = {},
+) {
+ return {
+ namespaceId: "default",
+ id: "workflow-run-id",
+ workflowName: "workflow-name",
+ version: null,
+ status: "running",
+ idempotencyKey: null,
+ config: {},
+ context: null,
+ input: null,
+ output: null,
+ error: null,
+ attempts: 1,
+ parentStepAttemptNamespaceId: null,
+ parentStepAttemptId: null,
+ workerId: "worker-id",
+ availableAt: null,
+ deadlineAt: null,
+ startedAt: new Date("2026-01-01T00:00:00.000Z"),
+ finishedAt: null,
+ createdAt: new Date("2026-01-01T00:00:00.000Z"),
+ updatedAt: new Date("2026-01-01T00:00:00.000Z"),
+ ...overrides,
+ };
+}
diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts
index c2ac3187..fe7dfe66 100644
--- a/packages/openworkflow/postgres/backend.ts
+++ b/packages/openworkflow/postgres/backend.ts
@@ -17,6 +17,8 @@ import {
FailStepAttemptParams,
CompleteStepAttemptParams,
SetStepAttemptChildWorkflowRunParams,
+ SendWorkflowSignalParams,
+ ConsumeWorkflowSignalParams,
FailWorkflowRunParams,
RescheduleWorkflowRunAfterFailedStepAttemptParams,
CompleteWorkflowRunParams,
@@ -247,6 +249,168 @@ export class BackendPostgres implements Backend {
return workflowRun ?? null;
}
+ async sendWorkflowSignal(params: SendWorkflowSignalParams): Promise {
+ const workflowSignalsTable = this.workflowSignalsTable();
+ const workflowRunsTable = this.workflowRunsTable();
+
+ if (params.idempotencyKey !== null) {
+ const [existing] = await this.pg<{ id: string }[]>`
+ SELECT "id"
+ FROM ${workflowSignalsTable}
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "workflow_run_id" = ${params.workflowRunId}
+ AND "idempotency_key" = ${params.idempotencyKey}
+ LIMIT 1
+ `;
+
+ if (existing) {
+ await this.wakeWorkflowRunForSignal(params.workflowRunId);
+ return;
+ }
+ }
+
+ const conflictClause =
+ params.idempotencyKey === null
+ ? this.pg``
+ : this.pg`
+ ON CONFLICT ("namespace_id", "workflow_run_id", "idempotency_key")
+ WHERE "idempotency_key" IS NOT NULL
+ DO NOTHING
+ `;
+
+ const [inserted] = await this.pg<{ id: string }[]>`
+ INSERT INTO ${workflowSignalsTable} (
+ "namespace_id",
+ "id",
+ "workflow_run_id",
+ "signal",
+ "data",
+ "idempotency_key",
+ "consumed_step_attempt_namespace_id",
+ "consumed_step_attempt_id",
+ "consumed_at",
+ "created_at",
+ "updated_at"
+ )
+ SELECT
+ ${this.namespaceId},
+ gen_random_uuid(),
+ ${params.workflowRunId},
+ ${params.signal},
+ ${this.pg.json(params.data)},
+ ${params.idempotencyKey},
+ NULL,
+ NULL,
+ NULL,
+ date_trunc('milliseconds', NOW()),
+ NOW()
+ FROM ${workflowRunsTable}
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "id" = ${params.workflowRunId}
+ AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled')
+ ${conflictClause}
+ RETURNING "id"
+ `;
+
+ if (!inserted && params.idempotencyKey !== null) {
+ const [existing] = await this.pg<{ id: string }[]>`
+ SELECT "id"
+ FROM ${workflowSignalsTable}
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "workflow_run_id" = ${params.workflowRunId}
+ AND "idempotency_key" = ${params.idempotencyKey}
+ LIMIT 1
+ `;
+
+ if (existing) {
+ await this.wakeWorkflowRunForSignal(params.workflowRunId);
+ return;
+ }
+ }
+
+ if (!inserted) {
+ const workflowRun = await this.getWorkflowRun({
+ workflowRunId: params.workflowRunId,
+ });
+ if (!workflowRun) {
+ throw new Error(`Workflow run ${params.workflowRunId} does not exist`);
+ }
+
+ if (
+ ["succeeded", "completed", "failed", "canceled"].includes(
+ workflowRun.status,
+ )
+ ) {
+ throw new Error(
+ `Cannot send signal to workflow run ${params.workflowRunId} with status ${workflowRun.status}`,
+ );
+ }
+
+ throw new Error("Failed to send workflow signal");
+ }
+
+ await this.wakeWorkflowRunForSignal(params.workflowRunId);
+ }
+
+ async consumeWorkflowSignal(
+ params: ConsumeWorkflowSignalParams,
+ ): Promise {
+ const workflowSignalsTable = this.workflowSignalsTable();
+ const stepAttemptsTable = this.stepAttemptsTable();
+ const workflowRunsTable = this.workflowRunsTable();
+
+ const [existing] = await this.pg<{ data: JsonValue | null }[]>`
+ SELECT "data"
+ FROM ${workflowSignalsTable}
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "workflow_run_id" = ${params.workflowRunId}
+ AND "signal" = ${params.signal}
+ AND "consumed_step_attempt_namespace_id" = ${this.namespaceId}
+ AND "consumed_step_attempt_id" = ${params.stepAttemptId}
+ ORDER BY "created_at" ASC, "id" ASC
+ LIMIT 1
+ `;
+
+ if (existing) {
+ return existing.data ?? null;
+ }
+
+ const [consumed] = await this.pg<{ data: JsonValue | null }[]>`
+ WITH candidate AS (
+ SELECT "id", "data"
+ FROM ${workflowSignalsTable}
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "workflow_run_id" = ${params.workflowRunId}
+ AND "signal" = ${params.signal}
+ AND "consumed_at" IS NULL
+ ORDER BY "created_at" ASC, "id" ASC
+ LIMIT 1
+ FOR UPDATE SKIP LOCKED
+ )
+ UPDATE ${workflowSignalsTable} ws
+ SET
+ "consumed_step_attempt_namespace_id" = ${this.namespaceId},
+ "consumed_step_attempt_id" = ${params.stepAttemptId},
+ "consumed_at" = NOW(),
+ "updated_at" = NOW()
+ FROM candidate, ${stepAttemptsTable} sa, ${workflowRunsTable} wr
+ WHERE ws."namespace_id" = ${this.namespaceId}
+ AND ws."id" = candidate."id"
+ AND sa."namespace_id" = ${this.namespaceId}
+ AND sa."workflow_run_id" = ${params.workflowRunId}
+ AND sa."id" = ${params.stepAttemptId}
+ AND sa."kind" = 'signal-wait'
+ AND sa."status" = 'running'
+ AND wr."namespace_id" = sa."namespace_id"
+ AND wr."id" = sa."workflow_run_id"
+ AND wr."status" = 'running'
+ AND wr."worker_id" = ${params.workerId}
+ RETURNING candidate."data" AS "data"
+ `;
+
+ return consumed ? (consumed.data ?? null) : undefined;
+ }
+
async listWorkflowRuns(
params: ListWorkflowRunsParams,
): Promise> {
@@ -449,7 +613,8 @@ export class BackendPostgres implements Backend {
AND wr."id" = ${workflowRunId}
AND wr."status" = 'running'
AND wr."worker_id" IS NULL
- AND EXISTS (
+ AND (
+ EXISTS (
SELECT 1
FROM ${stepAttemptsTable} sa
JOIN ${workflowRunsTable} child
@@ -460,6 +625,19 @@ export class BackendPostgres implements Backend {
AND sa."kind" = 'workflow'
AND sa."status" = 'running'
AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled')
+ ) OR EXISTS (
+ SELECT 1
+ FROM ${stepAttemptsTable} sa
+ JOIN ${this.workflowSignalsTable()} ws
+ ON ws."namespace_id" = sa."namespace_id"
+ AND ws."workflow_run_id" = sa."workflow_run_id"
+ AND ws."signal" = sa."context"->>'signal'
+ WHERE sa."namespace_id" = wr."namespace_id"
+ AND sa."workflow_run_id" = wr."id"
+ AND sa."kind" = 'signal-wait'
+ AND sa."status" = 'running'
+ AND ws."consumed_at" IS NULL
+ )
)
RETURNING wr.*
`;
@@ -651,6 +829,25 @@ export class BackendPostgres implements Backend {
return updated;
}
+ private async wakeWorkflowRunForSignal(workflowRunId: string): Promise {
+ const workflowRunsTable = this.workflowRunsTable();
+
+ await this.pg`
+ UPDATE ${workflowRunsTable}
+ SET
+ "available_at" = CASE
+ WHEN "available_at" IS NULL OR "available_at" > NOW()
+ THEN NOW()
+ ELSE "available_at"
+ END,
+ "updated_at" = NOW()
+ WHERE "namespace_id" = ${this.namespaceId}
+ AND "id" = ${workflowRunId}
+ AND "status" IN ('pending', 'running', 'sleeping')
+ AND "worker_id" IS NULL
+ `;
+ }
+
private async wakeParentWorkflowRun(
childWorkflowRun: Readonly,
): Promise {
@@ -942,6 +1139,10 @@ export class BackendPostgres implements Backend {
private stepAttemptsTable(pg: Postgres = this.pg) {
return pg`${pg(this.schema)}.${pg("step_attempts")}`;
}
+
+ private workflowSignalsTable(pg: Postgres = this.pg) {
+ return pg`${pg(this.schema)}.${pg("workflow_signals")}`;
+ }
}
/**
diff --git a/packages/openworkflow/postgres/postgres.test.ts b/packages/openworkflow/postgres/postgres.test.ts
index 30e8c450..bd0656bd 100644
--- a/packages/openworkflow/postgres/postgres.test.ts
+++ b/packages/openworkflow/postgres/postgres.test.ts
@@ -46,6 +46,11 @@ describe("postgres", () => {
test("throws for schema names longer than 63 bytes", () => {
expect(() => migrations("a".repeat(64))).toThrow(/at most 63 bytes/i);
});
+
+ test("includes workflow_signals in the latest migration", () => {
+ const latestMigration = migrations(DEFAULT_SCHEMA).at(-1);
+ expect(latestMigration).toContain('"workflow_signals"');
+ });
});
describe("migrate()", () => {
diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts
index 7d641e44..5dfddad4 100644
--- a/packages/openworkflow/postgres/postgres.ts
+++ b/packages/openworkflow/postgres/postgres.ts
@@ -206,6 +206,51 @@ export function migrations(schema: string): string[] {
ON CONFLICT DO NOTHING;
COMMIT;`,
+
+ // 5 - workflow signals
+ `BEGIN;
+
+ CREATE TABLE IF NOT EXISTS ${quotedSchema}."workflow_signals" (
+ "namespace_id" TEXT NOT NULL,
+ "id" TEXT NOT NULL,
+ --
+ "workflow_run_id" TEXT NOT NULL,
+ "signal" TEXT NOT NULL,
+ "data" JSONB,
+ "idempotency_key" TEXT,
+ "consumed_step_attempt_namespace_id" TEXT,
+ "consumed_step_attempt_id" TEXT,
+ "consumed_at" TIMESTAMPTZ,
+ "created_at" TIMESTAMPTZ NOT NULL,
+ "updated_at" TIMESTAMPTZ NOT NULL,
+ PRIMARY KEY ("namespace_id", "id"),
+ FOREIGN KEY ("namespace_id", "workflow_run_id")
+ REFERENCES ${quotedSchema}."workflow_runs" ("namespace_id", "id")
+ ON DELETE CASCADE,
+ FOREIGN KEY ("consumed_step_attempt_namespace_id", "consumed_step_attempt_id")
+ REFERENCES ${quotedSchema}."step_attempts" ("namespace_id", "id")
+ ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS "workflow_signals_pending_lookup_idx"
+ ON ${quotedSchema}."workflow_signals" (
+ "namespace_id",
+ "workflow_run_id",
+ "signal",
+ "consumed_at",
+ "created_at",
+ "id"
+ );
+
+ CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_workflow_run_idempotency_key_idx"
+ ON ${quotedSchema}."workflow_signals" ("namespace_id", "workflow_run_id", "idempotency_key")
+ WHERE "idempotency_key" IS NOT NULL;
+
+ INSERT INTO ${quotedSchema}."openworkflow_migrations"("version")
+ VALUES (5)
+ ON CONFLICT DO NOTHING;
+
+ COMMIT;`,
];
}
diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts
index 05e07d61..09cd6746 100644
--- a/packages/openworkflow/sqlite/backend.test.ts
+++ b/packages/openworkflow/sqlite/backend.test.ts
@@ -566,3 +566,250 @@ describe("BackendSqlite workflow wake-up reconciliation", () => {
}
});
});
+
+describe("BackendSqlite.sendWorkflowSignal fallback branches", () => {
+ type BackendSqliteCtor = new (
+ db: Database,
+ namespaceId: string,
+ ) => BackendSqlite;
+
+ test("re-reads an existing signal after an idempotent insert conflict", async () => {
+ let selectCount = 0;
+ const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => {
+ if (sql.startsWith('SELECT 1 FROM "workflow_signals"')) {
+ return {
+ get() {
+ selectCount += 1;
+ return selectCount === 2 ? { 1: 1 } : undefined;
+ },
+ };
+ }
+
+ if (sql.startsWith('INSERT OR IGNORE INTO "workflow_signals"')) {
+ return {
+ run() {
+ return { changes: 0 };
+ },
+ };
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendSqlite as unknown as BackendSqliteCtor)(
+ fakeDb,
+ randomUUID(),
+ );
+ const wakeWorkflowRunForSignal = vi.fn();
+ const internalBackend = backend as unknown as {
+ wakeWorkflowRunForSignal: (
+ workflowRunId: string,
+ currentTime: string,
+ ) => void;
+ };
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: { approved: true },
+ idempotencyKey: "signal-key",
+ });
+
+ expect(selectCount).toBe(2);
+ expect(wakeWorkflowRunForSignal).toHaveBeenCalledTimes(1);
+ expect(wakeWorkflowRunForSignal).toHaveBeenCalledWith(
+ "workflow-run-id",
+ expect.any(String),
+ );
+ });
+
+ test("throws the generic error when no signal row is inserted for an active run", async () => {
+ const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => {
+ if (sql.startsWith('INSERT INTO "workflow_signals"')) {
+ return {
+ run() {
+ return { changes: 0 };
+ },
+ };
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendSqlite as unknown as BackendSqliteCtor)(
+ fakeDb,
+ randomUUID(),
+ );
+ const wakeWorkflowRunForSignal = vi.fn();
+ const getWorkflowRun = vi.fn(() =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: "workflow-run-id",
+ status: "running",
+ }),
+ ),
+ );
+ const internalBackend = backend as unknown as {
+ wakeWorkflowRunForSignal: (
+ workflowRunId: string,
+ currentTime: string,
+ ) => void;
+ getWorkflowRun: typeof backend.getWorkflowRun;
+ };
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+ internalBackend.getWorkflowRun =
+ getWorkflowRun as unknown as typeof backend.getWorkflowRun;
+
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ }),
+ ).rejects.toThrow("Failed to send workflow signal");
+
+ expect(getWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: "workflow-run-id",
+ });
+ expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled();
+ });
+
+ test("throws the generic error when the idempotent fallback re-read still finds nothing", async () => {
+ let selectCount = 0;
+ const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => {
+ if (sql.startsWith('SELECT 1 FROM "workflow_signals"')) {
+ return {
+ get() {
+ selectCount += 1;
+ },
+ };
+ }
+
+ if (sql.startsWith('INSERT OR IGNORE INTO "workflow_signals"')) {
+ return {
+ run() {
+ return { changes: 0 };
+ },
+ };
+ }
+
+ throw new Error(`Unexpected SQL: ${sql}`);
+ });
+ const backend = new (BackendSqlite as unknown as BackendSqliteCtor)(
+ fakeDb,
+ randomUUID(),
+ );
+ const wakeWorkflowRunForSignal = vi.fn();
+ const getWorkflowRun = vi.fn(() =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: "workflow-run-id",
+ status: "running",
+ }),
+ ),
+ );
+ const internalBackend = backend as unknown as {
+ wakeWorkflowRunForSignal: (
+ workflowRunId: string,
+ currentTime: string,
+ ) => void;
+ getWorkflowRun: typeof backend.getWorkflowRun;
+ };
+ internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal;
+ internalBackend.getWorkflowRun =
+ getWorkflowRun as unknown as typeof backend.getWorkflowRun;
+
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: "workflow-run-id",
+ signal: "approval",
+ data: null,
+ idempotencyKey: "signal-key",
+ }),
+ ).rejects.toThrow("Failed to send workflow signal");
+
+ expect(selectCount).toBe(2);
+ expect(getWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: "workflow-run-id",
+ });
+ expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled();
+ });
+});
+
+function createFakeSqliteForSendWorkflowSignal(
+ handler: (sql: string) => {
+ get?: (...args: unknown[]) => unknown;
+ run?: (...args: unknown[]) => { changes: number };
+ },
+): Database {
+ return {
+ exec() {
+ throw new Error("exec should not be called in this test");
+ },
+ prepare(sql: string) {
+ const normalized = sql.replaceAll(/\s+/g, " ").trim();
+ return handler(normalized) as ReturnType;
+ },
+ close() {
+ // no-op for tests
+ },
+ } as Database;
+}
+
+function createMockWorkflowRun(
+ overrides: Partial<{
+ namespaceId: string;
+ id: string;
+ workflowName: string;
+ version: string | null;
+ status:
+ | "pending"
+ | "running"
+ | "sleeping"
+ | "completed"
+ | "succeeded"
+ | "failed"
+ | "canceled";
+ idempotencyKey: string | null;
+ config: Record;
+ context: Record | null;
+ input: Record | null;
+ output: Record | null;
+ error: Record | null;
+ attempts: number;
+ parentStepAttemptNamespaceId: string | null;
+ parentStepAttemptId: string | null;
+ workerId: string | null;
+ availableAt: Date | null;
+ deadlineAt: Date | null;
+ startedAt: Date | null;
+ finishedAt: Date | null;
+ createdAt: Date;
+ updatedAt: Date;
+ }> = {},
+) {
+ return {
+ namespaceId: "default",
+ id: "workflow-run-id",
+ workflowName: "workflow-name",
+ version: null,
+ status: "running",
+ idempotencyKey: null,
+ config: {},
+ context: null,
+ input: null,
+ output: null,
+ error: null,
+ attempts: 1,
+ parentStepAttemptNamespaceId: null,
+ parentStepAttemptId: null,
+ workerId: "worker-id",
+ availableAt: null,
+ deadlineAt: null,
+ startedAt: new Date("2026-01-01T00:00:00.000Z"),
+ finishedAt: null,
+ createdAt: new Date("2026-01-01T00:00:00.000Z"),
+ updatedAt: new Date("2026-01-01T00:00:00.000Z"),
+ ...overrides,
+ };
+}
diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts
index e12c4aed..33e43735 100644
--- a/packages/openworkflow/sqlite/backend.ts
+++ b/packages/openworkflow/sqlite/backend.ts
@@ -16,6 +16,8 @@ import {
FailStepAttemptParams,
CompleteStepAttemptParams,
SetStepAttemptChildWorkflowRunParams,
+ SendWorkflowSignalParams,
+ ConsumeWorkflowSignalParams,
FailWorkflowRunParams,
RescheduleWorkflowRunAfterFailedStepAttemptParams,
CompleteWorkflowRunParams,
@@ -236,6 +238,199 @@ export class BackendSqlite implements Backend {
return Promise.resolve(row ? rowToWorkflowRun(row) : null);
}
+ async sendWorkflowSignal(params: SendWorkflowSignalParams): Promise {
+ const currentTime = now();
+
+ if (params.idempotencyKey !== null) {
+ const existing = this.db
+ .prepare(
+ `
+ SELECT 1
+ FROM "workflow_signals"
+ WHERE "namespace_id" = ?
+ AND "workflow_run_id" = ?
+ AND "idempotency_key" = ?
+ LIMIT 1
+ `,
+ )
+ .get(this.namespaceId, params.workflowRunId, params.idempotencyKey) as
+ | { 1: number }
+ | undefined;
+
+ if (existing) {
+ this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime);
+ return;
+ }
+ }
+
+ const stmt = this.db.prepare(`
+ INSERT ${params.idempotencyKey === null ? "" : "OR IGNORE "}INTO "workflow_signals" (
+ "namespace_id",
+ "id",
+ "workflow_run_id",
+ "signal",
+ "data",
+ "idempotency_key",
+ "consumed_step_attempt_namespace_id",
+ "consumed_step_attempt_id",
+ "consumed_at",
+ "created_at",
+ "updated_at"
+ )
+ SELECT ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, ?, ?
+ FROM "workflow_runs"
+ WHERE "namespace_id" = ?
+ AND "id" = ?
+ AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled')
+ `);
+
+ const result = stmt.run(
+ this.namespaceId,
+ generateUUID(),
+ params.workflowRunId,
+ params.signal,
+ toJSON(params.data),
+ params.idempotencyKey,
+ currentTime,
+ currentTime,
+ this.namespaceId,
+ params.workflowRunId,
+ );
+
+ if (result.changes === 0 && params.idempotencyKey !== null) {
+ const existing = this.db
+ .prepare(
+ `
+ SELECT 1
+ FROM "workflow_signals"
+ WHERE "namespace_id" = ?
+ AND "workflow_run_id" = ?
+ AND "idempotency_key" = ?
+ LIMIT 1
+ `,
+ )
+ .get(this.namespaceId, params.workflowRunId, params.idempotencyKey) as
+ | { 1: number }
+ | undefined;
+
+ if (existing) {
+ this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime);
+ return;
+ }
+ }
+
+ if (result.changes === 0) {
+ const workflowRun = await this.getWorkflowRun({
+ workflowRunId: params.workflowRunId,
+ });
+ if (!workflowRun) {
+ throw new Error(`Workflow run ${params.workflowRunId} does not exist`);
+ }
+
+ if (
+ ["succeeded", "completed", "failed", "canceled"].includes(
+ workflowRun.status,
+ )
+ ) {
+ throw new Error(
+ `Cannot send signal to workflow run ${params.workflowRunId} with status ${workflowRun.status}`,
+ );
+ }
+
+ throw new Error("Failed to send workflow signal");
+ }
+
+ this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime);
+ }
+
+ consumeWorkflowSignal(
+ params: ConsumeWorkflowSignalParams,
+ ): Promise {
+ const existing = this.db
+ .prepare(
+ `
+ SELECT "data"
+ FROM "workflow_signals"
+ WHERE "namespace_id" = ?
+ AND "workflow_run_id" = ?
+ AND "signal" = ?
+ AND "consumed_step_attempt_namespace_id" = ?
+ AND "consumed_step_attempt_id" = ?
+ ORDER BY "created_at" ASC, "id" ASC
+ LIMIT 1
+ `,
+ )
+ .get(
+ this.namespaceId,
+ params.workflowRunId,
+ params.signal,
+ this.namespaceId,
+ params.stepAttemptId,
+ ) as { data: string | null } | undefined;
+
+ if (existing) {
+ return Promise.resolve(fromJSON(existing.data) as JsonValue | null);
+ }
+
+ const currentTime = now();
+ const row = this.db
+ .prepare(
+ `
+ UPDATE "workflow_signals"
+ SET
+ "consumed_step_attempt_namespace_id" = ?,
+ "consumed_step_attempt_id" = ?,
+ "consumed_at" = ?,
+ "updated_at" = ?
+ WHERE "namespace_id" = ?
+ AND "id" = (
+ SELECT "id"
+ FROM "workflow_signals"
+ WHERE "namespace_id" = ?
+ AND "workflow_run_id" = ?
+ AND "signal" = ?
+ AND "consumed_at" IS NULL
+ ORDER BY "created_at" ASC, "id" ASC
+ LIMIT 1
+ )
+ AND "consumed_at" IS NULL
+ AND EXISTS (
+ SELECT 1
+ FROM "step_attempts" sa
+ JOIN "workflow_runs" wr
+ ON wr."namespace_id" = sa."namespace_id"
+ AND wr."id" = sa."workflow_run_id"
+ WHERE sa."namespace_id" = ?
+ AND sa."workflow_run_id" = ?
+ AND sa."id" = ?
+ AND sa."kind" = 'signal-wait'
+ AND sa."status" = 'running'
+ AND wr."status" = 'running'
+ AND wr."worker_id" = ?
+ )
+ RETURNING "data"
+ `,
+ )
+ .get(
+ this.namespaceId,
+ params.stepAttemptId,
+ currentTime,
+ currentTime,
+ this.namespaceId,
+ this.namespaceId,
+ params.workflowRunId,
+ params.signal,
+ this.namespaceId,
+ params.workflowRunId,
+ params.stepAttemptId,
+ params.workerId,
+ ) as { data: string | null } | undefined;
+
+ return Promise.resolve(
+ row ? (fromJSON(row.data) as JsonValue | null) : undefined,
+ );
+ }
+
async claimWorkflowRun(
params: ClaimWorkflowRunParams,
): Promise {
@@ -367,7 +562,8 @@ export class BackendSqlite implements Backend {
SET
"status" = 'running',
"available_at" = CASE
- WHEN EXISTS (
+ WHEN (
+ EXISTS (
SELECT 1
FROM "step_attempts" sa
JOIN "workflow_runs" child
@@ -378,6 +574,19 @@ export class BackendSqlite implements Backend {
AND sa."kind" = 'workflow'
AND sa."status" = 'running'
AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled')
+ ) OR EXISTS (
+ SELECT 1
+ FROM "step_attempts" sa
+ JOIN "workflow_signals" ws
+ ON ws."namespace_id" = sa."namespace_id"
+ AND ws."workflow_run_id" = sa."workflow_run_id"
+ AND ws."signal" = json_extract(sa."context", '$.signal')
+ WHERE sa."namespace_id" = "workflow_runs"."namespace_id"
+ AND sa."workflow_run_id" = "workflow_runs"."id"
+ AND sa."kind" = 'signal-wait'
+ AND sa."status" = 'running'
+ AND ws."consumed_at" IS NULL
+ )
) AND ? > ? THEN ?
ELSE ?
END,
@@ -601,6 +810,35 @@ export class BackendSqlite implements Backend {
return updated;
}
+ private wakeWorkflowRunForSignal(
+ workflowRunId: string,
+ currentTime: string,
+ ): void {
+ this.db
+ .prepare(
+ `
+ UPDATE "workflow_runs"
+ SET
+ "available_at" = CASE
+ WHEN "available_at" IS NULL OR "available_at" > ? THEN ?
+ ELSE "available_at"
+ END,
+ "updated_at" = ?
+ WHERE "namespace_id" = ?
+ AND "id" = ?
+ AND "status" IN ('pending', 'running', 'sleeping')
+ AND "worker_id" IS NULL
+ `,
+ )
+ .run(
+ currentTime,
+ currentTime,
+ currentTime,
+ this.namespaceId,
+ workflowRunId,
+ );
+ }
+
private wakeParentWorkflowRun(childWorkflowRun: Readonly): void {
if (
!childWorkflowRun.parentStepAttemptNamespaceId ||
diff --git a/packages/openworkflow/sqlite/sqlite.test.ts b/packages/openworkflow/sqlite/sqlite.test.ts
index fddd1951..e4eee339 100644
--- a/packages/openworkflow/sqlite/sqlite.test.ts
+++ b/packages/openworkflow/sqlite/sqlite.test.ts
@@ -98,6 +98,14 @@ describe("sqlite", () => {
'CREATE TABLE IF NOT EXISTS "step_attempts"',
);
});
+
+ test("migrations create workflow_signals table", () => {
+ const migs = migrations();
+ const latestMigration = migs.at(-1);
+ expect(latestMigration).toContain(
+ 'CREATE TABLE IF NOT EXISTS "workflow_signals"',
+ );
+ });
});
describe("migrate()", () => {
@@ -206,6 +214,17 @@ describe("sqlite", () => {
)
.get() as { count: number };
expect(stepAttemptsCheck.count).toBe(1);
+
+ const workflowSignalsCheck = db
+ .prepare(
+ `
+ SELECT COUNT(*) as count
+ FROM sqlite_master
+ WHERE type = 'table' AND name = 'workflow_signals'
+ `,
+ )
+ .get() as { count: number };
+ expect(workflowSignalsCheck.count).toBe(1);
});
});
diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts
index bf3fd163..2e313085 100644
--- a/packages/openworkflow/sqlite/sqlite.ts
+++ b/packages/openworkflow/sqlite/sqlite.ts
@@ -193,6 +193,50 @@ export function migrations(): string[] {
VALUES (4);
COMMIT;`,
+
+ // 5 - workflow signals
+ `BEGIN;
+
+ CREATE TABLE IF NOT EXISTS "workflow_signals" (
+ "namespace_id" TEXT NOT NULL,
+ "id" TEXT NOT NULL,
+ --
+ "workflow_run_id" TEXT NOT NULL,
+ "signal" TEXT NOT NULL,
+ "data" TEXT,
+ "idempotency_key" TEXT,
+ "consumed_step_attempt_namespace_id" TEXT,
+ "consumed_step_attempt_id" TEXT,
+ "consumed_at" TEXT,
+ "created_at" TEXT NOT NULL,
+ "updated_at" TEXT NOT NULL,
+ PRIMARY KEY ("namespace_id", "id"),
+ FOREIGN KEY ("namespace_id", "workflow_run_id")
+ REFERENCES "workflow_runs" ("namespace_id", "id")
+ ON DELETE CASCADE,
+ FOREIGN KEY ("consumed_step_attempt_namespace_id", "consumed_step_attempt_id")
+ REFERENCES "step_attempts" ("namespace_id", "id")
+ ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS "workflow_signals_pending_lookup_idx"
+ ON "workflow_signals" (
+ "namespace_id",
+ "workflow_run_id",
+ "signal",
+ "consumed_at",
+ "created_at",
+ "id"
+ );
+
+ CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_workflow_run_idempotency_key_idx"
+ ON "workflow_signals" ("namespace_id", "workflow_run_id", "idempotency_key")
+ WHERE "idempotency_key" IS NOT NULL;
+
+ INSERT OR IGNORE INTO "openworkflow_migrations" ("version")
+ VALUES (5);
+
+ COMMIT;`,
];
}
diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts
index 6383cb4c..f80c5bf0 100644
--- a/packages/openworkflow/testing/backend.testsuite.ts
+++ b/packages/openworkflow/testing/backend.testsuite.ts
@@ -1436,6 +1436,201 @@ export function testBackend(options: TestBackendOptions): void {
});
});
+ describe("workflow signals", () => {
+ test("buffers signals and consumes them in FIFO order", async () => {
+ const claimed = await createClaimedWorkflowRun(backend);
+ const firstWait = await createSignalWaitAttempt(backend, claimed.id);
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: { order: 1 },
+ idempotencyKey: null,
+ });
+ await sleep(10);
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: { order: 2 },
+ idempotencyKey: null,
+ });
+
+ const first = await backend.consumeWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ stepAttemptId: firstWait.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ });
+ const firstReplay = await backend.consumeWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ stepAttemptId: firstWait.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ });
+
+ expect(first).toEqual({ order: 1 });
+ expect(firstReplay).toEqual({ order: 1 });
+
+ await backend.completeStepAttempt({
+ workflowRunId: claimed.id,
+ stepAttemptId: firstWait.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ output: first ?? null,
+ });
+
+ const secondWait = await createSignalWaitAttempt(backend, claimed.id);
+ const second = await backend.consumeWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ stepAttemptId: secondWait.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ });
+
+ expect(second).toEqual({ order: 2 });
+ });
+
+ test("dedupes client sends by idempotency key", async () => {
+ const claimed = await createClaimedWorkflowRun(backend);
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: { accepted: true },
+ idempotencyKey: "signal-key",
+ });
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: { accepted: false },
+ idempotencyKey: "signal-key",
+ });
+
+ const waitAttempt = await createSignalWaitAttempt(backend, claimed.id);
+ const consumed = await backend.consumeWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ stepAttemptId: waitAttempt.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ });
+
+ expect(consumed).toEqual({ accepted: true });
+
+ await backend.completeStepAttempt({
+ workflowRunId: claimed.id,
+ stepAttemptId: waitAttempt.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ output: consumed ?? null,
+ });
+
+ const secondWait = await createSignalWaitAttempt(backend, claimed.id);
+ const next = await backend.consumeWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ stepAttemptId: secondWait.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ });
+
+ expect(next).toBeUndefined();
+ });
+
+ test("wakes idle runs but does not steal active leases", async () => {
+ const futureRun = await backend.createWorkflowRun({
+ workflowName: randomUUID(),
+ version: null,
+ idempotencyKey: null,
+ input: null,
+ config: {},
+ context: null,
+ parentStepAttemptNamespaceId: null,
+ parentStepAttemptId: null,
+ availableAt: newDateInOneYear(),
+ deadlineAt: null,
+ });
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: futureRun.id,
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ });
+
+ const woken = await backend.getWorkflowRun({
+ workflowRunId: futureRun.id,
+ });
+ expect(woken?.workerId).toBeNull();
+ expect(deltaSeconds(woken?.availableAt)).toBeLessThan(1);
+
+ const claimed = await createClaimedWorkflowRun(backend);
+ const leasedAvailableAt = claimed.availableAt;
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ });
+
+ const stillLeased = await backend.getWorkflowRun({
+ workflowRunId: claimed.id,
+ });
+ expect(stillLeased?.workerId).toBe(claimed.workerId);
+ expect(stillLeased?.availableAt?.getTime()).toBe(
+ leasedAvailableAt?.getTime(),
+ );
+ });
+
+ test("reconciles parked signal waits that already have pending signals", async () => {
+ const claimed = await createClaimedWorkflowRun(backend);
+ await createSignalWaitAttempt(backend, claimed.id);
+
+ await backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: { approved: true },
+ idempotencyKey: null,
+ });
+
+ const parked = await backend.sleepWorkflowRun({
+ workflowRunId: claimed.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ availableAt: newDateInOneYear(),
+ });
+
+ expect(parked.workerId).toBeNull();
+ expect(deltaSeconds(parked.availableAt)).toBeLessThan(1);
+ });
+
+ test("rejects sends to missing and terminal workflow runs", async () => {
+ const missingId = randomUUID();
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: missingId,
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ }),
+ ).rejects.toThrow(`Workflow run ${missingId} does not exist`);
+
+ const claimed = await createClaimedWorkflowRun(backend);
+ await backend.completeWorkflowRun({
+ workflowRunId: claimed.id,
+ workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
+ output: null,
+ });
+
+ await expect(
+ backend.sendWorkflowSignal({
+ workflowRunId: claimed.id,
+ signal: "approval",
+ data: null,
+ idempotencyKey: null,
+ }),
+ ).rejects.toThrow(
+ `Cannot send signal to workflow run ${claimed.id} with status completed`,
+ );
+ });
+ });
+
describe("setStepAttemptChildWorkflowRun()", () => {
test("sets child workflow linkage on a running step attempt", async () => {
const parentRun = await createClaimedWorkflowRun(backend);
@@ -2269,6 +2464,32 @@ async function createClaimedWorkflowRun(b: Backend) {
return claimed;
}
+/**
+ * Create a running signal-wait step attempt for tests.
+ * @param b - Backend
+ * @param workflowRunId - Parent workflow run id
+ * @returns Created step attempt
+ */
+async function createSignalWaitAttempt(b: Backend, workflowRunId: string) {
+ const workflowRun = await b.getWorkflowRun({ workflowRunId });
+ if (!workflowRun?.workerId) {
+ throw new Error("Expected claimed workflow run");
+ }
+
+ return await b.createStepAttempt({
+ workflowRunId,
+ workerId: workflowRun.workerId,
+ stepName: randomUUID(),
+ kind: "signal-wait",
+ config: {},
+ context: {
+ kind: "signal-wait",
+ signal: "approval",
+ timeoutAt: newDateInOneYear().toISOString(),
+ },
+ });
+}
+
/**
* Get delta in seconds from now.
* @param date - Date to compare
diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts
index 2dba8674..3bc3b45b 100644
--- a/packages/openworkflow/worker/execution.test.ts
+++ b/packages/openworkflow/worker/execution.test.ts
@@ -721,7 +721,7 @@ describe("StepExecutor", () => {
expect(status).toBe("failed");
await expect(handle.result()).rejects.toThrow(
- /Workflow timeout must be a non-negative number/,
+ /Step wait timeout must be a non-negative number/,
);
});
@@ -2480,6 +2480,927 @@ describe("StepExecutor", () => {
);
expect(childStatus).toBe("completed");
});
+
+ describe("signals", () => {
+ test("consumes buffered signals sent before the wait step runs", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-buffered-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal<{ approved: boolean }>("approval");
+ },
+ );
+
+ const handle = await workflow.run();
+ await client.sendSignal(handle.workflowRun.id, "approval", {
+ data: { approved: true },
+ });
+
+ const worker = client.newWorker();
+ const status = await tickUntilTerminal(
+ backend,
+ worker,
+ handle.workflowRun.id,
+ 200,
+ 10,
+ );
+
+ expect(status).toBe("completed");
+ await expect(handle.result()).resolves.toEqual({ approved: true });
+
+ const attempts = await backend.listStepAttempts({
+ workflowRunId: handle.workflowRun.id,
+ limit: 10,
+ });
+ expect(
+ attempts.data.find((attempt) => attempt.stepName === "approval")?.kind,
+ ).toBe("signal-wait");
+ });
+
+ test("parks while waiting and resumes when the client sends a signal", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-resume-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal<{ approved: boolean }>("approval");
+ },
+ );
+
+ const handle = await workflow.run();
+ const worker = client.newWorker();
+
+ await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10);
+
+ await client.sendSignal(handle.workflowRun.id, "approval", {
+ data: { approved: true },
+ });
+
+ const status = await tickUntilTerminal(
+ backend,
+ worker,
+ handle.workflowRun.id,
+ 200,
+ 10,
+ );
+
+ expect(status).toBe("completed");
+ await expect(handle.result()).resolves.toEqual({ approved: true });
+ });
+
+ test("returns null when a signal wait times out", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-timeout-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal("approval", { timeout: "10ms" });
+ },
+ );
+
+ const handle = await workflow.run();
+ const worker = client.newWorker();
+
+ await worker.tick();
+ await sleep(50);
+ await worker.tick();
+
+ await expect(handle.result()).resolves.toBeNull();
+ });
+
+ test("fails when a signal wait timeout number is invalid", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-invalid-timeout-number-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal("approval", { timeout: -1 });
+ },
+ );
+
+ const worker = client.newWorker();
+ const handle = await workflow.run();
+ const status = await tickUntilTerminal(
+ backend,
+ worker,
+ handle.workflowRun.id,
+ 150,
+ 10,
+ );
+
+ expect(status).toBe("failed");
+ await expect(handle.result()).rejects.toThrow(
+ /Step wait timeout must be a non-negative number/,
+ );
+ });
+
+ test("fails when a consumed signal does not match the schema", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-schema-failure-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal("approval", {
+ schema: z.object({ approved: z.boolean() }),
+ });
+ },
+ );
+
+ const handle = await workflow.run();
+ await client.sendSignal(handle.workflowRun.id, "approval", {
+ data: { approved: "yes" },
+ });
+
+ const worker = client.newWorker();
+ await worker.tick();
+ await sleep(50);
+
+ const failedRun = await backend.getWorkflowRun({
+ workflowRunId: handle.workflowRun.id,
+ });
+ expect(failedRun?.status).toBe("failed");
+ expect(failedRun?.availableAt).toBeNull();
+ expect((failedRun?.error as { message?: string } | null)?.message).toBe(
+ "Invalid input: expected boolean, received string",
+ );
+ });
+
+ test("rejects concurrent waits for the same signal in one run", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const workflow = client.defineWorkflow(
+ { name: `signal-concurrent-${randomUUID()}` },
+ async ({ step }) => {
+ await Promise.all([
+ step.waitForSignal("approval"),
+ step.waitForSignal({
+ name: "wait-approval-b",
+ signal: "approval",
+ }),
+ ]);
+ return null;
+ },
+ );
+
+ const handle = await workflow.run(undefined, {
+ deadlineAt: new Date(Date.now() + 50),
+ });
+ const worker = client.newWorker();
+ await worker.tick();
+ await sleep(100);
+
+ const failedRun = await backend.getWorkflowRun({
+ workflowRunId: handle.workflowRun.id,
+ });
+ expect(failedRun?.status).toBe("failed");
+ expect((failedRun?.error as { message?: string } | null)?.message).toBe(
+ 'Signal "approval" is already being awaited by step "approval"',
+ );
+
+ const attempts = await backend.listStepAttempts({
+ workflowRunId: handle.workflowRun.id,
+ limit: 10,
+ });
+ expect(
+ attempts.data.filter((attempt) => attempt.kind === "signal-wait"),
+ ).toHaveLength(1);
+ });
+
+ test("step.sendSignal delivers to another waiting run", async () => {
+ const backend = await createBackend();
+ const client = new OpenWorkflow({ backend });
+
+ const waiter = client.defineWorkflow(
+ { name: `signal-waiter-${randomUUID()}` },
+ async ({ step }) => {
+ return await step.waitForSignal<{ approved: boolean }>("approval");
+ },
+ );
+ const sender = client.defineWorkflow<{ targetRunId: string }, string>(
+ { name: `signal-sender-${randomUUID()}` },
+ async ({ input, step }) => {
+ await step.sendSignal(input.targetRunId, "approval", {
+ data: { approved: true },
+ });
+ return "sent";
+ },
+ );
+
+ const worker = client.newWorker({ concurrency: 2 });
+ const waiterHandle = await waiter.run();
+ await tickUntilParked(
+ backend,
+ worker,
+ waiterHandle.workflowRun.id,
+ 200,
+ 10,
+ );
+
+ const senderHandle = await sender.run({
+ targetRunId: waiterHandle.workflowRun.id,
+ });
+
+ const waiterStatus = await tickUntilTerminal(
+ backend,
+ worker,
+ waiterHandle.workflowRun.id,
+ 200,
+ 10,
+ );
+ const senderStatus = await tickUntilTerminal(
+ backend,
+ worker,
+ senderHandle.workflowRun.id,
+ 200,
+ 10,
+ );
+
+ expect(waiterStatus).toBe("completed");
+ expect(senderStatus).toBe("completed");
+ await expect(waiterHandle.result()).resolves.toEqual({ approved: true });
+ await expect(senderHandle.result()).resolves.toBe("sent");
+ });
+
+ test("reuses completed signal-send attempts on replay", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-send-replay-completed",
+ workerId: "worker-signal-send-replay-completed",
+ });
+ const sendWorkflowSignal = vi.fn(() => Promise.resolve());
+ const createStepAttempt = vi.fn();
+ const completeWorkflowRun = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "completed",
+ workerId: params.workerId,
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "signal-send-completed-attempt",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "completed",
+ output: null,
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ sendWorkflowSignal,
+ createStepAttempt,
+ completeWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ await step.sendSignal("target-workflow-run", "approval", {
+ data: { approved: true },
+ });
+ return "done";
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-send-replay-completed",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(sendWorkflowSignal).not.toHaveBeenCalled();
+ expect(createStepAttempt).not.toHaveBeenCalled();
+ expect(completeWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: workflowRun.id,
+ workerId: "worker-signal-send-replay-completed",
+ output: "done",
+ });
+ });
+
+ test("creates new signal-send attempts with null payload when options are omitted", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-send-new-null-payload",
+ workerId: "worker-signal-send-new-null-payload",
+ });
+ const createStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: "signal-send-created-attempt",
+ workflowRunId: params.workflowRunId,
+ stepName: params.stepName,
+ kind: params.kind,
+ status: "running",
+ context: params.context,
+ finishedAt: null,
+ }),
+ ),
+ );
+ const sendWorkflowSignal = vi.fn(() => Promise.resolve());
+ const completeStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "completed",
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ createStepAttempt,
+ sendWorkflowSignal,
+ completeStepAttempt,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ await step.sendSignal("target-workflow-run", "approval");
+ return "done";
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-send-new-null-payload",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(sendWorkflowSignal).toHaveBeenCalledWith({
+ workflowRunId: "target-workflow-run",
+ signal: "approval",
+ data: null,
+ idempotencyKey: "__signal:signal-send-new-null-payload:approval",
+ });
+ });
+
+ test("reuses running signal-send attempts on replay", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-send-replay-running",
+ workerId: "worker-signal-send-replay-running",
+ });
+ const runningAttempt = createMockStepAttempt({
+ id: "signal-send-running-attempt",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "running",
+ finishedAt: null,
+ });
+ const sendWorkflowSignal = vi.fn(() => Promise.resolve());
+ const createStepAttempt = vi.fn();
+ const completeStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "completed",
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [runningAttempt],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ sendWorkflowSignal,
+ createStepAttempt,
+ completeStepAttempt,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ await step.sendSignal("target-workflow-run", "approval");
+ return "done";
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-send-replay-running",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(createStepAttempt).not.toHaveBeenCalled();
+ expect(sendWorkflowSignal).toHaveBeenCalledWith({
+ workflowRunId: "target-workflow-run",
+ signal: "approval",
+ data: null,
+ idempotencyKey: "__signal:signal-send-replay-running:approval",
+ });
+ expect(completeStepAttempt).toHaveBeenCalledWith({
+ workflowRunId: workflowRun.id,
+ stepAttemptId: runningAttempt.id,
+ workerId: "worker-signal-send-replay-running",
+ output: null,
+ });
+ });
+
+ test("reschedules when replayed signal sends fail", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-send-replay-failure",
+ workerId: "worker-signal-send-replay-failure",
+ });
+ const runningAttempt = createMockStepAttempt({
+ id: "signal-send-failing-attempt",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "running",
+ finishedAt: null,
+ });
+ const sendWorkflowSignal = vi.fn(() =>
+ Promise.reject(new Error("signal emit failed")),
+ );
+ const failStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "approval",
+ kind: "signal-send",
+ status: "failed",
+ error: params.error,
+ }),
+ ),
+ );
+ const rescheduleWorkflowRunAfterFailedStepAttempt = vi.fn(
+ (
+ params: Parameters<
+ Backend["rescheduleWorkflowRunAfterFailedStepAttempt"]
+ >[0],
+ ) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "pending",
+ workerId: null,
+ availableAt: params.availableAt,
+ }),
+ ),
+ );
+ const completeWorkflowRun = vi.fn();
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [runningAttempt],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ sendWorkflowSignal,
+ failStepAttempt,
+ rescheduleWorkflowRunAfterFailedStepAttempt,
+ completeWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ await step.sendSignal("target-workflow-run", "approval");
+ return "done";
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-send-replay-failure",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ const failStepCall = failStepAttempt.mock.calls[0]?.[0];
+ if (!failStepCall) {
+ throw new Error("Expected failStepAttempt to be called");
+ }
+ expect(failStepCall.workflowRunId).toBe(workflowRun.id);
+ expect(failStepCall.stepAttemptId).toBe(runningAttempt.id);
+ expect(failStepCall.workerId).toBe("worker-signal-send-replay-failure");
+ expect(failStepCall.error).toMatchObject({
+ message: "signal emit failed",
+ });
+ expect(rescheduleWorkflowRunAfterFailedStepAttempt).toHaveBeenCalledTimes(
+ 1,
+ );
+ expect(completeWorkflowRun).not.toHaveBeenCalled();
+ });
+
+ test("returns cached signal waits on replay", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-wait-replay-completed",
+ workerId: "worker-signal-wait-replay-completed",
+ });
+ const consumeWorkflowSignal = vi.fn(() => Promise.resolve(void 0));
+ const completeWorkflowRun = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "completed",
+ workerId: params.workerId,
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "signal-wait-completed-attempt",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-wait",
+ status: "completed",
+ output: { approved: true },
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ consumeWorkflowSignal,
+ completeWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ return await step.waitForSignal<{ approved: boolean }>("approval");
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-wait-replay-completed",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(consumeWorkflowSignal).not.toHaveBeenCalled();
+ expect(completeWorkflowRun).toHaveBeenCalledWith({
+ workflowRunId: workflowRun.id,
+ workerId: "worker-signal-wait-replay-completed",
+ output: { approved: true },
+ });
+ });
+
+ test("releases signal wait reservations after createStepAttempt errors", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-wait-create-step-error",
+ workerId: "worker-signal-wait-create-step-error",
+ });
+ const createStepAttempt = vi
+ .fn()
+ .mockRejectedValueOnce(new Error("create signal wait failed"))
+ .mockResolvedValueOnce(
+ createMockStepAttempt({
+ id: "signal-wait-created-after-retry",
+ workflowRunId: workflowRun.id,
+ stepName: "approval-retry",
+ kind: "signal-wait",
+ status: "running",
+ context: {
+ kind: "signal-wait",
+ signal: "approval",
+ timeoutAt: new Date("2027-01-01T00:00:00.000Z").toISOString(),
+ },
+ finishedAt: null,
+ }),
+ );
+ const consumeWorkflowSignal = vi.fn(() =>
+ Promise.resolve({ approved: true }),
+ );
+ const completeStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "approval-retry",
+ kind: "signal-wait",
+ status: "completed",
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ createStepAttempt,
+ consumeWorkflowSignal,
+ completeStepAttempt,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ try {
+ await step.waitForSignal("approval");
+ } catch (error) {
+ expect((error as Error).message).toBe("create signal wait failed");
+ }
+
+ return await step.waitForSignal<{ approved: boolean }>({
+ name: "approval-retry",
+ signal: "approval",
+ });
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-wait-create-step-error",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(createStepAttempt).toHaveBeenCalledTimes(2);
+ expect(consumeWorkflowSignal).toHaveBeenCalledWith({
+ workflowRunId: workflowRun.id,
+ signal: "approval",
+ stepAttemptId: "signal-wait-created-after-retry",
+ workerId: "worker-signal-wait-create-step-error",
+ });
+ });
+
+ test("parks replayed signal waits with a default timeout when context is missing", async () => {
+ const createdAt = new Date("2026-02-03T04:05:06.000Z");
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-wait-missing-context",
+ workerId: "worker-signal-wait-missing-context",
+ });
+ const sleepWorkflowRun = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "running",
+ workerId: null,
+ availableAt: params.availableAt,
+ }),
+ ),
+ );
+ const createStepAttempt = vi.fn();
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "signal-send-running-side-step",
+ workflowRunId: workflowRun.id,
+ stepName: "notify",
+ kind: "signal-send",
+ status: "running",
+ finishedAt: null,
+ }),
+ createMockStepAttempt({
+ id: "signal-wait-running-missing-context",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-wait",
+ status: "running",
+ context: null,
+ createdAt,
+ finishedAt: null,
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ createStepAttempt,
+ consumeWorkflowSignal: vi.fn(() => Promise.resolve(void 0)),
+ sleepWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ return await step.waitForSignal("approval");
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-wait-missing-context",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(createStepAttempt).not.toHaveBeenCalled();
+ const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0];
+ if (!sleepCall) {
+ throw new Error("Expected sleepWorkflowRun to be called");
+ }
+ expect(sleepCall.availableAt.toISOString()).toBe(
+ "2027-02-03T04:05:06.000Z",
+ );
+ });
+
+ test("parks replayed signal waits with a default timeout when stored timeout is invalid", async () => {
+ const createdAt = new Date("2026-05-06T07:08:09.000Z");
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-wait-invalid-timeout",
+ workerId: "worker-signal-wait-invalid-timeout",
+ });
+ const sleepWorkflowRun = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "running",
+ workerId: null,
+ availableAt: params.availableAt,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "signal-wait-running-invalid-timeout",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-wait",
+ status: "running",
+ context: {
+ kind: "signal-wait",
+ signal: "approval",
+ timeoutAt: "not-a-date",
+ },
+ createdAt,
+ finishedAt: null,
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ consumeWorkflowSignal: vi.fn(() => Promise.resolve(void 0)),
+ sleepWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ return await step.waitForSignal("approval");
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-wait-invalid-timeout",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0];
+ if (!sleepCall) {
+ throw new Error("Expected sleepWorkflowRun to be called");
+ }
+ expect(sleepCall.availableAt.toISOString()).toBe(
+ "2027-05-06T07:08:09.000Z",
+ );
+ });
+
+ test("consumes replayed signal waits even when legacy context is missing", async () => {
+ const workflowRun = createMockWorkflowRun({
+ id: "signal-wait-missing-context-consume",
+ workerId: "worker-signal-wait-missing-context-consume",
+ });
+ const completeStepAttempt = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "approval",
+ kind: "signal-wait",
+ status: "completed",
+ output: params.output ?? null,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "signal-wait-running-missing-context-consume",
+ workflowRunId: workflowRun.id,
+ stepName: "approval",
+ kind: "signal-wait",
+ status: "running",
+ context: null,
+ finishedAt: null,
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ consumeWorkflowSignal: vi.fn(() => Promise.resolve({ approved: true })),
+ completeStepAttempt,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ return await step.waitForSignal<{ approved: boolean }>("approval");
+ },
+ workflowVersion: null,
+ workerId: "worker-signal-wait-missing-context-consume",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ expect(completeStepAttempt).toHaveBeenCalledWith({
+ workflowRunId: workflowRun.id,
+ stepAttemptId: "signal-wait-running-missing-context-consume",
+ workerId: "worker-signal-wait-missing-context-consume",
+ output: { approved: true },
+ });
+ });
+ });
+
+ test("parks replayed workflow waits with a default timeout when stored timeout is invalid", async () => {
+ const createdAt = new Date("2026-06-07T08:09:10.000Z");
+ const workflowRun = createMockWorkflowRun({
+ id: "workflow-wait-invalid-timeout",
+ workerId: "worker-workflow-wait-invalid-timeout",
+ });
+ const sleepWorkflowRun = vi.fn(
+ (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "running",
+ workerId: null,
+ availableAt: params.availableAt,
+ }),
+ ),
+ );
+ const backend = createExecutionBackendMock({
+ listStepAttempts: vi.fn(() =>
+ Promise.resolve({
+ data: [
+ createMockStepAttempt({
+ id: "workflow-running-invalid-timeout",
+ workflowRunId: workflowRun.id,
+ stepName: "child-workflow",
+ kind: "workflow",
+ status: "running",
+ context: {
+ kind: "workflow",
+ timeoutAt: "not-a-date",
+ },
+ childWorkflowRunNamespaceId: "default",
+ childWorkflowRunId: "child-workflow-run-id",
+ createdAt,
+ finishedAt: null,
+ }),
+ ],
+ pagination: { next: null, prev: null },
+ }),
+ ),
+ getWorkflowRun: vi.fn(() =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: "child-workflow-run-id",
+ status: "running",
+ workerId: "child-worker",
+ }),
+ ),
+ ),
+ sleepWorkflowRun,
+ });
+
+ await executeWorkflow({
+ backend,
+ workflowRun,
+ workflowFn: async ({ step }: WorkflowFunctionParams) => {
+ return await step.runWorkflow(
+ {
+ name: "child-workflow",
+ version: "1.0.0",
+ },
+ null,
+ );
+ },
+ workflowVersion: null,
+ workerId: "worker-workflow-wait-invalid-timeout",
+ retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY,
+ });
+
+ const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0];
+ if (!sleepCall) {
+ throw new Error("Expected sleepWorkflowRun to be called");
+ }
+ expect(sleepCall.availableAt.toISOString()).toBe(
+ "2027-06-07T08:09:10.000Z",
+ );
+ });
});
describe("executeWorkflow", () => {
@@ -3288,6 +4209,17 @@ describe("createStepExecutionStateFromAttempts", () => {
stepName: "step-c",
status: "running",
});
+ const runningSignalWait = createMockStepAttempt({
+ id: "running-signal",
+ stepName: "wait-for-approval",
+ kind: "signal-wait",
+ status: "running",
+ context: {
+ kind: "signal-wait",
+ signal: "approval",
+ timeoutAt: new Date("2025-01-01T00:01:00Z").toISOString(),
+ },
+ });
const state = createStepExecutionStateFromAttempts([
completed,
@@ -3295,6 +4227,7 @@ describe("createStepExecutionStateFromAttempts", () => {
failedA2,
failedB,
running,
+ runningSignalWait,
]);
expect(state.cache.size).toBe(1);
@@ -3309,6 +4242,9 @@ describe("createStepExecutionStateFromAttempts", () => {
expect(state.failedByStepName.get("step-b")).toBe(failedB);
expect(state.failedByStepName.has("step-c")).toBe(false);
expect(state.runningByStepName.get("step-c")).toBe(running);
+ expect(state.runningSignalWaitBySignal.get("approval")).toBe(
+ runningSignalWait,
+ );
expect(state.runningByStepName.has("step-b")).toBe(false);
});
@@ -3319,6 +4255,7 @@ describe("createStepExecutionStateFromAttempts", () => {
expect(state.failedCountsByStepName.size).toBe(0);
expect(state.failedByStepName.size).toBe(0);
expect(state.runningByStepName.size).toBe(0);
+ expect(state.runningSignalWaitBySignal.size).toBe(0);
});
});
@@ -3480,3 +4417,103 @@ function createMockWorkflowRun(
...overrides,
};
}
+
+function createExecutionBackendMock(overrides: Partial = {}): Backend {
+ const backend: Partial = {
+ listStepAttempts: () =>
+ Promise.resolve({
+ data: [],
+ pagination: { next: null, prev: null },
+ }),
+ createStepAttempt: (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: `created-${params.stepName}`,
+ workflowRunId: params.workflowRunId,
+ stepName: params.stepName,
+ kind: params.kind,
+ status: "running",
+ context: params.context,
+ output: null,
+ error: null,
+ finishedAt: null,
+ }),
+ ),
+ completeStepAttempt: (
+ params: Parameters[0],
+ ) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "step",
+ status: "completed",
+ output: params.output ?? null,
+ }),
+ ),
+ failStepAttempt: (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockStepAttempt({
+ id: params.stepAttemptId,
+ workflowRunId: params.workflowRunId,
+ stepName: "step",
+ status: "failed",
+ error: params.error,
+ }),
+ ),
+ completeWorkflowRun: (
+ params: Parameters[0],
+ ) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "completed",
+ workerId: params.workerId,
+ output: params.output ?? null,
+ }),
+ ),
+ failWorkflowRun: (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "failed",
+ workerId: null,
+ error: params.error,
+ }),
+ ),
+ rescheduleWorkflowRunAfterFailedStepAttempt: (
+ params: Parameters<
+ Backend["rescheduleWorkflowRunAfterFailedStepAttempt"]
+ >[0],
+ ) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "pending",
+ workerId: null,
+ availableAt: params.availableAt,
+ error: params.error,
+ }),
+ ),
+ sleepWorkflowRun: (params: Parameters[0]) =>
+ Promise.resolve(
+ createMockWorkflowRun({
+ id: params.workflowRunId,
+ status: "running",
+ workerId: null,
+ availableAt: params.availableAt,
+ }),
+ ),
+ sendWorkflowSignal: () => {
+ // no-op
+ return Promise.resolve();
+ },
+ consumeWorkflowSignal: () => Promise.resolve(void 0),
+ getWorkflowRun: () => Promise.resolve(null),
+ };
+
+ return {
+ ...backend,
+ ...overrides,
+ } as Backend;
+}
diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts
index d825d737..9f3ca172 100644
--- a/packages/openworkflow/worker/execution.ts
+++ b/packages/openworkflow/worker/execution.ts
@@ -6,6 +6,7 @@ import {
type SerializedError,
} from "../core/error.js";
import type { JsonValue } from "../core/json.js";
+import type { StandardSchemaV1 } from "../core/standard-schema.js";
import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js";
import {
getCachedStepAttempt,
@@ -13,6 +14,7 @@ import {
normalizeStepOutput,
calculateDateFromDuration,
createSleepContext,
+ createSignalWaitContext,
createWorkflowContext,
} from "../core/step-attempt.js";
import {
@@ -26,6 +28,7 @@ import type {
StepApi,
StepFunction,
StepFunctionConfig,
+ StepWaitTimeout,
WorkflowFunction,
WorkflowRunMetadata,
} from "../core/workflow-function.js";
@@ -123,11 +126,7 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = {
maximumAttempts: 10,
};
-/**
- * Retry policy for workflow step failures (no retries - the child workflow
- * is responsible for retries).
- */
-const WORKFLOW_STEP_FAILURE_RETRY_POLICY: RetryPolicy = {
+const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = {
...DEFAULT_STEP_RETRY_POLICY,
maximumAttempts: 1,
};
@@ -197,6 +196,7 @@ export interface StepExecutionState {
failedCountsByStepName: ReadonlyMap;
failedByStepName: ReadonlyMap;
runningByStepName: ReadonlyMap;
+ runningSignalWaitBySignal: ReadonlyMap;
}
/**
@@ -211,6 +211,7 @@ export function createStepExecutionStateFromAttempts(
const failedCountsByStepName = new Map();
const failedByStepName = new Map();
const runningByStepName = new Map();
+ const runningSignalWaitBySignal = new Map();
for (const attempt of attempts) {
if (attempt.status === "completed" || attempt.status === "succeeded") {
@@ -226,6 +227,9 @@ export function createStepExecutionStateFromAttempts(
}
runningByStepName.set(attempt.stepName, attempt);
+ if (attempt.context?.kind === "signal-wait") {
+ runningSignalWaitBySignal.set(attempt.context.signal, attempt);
+ }
}
return {
@@ -233,20 +237,19 @@ export function createStepExecutionStateFromAttempts(
failedCountsByStepName,
failedByStepName,
runningByStepName,
+ runningSignalWaitBySignal,
};
}
/**
- * Resolve workflow timeout input to an absolute deadline.
+ * Resolve a step wait timeout input to an absolute deadline.
* @param timeout - Relative/absolute timeout input
* @returns Absolute timeout deadline
* @throws {Error} When timeout is invalid
*/
-function resolveWorkflowTimeoutAt(
- timeout: number | string | Date | undefined,
-): Date {
+function resolveWaitTimeoutAt(timeout?: StepWaitTimeout): Date {
if (timeout === undefined) {
- return defaultWorkflowTimeoutAt();
+ return defaultWaitTimeoutAt();
}
if (timeout instanceof Date) {
@@ -255,7 +258,7 @@ function resolveWorkflowTimeoutAt(
if (typeof timeout === "number") {
if (!Number.isFinite(timeout) || timeout < 0) {
- throw new Error("Workflow timeout must be a non-negative number");
+ throw new Error("Step wait timeout must be a non-negative number");
}
return new Date(Date.now() + timeout);
}
@@ -268,11 +271,11 @@ function resolveWorkflowTimeoutAt(
}
/**
- * Default workflow timeout: 1 year from a base time.
+ * Default step wait timeout: 1 year from a base time.
* @param base - Base timestamp (defaults to now)
* @returns Timeout deadline
*/
-function defaultWorkflowTimeoutAt(base: Readonly = new Date()): Date {
+function defaultWaitTimeoutAt(base: Readonly = new Date()): Date {
const timeoutAt = new Date(base);
timeoutAt.setFullYear(timeoutAt.getFullYear() + 1);
return timeoutAt;
@@ -290,7 +293,20 @@ function getWorkflowTimeoutAt(attempt: Readonly): Date | null {
if (attempt.context.timeoutAt === null) {
// Backward compatibility for previously persisted workflow contexts.
- return defaultWorkflowTimeoutAt(attempt.createdAt);
+ return defaultWaitTimeoutAt(attempt.createdAt);
+ }
+
+ return new Date(attempt.context.timeoutAt);
+}
+
+/**
+ * Extract the signal wait timeout from a persisted step attempt's context.
+ * @param attempt - Running signal-wait step attempt
+ * @returns Timeout deadline, or null when context is not signal-wait
+ */
+function getSignalWaitTimeoutAt(attempt: Readonly): Date | null {
+ if (attempt.context?.kind !== "signal-wait") {
+ return null;
}
return new Date(attempt.context.timeoutAt);
@@ -338,18 +354,28 @@ function getRunningWaitAttemptResumeAt(
}
if (attempt.kind !== "workflow") {
- return null;
+ if (attempt.kind !== "signal-wait") {
+ return null;
+ }
+
+ const timeoutAt =
+ getSignalWaitTimeoutAt(attempt) ??
+ defaultWaitTimeoutAt(attempt.createdAt);
+ if (Number.isFinite(timeoutAt.getTime())) {
+ return timeoutAt;
+ }
+
+ return defaultWaitTimeoutAt(attempt.createdAt);
}
const timeoutAt =
- getWorkflowTimeoutAt(attempt) ??
- defaultWorkflowTimeoutAt(attempt.createdAt);
+ getWorkflowTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt);
if (Number.isFinite(timeoutAt.getTime())) {
return timeoutAt;
}
// Backward compatibility for malformed historical workflow timeout values.
- return defaultWorkflowTimeoutAt(attempt.createdAt);
+ return defaultWaitTimeoutAt(attempt.createdAt);
}
/**
@@ -463,6 +489,19 @@ function buildWorkflowIdempotencyKey(attempt: Readonly): string {
return `__workflow:${attempt.namespaceId}:${attempt.id}`;
}
+/**
+ * Build deterministic idempotency key for step.sendSignal.
+ * @param workflowRunId - Current workflow run id
+ * @param stepName - Durable step name
+ * @returns Stable idempotency key
+ */
+function buildSignalIdempotencyKey(
+ workflowRunId: string,
+ stepName: string,
+): string {
+ return `__signal:${workflowRunId}:${stepName}`;
+}
+
/**
* Configures the options for a StepExecutor.
*/
@@ -482,9 +521,31 @@ interface RunWorkflowStepRequest<
> {
workflowSpec: WorkflowSpec ;
input: RunInput | undefined;
- timeout: number | string | Date | undefined;
+ timeout: StepWaitTimeout | undefined;
}
+interface WaitForSignalStepRequest {
+ signal: string;
+ timeout: StepWaitTimeout | undefined;
+ schema: StandardSchemaV1 | undefined;
+}
+
+type WaitForSignalOptions = Readonly<{
+ timeout?: StepWaitTimeout;
+ schema?: StandardSchemaV1;
+}>;
+
+type NamedWaitForSignalOptions = Readonly<{
+ name?: string;
+ signal: string;
+ timeout?: StepWaitTimeout;
+ schema?: StandardSchemaV1;
+}>;
+
+type WaitForSignalRequestInput =
+ | string
+ | NamedWaitForSignalOptions;
+
/**
* Replays prior step attempts and persists new ones while memoizing
* deterministic step outputs.
@@ -499,6 +560,7 @@ class StepExecutor implements StepApi {
private readonly failedCountsByStepName: Map;
private readonly failedByStepName: Map;
private readonly runningByStepName: Map;
+ private readonly activeSignalWaitStepNameBySignal: Map;
private readonly expectedNextStepIndexByName: Map;
private readonly resolvedStepNames: Set;
private readonly executionFence: ExecutionFenceController;
@@ -515,6 +577,14 @@ class StepExecutor implements StepApi {
this.failedCountsByStepName = new Map(state.failedCountsByStepName);
this.failedByStepName = new Map(state.failedByStepName);
this.runningByStepName = new Map(state.runningByStepName);
+ this.activeSignalWaitStepNameBySignal = new Map(
+ [...state.runningSignalWaitBySignal.entries()].map(
+ ([signal, attempt]): readonly [string, string] => [
+ signal,
+ attempt.stepName,
+ ],
+ ),
+ );
this.expectedNextStepIndexByName = new Map();
this.resolvedStepNames = new Set();
this.executionFence = options.executionFence;
@@ -674,6 +744,230 @@ class StepExecutor implements StepApi {
throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt));
}
+ // ---- step.sendSignal ---------------------------------------------------
+
+ async sendSignal(
+ workflowRunId: string,
+ signal: string,
+ options?: Readonly<{
+ data?: JsonValue;
+ }>,
+ ): Promise {
+ const stepName = this.resolveStepName(signal);
+
+ const existingAttempt = getCachedStepAttempt(this.cache, stepName);
+ if (existingAttempt) return;
+
+ const runningAttempt = this.runningByStepName.get(stepName);
+ if (runningAttempt?.kind === "signal-send") {
+ await this.resolveRunningSignalSend(
+ stepName,
+ runningAttempt,
+ workflowRunId,
+ signal,
+ options?.data ?? null,
+ );
+ return;
+ }
+
+ this.assertExecutionActive();
+ this.ensureStepLimitNotReached();
+ const attempt = await this.backend.createStepAttempt({
+ workflowRunId: this.workflowRunId,
+ workerId: this.workerId,
+ stepName,
+ kind: "signal-send",
+ config: {},
+ context: null,
+ });
+ this.stepCount += 1;
+ this.runningByStepName.set(stepName, attempt);
+
+ await this.resolveRunningSignalSend(
+ stepName,
+ attempt,
+ workflowRunId,
+ signal,
+ options?.data ?? null,
+ );
+ }
+
+ private async resolveRunningSignalSend(
+ stepName: string,
+ attempt: Readonly,
+ workflowRunId: string,
+ signal: string,
+ data: JsonValue | null,
+ ): Promise {
+ try {
+ await this.backend.sendWorkflowSignal({
+ workflowRunId,
+ signal,
+ data,
+ idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName),
+ });
+
+ const savedAttempt = await this.backend.completeStepAttempt({
+ workflowRunId: this.workflowRunId,
+ stepAttemptId: attempt.id,
+ workerId: this.workerId,
+ output: null,
+ });
+ this.cache = addToStepAttemptCache(this.cache, savedAttempt);
+ this.runningByStepName.delete(stepName);
+ } catch (error) {
+ return await this.failStepWithError(
+ stepName,
+ attempt.id,
+ error,
+ DEFAULT_STEP_RETRY_POLICY,
+ );
+ }
+ }
+
+ // ---- step.waitForSignal -----------------------------------------------
+
+ async waitForSignal(
+ signal: string,
+ options?: WaitForSignalOptions,
+ ): Promise;
+ async waitForSignal(
+ options: NamedWaitForSignalOptions,
+ ): Promise;
+ async waitForSignal(
+ signalOrOptions: WaitForSignalRequestInput,
+ options?: WaitForSignalOptions,
+ ): Promise {
+ const request =
+ typeof signalOrOptions === "string"
+ ? {
+ name: undefined,
+ signal: signalOrOptions,
+ timeout: options?.timeout,
+ schema: options?.schema,
+ }
+ : signalOrOptions;
+ const stepName = this.resolveStepName(request.name ?? request.signal);
+
+ const existingAttempt = getCachedStepAttempt(this.cache, stepName);
+ if (existingAttempt) {
+ return existingAttempt.output as Output | null;
+ }
+
+ const runningAttempt = this.runningByStepName.get(stepName);
+ if (runningAttempt?.kind === "signal-wait") {
+ return await this.resolveRunningSignalWait(stepName, runningAttempt, {
+ signal:
+ runningAttempt.context?.kind === "signal-wait"
+ ? runningAttempt.context.signal
+ : request.signal,
+ timeout: request.timeout,
+ schema: request.schema,
+ });
+ }
+
+ const activeSignalWaitStepName = this.activeSignalWaitStepNameBySignal.get(
+ request.signal,
+ );
+ if (activeSignalWaitStepName && activeSignalWaitStepName !== stepName) {
+ throw new Error(
+ `Signal "${request.signal}" is already being awaited by step "${activeSignalWaitStepName}"`,
+ );
+ }
+
+ const timeoutAt = resolveWaitTimeoutAt(request.timeout);
+ this.assertExecutionActive();
+ this.ensureStepLimitNotReached();
+ this.activeSignalWaitStepNameBySignal.set(request.signal, stepName);
+
+ let attempt: StepAttempt;
+ try {
+ attempt = await this.backend.createStepAttempt({
+ workflowRunId: this.workflowRunId,
+ workerId: this.workerId,
+ stepName,
+ kind: "signal-wait",
+ config: {},
+ context: createSignalWaitContext(request.signal, timeoutAt),
+ });
+ } catch (error) {
+ this.releaseActiveSignalWait(request.signal, stepName);
+ throw error;
+ }
+
+ this.stepCount += 1;
+ this.runningByStepName.set(stepName, attempt);
+
+ return await this.resolveRunningSignalWait(stepName, attempt, {
+ signal: request.signal,
+ timeout: request.timeout,
+ schema: request.schema,
+ });
+ }
+
+ private releaseActiveSignalWait(signal: string, stepName: string): void {
+ if (this.activeSignalWaitStepNameBySignal.get(signal) === stepName) {
+ this.activeSignalWaitStepNameBySignal.delete(signal);
+ }
+ }
+
+ private async resolveRunningSignalWait(
+ stepName: string,
+ attempt: Readonly,
+ request: Readonly>,
+ ): Promise {
+ const signalData = await this.backend.consumeWorkflowSignal({
+ workflowRunId: this.workflowRunId,
+ signal: request.signal,
+ stepAttemptId: attempt.id,
+ workerId: this.workerId,
+ });
+
+ if (signalData !== undefined) {
+ const validationResult = await validateInput(request.schema, signalData);
+ if (!validationResult.success) {
+ this.releaseActiveSignalWait(request.signal, stepName);
+ return await this.failStepWithError(
+ stepName,
+ attempt.id,
+ new Error(validationResult.error),
+ TERMINAL_STEP_RETRY_POLICY,
+ );
+ }
+
+ const completed = await this.backend.completeStepAttempt({
+ workflowRunId: this.workflowRunId,
+ stepAttemptId: attempt.id,
+ workerId: this.workerId,
+ output: normalizeStepOutput(validationResult.value),
+ });
+ this.releaseActiveSignalWait(request.signal, stepName);
+ this.runningByStepName.delete(stepName);
+ this.cache = addToStepAttemptCache(this.cache, completed);
+ return completed.output as Output | null;
+ }
+
+ const persistedTimeoutAt = getSignalWaitTimeoutAt(attempt);
+ const timeoutAt =
+ persistedTimeoutAt && Number.isFinite(persistedTimeoutAt.getTime())
+ ? persistedTimeoutAt
+ : defaultWaitTimeoutAt(attempt.createdAt);
+ if (Date.now() >= timeoutAt.getTime()) {
+ const completed = await this.backend.completeStepAttempt({
+ workflowRunId: this.workflowRunId,
+ stepAttemptId: attempt.id,
+ workerId: this.workerId,
+ output: null,
+ });
+ this.releaseActiveSignalWait(request.signal, stepName);
+ this.runningByStepName.delete(stepName);
+ this.cache = addToStepAttemptCache(this.cache, completed);
+ return null;
+ }
+
+ throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(timeoutAt));
+ }
+
// ---- step.runWorkflow -----------------------------------------------
async runWorkflow (
@@ -713,7 +1007,7 @@ class StepExecutor implements StepApi {
throw new StepError({
stepName,
stepFailedAttempts: this.failedCountsByStepName.get(stepName) ?? 1,
- retryPolicy: WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ retryPolicy: TERMINAL_STEP_RETRY_POLICY,
error: failedError,
});
}
@@ -729,7 +1023,7 @@ class StepExecutor implements StepApi {
}
// First encounter — create the workflow step and child workflow run
- const timeoutAt = resolveWorkflowTimeoutAt(request.timeout);
+ const timeoutAt = resolveWaitTimeoutAt(request.timeout);
this.assertExecutionActive();
this.ensureStepLimitNotReached();
const attempt = await this.backend.createStepAttempt({
@@ -784,7 +1078,7 @@ class StepExecutor implements StepApi {
new Error(
`Workflow step "${stepName}" could not find linked child workflow run`,
),
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}
@@ -798,7 +1092,7 @@ class StepExecutor implements StepApi {
new Error(
`Workflow step "${stepName}" could not find linked child workflow run "${childId}"`,
),
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}
@@ -808,7 +1102,7 @@ class StepExecutor implements StepApi {
stepName,
workflowAttempt.id,
new Error("Timed out waiting for child workflow to complete"),
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}
@@ -835,7 +1129,7 @@ class StepExecutor implements StepApi {
stepName,
workflowAttempt.id,
childError,
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}
@@ -847,7 +1141,7 @@ class StepExecutor implements StepApi {
new Error(
`Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`,
),
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}
@@ -856,7 +1150,7 @@ class StepExecutor implements StepApi {
const resumeAt =
timeoutAt && Number.isFinite(timeoutAt.getTime())
? timeoutAt
- : defaultWorkflowTimeoutAt(workflowAttempt.createdAt);
+ : defaultWaitTimeoutAt(workflowAttempt.createdAt);
throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt));
}
@@ -973,7 +1267,7 @@ class StepExecutor implements StepApi {
stepName,
stepAttemptId,
error,
- WORKFLOW_STEP_FAILURE_RETRY_POLICY,
+ TERMINAL_STEP_RETRY_POLICY,
);
}