Skip to content

fix: enable concurrent task processing and add retry with exponential backoff#40

Merged
sirily11 merged 3 commits intomainfrom
copilot/fix-worker-timeout-issues
Mar 25, 2026
Merged

fix: enable concurrent task processing and add retry with exponential backoff#40
sirily11 merged 3 commits intomainfrom
copilot/fix-worker-timeout-issues

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Mar 25, 2026

Worker processes translation tasks sequentially despite prefetch > 1, and silently discards failed tasks with no retry.

Concurrent processing

The channel.consume callback was async — amqplib ignores the returned Promise, making the concurrency behavior implicit and fragile. Changed to a synchronous callback with explicit .then()/.catch():

// Before: async callback, amqplib ignores the promise
async (message) => {
  await handler(task);
  channel.ack(message);
}

// After: synchronous callback, concurrent by design
(message) => {
  handler(task)
    .then(() => channel.ack(message))
    .catch((error) => channel.nack(message, false, !message.fields.redelivered));
}

Tasks now run concurrently up to the WORKER_PREFETCH limit (default 5).

Retry on error

  • runWorkerTask retries up to WORKER_MAX_RETRIES (default 3) with exponential backoff (1s, 2s, 4s)
  • Error events emitted to clients only on final failure, not intermediate attempts
  • Active-run lock is renewed before each retry to prevent expiry
  • Consumer requeues messages once on infrastructure-level failures (Redis/AMQP down); discards on second redelivery

Cleanup

  • Removed duplicate releaseActiveRun in worker.ts — already called in worker/index.ts's handleTask finally block

Copilot AI and others added 2 commits March 25, 2026 08:27
… backoff

- Make consumer callback synchronous so amqplib delivers messages
  concurrently up to the prefetch limit (default 5)
- Add retry loop in runWorkerTask with configurable max retries
  (WORKER_MAX_RETRIES env var, default 3) and exponential backoff
- Only emit error events to clients on final retry failure
- On consumer-level errors, requeue message once before discarding
- Remove duplicate releaseActiveRun call from worker.ts (already
  handled in worker/index.ts)

Co-authored-by: sirily11 <32106111+sirily11@users.noreply.github.com>
Agent-Logs-Url: https://github.com/rxtech-lab/universal-translation/sessions/4a4908a8-f7b5-4745-9b67-a358b121acaf
@sirily11 sirily11 marked this pull request as ready for review March 25, 2026 15:20
Copilot AI review requested due to automatic review settings March 25, 2026 15:20
@autopilot-project-manager autopilot-project-manager bot added bug Something isn't working enhancement New feature or request labels Mar 25, 2026
@vercel
Copy link
Copy Markdown

vercel bot commented Mar 25, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
universal-translation Ready Ready Preview, Comment Mar 25, 2026 3:44pm

Request Review

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the worker’s AMQP consumption and task execution flow to better utilize RabbitMQ prefetch concurrency and to add retry behavior with exponential backoff for task failures.

Changes:

  • Switches the RabbitMQ consume callback to a synchronous function and explicitly ack/nack via .then()/.catch() to make concurrency behavior explicit.
  • Adds retry-with-exponential-backoff logic to runWorkerTask, emitting client-visible errors only on final failure.
  • Removes duplicated active-run lock release from lib/translation/worker.ts (lock lifecycle handled in worker/index.ts).

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
lib/translation/worker.ts Adds retry/backoff loop around task execution and removes redundant lock release.
lib/queue/consumer.ts Refactors the consumer callback to non-async and handles ack/nack in promise handlers for explicit concurrency.

logWorker("chat_finished", taskDetails(task));
}

const WORKER_MAX_RETRIES = Number(process.env.WORKER_MAX_RETRIES ?? "3");
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WORKER_MAX_RETRIES is parsed with Number(...). If the env var is non-numeric, this becomes NaN, causing the retry loop condition (attempt <= WORKER_MAX_RETRIES) to be false and the task to be silently skipped after logging "task_runner_started". Consider parsing as an integer and falling back to the default when the value is not finite / < 0 (and optionally clamping to a reasonable max).

Suggested change
const WORKER_MAX_RETRIES = Number(process.env.WORKER_MAX_RETRIES ?? "3");
function getWorkerMaxRetries(): number {
const raw = process.env.WORKER_MAX_RETRIES;
const parsed =
raw !== undefined ? Number.parseInt(raw, 10) : Number.NaN;
if (!Number.isFinite(parsed) || parsed < 0) {
return 3;
}
const MAX_RETRIES = 10;
return Math.min(parsed, MAX_RETRIES);
}
const WORKER_MAX_RETRIES = getWorkerMaxRetries();

Copilot uses AI. Check for mistakes.
`${taskDetails(task)} attempt=${attempt + 1}/${WORKER_MAX_RETRIES + 1} delay=${delay}ms`,
);
await sleep(delay);
await renewActiveRun(task.projectId, task.runId);
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renewActiveRun() can return false when the active-run lock is no longer owned by this runId. Right now the retry path ignores the return value, so the worker may keep retrying (and emitting events) even after losing the lock, potentially allowing concurrent processing for the same project. Consider checking the boolean result and aborting retries (and/or emitting the same lock-failure error used elsewhere) if the lock cannot be renewed.

Suggested change
await renewActiveRun(task.projectId, task.runId);
const renewed = await renewActiveRun(task.projectId, task.runId);
if (!renewed) {
logWorker(
"task_runner_lock_lost",
`${taskDetails(task)} attempt=${attempt + 1}/${WORKER_MAX_RETRIES + 1}`,
);
throw new Error("Active run lock lost");
}

Copilot uses AI. Check for mistakes.
Comment on lines +748 to +752
for (let attempt = 0; attempt <= WORKER_MAX_RETRIES; attempt++) {
try {
if (attempt > 0) {
const delay = retryDelayMs(attempt - 1);
logWorker(
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This retry loop introduces new behavior (multiple attempts with backoff and conditional final-failure reporting) but isn’t covered by unit tests. Since this file already has tests (e.g., getChatPayload), consider adding tests for runWorkerTask retry/backoff and final-failure emission to prevent regressions.

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +46
const task = parseJson<TranslationTask>(message);
if (!task) {
logQueue(
"message_nacked",
`deliveryTag=${message.fields.deliveryTag} error=${error instanceof Error ? error.message : String(error)}`,
"message_invalid",
`deliveryTag=${message.fields.deliveryTag}`,
);
channel.nack(message, false, false);
return;
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseJson() can throw on invalid JSON. With the removal of the surrounding try/catch, a malformed message will now throw out of the consume callback and won’t be nacked/acked (and may crash the consumer). Consider wrapping parseJson(message) in a try/catch here (or making parseJson return null on JSON.parse failure) and nack/discard the message on parse errors.

Copilot uses AI. Check for mistakes.
Comment on lines +760 to +764
if (task.type === "translate") {
await runTranslateTask(task, emit);
} else {
await runChatTask(task, emit);
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new retry loop, runTranslateTask/runChatTask can be executed multiple times for the same runId. Those functions have one-time side effects (e.g., translate clears the cancel flag and clears the run event cache) that may now run again on retries; in particular, a user cancellation that happens between attempts can be wiped by clearRunCancelled inside the task runner. Consider making the retried portion idempotent (or moving one-time initialization/clears outside the retry loop / behind an isRetry guard).

Copilot uses AI. Check for mistakes.
@sirily11
Copy link
Copy Markdown
Contributor

@copilot rror: Failed to run "next" due to signal SIGILL
76.35 Illegal instruction (core dumped)

WARNING: No output specified with docker-container driver. Build result will only remain in the build cache. To push result image into registry use --push or to load image into docker use --load
Dockerfile:13

11 | COPY --from=deps /app/node_modules ./node_modules
12 | COPY . .
13 | >>> RUN bun next build
14 |
15 | FROM base AS runner

ERROR: failed to build: failed to solve: process "/bin/sh -c bun next build" did not complete successfully: exit code: 132
Reference
Check build summary support

Build error

…k check, cancellation

- Wrap JSON.parse in try/catch in parseJson to prevent consumer crash on
  malformed messages
- Parse WORKER_MAX_RETRIES with parseInt + validation, fallback to 3 for
  non-finite/negative values, clamp to max 10
- Check renewActiveRun return value and abort retries when lock is lost
- Check isRunCancelled before each retry to respect user cancellation

Co-authored-by: sirily11 <32106111+sirily11@users.noreply.github.com>
Agent-Logs-Url: https://github.com/rxtech-lab/universal-translation/sessions/48ad1b04-d3da-4a33-a441-93fd86d4108b
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Mar 25, 2026

@copilot rror: Failed to run "next" due to signal SIGILL
76.35 Illegal instruction (core dumped)

...

The Docker build failure is a Bun runtime crash (panic: Segmentation fault / SIGILL) inside oven/bun:1 — not caused by the code changes in this PR. The crash report links to bun.report confirming it's a Bun bug. The changed files (lib/queue/consumer.ts, lib/translation/worker.ts) are only imported by the worker entry point (worker/index.ts) and are not part of the Next.js build graph.

In commit 5f578a1 I've also addressed all the automated review feedback (parseJson safety, retry NaN guard, lock renewal check, cancellation before retry).

Copilot AI requested a review from sirily11 March 25, 2026 15:44
@sirily11 sirily11 merged commit f8ddfcf into main Mar 25, 2026
6 checks passed
@sirily11 sirily11 deleted the copilot/fix-worker-timeout-issues branch March 25, 2026 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants