diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 87a38b79..8671c550 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -105,6 +105,7 @@ of coordination. There is no separate orchestrator server. | | | - workflow_runs | | - step_attempts | + | - workflow_signals | +------------------------------+ ``` @@ -122,9 +123,9 @@ of coordination. There is no separate orchestrator server. `npx @openworkflow/cli worker start` with auto-discovery of workflow files. - **Backend**: The source of truth. It stores workflow runs and step attempts. - The `workflow_runs` table serves as the job queue for the workers, while the - `step_attempts` table serves as a record of started and completed work, - enabling memoization. + The `workflow_runs` table serves as the job queue for the workers, the + `step_attempts` table serves as a record of started and completed work, and + the `workflow_signals` buffers signals until a waiting step consumes them. ### 2.3. Basic Execution Flow @@ -149,7 +150,7 @@ of coordination. There is no separate orchestrator server. `step_attempt` record with status `running`, executes the step function, and then updates the `step_attempt` to `completed` upon completion. The Worker continues executing inline until the workflow code completes or encounters a - sleep. + durable wait such as sleep, child-workflow waiting, or signal waiting. 6. **State Update**: The Worker updates the Backend with each `step_attempt` as it is created and completed, and updates the status of the `workflow_run` (e.g., `completed`, `running` for parked waits). @@ -234,10 +235,23 @@ target workflow name in `spec`) and `options.timeout` controls the wait timeout (default 1y). When the timeout is reached, the parent step fails but the child workflow continues running independently. -All step APIs (`step.run`, `step.sleep`, and `step.runWorkflow`) share the same -collision logic for durable keys. If duplicate base names are encountered in one -execution pass, OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, -and so on so each step call maps to a distinct step attempt. +**`step.sendSignal(workflowRunId, signal, options?)`**: Sends signal to another +workflow run as a durable step. The signal is stored in `workflow_signals` and +retried safely using a deterministic idempotency key derived from the logical +step name. + +**`step.waitForSignal(signal, options?)`**: Waits durably for the next buffered +signal with that name for the current workflow run. The common-case API uses +the signal string itself as the durable step name. When needed, +`step.waitForSignal({ name, signal, timeout, schema })` lets callers override +the durable step name explicitly. Waits return the parsed payload or `null` on +timeout. + +All step APIs (`step.run`, `step.sleep`, `step.runWorkflow`, +`step.sendSignal`, and `step.waitForSignal`) share the same collision logic for +durable keys. If duplicate base names are encountered in one execution pass, +OpenWorkflow auto-indexes them as `name`, `name:1`, `name:2`, and so on so each +step call maps to a distinct step attempt. ## 4. Error Handling & Retries diff --git a/packages/dashboard/src/routes/runs/$runId.tsx b/packages/dashboard/src/routes/runs/$runId.tsx index 4952f4cd..f143c36f 100644 --- a/packages/dashboard/src/routes/runs/$runId.tsx +++ b/packages/dashboard/src/routes/runs/$runId.tsx @@ -352,8 +352,7 @@ function RunDetailsPage() { const config = STEP_STATUS_CONFIG[step.status]; const StatusIcon = config.icon; const iconColor = config.color; - const stepTypeLabel = - step.kind === "function" ? "function" : step.kind; + const stepTypeLabel = formatStepKindLabel(step.kind); const stepDuration = computeDuration( step.startedAt, step.finishedAt, @@ -740,6 +739,10 @@ function StepInspectorPanel({ value={attemptCount.toString()} mono /> + ): unknown { if (value instanceof Error) { return { diff --git a/packages/docs/docs.json b/packages/docs/docs.json index 13eb3f90..4d34e3bf 100644 --- a/packages/docs/docs.json +++ b/packages/docs/docs.json @@ -42,6 +42,7 @@ "docs/parallel-steps", "docs/dynamic-steps", "docs/child-workflows", + "docs/signals", "docs/retries", "docs/type-safety", "docs/versioning", diff --git a/packages/docs/docs/openworkflow-vs-temporal.mdx b/packages/docs/docs/openworkflow-vs-temporal.mdx index 06b0a2c8..5305b066 100644 --- a/packages/docs/docs/openworkflow-vs-temporal.mdx +++ b/packages/docs/docs/openworkflow-vs-temporal.mdx @@ -50,8 +50,9 @@ Temporal exposes multiple timeout classes and retry options per workflow and activity. That flexibility is useful when you need it. OpenWorkflow keeps the default surface smaller (`step.run`, `step.sleep`, -retries with backoff, optional `deadlineAt`) and is often faster to adopt for -TypeScript teams that want fewer moving parts. +`step.runWorkflow`, `step.waitForSignal`, retries with backoff, optional +`deadlineAt`) and is often faster to adopt for TypeScript teams that want fewer +moving parts. ### Tradeoff: happy-path overhead vs resume-path replay CPU @@ -67,8 +68,7 @@ resume. - **Polyglot platform**: You need one orchestration platform across multiple languages. -- **Advanced runtime interaction**: You need Signals, Queries, Updates, or - Activity heartbeats. These are coming to OpenWorkflow but are available right - now in Temporal. +- **Advanced runtime interaction**: You need the full Temporal surface such as + Queries, Updates, or Activity heartbeats. - **Dedicated platform ownership**: Your team can operate Temporal Server as shared infrastructure. diff --git a/packages/docs/docs/overview.mdx b/packages/docs/docs/overview.mdx index 75ac999c..a83e631f 100644 --- a/packages/docs/docs/overview.mdx +++ b/packages/docs/docs/overview.mdx @@ -27,7 +27,8 @@ input)`), creating a `pending` run in the database 3. The worker replays workflow code from the beginning 4. Completed steps return cached outputs; new steps execute and persist results 5. The run transitions to `completed`, `failed`, or `canceled` (or stays - `running` while durably parked for sleep) + `running` while durably parked for sleep, child-workflow waiting, or + signal waiting) Because replay is deterministic, crash recovery is reliable. A replacement worker can continue from the last durable checkpoint without redoing completed @@ -37,6 +38,8 @@ work. - **Memoized steps** prevent duplicate side effects on retries - **Durable sleep** (`step.sleep`) pauses runs without holding worker capacity +- **Durable signals** (`step.waitForSignal`) buffer signals until a waiting step + consumes them - **Heartbeats + leases** (`availableAt`) allow automatic crash recovery - **Database as source of truth** avoids a separate orchestration service diff --git a/packages/docs/docs/roadmap.mdx b/packages/docs/docs/roadmap.mdx index f97bc187..210ae320 100644 --- a/packages/docs/docs/roadmap.mdx +++ b/packages/docs/docs/roadmap.mdx @@ -20,10 +20,10 @@ description: What's coming next for OpenWorkflow - ✅ Idempotency keys - ✅ Prometheus `/metrics` endpoint - ✅ Child workflows (`step.runWorkflow`) +- ✅ Signals (`step.waitForSignal`, `step.sendSignal`, `ow.sendSignal`) ## Coming Soon -- Signals - Cron / scheduling - Rollback / compensation functions - Priority and concurrency controls diff --git a/packages/docs/docs/signals.mdx b/packages/docs/docs/signals.mdx new file mode 100644 index 00000000..7e58d807 --- /dev/null +++ b/packages/docs/docs/signals.mdx @@ -0,0 +1,157 @@ +--- +title: Signals +description: Pause a workflow until your app or another workflow sends a named signal +--- + +Use signals when a workflow needs to wait for an event that does not happen at +a fixed time, like a human approval, a webhook callback, or a result from +another workflow. + +A signal is addressed to one specific workflow run by run ID. OpenWorkflow +buffers signals durably, so they can arrive before or after the workflow reaches +`step.waitForSignal()`. + +## Basic Usage + +Wait for a signal inside a workflow: + +```ts +import { defineWorkflow } from "openworkflow"; + +export const reviewOrder = defineWorkflow( + { name: "review-order" }, + async ({ input, step }) => { + await step.run({ name: "save-order" }, async () => { + await db.orders.insert(input); + }); + + const approval = await step.waitForSignal<{ + approved: boolean; + reviewerId: string; + }>("approval", { + timeout: "7d", + }); + + if (!approval?.approved) { + throw new Error("Order was not approved"); + } + + await step.run({ name: "submit-order" }, async () => { + await orders.submit(input.orderId, approval.reviewerId); + }); + }, +); +``` + +Send the signal from your application code: + +```ts +const handle = await reviewOrder.run({ orderId: "order_123" }); + +await ow.sendSignal(handle.workflowRun.id, "approval", { + data: { + approved: true, + reviewerId: "manager_42", + }, +}); +``` + +## Send from Your App + +Use `ow.sendSignal()` when the event comes from outside workflow code, such as +an HTTP route, webhook handler, or admin action: + +```ts +await ow.sendSignal(runId, "approval", { + data: { + approved: true, + reviewerId: "manager_42", + }, + idempotencyKey: `approval:${event.id}`, +}); +``` + +Use `idempotencyKey` when the sender might retry. If the same +`workflowRunId` + `idempotencyKey` is sent again, OpenWorkflow stores it only +once. + +## Send from Another Workflow + +Use `step.sendSignal()` when one workflow should unblock another and you want +the send itself to be durable: + +```ts +await step.sendSignal(targetRunId, "approval", { + data: { approved: true, reviewerId: "manager_42" }, +}); +``` + +This is useful when workflows coordinate as peers rather than as +parent-and-child runs. + +## Timeout and Validation + +`step.waitForSignal()` returns the payload when a matching signal arrives, or +`null` if the timeout is reached first: + +```ts +const approval = await step.waitForSignal("approval", { + timeout: "3d", + schema: z.object({ + approved: z.boolean(), + reviewerId: z.string(), + }), +}); + +if (approval === null) { + await step.run({ name: "mark-expired" }, async () => { + await db.requests.update(requestId, { status: "expired" }); + }); +} +``` + +If the signal payload fails schema validation, the wait step fails immediately. + +## Step Names + +By default, the signal string is also the durable step name: + +```ts +await step.waitForSignal("approval"); +await step.sendSignal(targetRunId, "approval"); +``` + +Step names share one namespace across all step types. If the same name appears +again during one execution, OpenWorkflow applies its normal `:1`, `:2`, and so +on disambiguation. + +If you want a more explicit durable step name, use the object form: + +```ts +await step.waitForSignal({ + name: "manager-approval", + signal: "approval", + timeout: "7d", +}); +``` + +## Important Behavior + +- Signals are buffered per `workflowRunId` and signal name until consumed. +- If multiple buffered signals match, the oldest pending one is consumed first. +- A run can have only one active waiter for a given signal name at a time. +- Signals target a specific run ID, not every run of a workflow. + +## When to Use Signals + +Signals are a good fit for: + +- Human approvals and manual review steps +- Webhook or callback-style resumptions +- Cross-workflow coordination when you do not want a child workflow + + + If you need to wait for another workflow's final result, prefer + [`step.runWorkflow()`](/docs/child-workflows). If you need to wait until a + specific time, prefer [`step.sleep()`](/docs/sleeping). + diff --git a/packages/docs/docs/steps.mdx b/packages/docs/docs/steps.mdx index 5eaa2f80..349cb8a1 100644 --- a/packages/docs/docs/steps.mdx +++ b/packages/docs/docs/steps.mdx @@ -115,7 +115,7 @@ await step.run({ name: "log-event" }, async () => { ## Step Types -OpenWorkflow provides three step types: +OpenWorkflow provides five step types: ### `step.run()` @@ -148,6 +148,42 @@ const childOutput = await step.runWorkflow( ); ``` +### `step.sendSignal()` + +Sends signal to another workflow run as a durable step: + +```ts +await step.sendSignal(targetRunId, "approval", { + data: { approved: true }, +}); +``` + +See [Signals](/docs/signals) for buffered delivery, external sends with +`ow.sendSignal()`, and coordination patterns. + +### `step.waitForSignal()` + +Waits durably for the next matching signal for the current workflow run: + +```ts +const approval = await step.waitForSignal("approval", { + timeout: "7d", +}); +``` + +If you need to override the durable step name explicitly, use the object form: + +```ts +const approval = await step.waitForSignal({ + name: "manager-approval", + signal: "approval", + timeout: "7d", +}); +``` + +See [Signals](/docs/signals) for end-to-end examples, timeout behavior, and +delivery semantics. + ## Retry Policy (Optional) Control backoff and retry limits for an individual step: diff --git a/packages/docs/docs/workflows.mdx b/packages/docs/docs/workflows.mdx index 5d8ae39e..7ef12e52 100644 --- a/packages/docs/docs/workflows.mdx +++ b/packages/docs/docs/workflows.mdx @@ -63,6 +63,15 @@ for the workflow to complete. runnable workflow that supports `.run(...)` directly. +To wake an existing run from outside workflow code, send a buffered signal to +its run ID: + +```ts +await ow.sendSignal(handle.workflowRun.id, "approval", { + data: { approved: true }, +}); +``` + ### Scheduling a Workflow Run You can schedule a workflow run for a specific time by passing `availableAt`: @@ -213,12 +222,12 @@ create a separate run. The workflow function receives an object with four properties: -| Parameter | Type | Description | -| --------- | --------------------- | --------------------------------------------------------------------- | -| `input` | Generic | The input data passed when starting the workflow | -| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`) | -| `version` | `string \| null` | The workflow version, if specified | -| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | +| Parameter | Type | Description | +| --------- | --------------------- | -------------------------------------------------------------------------------------------------------------- | +| `input` | Generic | The input data passed when starting the workflow | +| `step` | `StepApi` | API for defining steps (`step.run`, `step.sleep`, `step.runWorkflow`, `step.sendSignal`, `step.waitForSignal`) | +| `version` | `string \| null` | The workflow version, if specified | +| `run` | `WorkflowRunMetadata` | Read-only run metadata snapshot (`run.id`, etc.) | ```ts defineWorkflow({ name: "example" }, async ({ input, step, version, run }) => { diff --git a/packages/openworkflow/client/client.test.ts b/packages/openworkflow/client/client.test.ts index 910eb418..6d8e84f1 100644 --- a/packages/openworkflow/client/client.test.ts +++ b/packages/openworkflow/client/client.test.ts @@ -15,7 +15,7 @@ import { OpenWorkflow } from "./client.js"; import { type as arkType } from "arktype"; import { randomUUID } from "node:crypto"; import * as v from "valibot"; -import { describe, expect, test } from "vitest"; +import { describe, expect, test, vi } from "vitest"; import { number as yupNumber, object as yupObject, @@ -506,6 +506,27 @@ describe("OpenWorkflow", () => { expect(workflowRun?.finishedAt).not.toBeNull(); }); + test("sends workflow signals via the backend", async () => { + const sendWorkflowSignal = vi.fn(() => Promise.resolve()); + const client = new OpenWorkflow({ + backend: { + sendWorkflowSignal, + } as unknown as Backend, + }); + + await client.sendSignal("run-123", "approval", { + data: { approved: true }, + idempotencyKey: "signal-1", + }); + + expect(sendWorkflowSignal).toHaveBeenCalledWith({ + workflowRunId: "run-123", + signal: "approval", + data: { approved: true }, + idempotencyKey: "signal-1", + }); + }); + test("throws when canceling a non-existent workflow run", async () => { const backend = await createBackend(); const client = new OpenWorkflow({ backend }); diff --git a/packages/openworkflow/client/client.ts b/packages/openworkflow/client/client.ts index bb936dce..98fc67d3 100644 --- a/packages/openworkflow/client/client.ts +++ b/packages/openworkflow/client/client.ts @@ -1,5 +1,6 @@ import type { Backend } from "../core/backend.js"; import type { DurationString } from "../core/duration.js"; +import type { JsonValue } from "../core/json.js"; import type { StandardSchemaV1 } from "../core/standard-schema.js"; import { calculateDateFromDuration } from "../core/step-attempt.js"; import { @@ -173,6 +174,28 @@ export class OpenWorkflow { async cancelWorkflowRun(workflowRunId: string): Promise { await this.backend.cancelWorkflowRun({ workflowRunId }); } + + /** + * Sends a buffered signal to a workflow run. + * @param workflowRunId - Target workflow run id + * @param signal - Signal name + * @param options - Optional signal payload and client idempotency key + */ + async sendSignal( + workflowRunId: string, + signal: string, + options?: Readonly<{ + data?: JsonValue; + idempotencyKey?: string; + }>, + ): Promise { + await this.backend.sendWorkflowSignal({ + workflowRunId, + signal, + data: options?.data ?? null, + idempotencyKey: options?.idempotencyKey ?? null, + }); + } } /** diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index d79dc5e6..2d2bf23e 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -68,6 +68,12 @@ export interface Backend { params: Readonly, ): Promise; + // Signals + sendWorkflowSignal(params: Readonly): Promise; + consumeWorkflowSignal( + params: Readonly, + ): Promise; + // Lifecycle stop(): Promise; } @@ -173,6 +179,20 @@ export interface SetStepAttemptChildWorkflowRunParams { childWorkflowRunId: string; } +export interface SendWorkflowSignalParams { + workflowRunId: string; + signal: string; + data: JsonValue | null; + idempotencyKey: string | null; +} + +export interface ConsumeWorkflowSignalParams { + workflowRunId: string; + signal: string; + stepAttemptId: string; + workerId: string; +} + export interface PaginationOptions { limit?: number; after?: string; diff --git a/packages/openworkflow/core/step-attempt.test.ts b/packages/openworkflow/core/step-attempt.test.ts index ce5f207f..c26a0e7a 100644 --- a/packages/openworkflow/core/step-attempt.test.ts +++ b/packages/openworkflow/core/step-attempt.test.ts @@ -6,6 +6,7 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, + createSignalWaitContext, createWorkflowContext, } from "./step-attempt.js"; import type { StepAttempt, StepAttemptCache } from "./step-attempt.js"; @@ -339,6 +340,19 @@ describe("createWorkflowContext", () => { }); }); +describe("createSignalWaitContext", () => { + test("creates signal-wait context with signal and timeout", () => { + const timeoutAt = new Date("2025-06-15T10:30:00.000Z"); + const context = createSignalWaitContext("approval", timeoutAt); + + expect(context).toEqual({ + kind: "signal-wait", + signal: "approval", + timeoutAt: "2025-06-15T10:30:00.000Z", + }); + }); +}); + function createMockStepAttempt( overrides: Partial = {}, ): StepAttempt { diff --git a/packages/openworkflow/core/step-attempt.ts b/packages/openworkflow/core/step-attempt.ts index 888120ac..61d30970 100644 --- a/packages/openworkflow/core/step-attempt.ts +++ b/packages/openworkflow/core/step-attempt.ts @@ -7,7 +7,12 @@ import { err, ok } from "./result.js"; /** * The kind of step in a workflow. */ -export type StepKind = "function" | "sleep" | "workflow"; +export type StepKind = + | "function" + | "sleep" + | "workflow" + | "signal-send" + | "signal-wait"; /** * Status of a step attempt through its lifecycle. @@ -34,12 +39,22 @@ export interface WorkflowStepAttemptContext { timeoutAt: string | null; } +/** + * Context for a signal-wait step attempt. + */ +export interface SignalWaitStepAttemptContext { + kind: "signal-wait"; + signal: string; + timeoutAt: string; +} + /** * Context for a step attempt. */ export type StepAttemptContext = | SleepStepAttemptContext - | WorkflowStepAttemptContext; + | WorkflowStepAttemptContext + | SignalWaitStepAttemptContext; /** * StepAttempt represents a single attempt of a step within a workflow. @@ -171,3 +186,20 @@ export function createWorkflowContext( timeoutAt: timeoutAt?.toISOString() ?? null, }; } + +/** + * Create the context object for a signal-wait step attempt. + * @param signal - Signal name + * @param timeoutAt - Wait timeout deadline + * @returns The context object for the signal-wait step + */ +export function createSignalWaitContext( + signal: string, + timeoutAt: Readonly, +): SignalWaitStepAttemptContext { + return { + kind: "signal-wait" as const, + signal, + timeoutAt: timeoutAt.toISOString(), + }; +} diff --git a/packages/openworkflow/core/workflow-function.ts b/packages/openworkflow/core/workflow-function.ts index 01745679..068b19b2 100644 --- a/packages/openworkflow/core/workflow-function.ts +++ b/packages/openworkflow/core/workflow-function.ts @@ -1,7 +1,11 @@ import type { DurationString } from "./duration.js"; +import type { JsonValue } from "./json.js"; +import type { StandardSchemaV1 } from "./standard-schema.js"; import type { RetryPolicy, WorkflowSpec } from "./workflow-definition.js"; import type { WorkflowRun } from "./workflow-run.js"; +export type StepWaitTimeout = number | string | Date; + /** * Config for an individual step defined with `step.run()`. */ @@ -37,13 +41,13 @@ export interface StepRunWorkflowOptions { /** * Maximum time to wait for the child workflow to complete. */ - timeout?: number | string | Date; + timeout?: StepWaitTimeout; } /** * Represents the API for defining steps within a workflow. Used within a * workflow handler to define steps by calling `step.run()`, `step.sleep()`, - * and `step.runWorkflow()`. + * `step.runWorkflow()`, `step.sendSignal()`, and `step.waitForSignal()`. */ export interface StepApi { run: ( @@ -56,6 +60,30 @@ export interface StepApi { options?: Readonly, ) => Promise; sleep: (name: string, duration: DurationString) => Promise; + sendSignal: ( + workflowRunId: string, + signal: string, + options?: Readonly<{ + data?: JsonValue; + }>, + ) => Promise; + waitForSignal: { + ( + signal: string, + options?: Readonly<{ + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ): Promise; + ( + options: Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; + }>, + ): Promise; + }; } /** diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index 31fecd90..7fe6816e 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -8,7 +8,7 @@ import { } from "./postgres.js"; import assert from "node:assert"; import { randomUUID } from "node:crypto"; -import { describe, expect, test } from "vitest"; +import { describe, expect, test, vi } from "vitest"; test("it is a test file (workaround for sonarjs/no-empty-test-file linter)", () => { assert.ok(true); @@ -632,3 +632,356 @@ describe("BackendPostgres workflow wake-up reconciliation", () => { } }); }); + +describe("BackendPostgres.sendWorkflowSignal fallback branches", () => { + type BackendPostgresCtor = new ( + pg: unknown, + namespaceId: string, + schema: string, + ) => BackendPostgres; + + test("re-reads an existing signal after an idempotent insert conflict", async () => { + let selectCount = 0; + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith('SELECT "id"')) { + selectCount += 1; + return selectCount === 2 ? [{ id: "existing-signal" }] : []; + } + + if (sql.startsWith("INSERT INTO")) { + return []; + } + + return []; + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve()); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + workflowRunsTable: () => string; + wakeWorkflowRunForSignal: (workflowRunId: string) => Promise; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + + await backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: { approved: true }, + idempotencyKey: "signal-key", + }); + + expect(selectCount).toBe(2); + expect(wakeWorkflowRunForSignal).toHaveBeenCalledWith("workflow-run-id"); + }); + + test("throws the generic error when no signal row is inserted for an active run", async () => { + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith("INSERT INTO")) { + return []; + } + + return []; + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve()); + const getWorkflowRun = vi.fn(() => + Promise.resolve( + createMockWorkflowRun({ + id: "workflow-run-id", + status: "running", + }), + ), + ); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + workflowRunsTable: () => string; + wakeWorkflowRunForSignal: (workflowRunId: string) => Promise; + getWorkflowRun: typeof backend.getWorkflowRun; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + internalBackend.getWorkflowRun = + getWorkflowRun as unknown as typeof backend.getWorkflowRun; + + await expect( + backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: null, + idempotencyKey: null, + }), + ).rejects.toThrow("Failed to send workflow signal"); + + expect(getWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: "workflow-run-id", + }); + expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled(); + }); + + test("throws the generic error when the idempotent fallback re-read still finds nothing", async () => { + let selectCount = 0; + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith('SELECT "id"')) { + selectCount += 1; + return []; + } + + if (sql.startsWith("INSERT INTO")) { + return []; + } + + return []; + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const wakeWorkflowRunForSignal = vi.fn(() => Promise.resolve()); + const getWorkflowRun = vi.fn(() => + Promise.resolve( + createMockWorkflowRun({ + id: "workflow-run-id", + status: "running", + }), + ), + ); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + workflowRunsTable: () => string; + wakeWorkflowRunForSignal: (workflowRunId: string) => Promise; + getWorkflowRun: typeof backend.getWorkflowRun; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + internalBackend.getWorkflowRun = + getWorkflowRun as unknown as typeof backend.getWorkflowRun; + + await expect( + backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: null, + idempotencyKey: "signal-key", + }), + ).rejects.toThrow("Failed to send workflow signal"); + + expect(selectCount).toBe(2); + expect(getWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: "workflow-run-id", + }); + expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled(); + }); +}); + +describe("BackendPostgres.consumeWorkflowSignal edge cases", () => { + type BackendPostgresCtor = new ( + pg: unknown, + namespaceId: string, + schema: string, + ) => BackendPostgres; + + test("returns null for previously consumed signals with null data", async () => { + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith('SELECT "data"')) { + return [{ data: null }]; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + stepAttemptsTable: () => string; + workflowRunsTable: () => string; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.stepAttemptsTable = () => '"step_attempts"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + + await expect( + backend.consumeWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + stepAttemptId: "step-attempt-id", + workerId: "worker-id", + }), + ).resolves.toBeNull(); + }); + + test("returns undefined when no signal can be consumed", async () => { + let selectCount = 0; + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith('SELECT "data"')) { + selectCount += 1; + return []; + } + + if (sql.startsWith("WITH candidate AS")) { + return []; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + stepAttemptsTable: () => string; + workflowRunsTable: () => string; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.stepAttemptsTable = () => '"step_attempts"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + + await expect( + backend.consumeWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + stepAttemptId: "step-attempt-id", + workerId: "worker-id", + }), + ).resolves.toBeUndefined(); + + expect(selectCount).toBe(1); + }); + + test("returns null when a freshly consumed signal has null data", async () => { + const fakePg = createFakePostgresQueryHandler((sql) => { + if (sql.startsWith('SELECT "data"')) { + return []; + } + + if (sql.startsWith("WITH candidate AS")) { + return [{ data: null }]; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendPostgres as unknown as BackendPostgresCtor)( + fakePg, + randomUUID(), + DEFAULT_SCHEMA, + ); + const internalBackend = backend as unknown as { + workflowSignalsTable: () => string; + stepAttemptsTable: () => string; + workflowRunsTable: () => string; + }; + internalBackend.workflowSignalsTable = () => '"workflow_signals"'; + internalBackend.stepAttemptsTable = () => '"step_attempts"'; + internalBackend.workflowRunsTable = () => '"workflow_runs"'; + + await expect( + backend.consumeWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + stepAttemptId: "step-attempt-id", + workerId: "worker-id", + }), + ).resolves.toBeNull(); + }); +}); + +function createFakePostgresQueryHandler( + handler: (sql: string) => readonly unknown[], +): unknown { + const fakePg = ((input: TemplateStringsArray | string) => { + if ( + Array.isArray(input) && + Object.prototype.hasOwnProperty.call(input, "raw") + ) { + const sql = input.join(" ").replaceAll(/\s+/g, " ").trim(); + return handler(sql); + } + + return []; + }) as { + (input: TemplateStringsArray | string, ...values: unknown[]): unknown; + json: (value: unknown) => unknown; + end: () => Promise; + }; + fakePg.json = (value) => value; + fakePg.end = async () => { + // no-op for tests + }; + + return fakePg; +} + +function createMockWorkflowRun( + overrides: Partial<{ + namespaceId: string; + id: string; + workflowName: string; + version: string | null; + status: + | "pending" + | "running" + | "sleeping" + | "completed" + | "succeeded" + | "failed" + | "canceled"; + idempotencyKey: string | null; + config: Record; + context: Record | null; + input: Record | null; + output: Record | null; + error: Record | null; + attempts: number; + parentStepAttemptNamespaceId: string | null; + parentStepAttemptId: string | null; + workerId: string | null; + availableAt: Date | null; + deadlineAt: Date | null; + startedAt: Date | null; + finishedAt: Date | null; + createdAt: Date; + updatedAt: Date; + }> = {}, +) { + return { + namespaceId: "default", + id: "workflow-run-id", + workflowName: "workflow-name", + version: null, + status: "running", + idempotencyKey: null, + config: {}, + context: null, + input: null, + output: null, + error: null, + attempts: 1, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + workerId: "worker-id", + availableAt: null, + deadlineAt: null, + startedAt: new Date("2026-01-01T00:00:00.000Z"), + finishedAt: null, + createdAt: new Date("2026-01-01T00:00:00.000Z"), + updatedAt: new Date("2026-01-01T00:00:00.000Z"), + ...overrides, + }; +} diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index c2ac3187..fe7dfe66 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -17,6 +17,8 @@ import { FailStepAttemptParams, CompleteStepAttemptParams, SetStepAttemptChildWorkflowRunParams, + SendWorkflowSignalParams, + ConsumeWorkflowSignalParams, FailWorkflowRunParams, RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, @@ -247,6 +249,168 @@ export class BackendPostgres implements Backend { return workflowRun ?? null; } + async sendWorkflowSignal(params: SendWorkflowSignalParams): Promise { + const workflowSignalsTable = this.workflowSignalsTable(); + const workflowRunsTable = this.workflowRunsTable(); + + if (params.idempotencyKey !== null) { + const [existing] = await this.pg<{ id: string }[]>` + SELECT "id" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "idempotency_key" = ${params.idempotencyKey} + LIMIT 1 + `; + + if (existing) { + await this.wakeWorkflowRunForSignal(params.workflowRunId); + return; + } + } + + const conflictClause = + params.idempotencyKey === null + ? this.pg`` + : this.pg` + ON CONFLICT ("namespace_id", "workflow_run_id", "idempotency_key") + WHERE "idempotency_key" IS NOT NULL + DO NOTHING + `; + + const [inserted] = await this.pg<{ id: string }[]>` + INSERT INTO ${workflowSignalsTable} ( + "namespace_id", + "id", + "workflow_run_id", + "signal", + "data", + "idempotency_key", + "consumed_step_attempt_namespace_id", + "consumed_step_attempt_id", + "consumed_at", + "created_at", + "updated_at" + ) + SELECT + ${this.namespaceId}, + gen_random_uuid(), + ${params.workflowRunId}, + ${params.signal}, + ${this.pg.json(params.data)}, + ${params.idempotencyKey}, + NULL, + NULL, + NULL, + date_trunc('milliseconds', NOW()), + NOW() + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${params.workflowRunId} + AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled') + ${conflictClause} + RETURNING "id" + `; + + if (!inserted && params.idempotencyKey !== null) { + const [existing] = await this.pg<{ id: string }[]>` + SELECT "id" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "idempotency_key" = ${params.idempotencyKey} + LIMIT 1 + `; + + if (existing) { + await this.wakeWorkflowRunForSignal(params.workflowRunId); + return; + } + } + + if (!inserted) { + const workflowRun = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!workflowRun) { + throw new Error(`Workflow run ${params.workflowRunId} does not exist`); + } + + if ( + ["succeeded", "completed", "failed", "canceled"].includes( + workflowRun.status, + ) + ) { + throw new Error( + `Cannot send signal to workflow run ${params.workflowRunId} with status ${workflowRun.status}`, + ); + } + + throw new Error("Failed to send workflow signal"); + } + + await this.wakeWorkflowRunForSignal(params.workflowRunId); + } + + async consumeWorkflowSignal( + params: ConsumeWorkflowSignalParams, + ): Promise { + const workflowSignalsTable = this.workflowSignalsTable(); + const stepAttemptsTable = this.stepAttemptsTable(); + const workflowRunsTable = this.workflowRunsTable(); + + const [existing] = await this.pg<{ data: JsonValue | null }[]>` + SELECT "data" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "signal" = ${params.signal} + AND "consumed_step_attempt_namespace_id" = ${this.namespaceId} + AND "consumed_step_attempt_id" = ${params.stepAttemptId} + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + `; + + if (existing) { + return existing.data ?? null; + } + + const [consumed] = await this.pg<{ data: JsonValue | null }[]>` + WITH candidate AS ( + SELECT "id", "data" + FROM ${workflowSignalsTable} + WHERE "namespace_id" = ${this.namespaceId} + AND "workflow_run_id" = ${params.workflowRunId} + AND "signal" = ${params.signal} + AND "consumed_at" IS NULL + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE ${workflowSignalsTable} ws + SET + "consumed_step_attempt_namespace_id" = ${this.namespaceId}, + "consumed_step_attempt_id" = ${params.stepAttemptId}, + "consumed_at" = NOW(), + "updated_at" = NOW() + FROM candidate, ${stepAttemptsTable} sa, ${workflowRunsTable} wr + WHERE ws."namespace_id" = ${this.namespaceId} + AND ws."id" = candidate."id" + AND sa."namespace_id" = ${this.namespaceId} + AND sa."workflow_run_id" = ${params.workflowRunId} + AND sa."id" = ${params.stepAttemptId} + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + AND wr."namespace_id" = sa."namespace_id" + AND wr."id" = sa."workflow_run_id" + AND wr."status" = 'running' + AND wr."worker_id" = ${params.workerId} + RETURNING candidate."data" AS "data" + `; + + return consumed ? (consumed.data ?? null) : undefined; + } + async listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { @@ -449,7 +613,8 @@ export class BackendPostgres implements Backend { AND wr."id" = ${workflowRunId} AND wr."status" = 'running' AND wr."worker_id" IS NULL - AND EXISTS ( + AND ( + EXISTS ( SELECT 1 FROM ${stepAttemptsTable} sa JOIN ${workflowRunsTable} child @@ -460,6 +625,19 @@ export class BackendPostgres implements Backend { AND sa."kind" = 'workflow' AND sa."status" = 'running' AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) OR EXISTS ( + SELECT 1 + FROM ${stepAttemptsTable} sa + JOIN ${this.workflowSignalsTable()} ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."workflow_run_id" = sa."workflow_run_id" + AND ws."signal" = sa."context"->>'signal' + WHERE sa."namespace_id" = wr."namespace_id" + AND sa."workflow_run_id" = wr."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + AND ws."consumed_at" IS NULL + ) ) RETURNING wr.* `; @@ -651,6 +829,25 @@ export class BackendPostgres implements Backend { return updated; } + private async wakeWorkflowRunForSignal(workflowRunId: string): Promise { + const workflowRunsTable = this.workflowRunsTable(); + + await this.pg` + UPDATE ${workflowRunsTable} + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > NOW() + THEN NOW() + ELSE "available_at" + END, + "updated_at" = NOW() + WHERE "namespace_id" = ${this.namespaceId} + AND "id" = ${workflowRunId} + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `; + } + private async wakeParentWorkflowRun( childWorkflowRun: Readonly, ): Promise { @@ -942,6 +1139,10 @@ export class BackendPostgres implements Backend { private stepAttemptsTable(pg: Postgres = this.pg) { return pg`${pg(this.schema)}.${pg("step_attempts")}`; } + + private workflowSignalsTable(pg: Postgres = this.pg) { + return pg`${pg(this.schema)}.${pg("workflow_signals")}`; + } } /** diff --git a/packages/openworkflow/postgres/postgres.test.ts b/packages/openworkflow/postgres/postgres.test.ts index 30e8c450..bd0656bd 100644 --- a/packages/openworkflow/postgres/postgres.test.ts +++ b/packages/openworkflow/postgres/postgres.test.ts @@ -46,6 +46,11 @@ describe("postgres", () => { test("throws for schema names longer than 63 bytes", () => { expect(() => migrations("a".repeat(64))).toThrow(/at most 63 bytes/i); }); + + test("includes workflow_signals in the latest migration", () => { + const latestMigration = migrations(DEFAULT_SCHEMA).at(-1); + expect(latestMigration).toContain('"workflow_signals"'); + }); }); describe("migrate()", () => { diff --git a/packages/openworkflow/postgres/postgres.ts b/packages/openworkflow/postgres/postgres.ts index 7d641e44..5dfddad4 100644 --- a/packages/openworkflow/postgres/postgres.ts +++ b/packages/openworkflow/postgres/postgres.ts @@ -206,6 +206,51 @@ export function migrations(schema: string): string[] { ON CONFLICT DO NOTHING; COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS ${quotedSchema}."workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "workflow_run_id" TEXT NOT NULL, + "signal" TEXT NOT NULL, + "data" JSONB, + "idempotency_key" TEXT, + "consumed_step_attempt_namespace_id" TEXT, + "consumed_step_attempt_id" TEXT, + "consumed_at" TIMESTAMPTZ, + "created_at" TIMESTAMPTZ NOT NULL, + "updated_at" TIMESTAMPTZ NOT NULL, + PRIMARY KEY ("namespace_id", "id"), + FOREIGN KEY ("namespace_id", "workflow_run_id") + REFERENCES ${quotedSchema}."workflow_runs" ("namespace_id", "id") + ON DELETE CASCADE, + FOREIGN KEY ("consumed_step_attempt_namespace_id", "consumed_step_attempt_id") + REFERENCES ${quotedSchema}."step_attempts" ("namespace_id", "id") + ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS "workflow_signals_pending_lookup_idx" + ON ${quotedSchema}."workflow_signals" ( + "namespace_id", + "workflow_run_id", + "signal", + "consumed_at", + "created_at", + "id" + ); + + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_workflow_run_idempotency_key_idx" + ON ${quotedSchema}."workflow_signals" ("namespace_id", "workflow_run_id", "idempotency_key") + WHERE "idempotency_key" IS NOT NULL; + + INSERT INTO ${quotedSchema}."openworkflow_migrations"("version") + VALUES (5) + ON CONFLICT DO NOTHING; + + COMMIT;`, ]; } diff --git a/packages/openworkflow/sqlite/backend.test.ts b/packages/openworkflow/sqlite/backend.test.ts index 05e07d61..09cd6746 100644 --- a/packages/openworkflow/sqlite/backend.test.ts +++ b/packages/openworkflow/sqlite/backend.test.ts @@ -566,3 +566,250 @@ describe("BackendSqlite workflow wake-up reconciliation", () => { } }); }); + +describe("BackendSqlite.sendWorkflowSignal fallback branches", () => { + type BackendSqliteCtor = new ( + db: Database, + namespaceId: string, + ) => BackendSqlite; + + test("re-reads an existing signal after an idempotent insert conflict", async () => { + let selectCount = 0; + const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => { + if (sql.startsWith('SELECT 1 FROM "workflow_signals"')) { + return { + get() { + selectCount += 1; + return selectCount === 2 ? { 1: 1 } : undefined; + }, + }; + } + + if (sql.startsWith('INSERT OR IGNORE INTO "workflow_signals"')) { + return { + run() { + return { changes: 0 }; + }, + }; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendSqlite as unknown as BackendSqliteCtor)( + fakeDb, + randomUUID(), + ); + const wakeWorkflowRunForSignal = vi.fn(); + const internalBackend = backend as unknown as { + wakeWorkflowRunForSignal: ( + workflowRunId: string, + currentTime: string, + ) => void; + }; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + + await backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: { approved: true }, + idempotencyKey: "signal-key", + }); + + expect(selectCount).toBe(2); + expect(wakeWorkflowRunForSignal).toHaveBeenCalledTimes(1); + expect(wakeWorkflowRunForSignal).toHaveBeenCalledWith( + "workflow-run-id", + expect.any(String), + ); + }); + + test("throws the generic error when no signal row is inserted for an active run", async () => { + const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => { + if (sql.startsWith('INSERT INTO "workflow_signals"')) { + return { + run() { + return { changes: 0 }; + }, + }; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendSqlite as unknown as BackendSqliteCtor)( + fakeDb, + randomUUID(), + ); + const wakeWorkflowRunForSignal = vi.fn(); + const getWorkflowRun = vi.fn(() => + Promise.resolve( + createMockWorkflowRun({ + id: "workflow-run-id", + status: "running", + }), + ), + ); + const internalBackend = backend as unknown as { + wakeWorkflowRunForSignal: ( + workflowRunId: string, + currentTime: string, + ) => void; + getWorkflowRun: typeof backend.getWorkflowRun; + }; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + internalBackend.getWorkflowRun = + getWorkflowRun as unknown as typeof backend.getWorkflowRun; + + await expect( + backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: null, + idempotencyKey: null, + }), + ).rejects.toThrow("Failed to send workflow signal"); + + expect(getWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: "workflow-run-id", + }); + expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled(); + }); + + test("throws the generic error when the idempotent fallback re-read still finds nothing", async () => { + let selectCount = 0; + const fakeDb = createFakeSqliteForSendWorkflowSignal((sql) => { + if (sql.startsWith('SELECT 1 FROM "workflow_signals"')) { + return { + get() { + selectCount += 1; + }, + }; + } + + if (sql.startsWith('INSERT OR IGNORE INTO "workflow_signals"')) { + return { + run() { + return { changes: 0 }; + }, + }; + } + + throw new Error(`Unexpected SQL: ${sql}`); + }); + const backend = new (BackendSqlite as unknown as BackendSqliteCtor)( + fakeDb, + randomUUID(), + ); + const wakeWorkflowRunForSignal = vi.fn(); + const getWorkflowRun = vi.fn(() => + Promise.resolve( + createMockWorkflowRun({ + id: "workflow-run-id", + status: "running", + }), + ), + ); + const internalBackend = backend as unknown as { + wakeWorkflowRunForSignal: ( + workflowRunId: string, + currentTime: string, + ) => void; + getWorkflowRun: typeof backend.getWorkflowRun; + }; + internalBackend.wakeWorkflowRunForSignal = wakeWorkflowRunForSignal; + internalBackend.getWorkflowRun = + getWorkflowRun as unknown as typeof backend.getWorkflowRun; + + await expect( + backend.sendWorkflowSignal({ + workflowRunId: "workflow-run-id", + signal: "approval", + data: null, + idempotencyKey: "signal-key", + }), + ).rejects.toThrow("Failed to send workflow signal"); + + expect(selectCount).toBe(2); + expect(getWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: "workflow-run-id", + }); + expect(wakeWorkflowRunForSignal).not.toHaveBeenCalled(); + }); +}); + +function createFakeSqliteForSendWorkflowSignal( + handler: (sql: string) => { + get?: (...args: unknown[]) => unknown; + run?: (...args: unknown[]) => { changes: number }; + }, +): Database { + return { + exec() { + throw new Error("exec should not be called in this test"); + }, + prepare(sql: string) { + const normalized = sql.replaceAll(/\s+/g, " ").trim(); + return handler(normalized) as ReturnType; + }, + close() { + // no-op for tests + }, + } as Database; +} + +function createMockWorkflowRun( + overrides: Partial<{ + namespaceId: string; + id: string; + workflowName: string; + version: string | null; + status: + | "pending" + | "running" + | "sleeping" + | "completed" + | "succeeded" + | "failed" + | "canceled"; + idempotencyKey: string | null; + config: Record; + context: Record | null; + input: Record | null; + output: Record | null; + error: Record | null; + attempts: number; + parentStepAttemptNamespaceId: string | null; + parentStepAttemptId: string | null; + workerId: string | null; + availableAt: Date | null; + deadlineAt: Date | null; + startedAt: Date | null; + finishedAt: Date | null; + createdAt: Date; + updatedAt: Date; + }> = {}, +) { + return { + namespaceId: "default", + id: "workflow-run-id", + workflowName: "workflow-name", + version: null, + status: "running", + idempotencyKey: null, + config: {}, + context: null, + input: null, + output: null, + error: null, + attempts: 1, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + workerId: "worker-id", + availableAt: null, + deadlineAt: null, + startedAt: new Date("2026-01-01T00:00:00.000Z"), + finishedAt: null, + createdAt: new Date("2026-01-01T00:00:00.000Z"), + updatedAt: new Date("2026-01-01T00:00:00.000Z"), + ...overrides, + }; +} diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index e12c4aed..33e43735 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -16,6 +16,8 @@ import { FailStepAttemptParams, CompleteStepAttemptParams, SetStepAttemptChildWorkflowRunParams, + SendWorkflowSignalParams, + ConsumeWorkflowSignalParams, FailWorkflowRunParams, RescheduleWorkflowRunAfterFailedStepAttemptParams, CompleteWorkflowRunParams, @@ -236,6 +238,199 @@ export class BackendSqlite implements Backend { return Promise.resolve(row ? rowToWorkflowRun(row) : null); } + async sendWorkflowSignal(params: SendWorkflowSignalParams): Promise { + const currentTime = now(); + + if (params.idempotencyKey !== null) { + const existing = this.db + .prepare( + ` + SELECT 1 + FROM "workflow_signals" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "idempotency_key" = ? + LIMIT 1 + `, + ) + .get(this.namespaceId, params.workflowRunId, params.idempotencyKey) as + | { 1: number } + | undefined; + + if (existing) { + this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime); + return; + } + } + + const stmt = this.db.prepare(` + INSERT ${params.idempotencyKey === null ? "" : "OR IGNORE "}INTO "workflow_signals" ( + "namespace_id", + "id", + "workflow_run_id", + "signal", + "data", + "idempotency_key", + "consumed_step_attempt_namespace_id", + "consumed_step_attempt_id", + "consumed_at", + "created_at", + "updated_at" + ) + SELECT ?, ?, ?, ?, ?, ?, NULL, NULL, NULL, ?, ? + FROM "workflow_runs" + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" NOT IN ('succeeded', 'completed', 'failed', 'canceled') + `); + + const result = stmt.run( + this.namespaceId, + generateUUID(), + params.workflowRunId, + params.signal, + toJSON(params.data), + params.idempotencyKey, + currentTime, + currentTime, + this.namespaceId, + params.workflowRunId, + ); + + if (result.changes === 0 && params.idempotencyKey !== null) { + const existing = this.db + .prepare( + ` + SELECT 1 + FROM "workflow_signals" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "idempotency_key" = ? + LIMIT 1 + `, + ) + .get(this.namespaceId, params.workflowRunId, params.idempotencyKey) as + | { 1: number } + | undefined; + + if (existing) { + this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime); + return; + } + } + + if (result.changes === 0) { + const workflowRun = await this.getWorkflowRun({ + workflowRunId: params.workflowRunId, + }); + if (!workflowRun) { + throw new Error(`Workflow run ${params.workflowRunId} does not exist`); + } + + if ( + ["succeeded", "completed", "failed", "canceled"].includes( + workflowRun.status, + ) + ) { + throw new Error( + `Cannot send signal to workflow run ${params.workflowRunId} with status ${workflowRun.status}`, + ); + } + + throw new Error("Failed to send workflow signal"); + } + + this.wakeWorkflowRunForSignal(params.workflowRunId, currentTime); + } + + consumeWorkflowSignal( + params: ConsumeWorkflowSignalParams, + ): Promise { + const existing = this.db + .prepare( + ` + SELECT "data" + FROM "workflow_signals" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "signal" = ? + AND "consumed_step_attempt_namespace_id" = ? + AND "consumed_step_attempt_id" = ? + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + `, + ) + .get( + this.namespaceId, + params.workflowRunId, + params.signal, + this.namespaceId, + params.stepAttemptId, + ) as { data: string | null } | undefined; + + if (existing) { + return Promise.resolve(fromJSON(existing.data) as JsonValue | null); + } + + const currentTime = now(); + const row = this.db + .prepare( + ` + UPDATE "workflow_signals" + SET + "consumed_step_attempt_namespace_id" = ?, + "consumed_step_attempt_id" = ?, + "consumed_at" = ?, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ( + SELECT "id" + FROM "workflow_signals" + WHERE "namespace_id" = ? + AND "workflow_run_id" = ? + AND "signal" = ? + AND "consumed_at" IS NULL + ORDER BY "created_at" ASC, "id" ASC + LIMIT 1 + ) + AND "consumed_at" IS NULL + AND EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_runs" wr + ON wr."namespace_id" = sa."namespace_id" + AND wr."id" = sa."workflow_run_id" + WHERE sa."namespace_id" = ? + AND sa."workflow_run_id" = ? + AND sa."id" = ? + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + AND wr."status" = 'running' + AND wr."worker_id" = ? + ) + RETURNING "data" + `, + ) + .get( + this.namespaceId, + params.stepAttemptId, + currentTime, + currentTime, + this.namespaceId, + this.namespaceId, + params.workflowRunId, + params.signal, + this.namespaceId, + params.workflowRunId, + params.stepAttemptId, + params.workerId, + ) as { data: string | null } | undefined; + + return Promise.resolve( + row ? (fromJSON(row.data) as JsonValue | null) : undefined, + ); + } + async claimWorkflowRun( params: ClaimWorkflowRunParams, ): Promise { @@ -367,7 +562,8 @@ export class BackendSqlite implements Backend { SET "status" = 'running', "available_at" = CASE - WHEN EXISTS ( + WHEN ( + EXISTS ( SELECT 1 FROM "step_attempts" sa JOIN "workflow_runs" child @@ -378,6 +574,19 @@ export class BackendSqlite implements Backend { AND sa."kind" = 'workflow' AND sa."status" = 'running' AND child."status" IN ('completed', 'succeeded', 'failed', 'canceled') + ) OR EXISTS ( + SELECT 1 + FROM "step_attempts" sa + JOIN "workflow_signals" ws + ON ws."namespace_id" = sa."namespace_id" + AND ws."workflow_run_id" = sa."workflow_run_id" + AND ws."signal" = json_extract(sa."context", '$.signal') + WHERE sa."namespace_id" = "workflow_runs"."namespace_id" + AND sa."workflow_run_id" = "workflow_runs"."id" + AND sa."kind" = 'signal-wait' + AND sa."status" = 'running' + AND ws."consumed_at" IS NULL + ) ) AND ? > ? THEN ? ELSE ? END, @@ -601,6 +810,35 @@ export class BackendSqlite implements Backend { return updated; } + private wakeWorkflowRunForSignal( + workflowRunId: string, + currentTime: string, + ): void { + this.db + .prepare( + ` + UPDATE "workflow_runs" + SET + "available_at" = CASE + WHEN "available_at" IS NULL OR "available_at" > ? THEN ? + ELSE "available_at" + END, + "updated_at" = ? + WHERE "namespace_id" = ? + AND "id" = ? + AND "status" IN ('pending', 'running', 'sleeping') + AND "worker_id" IS NULL + `, + ) + .run( + currentTime, + currentTime, + currentTime, + this.namespaceId, + workflowRunId, + ); + } + private wakeParentWorkflowRun(childWorkflowRun: Readonly): void { if ( !childWorkflowRun.parentStepAttemptNamespaceId || diff --git a/packages/openworkflow/sqlite/sqlite.test.ts b/packages/openworkflow/sqlite/sqlite.test.ts index fddd1951..e4eee339 100644 --- a/packages/openworkflow/sqlite/sqlite.test.ts +++ b/packages/openworkflow/sqlite/sqlite.test.ts @@ -98,6 +98,14 @@ describe("sqlite", () => { 'CREATE TABLE IF NOT EXISTS "step_attempts"', ); }); + + test("migrations create workflow_signals table", () => { + const migs = migrations(); + const latestMigration = migs.at(-1); + expect(latestMigration).toContain( + 'CREATE TABLE IF NOT EXISTS "workflow_signals"', + ); + }); }); describe("migrate()", () => { @@ -206,6 +214,17 @@ describe("sqlite", () => { ) .get() as { count: number }; expect(stepAttemptsCheck.count).toBe(1); + + const workflowSignalsCheck = db + .prepare( + ` + SELECT COUNT(*) as count + FROM sqlite_master + WHERE type = 'table' AND name = 'workflow_signals' + `, + ) + .get() as { count: number }; + expect(workflowSignalsCheck.count).toBe(1); }); }); diff --git a/packages/openworkflow/sqlite/sqlite.ts b/packages/openworkflow/sqlite/sqlite.ts index bf3fd163..2e313085 100644 --- a/packages/openworkflow/sqlite/sqlite.ts +++ b/packages/openworkflow/sqlite/sqlite.ts @@ -193,6 +193,50 @@ export function migrations(): string[] { VALUES (4); COMMIT;`, + + // 5 - workflow signals + `BEGIN; + + CREATE TABLE IF NOT EXISTS "workflow_signals" ( + "namespace_id" TEXT NOT NULL, + "id" TEXT NOT NULL, + -- + "workflow_run_id" TEXT NOT NULL, + "signal" TEXT NOT NULL, + "data" TEXT, + "idempotency_key" TEXT, + "consumed_step_attempt_namespace_id" TEXT, + "consumed_step_attempt_id" TEXT, + "consumed_at" TEXT, + "created_at" TEXT NOT NULL, + "updated_at" TEXT NOT NULL, + PRIMARY KEY ("namespace_id", "id"), + FOREIGN KEY ("namespace_id", "workflow_run_id") + REFERENCES "workflow_runs" ("namespace_id", "id") + ON DELETE CASCADE, + FOREIGN KEY ("consumed_step_attempt_namespace_id", "consumed_step_attempt_id") + REFERENCES "step_attempts" ("namespace_id", "id") + ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS "workflow_signals_pending_lookup_idx" + ON "workflow_signals" ( + "namespace_id", + "workflow_run_id", + "signal", + "consumed_at", + "created_at", + "id" + ); + + CREATE UNIQUE INDEX IF NOT EXISTS "workflow_signals_workflow_run_idempotency_key_idx" + ON "workflow_signals" ("namespace_id", "workflow_run_id", "idempotency_key") + WHERE "idempotency_key" IS NOT NULL; + + INSERT OR IGNORE INTO "openworkflow_migrations" ("version") + VALUES (5); + + COMMIT;`, ]; } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 6383cb4c..f80c5bf0 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -1436,6 +1436,201 @@ export function testBackend(options: TestBackendOptions): void { }); }); + describe("workflow signals", () => { + test("buffers signals and consumes them in FIFO order", async () => { + const claimed = await createClaimedWorkflowRun(backend); + const firstWait = await createSignalWaitAttempt(backend, claimed.id); + + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: { order: 1 }, + idempotencyKey: null, + }); + await sleep(10); + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: { order: 2 }, + idempotencyKey: null, + }); + + const first = await backend.consumeWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + stepAttemptId: firstWait.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + const firstReplay = await backend.consumeWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + stepAttemptId: firstWait.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + + expect(first).toEqual({ order: 1 }); + expect(firstReplay).toEqual({ order: 1 }); + + await backend.completeStepAttempt({ + workflowRunId: claimed.id, + stepAttemptId: firstWait.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + output: first ?? null, + }); + + const secondWait = await createSignalWaitAttempt(backend, claimed.id); + const second = await backend.consumeWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + stepAttemptId: secondWait.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + + expect(second).toEqual({ order: 2 }); + }); + + test("dedupes client sends by idempotency key", async () => { + const claimed = await createClaimedWorkflowRun(backend); + + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: { accepted: true }, + idempotencyKey: "signal-key", + }); + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: { accepted: false }, + idempotencyKey: "signal-key", + }); + + const waitAttempt = await createSignalWaitAttempt(backend, claimed.id); + const consumed = await backend.consumeWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + stepAttemptId: waitAttempt.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + + expect(consumed).toEqual({ accepted: true }); + + await backend.completeStepAttempt({ + workflowRunId: claimed.id, + stepAttemptId: waitAttempt.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + output: consumed ?? null, + }); + + const secondWait = await createSignalWaitAttempt(backend, claimed.id); + const next = await backend.consumeWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + stepAttemptId: secondWait.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + + expect(next).toBeUndefined(); + }); + + test("wakes idle runs but does not steal active leases", async () => { + const futureRun = await backend.createWorkflowRun({ + workflowName: randomUUID(), + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: newDateInOneYear(), + deadlineAt: null, + }); + + await backend.sendWorkflowSignal({ + workflowRunId: futureRun.id, + signal: "approval", + data: null, + idempotencyKey: null, + }); + + const woken = await backend.getWorkflowRun({ + workflowRunId: futureRun.id, + }); + expect(woken?.workerId).toBeNull(); + expect(deltaSeconds(woken?.availableAt)).toBeLessThan(1); + + const claimed = await createClaimedWorkflowRun(backend); + const leasedAvailableAt = claimed.availableAt; + + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: null, + idempotencyKey: null, + }); + + const stillLeased = await backend.getWorkflowRun({ + workflowRunId: claimed.id, + }); + expect(stillLeased?.workerId).toBe(claimed.workerId); + expect(stillLeased?.availableAt?.getTime()).toBe( + leasedAvailableAt?.getTime(), + ); + }); + + test("reconciles parked signal waits that already have pending signals", async () => { + const claimed = await createClaimedWorkflowRun(backend); + await createSignalWaitAttempt(backend, claimed.id); + + await backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: { approved: true }, + idempotencyKey: null, + }); + + const parked = await backend.sleepWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + availableAt: newDateInOneYear(), + }); + + expect(parked.workerId).toBeNull(); + expect(deltaSeconds(parked.availableAt)).toBeLessThan(1); + }); + + test("rejects sends to missing and terminal workflow runs", async () => { + const missingId = randomUUID(); + await expect( + backend.sendWorkflowSignal({ + workflowRunId: missingId, + signal: "approval", + data: null, + idempotencyKey: null, + }), + ).rejects.toThrow(`Workflow run ${missingId} does not exist`); + + const claimed = await createClaimedWorkflowRun(backend); + await backend.completeWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + output: null, + }); + + await expect( + backend.sendWorkflowSignal({ + workflowRunId: claimed.id, + signal: "approval", + data: null, + idempotencyKey: null, + }), + ).rejects.toThrow( + `Cannot send signal to workflow run ${claimed.id} with status completed`, + ); + }); + }); + describe("setStepAttemptChildWorkflowRun()", () => { test("sets child workflow linkage on a running step attempt", async () => { const parentRun = await createClaimedWorkflowRun(backend); @@ -2269,6 +2464,32 @@ async function createClaimedWorkflowRun(b: Backend) { return claimed; } +/** + * Create a running signal-wait step attempt for tests. + * @param b - Backend + * @param workflowRunId - Parent workflow run id + * @returns Created step attempt + */ +async function createSignalWaitAttempt(b: Backend, workflowRunId: string) { + const workflowRun = await b.getWorkflowRun({ workflowRunId }); + if (!workflowRun?.workerId) { + throw new Error("Expected claimed workflow run"); + } + + return await b.createStepAttempt({ + workflowRunId, + workerId: workflowRun.workerId, + stepName: randomUUID(), + kind: "signal-wait", + config: {}, + context: { + kind: "signal-wait", + signal: "approval", + timeoutAt: newDateInOneYear().toISOString(), + }, + }); +} + /** * Get delta in seconds from now. * @param date - Date to compare diff --git a/packages/openworkflow/worker/execution.test.ts b/packages/openworkflow/worker/execution.test.ts index 2dba8674..3bc3b45b 100644 --- a/packages/openworkflow/worker/execution.test.ts +++ b/packages/openworkflow/worker/execution.test.ts @@ -721,7 +721,7 @@ describe("StepExecutor", () => { expect(status).toBe("failed"); await expect(handle.result()).rejects.toThrow( - /Workflow timeout must be a non-negative number/, + /Step wait timeout must be a non-negative number/, ); }); @@ -2480,6 +2480,927 @@ describe("StepExecutor", () => { ); expect(childStatus).toBe("completed"); }); + + describe("signals", () => { + test("consumes buffered signals sent before the wait step runs", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-buffered-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal<{ approved: boolean }>("approval"); + }, + ); + + const handle = await workflow.run(); + await client.sendSignal(handle.workflowRun.id, "approval", { + data: { approved: true }, + }); + + const worker = client.newWorker(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ approved: true }); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 10, + }); + expect( + attempts.data.find((attempt) => attempt.stepName === "approval")?.kind, + ).toBe("signal-wait"); + }); + + test("parks while waiting and resumes when the client sends a signal", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-resume-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal<{ approved: boolean }>("approval"); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + + await tickUntilParked(backend, worker, handle.workflowRun.id, 200, 10); + + await client.sendSignal(handle.workflowRun.id, "approval", { + data: { approved: true }, + }); + + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 200, + 10, + ); + + expect(status).toBe("completed"); + await expect(handle.result()).resolves.toEqual({ approved: true }); + }); + + test("returns null when a signal wait times out", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-timeout-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal("approval", { timeout: "10ms" }); + }, + ); + + const handle = await workflow.run(); + const worker = client.newWorker(); + + await worker.tick(); + await sleep(50); + await worker.tick(); + + await expect(handle.result()).resolves.toBeNull(); + }); + + test("fails when a signal wait timeout number is invalid", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-invalid-timeout-number-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal("approval", { timeout: -1 }); + }, + ); + + const worker = client.newWorker(); + const handle = await workflow.run(); + const status = await tickUntilTerminal( + backend, + worker, + handle.workflowRun.id, + 150, + 10, + ); + + expect(status).toBe("failed"); + await expect(handle.result()).rejects.toThrow( + /Step wait timeout must be a non-negative number/, + ); + }); + + test("fails when a consumed signal does not match the schema", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-schema-failure-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal("approval", { + schema: z.object({ approved: z.boolean() }), + }); + }, + ); + + const handle = await workflow.run(); + await client.sendSignal(handle.workflowRun.id, "approval", { + data: { approved: "yes" }, + }); + + const worker = client.newWorker(); + await worker.tick(); + await sleep(50); + + const failedRun = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(failedRun?.status).toBe("failed"); + expect(failedRun?.availableAt).toBeNull(); + expect((failedRun?.error as { message?: string } | null)?.message).toBe( + "Invalid input: expected boolean, received string", + ); + }); + + test("rejects concurrent waits for the same signal in one run", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const workflow = client.defineWorkflow( + { name: `signal-concurrent-${randomUUID()}` }, + async ({ step }) => { + await Promise.all([ + step.waitForSignal("approval"), + step.waitForSignal({ + name: "wait-approval-b", + signal: "approval", + }), + ]); + return null; + }, + ); + + const handle = await workflow.run(undefined, { + deadlineAt: new Date(Date.now() + 50), + }); + const worker = client.newWorker(); + await worker.tick(); + await sleep(100); + + const failedRun = await backend.getWorkflowRun({ + workflowRunId: handle.workflowRun.id, + }); + expect(failedRun?.status).toBe("failed"); + expect((failedRun?.error as { message?: string } | null)?.message).toBe( + 'Signal "approval" is already being awaited by step "approval"', + ); + + const attempts = await backend.listStepAttempts({ + workflowRunId: handle.workflowRun.id, + limit: 10, + }); + expect( + attempts.data.filter((attempt) => attempt.kind === "signal-wait"), + ).toHaveLength(1); + }); + + test("step.sendSignal delivers to another waiting run", async () => { + const backend = await createBackend(); + const client = new OpenWorkflow({ backend }); + + const waiter = client.defineWorkflow( + { name: `signal-waiter-${randomUUID()}` }, + async ({ step }) => { + return await step.waitForSignal<{ approved: boolean }>("approval"); + }, + ); + const sender = client.defineWorkflow<{ targetRunId: string }, string>( + { name: `signal-sender-${randomUUID()}` }, + async ({ input, step }) => { + await step.sendSignal(input.targetRunId, "approval", { + data: { approved: true }, + }); + return "sent"; + }, + ); + + const worker = client.newWorker({ concurrency: 2 }); + const waiterHandle = await waiter.run(); + await tickUntilParked( + backend, + worker, + waiterHandle.workflowRun.id, + 200, + 10, + ); + + const senderHandle = await sender.run({ + targetRunId: waiterHandle.workflowRun.id, + }); + + const waiterStatus = await tickUntilTerminal( + backend, + worker, + waiterHandle.workflowRun.id, + 200, + 10, + ); + const senderStatus = await tickUntilTerminal( + backend, + worker, + senderHandle.workflowRun.id, + 200, + 10, + ); + + expect(waiterStatus).toBe("completed"); + expect(senderStatus).toBe("completed"); + await expect(waiterHandle.result()).resolves.toEqual({ approved: true }); + await expect(senderHandle.result()).resolves.toBe("sent"); + }); + + test("reuses completed signal-send attempts on replay", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-send-replay-completed", + workerId: "worker-signal-send-replay-completed", + }); + const sendWorkflowSignal = vi.fn(() => Promise.resolve()); + const createStepAttempt = vi.fn(); + const completeWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "completed", + workerId: params.workerId, + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "signal-send-completed-attempt", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-send", + status: "completed", + output: null, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + sendWorkflowSignal, + createStepAttempt, + completeWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + await step.sendSignal("target-workflow-run", "approval", { + data: { approved: true }, + }); + return "done"; + }, + workflowVersion: null, + workerId: "worker-signal-send-replay-completed", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(sendWorkflowSignal).not.toHaveBeenCalled(); + expect(createStepAttempt).not.toHaveBeenCalled(); + expect(completeWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: workflowRun.id, + workerId: "worker-signal-send-replay-completed", + output: "done", + }); + }); + + test("creates new signal-send attempts with null payload when options are omitted", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-send-new-null-payload", + workerId: "worker-signal-send-new-null-payload", + }); + const createStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: "signal-send-created-attempt", + workflowRunId: params.workflowRunId, + stepName: params.stepName, + kind: params.kind, + status: "running", + context: params.context, + finishedAt: null, + }), + ), + ); + const sendWorkflowSignal = vi.fn(() => Promise.resolve()); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "approval", + kind: "signal-send", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + createStepAttempt, + sendWorkflowSignal, + completeStepAttempt, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + await step.sendSignal("target-workflow-run", "approval"); + return "done"; + }, + workflowVersion: null, + workerId: "worker-signal-send-new-null-payload", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(sendWorkflowSignal).toHaveBeenCalledWith({ + workflowRunId: "target-workflow-run", + signal: "approval", + data: null, + idempotencyKey: "__signal:signal-send-new-null-payload:approval", + }); + }); + + test("reuses running signal-send attempts on replay", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-send-replay-running", + workerId: "worker-signal-send-replay-running", + }); + const runningAttempt = createMockStepAttempt({ + id: "signal-send-running-attempt", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-send", + status: "running", + finishedAt: null, + }); + const sendWorkflowSignal = vi.fn(() => Promise.resolve()); + const createStepAttempt = vi.fn(); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "approval", + kind: "signal-send", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [runningAttempt], + pagination: { next: null, prev: null }, + }), + ), + sendWorkflowSignal, + createStepAttempt, + completeStepAttempt, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + await step.sendSignal("target-workflow-run", "approval"); + return "done"; + }, + workflowVersion: null, + workerId: "worker-signal-send-replay-running", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(createStepAttempt).not.toHaveBeenCalled(); + expect(sendWorkflowSignal).toHaveBeenCalledWith({ + workflowRunId: "target-workflow-run", + signal: "approval", + data: null, + idempotencyKey: "__signal:signal-send-replay-running:approval", + }); + expect(completeStepAttempt).toHaveBeenCalledWith({ + workflowRunId: workflowRun.id, + stepAttemptId: runningAttempt.id, + workerId: "worker-signal-send-replay-running", + output: null, + }); + }); + + test("reschedules when replayed signal sends fail", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-send-replay-failure", + workerId: "worker-signal-send-replay-failure", + }); + const runningAttempt = createMockStepAttempt({ + id: "signal-send-failing-attempt", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-send", + status: "running", + finishedAt: null, + }); + const sendWorkflowSignal = vi.fn(() => + Promise.reject(new Error("signal emit failed")), + ); + const failStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "approval", + kind: "signal-send", + status: "failed", + error: params.error, + }), + ), + ); + const rescheduleWorkflowRunAfterFailedStepAttempt = vi.fn( + ( + params: Parameters< + Backend["rescheduleWorkflowRunAfterFailedStepAttempt"] + >[0], + ) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "pending", + workerId: null, + availableAt: params.availableAt, + }), + ), + ); + const completeWorkflowRun = vi.fn(); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [runningAttempt], + pagination: { next: null, prev: null }, + }), + ), + sendWorkflowSignal, + failStepAttempt, + rescheduleWorkflowRunAfterFailedStepAttempt, + completeWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + await step.sendSignal("target-workflow-run", "approval"); + return "done"; + }, + workflowVersion: null, + workerId: "worker-signal-send-replay-failure", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const failStepCall = failStepAttempt.mock.calls[0]?.[0]; + if (!failStepCall) { + throw new Error("Expected failStepAttempt to be called"); + } + expect(failStepCall.workflowRunId).toBe(workflowRun.id); + expect(failStepCall.stepAttemptId).toBe(runningAttempt.id); + expect(failStepCall.workerId).toBe("worker-signal-send-replay-failure"); + expect(failStepCall.error).toMatchObject({ + message: "signal emit failed", + }); + expect(rescheduleWorkflowRunAfterFailedStepAttempt).toHaveBeenCalledTimes( + 1, + ); + expect(completeWorkflowRun).not.toHaveBeenCalled(); + }); + + test("returns cached signal waits on replay", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-wait-replay-completed", + workerId: "worker-signal-wait-replay-completed", + }); + const consumeWorkflowSignal = vi.fn(() => Promise.resolve(void 0)); + const completeWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "completed", + workerId: params.workerId, + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "signal-wait-completed-attempt", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-wait", + status: "completed", + output: { approved: true }, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + consumeWorkflowSignal, + completeWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + return await step.waitForSignal<{ approved: boolean }>("approval"); + }, + workflowVersion: null, + workerId: "worker-signal-wait-replay-completed", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(consumeWorkflowSignal).not.toHaveBeenCalled(); + expect(completeWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: workflowRun.id, + workerId: "worker-signal-wait-replay-completed", + output: { approved: true }, + }); + }); + + test("releases signal wait reservations after createStepAttempt errors", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-wait-create-step-error", + workerId: "worker-signal-wait-create-step-error", + }); + const createStepAttempt = vi + .fn() + .mockRejectedValueOnce(new Error("create signal wait failed")) + .mockResolvedValueOnce( + createMockStepAttempt({ + id: "signal-wait-created-after-retry", + workflowRunId: workflowRun.id, + stepName: "approval-retry", + kind: "signal-wait", + status: "running", + context: { + kind: "signal-wait", + signal: "approval", + timeoutAt: new Date("2027-01-01T00:00:00.000Z").toISOString(), + }, + finishedAt: null, + }), + ); + const consumeWorkflowSignal = vi.fn(() => + Promise.resolve({ approved: true }), + ); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "approval-retry", + kind: "signal-wait", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + createStepAttempt, + consumeWorkflowSignal, + completeStepAttempt, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + try { + await step.waitForSignal("approval"); + } catch (error) { + expect((error as Error).message).toBe("create signal wait failed"); + } + + return await step.waitForSignal<{ approved: boolean }>({ + name: "approval-retry", + signal: "approval", + }); + }, + workflowVersion: null, + workerId: "worker-signal-wait-create-step-error", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(createStepAttempt).toHaveBeenCalledTimes(2); + expect(consumeWorkflowSignal).toHaveBeenCalledWith({ + workflowRunId: workflowRun.id, + signal: "approval", + stepAttemptId: "signal-wait-created-after-retry", + workerId: "worker-signal-wait-create-step-error", + }); + }); + + test("parks replayed signal waits with a default timeout when context is missing", async () => { + const createdAt = new Date("2026-02-03T04:05:06.000Z"); + const workflowRun = createMockWorkflowRun({ + id: "signal-wait-missing-context", + workerId: "worker-signal-wait-missing-context", + }); + const sleepWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "running", + workerId: null, + availableAt: params.availableAt, + }), + ), + ); + const createStepAttempt = vi.fn(); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "signal-send-running-side-step", + workflowRunId: workflowRun.id, + stepName: "notify", + kind: "signal-send", + status: "running", + finishedAt: null, + }), + createMockStepAttempt({ + id: "signal-wait-running-missing-context", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-wait", + status: "running", + context: null, + createdAt, + finishedAt: null, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + createStepAttempt, + consumeWorkflowSignal: vi.fn(() => Promise.resolve(void 0)), + sleepWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + return await step.waitForSignal("approval"); + }, + workflowVersion: null, + workerId: "worker-signal-wait-missing-context", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(createStepAttempt).not.toHaveBeenCalled(); + const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0]; + if (!sleepCall) { + throw new Error("Expected sleepWorkflowRun to be called"); + } + expect(sleepCall.availableAt.toISOString()).toBe( + "2027-02-03T04:05:06.000Z", + ); + }); + + test("parks replayed signal waits with a default timeout when stored timeout is invalid", async () => { + const createdAt = new Date("2026-05-06T07:08:09.000Z"); + const workflowRun = createMockWorkflowRun({ + id: "signal-wait-invalid-timeout", + workerId: "worker-signal-wait-invalid-timeout", + }); + const sleepWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "running", + workerId: null, + availableAt: params.availableAt, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "signal-wait-running-invalid-timeout", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-wait", + status: "running", + context: { + kind: "signal-wait", + signal: "approval", + timeoutAt: "not-a-date", + }, + createdAt, + finishedAt: null, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + consumeWorkflowSignal: vi.fn(() => Promise.resolve(void 0)), + sleepWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + return await step.waitForSignal("approval"); + }, + workflowVersion: null, + workerId: "worker-signal-wait-invalid-timeout", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0]; + if (!sleepCall) { + throw new Error("Expected sleepWorkflowRun to be called"); + } + expect(sleepCall.availableAt.toISOString()).toBe( + "2027-05-06T07:08:09.000Z", + ); + }); + + test("consumes replayed signal waits even when legacy context is missing", async () => { + const workflowRun = createMockWorkflowRun({ + id: "signal-wait-missing-context-consume", + workerId: "worker-signal-wait-missing-context-consume", + }); + const completeStepAttempt = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "approval", + kind: "signal-wait", + status: "completed", + output: params.output ?? null, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "signal-wait-running-missing-context-consume", + workflowRunId: workflowRun.id, + stepName: "approval", + kind: "signal-wait", + status: "running", + context: null, + finishedAt: null, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + consumeWorkflowSignal: vi.fn(() => Promise.resolve({ approved: true })), + completeStepAttempt, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + return await step.waitForSignal<{ approved: boolean }>("approval"); + }, + workflowVersion: null, + workerId: "worker-signal-wait-missing-context-consume", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + expect(completeStepAttempt).toHaveBeenCalledWith({ + workflowRunId: workflowRun.id, + stepAttemptId: "signal-wait-running-missing-context-consume", + workerId: "worker-signal-wait-missing-context-consume", + output: { approved: true }, + }); + }); + }); + + test("parks replayed workflow waits with a default timeout when stored timeout is invalid", async () => { + const createdAt = new Date("2026-06-07T08:09:10.000Z"); + const workflowRun = createMockWorkflowRun({ + id: "workflow-wait-invalid-timeout", + workerId: "worker-workflow-wait-invalid-timeout", + }); + const sleepWorkflowRun = vi.fn( + (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "running", + workerId: null, + availableAt: params.availableAt, + }), + ), + ); + const backend = createExecutionBackendMock({ + listStepAttempts: vi.fn(() => + Promise.resolve({ + data: [ + createMockStepAttempt({ + id: "workflow-running-invalid-timeout", + workflowRunId: workflowRun.id, + stepName: "child-workflow", + kind: "workflow", + status: "running", + context: { + kind: "workflow", + timeoutAt: "not-a-date", + }, + childWorkflowRunNamespaceId: "default", + childWorkflowRunId: "child-workflow-run-id", + createdAt, + finishedAt: null, + }), + ], + pagination: { next: null, prev: null }, + }), + ), + getWorkflowRun: vi.fn(() => + Promise.resolve( + createMockWorkflowRun({ + id: "child-workflow-run-id", + status: "running", + workerId: "child-worker", + }), + ), + ), + sleepWorkflowRun, + }); + + await executeWorkflow({ + backend, + workflowRun, + workflowFn: async ({ step }: WorkflowFunctionParams) => { + return await step.runWorkflow( + { + name: "child-workflow", + version: "1.0.0", + }, + null, + ); + }, + workflowVersion: null, + workerId: "worker-workflow-wait-invalid-timeout", + retryPolicy: DEFAULT_WORKFLOW_RETRY_POLICY, + }); + + const sleepCall = sleepWorkflowRun.mock.calls[0]?.[0]; + if (!sleepCall) { + throw new Error("Expected sleepWorkflowRun to be called"); + } + expect(sleepCall.availableAt.toISOString()).toBe( + "2027-06-07T08:09:10.000Z", + ); + }); }); describe("executeWorkflow", () => { @@ -3288,6 +4209,17 @@ describe("createStepExecutionStateFromAttempts", () => { stepName: "step-c", status: "running", }); + const runningSignalWait = createMockStepAttempt({ + id: "running-signal", + stepName: "wait-for-approval", + kind: "signal-wait", + status: "running", + context: { + kind: "signal-wait", + signal: "approval", + timeoutAt: new Date("2025-01-01T00:01:00Z").toISOString(), + }, + }); const state = createStepExecutionStateFromAttempts([ completed, @@ -3295,6 +4227,7 @@ describe("createStepExecutionStateFromAttempts", () => { failedA2, failedB, running, + runningSignalWait, ]); expect(state.cache.size).toBe(1); @@ -3309,6 +4242,9 @@ describe("createStepExecutionStateFromAttempts", () => { expect(state.failedByStepName.get("step-b")).toBe(failedB); expect(state.failedByStepName.has("step-c")).toBe(false); expect(state.runningByStepName.get("step-c")).toBe(running); + expect(state.runningSignalWaitBySignal.get("approval")).toBe( + runningSignalWait, + ); expect(state.runningByStepName.has("step-b")).toBe(false); }); @@ -3319,6 +4255,7 @@ describe("createStepExecutionStateFromAttempts", () => { expect(state.failedCountsByStepName.size).toBe(0); expect(state.failedByStepName.size).toBe(0); expect(state.runningByStepName.size).toBe(0); + expect(state.runningSignalWaitBySignal.size).toBe(0); }); }); @@ -3480,3 +4417,103 @@ function createMockWorkflowRun( ...overrides, }; } + +function createExecutionBackendMock(overrides: Partial = {}): Backend { + const backend: Partial = { + listStepAttempts: () => + Promise.resolve({ + data: [], + pagination: { next: null, prev: null }, + }), + createStepAttempt: (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: `created-${params.stepName}`, + workflowRunId: params.workflowRunId, + stepName: params.stepName, + kind: params.kind, + status: "running", + context: params.context, + output: null, + error: null, + finishedAt: null, + }), + ), + completeStepAttempt: ( + params: Parameters[0], + ) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "step", + status: "completed", + output: params.output ?? null, + }), + ), + failStepAttempt: (params: Parameters[0]) => + Promise.resolve( + createMockStepAttempt({ + id: params.stepAttemptId, + workflowRunId: params.workflowRunId, + stepName: "step", + status: "failed", + error: params.error, + }), + ), + completeWorkflowRun: ( + params: Parameters[0], + ) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "completed", + workerId: params.workerId, + output: params.output ?? null, + }), + ), + failWorkflowRun: (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "failed", + workerId: null, + error: params.error, + }), + ), + rescheduleWorkflowRunAfterFailedStepAttempt: ( + params: Parameters< + Backend["rescheduleWorkflowRunAfterFailedStepAttempt"] + >[0], + ) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "pending", + workerId: null, + availableAt: params.availableAt, + error: params.error, + }), + ), + sleepWorkflowRun: (params: Parameters[0]) => + Promise.resolve( + createMockWorkflowRun({ + id: params.workflowRunId, + status: "running", + workerId: null, + availableAt: params.availableAt, + }), + ), + sendWorkflowSignal: () => { + // no-op + return Promise.resolve(); + }, + consumeWorkflowSignal: () => Promise.resolve(void 0), + getWorkflowRun: () => Promise.resolve(null), + }; + + return { + ...backend, + ...overrides, + } as Backend; +} diff --git a/packages/openworkflow/worker/execution.ts b/packages/openworkflow/worker/execution.ts index d825d737..9f3ca172 100644 --- a/packages/openworkflow/worker/execution.ts +++ b/packages/openworkflow/worker/execution.ts @@ -6,6 +6,7 @@ import { type SerializedError, } from "../core/error.js"; import type { JsonValue } from "../core/json.js"; +import type { StandardSchemaV1 } from "../core/standard-schema.js"; import type { StepAttempt, StepAttemptCache } from "../core/step-attempt.js"; import { getCachedStepAttempt, @@ -13,6 +14,7 @@ import { normalizeStepOutput, calculateDateFromDuration, createSleepContext, + createSignalWaitContext, createWorkflowContext, } from "../core/step-attempt.js"; import { @@ -26,6 +28,7 @@ import type { StepApi, StepFunction, StepFunctionConfig, + StepWaitTimeout, WorkflowFunction, WorkflowRunMetadata, } from "../core/workflow-function.js"; @@ -123,11 +126,7 @@ const DEFAULT_STEP_RETRY_POLICY: RetryPolicy = { maximumAttempts: 10, }; -/** - * Retry policy for workflow step failures (no retries - the child workflow - * is responsible for retries). - */ -const WORKFLOW_STEP_FAILURE_RETRY_POLICY: RetryPolicy = { +const TERMINAL_STEP_RETRY_POLICY: RetryPolicy = { ...DEFAULT_STEP_RETRY_POLICY, maximumAttempts: 1, }; @@ -197,6 +196,7 @@ export interface StepExecutionState { failedCountsByStepName: ReadonlyMap; failedByStepName: ReadonlyMap; runningByStepName: ReadonlyMap; + runningSignalWaitBySignal: ReadonlyMap; } /** @@ -211,6 +211,7 @@ export function createStepExecutionStateFromAttempts( const failedCountsByStepName = new Map(); const failedByStepName = new Map(); const runningByStepName = new Map(); + const runningSignalWaitBySignal = new Map(); for (const attempt of attempts) { if (attempt.status === "completed" || attempt.status === "succeeded") { @@ -226,6 +227,9 @@ export function createStepExecutionStateFromAttempts( } runningByStepName.set(attempt.stepName, attempt); + if (attempt.context?.kind === "signal-wait") { + runningSignalWaitBySignal.set(attempt.context.signal, attempt); + } } return { @@ -233,20 +237,19 @@ export function createStepExecutionStateFromAttempts( failedCountsByStepName, failedByStepName, runningByStepName, + runningSignalWaitBySignal, }; } /** - * Resolve workflow timeout input to an absolute deadline. + * Resolve a step wait timeout input to an absolute deadline. * @param timeout - Relative/absolute timeout input * @returns Absolute timeout deadline * @throws {Error} When timeout is invalid */ -function resolveWorkflowTimeoutAt( - timeout: number | string | Date | undefined, -): Date { +function resolveWaitTimeoutAt(timeout?: StepWaitTimeout): Date { if (timeout === undefined) { - return defaultWorkflowTimeoutAt(); + return defaultWaitTimeoutAt(); } if (timeout instanceof Date) { @@ -255,7 +258,7 @@ function resolveWorkflowTimeoutAt( if (typeof timeout === "number") { if (!Number.isFinite(timeout) || timeout < 0) { - throw new Error("Workflow timeout must be a non-negative number"); + throw new Error("Step wait timeout must be a non-negative number"); } return new Date(Date.now() + timeout); } @@ -268,11 +271,11 @@ function resolveWorkflowTimeoutAt( } /** - * Default workflow timeout: 1 year from a base time. + * Default step wait timeout: 1 year from a base time. * @param base - Base timestamp (defaults to now) * @returns Timeout deadline */ -function defaultWorkflowTimeoutAt(base: Readonly = new Date()): Date { +function defaultWaitTimeoutAt(base: Readonly = new Date()): Date { const timeoutAt = new Date(base); timeoutAt.setFullYear(timeoutAt.getFullYear() + 1); return timeoutAt; @@ -290,7 +293,20 @@ function getWorkflowTimeoutAt(attempt: Readonly): Date | null { if (attempt.context.timeoutAt === null) { // Backward compatibility for previously persisted workflow contexts. - return defaultWorkflowTimeoutAt(attempt.createdAt); + return defaultWaitTimeoutAt(attempt.createdAt); + } + + return new Date(attempt.context.timeoutAt); +} + +/** + * Extract the signal wait timeout from a persisted step attempt's context. + * @param attempt - Running signal-wait step attempt + * @returns Timeout deadline, or null when context is not signal-wait + */ +function getSignalWaitTimeoutAt(attempt: Readonly): Date | null { + if (attempt.context?.kind !== "signal-wait") { + return null; } return new Date(attempt.context.timeoutAt); @@ -338,18 +354,28 @@ function getRunningWaitAttemptResumeAt( } if (attempt.kind !== "workflow") { - return null; + if (attempt.kind !== "signal-wait") { + return null; + } + + const timeoutAt = + getSignalWaitTimeoutAt(attempt) ?? + defaultWaitTimeoutAt(attempt.createdAt); + if (Number.isFinite(timeoutAt.getTime())) { + return timeoutAt; + } + + return defaultWaitTimeoutAt(attempt.createdAt); } const timeoutAt = - getWorkflowTimeoutAt(attempt) ?? - defaultWorkflowTimeoutAt(attempt.createdAt); + getWorkflowTimeoutAt(attempt) ?? defaultWaitTimeoutAt(attempt.createdAt); if (Number.isFinite(timeoutAt.getTime())) { return timeoutAt; } // Backward compatibility for malformed historical workflow timeout values. - return defaultWorkflowTimeoutAt(attempt.createdAt); + return defaultWaitTimeoutAt(attempt.createdAt); } /** @@ -463,6 +489,19 @@ function buildWorkflowIdempotencyKey(attempt: Readonly): string { return `__workflow:${attempt.namespaceId}:${attempt.id}`; } +/** + * Build deterministic idempotency key for step.sendSignal. + * @param workflowRunId - Current workflow run id + * @param stepName - Durable step name + * @returns Stable idempotency key + */ +function buildSignalIdempotencyKey( + workflowRunId: string, + stepName: string, +): string { + return `__signal:${workflowRunId}:${stepName}`; +} + /** * Configures the options for a StepExecutor. */ @@ -482,9 +521,31 @@ interface RunWorkflowStepRequest< > { workflowSpec: WorkflowSpec; input: RunInput | undefined; - timeout: number | string | Date | undefined; + timeout: StepWaitTimeout | undefined; } +interface WaitForSignalStepRequest { + signal: string; + timeout: StepWaitTimeout | undefined; + schema: StandardSchemaV1 | undefined; +} + +type WaitForSignalOptions = Readonly<{ + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; +}>; + +type NamedWaitForSignalOptions = Readonly<{ + name?: string; + signal: string; + timeout?: StepWaitTimeout; + schema?: StandardSchemaV1; +}>; + +type WaitForSignalRequestInput = + | string + | NamedWaitForSignalOptions; + /** * Replays prior step attempts and persists new ones while memoizing * deterministic step outputs. @@ -499,6 +560,7 @@ class StepExecutor implements StepApi { private readonly failedCountsByStepName: Map; private readonly failedByStepName: Map; private readonly runningByStepName: Map; + private readonly activeSignalWaitStepNameBySignal: Map; private readonly expectedNextStepIndexByName: Map; private readonly resolvedStepNames: Set; private readonly executionFence: ExecutionFenceController; @@ -515,6 +577,14 @@ class StepExecutor implements StepApi { this.failedCountsByStepName = new Map(state.failedCountsByStepName); this.failedByStepName = new Map(state.failedByStepName); this.runningByStepName = new Map(state.runningByStepName); + this.activeSignalWaitStepNameBySignal = new Map( + [...state.runningSignalWaitBySignal.entries()].map( + ([signal, attempt]): readonly [string, string] => [ + signal, + attempt.stepName, + ], + ), + ); this.expectedNextStepIndexByName = new Map(); this.resolvedStepNames = new Set(); this.executionFence = options.executionFence; @@ -674,6 +744,230 @@ class StepExecutor implements StepApi { throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); } + // ---- step.sendSignal --------------------------------------------------- + + async sendSignal( + workflowRunId: string, + signal: string, + options?: Readonly<{ + data?: JsonValue; + }>, + ): Promise { + const stepName = this.resolveStepName(signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) return; + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-send") { + await this.resolveRunningSignalSend( + stepName, + runningAttempt, + workflowRunId, + signal, + options?.data ?? null, + ); + return; + } + + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + const attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-send", + config: {}, + context: null, + }); + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + await this.resolveRunningSignalSend( + stepName, + attempt, + workflowRunId, + signal, + options?.data ?? null, + ); + } + + private async resolveRunningSignalSend( + stepName: string, + attempt: Readonly, + workflowRunId: string, + signal: string, + data: JsonValue | null, + ): Promise { + try { + await this.backend.sendWorkflowSignal({ + workflowRunId, + signal, + data, + idempotencyKey: buildSignalIdempotencyKey(this.workflowRunId, stepName), + }); + + const savedAttempt = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output: null, + }); + this.cache = addToStepAttemptCache(this.cache, savedAttempt); + this.runningByStepName.delete(stepName); + } catch (error) { + return await this.failStepWithError( + stepName, + attempt.id, + error, + DEFAULT_STEP_RETRY_POLICY, + ); + } + } + + // ---- step.waitForSignal ----------------------------------------------- + + async waitForSignal( + signal: string, + options?: WaitForSignalOptions, + ): Promise; + async waitForSignal( + options: NamedWaitForSignalOptions, + ): Promise; + async waitForSignal( + signalOrOptions: WaitForSignalRequestInput, + options?: WaitForSignalOptions, + ): Promise { + const request = + typeof signalOrOptions === "string" + ? { + name: undefined, + signal: signalOrOptions, + timeout: options?.timeout, + schema: options?.schema, + } + : signalOrOptions; + const stepName = this.resolveStepName(request.name ?? request.signal); + + const existingAttempt = getCachedStepAttempt(this.cache, stepName); + if (existingAttempt) { + return existingAttempt.output as Output | null; + } + + const runningAttempt = this.runningByStepName.get(stepName); + if (runningAttempt?.kind === "signal-wait") { + return await this.resolveRunningSignalWait(stepName, runningAttempt, { + signal: + runningAttempt.context?.kind === "signal-wait" + ? runningAttempt.context.signal + : request.signal, + timeout: request.timeout, + schema: request.schema, + }); + } + + const activeSignalWaitStepName = this.activeSignalWaitStepNameBySignal.get( + request.signal, + ); + if (activeSignalWaitStepName && activeSignalWaitStepName !== stepName) { + throw new Error( + `Signal "${request.signal}" is already being awaited by step "${activeSignalWaitStepName}"`, + ); + } + + const timeoutAt = resolveWaitTimeoutAt(request.timeout); + this.assertExecutionActive(); + this.ensureStepLimitNotReached(); + this.activeSignalWaitStepNameBySignal.set(request.signal, stepName); + + let attempt: StepAttempt; + try { + attempt = await this.backend.createStepAttempt({ + workflowRunId: this.workflowRunId, + workerId: this.workerId, + stepName, + kind: "signal-wait", + config: {}, + context: createSignalWaitContext(request.signal, timeoutAt), + }); + } catch (error) { + this.releaseActiveSignalWait(request.signal, stepName); + throw error; + } + + this.stepCount += 1; + this.runningByStepName.set(stepName, attempt); + + return await this.resolveRunningSignalWait(stepName, attempt, { + signal: request.signal, + timeout: request.timeout, + schema: request.schema, + }); + } + + private releaseActiveSignalWait(signal: string, stepName: string): void { + if (this.activeSignalWaitStepNameBySignal.get(signal) === stepName) { + this.activeSignalWaitStepNameBySignal.delete(signal); + } + } + + private async resolveRunningSignalWait( + stepName: string, + attempt: Readonly, + request: Readonly>, + ): Promise { + const signalData = await this.backend.consumeWorkflowSignal({ + workflowRunId: this.workflowRunId, + signal: request.signal, + stepAttemptId: attempt.id, + workerId: this.workerId, + }); + + if (signalData !== undefined) { + const validationResult = await validateInput(request.schema, signalData); + if (!validationResult.success) { + this.releaseActiveSignalWait(request.signal, stepName); + return await this.failStepWithError( + stepName, + attempt.id, + new Error(validationResult.error), + TERMINAL_STEP_RETRY_POLICY, + ); + } + + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output: normalizeStepOutput(validationResult.value), + }); + this.releaseActiveSignalWait(request.signal, stepName); + this.runningByStepName.delete(stepName); + this.cache = addToStepAttemptCache(this.cache, completed); + return completed.output as Output | null; + } + + const persistedTimeoutAt = getSignalWaitTimeoutAt(attempt); + const timeoutAt = + persistedTimeoutAt && Number.isFinite(persistedTimeoutAt.getTime()) + ? persistedTimeoutAt + : defaultWaitTimeoutAt(attempt.createdAt); + if (Date.now() >= timeoutAt.getTime()) { + const completed = await this.backend.completeStepAttempt({ + workflowRunId: this.workflowRunId, + stepAttemptId: attempt.id, + workerId: this.workerId, + output: null, + }); + this.releaseActiveSignalWait(request.signal, stepName); + this.runningByStepName.delete(stepName); + this.cache = addToStepAttemptCache(this.cache, completed); + return null; + } + + throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(timeoutAt)); + } + // ---- step.runWorkflow ----------------------------------------------- async runWorkflow( @@ -713,7 +1007,7 @@ class StepExecutor implements StepApi { throw new StepError({ stepName, stepFailedAttempts: this.failedCountsByStepName.get(stepName) ?? 1, - retryPolicy: WORKFLOW_STEP_FAILURE_RETRY_POLICY, + retryPolicy: TERMINAL_STEP_RETRY_POLICY, error: failedError, }); } @@ -729,7 +1023,7 @@ class StepExecutor implements StepApi { } // First encounter — create the workflow step and child workflow run - const timeoutAt = resolveWorkflowTimeoutAt(request.timeout); + const timeoutAt = resolveWaitTimeoutAt(request.timeout); this.assertExecutionActive(); this.ensureStepLimitNotReached(); const attempt = await this.backend.createStepAttempt({ @@ -784,7 +1078,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -798,7 +1092,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" could not find linked child workflow run "${childId}"`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -808,7 +1102,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, new Error("Timed out waiting for child workflow to complete"), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -835,7 +1129,7 @@ class StepExecutor implements StepApi { stepName, workflowAttempt.id, childError, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -847,7 +1141,7 @@ class StepExecutor implements StepApi { new Error( `Workflow step "${stepName}" failed because child workflow run "${childRun.id}" was canceled`, ), - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); } @@ -856,7 +1150,7 @@ class StepExecutor implements StepApi { const resumeAt = timeoutAt && Number.isFinite(timeoutAt.getTime()) ? timeoutAt - : defaultWorkflowTimeoutAt(workflowAttempt.createdAt); + : defaultWaitTimeoutAt(workflowAttempt.createdAt); throw new SleepSignal(this.resolveEarliestRunningWaitResumeAt(resumeAt)); } @@ -973,7 +1267,7 @@ class StepExecutor implements StepApi { stepName, stepAttemptId, error, - WORKFLOW_STEP_FAILURE_RETRY_POLICY, + TERMINAL_STEP_RETRY_POLICY, ); }