Skip to content

Latest commit

 

History

History
177 lines (136 loc) · 5.4 KB

File metadata and controls

177 lines (136 loc) · 5.4 KB

Workers

Workers are a pull-based alternative to webhooks. Instead of CueAPI pushing an HTTP request to your endpoint, your worker process polls CueAPI for pending executions, processes them, and reports the outcome.

When to use workers vs webhooks

Webhooks Workers
Setup Provide a public URL Run a worker process
Network CueAPI must reach your endpoint Worker must reach CueAPI
Best for Simple integrations, serverless Long-running tasks, firewalled environments, AI agents
Timeout Limited by WEBHOOK_TIMEOUT_SECONDS Limited by WORKER_CLAIM_TIMEOUT_SECONDS (default 15 min)

Use workers when:

  • Your tasks run longer than 30 seconds
  • Your processing environment is behind a firewall or NAT
  • You want fine-grained control over concurrency and retries
  • You are orchestrating AI agent workflows

Creating a worker-transport cue

Set transport to "worker". No url is needed.

curl -X POST http://localhost:8000/v1/cues \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer cue_sk_..." \
  -d '{
    "name": "process-reports",
    "schedule": {"type": "recurring", "cron": "0 9 * * *", "timezone": "UTC"},
    "transport": "worker",
    "payload": {"report_type": "daily"}
  }'

Worker lifecycle

1. Poll for claimable executions

curl http://localhost:8000/v1/executions/claimable \
  -H "Authorization: Bearer cue_sk_..."

Response:

{
  "executions": [
    {
      "execution_id": "exec_abc123",
      "cue_id": "cue_...",
      "cue_name": "process-reports",
      "payload": {"report_type": "daily"},
      "scheduled_for": "2025-01-01T09:00:00Z",
      "attempt": 1
    }
  ]
}

If no executions are available, executions will be an empty array. Poll again after a short delay.

2. Claim an execution

curl -X POST http://localhost:8000/v1/executions/exec_abc123/claim \
  -H "Authorization: Bearer cue_sk_..." \
  -H "Content-Type: application/json" \
  -d '{"worker_id": "my-worker-001"}'

Response:

{
  "id": "exec_abc123",
  "status": "claimed",
  "claimed_at": "2025-01-01T09:00:01Z"
}

Once claimed, no other worker can claim the same execution. The claim is held for WORKER_CLAIM_TIMEOUT_SECONDS (default 900 seconds / 15 minutes). If you do not report an outcome before the timeout, the execution is released back to the queue.

3. Do your work

Process the payload however you need. This is your application logic.

4. Report the outcome

Success:

curl -X POST http://localhost:8000/v1/executions/exec_abc123/outcome \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer cue_sk_..." \
  -d '{
    "success": true,
    "result": "rows_processed: 142"
  }'

Failure:

curl -X POST http://localhost:8000/v1/executions/exec_abc123/outcome \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer cue_sk_..." \
  -d '{
    "success": false,
    "error": "Database connection refused"
  }'

Running multiple workers

You can run as many worker processes as you need. The claim mechanism ensures each execution is processed by exactly one worker. Workers can run on different machines, in different regions, or in different containers.

Tips for multi-worker setups:

  • Each worker polls independently. Stagger poll intervals slightly to reduce contention.
  • A typical poll interval is 1-5 seconds. Adjust based on your latency requirements.
  • Workers do not need to coordinate with each other. CueAPI handles distribution.

Timeouts and heartbeats

Two timeouts govern worker behavior:

  • WORKER_HEARTBEAT_TIMEOUT_SECONDS (default 180): If a worker has not polled /v1/executions/claimable within this window, CueAPI considers it stale. Executions claimed by stale workers are released.

  • WORKER_CLAIM_TIMEOUT_SECONDS (default 900): Maximum time a worker can hold a claimed execution. If no outcome is reported within this window, the execution is released back to the queue for another worker to claim.

Set WORKER_CLAIM_TIMEOUT_SECONDS to be longer than your longest expected task duration. If your tasks routinely take 30 minutes, set it to at least 2100 (35 minutes).

Minimal worker example (Python)

import time
import requests

API = "http://localhost:8000"
TOKEN = "cue_sk_..."
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

while True:
    # Poll
    resp = requests.get(f"{API}/v1/executions/claimable", headers=HEADERS)
    executions = resp.json().get("executions", [])

    for execution in executions:
        exec_id = execution["execution_id"]

        # Claim
        claim = requests.post(
            f"{API}/v1/executions/{exec_id}/claim",
            headers={**HEADERS, "Content-Type": "application/json"},
            json={"worker_id": "my-worker-001"},
        )
        if claim.status_code != 200:
            continue

        # Process
        try:
            result = do_work(execution["payload"])
            requests.post(
                f"{API}/v1/executions/{exec_id}/outcome",
                headers={**HEADERS, "Content-Type": "application/json"},
                json={"success": True, "result": str(result)},
            )
        except Exception as e:
            requests.post(
                f"{API}/v1/executions/{exec_id}/outcome",
                headers={**HEADERS, "Content-Type": "application/json"},
                json={"success": False, "error": str(e)},
            )

    time.sleep(2)