Conversation
… backoff - Make consumer callback synchronous so amqplib delivers messages concurrently up to the prefetch limit (default 5) - Add retry loop in runWorkerTask with configurable max retries (WORKER_MAX_RETRIES env var, default 3) and exponential backoff - Only emit error events to clients on final retry failure - On consumer-level errors, requeue message once before discarding - Remove duplicate releaseActiveRun call from worker.ts (already handled in worker/index.ts) Co-authored-by: sirily11 <32106111+sirily11@users.noreply.github.com> Agent-Logs-Url: https://github.com/rxtech-lab/universal-translation/sessions/4a4908a8-f7b5-4745-9b67-a358b121acaf
Co-authored-by: sirily11 <32106111+sirily11@users.noreply.github.com> Agent-Logs-Url: https://github.com/rxtech-lab/universal-translation/sessions/4a4908a8-f7b5-4745-9b67-a358b121acaf
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Pull request overview
This PR updates the worker’s AMQP consumption and task execution flow to better utilize RabbitMQ prefetch concurrency and to add retry behavior with exponential backoff for task failures.
Changes:
- Switches the RabbitMQ
consumecallback to a synchronous function and explicitlyack/nackvia.then()/.catch()to make concurrency behavior explicit. - Adds retry-with-exponential-backoff logic to
runWorkerTask, emitting client-visible errors only on final failure. - Removes duplicated active-run lock release from
lib/translation/worker.ts(lock lifecycle handled inworker/index.ts).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| lib/translation/worker.ts | Adds retry/backoff loop around task execution and removes redundant lock release. |
| lib/queue/consumer.ts | Refactors the consumer callback to non-async and handles ack/nack in promise handlers for explicit concurrency. |
lib/translation/worker.ts
Outdated
| logWorker("chat_finished", taskDetails(task)); | ||
| } | ||
|
|
||
| const WORKER_MAX_RETRIES = Number(process.env.WORKER_MAX_RETRIES ?? "3"); |
There was a problem hiding this comment.
WORKER_MAX_RETRIES is parsed with Number(...). If the env var is non-numeric, this becomes NaN, causing the retry loop condition (attempt <= WORKER_MAX_RETRIES) to be false and the task to be silently skipped after logging "task_runner_started". Consider parsing as an integer and falling back to the default when the value is not finite / < 0 (and optionally clamping to a reasonable max).
| const WORKER_MAX_RETRIES = Number(process.env.WORKER_MAX_RETRIES ?? "3"); | |
| function getWorkerMaxRetries(): number { | |
| const raw = process.env.WORKER_MAX_RETRIES; | |
| const parsed = | |
| raw !== undefined ? Number.parseInt(raw, 10) : Number.NaN; | |
| if (!Number.isFinite(parsed) || parsed < 0) { | |
| return 3; | |
| } | |
| const MAX_RETRIES = 10; | |
| return Math.min(parsed, MAX_RETRIES); | |
| } | |
| const WORKER_MAX_RETRIES = getWorkerMaxRetries(); |
lib/translation/worker.ts
Outdated
| `${taskDetails(task)} attempt=${attempt + 1}/${WORKER_MAX_RETRIES + 1} delay=${delay}ms`, | ||
| ); | ||
| await sleep(delay); | ||
| await renewActiveRun(task.projectId, task.runId); |
There was a problem hiding this comment.
renewActiveRun() can return false when the active-run lock is no longer owned by this runId. Right now the retry path ignores the return value, so the worker may keep retrying (and emitting events) even after losing the lock, potentially allowing concurrent processing for the same project. Consider checking the boolean result and aborting retries (and/or emitting the same lock-failure error used elsewhere) if the lock cannot be renewed.
| await renewActiveRun(task.projectId, task.runId); | |
| const renewed = await renewActiveRun(task.projectId, task.runId); | |
| if (!renewed) { | |
| logWorker( | |
| "task_runner_lock_lost", | |
| `${taskDetails(task)} attempt=${attempt + 1}/${WORKER_MAX_RETRIES + 1}`, | |
| ); | |
| throw new Error("Active run lock lost"); | |
| } |
| for (let attempt = 0; attempt <= WORKER_MAX_RETRIES; attempt++) { | ||
| try { | ||
| if (attempt > 0) { | ||
| const delay = retryDelayMs(attempt - 1); | ||
| logWorker( |
There was a problem hiding this comment.
This retry loop introduces new behavior (multiple attempts with backoff and conditional final-failure reporting) but isn’t covered by unit tests. Since this file already has tests (e.g., getChatPayload), consider adding tests for runWorkerTask retry/backoff and final-failure emission to prevent regressions.
| const task = parseJson<TranslationTask>(message); | ||
| if (!task) { | ||
| logQueue( | ||
| "message_nacked", | ||
| `deliveryTag=${message.fields.deliveryTag} error=${error instanceof Error ? error.message : String(error)}`, | ||
| "message_invalid", | ||
| `deliveryTag=${message.fields.deliveryTag}`, | ||
| ); | ||
| channel.nack(message, false, false); | ||
| return; |
There was a problem hiding this comment.
parseJson() can throw on invalid JSON. With the removal of the surrounding try/catch, a malformed message will now throw out of the consume callback and won’t be nacked/acked (and may crash the consumer). Consider wrapping parseJson(message) in a try/catch here (or making parseJson return null on JSON.parse failure) and nack/discard the message on parse errors.
| if (task.type === "translate") { | ||
| await runTranslateTask(task, emit); | ||
| } else { | ||
| await runChatTask(task, emit); | ||
| } |
There was a problem hiding this comment.
With the new retry loop, runTranslateTask/runChatTask can be executed multiple times for the same runId. Those functions have one-time side effects (e.g., translate clears the cancel flag and clears the run event cache) that may now run again on retries; in particular, a user cancellation that happens between attempts can be wiped by clearRunCancelled inside the task runner. Consider making the retried portion idempotent (or moving one-time initialization/clears outside the retry loop / behind an isRetry guard).
@copilot rror: Failed to run "next" due to signal SIGILL
|
…k check, cancellation - Wrap JSON.parse in try/catch in parseJson to prevent consumer crash on malformed messages - Parse WORKER_MAX_RETRIES with parseInt + validation, fallback to 3 for non-finite/negative values, clamp to max 10 - Check renewActiveRun return value and abort retries when lock is lost - Check isRunCancelled before each retry to respect user cancellation Co-authored-by: sirily11 <32106111+sirily11@users.noreply.github.com> Agent-Logs-Url: https://github.com/rxtech-lab/universal-translation/sessions/48ad1b04-d3da-4a33-a441-93fd86d4108b
... The Docker build failure is a Bun runtime crash ( In commit 5f578a1 I've also addressed all the automated review feedback (parseJson safety, retry NaN guard, lock renewal check, cancellation before retry). |
Worker processes translation tasks sequentially despite prefetch > 1, and silently discards failed tasks with no retry.
Concurrent processing
The
channel.consumecallback wasasync— amqplib ignores the returned Promise, making the concurrency behavior implicit and fragile. Changed to a synchronous callback with explicit.then()/.catch():Tasks now run concurrently up to the
WORKER_PREFETCHlimit (default 5).Retry on error
runWorkerTaskretries up toWORKER_MAX_RETRIES(default 3) with exponential backoff (1s, 2s, 4s)Cleanup
releaseActiveRuninworker.ts— already called inworker/index.ts'shandleTaskfinally block