feat: add durable SQLite-backed job queue for desktop mode #257#291
feat: add durable SQLite-backed job queue for desktop mode #257#291rajesh-puripanda wants to merge 6 commits into
Conversation
PR Context Summary
Suggested issue links
Use |
|
Warning Review limit reached
More reviews will be available in 15 minutes and 58 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a configurable queue backend (redis|sqlite), implements a durable SQLite-backed job queue (schema, SQLiteJob/SQLiteQueue), makes queue APIs backend-agnostic, adds clustering coalescing for SQLite, integrates status/recovery paths, provides a SQLite polling worker, and includes comprehensive tests. ChangesSQLite-backed job queue for desktop mode
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
ApprovabilityVerdict: Needs human review Unable to check for correctness in 8838ec2. This PR introduces a substantial new feature - a complete SQLite-backed job queue system with new database schema, worker implementation, and cross-cutting changes to existing queue infrastructure. New features of this scope warrant human review. You can customize Macroscope's approvability policy. Learn more. |
…y dirname, redis_conn passthrough, clustering race condition
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (3)
backend/tests/test_sqlite_queue.py (3)
51-67: ⚡ Quick winGlobal
settings.SQLITE_QUEUE_PATHmutation isn't restored in tests that skip theconnfixture.The
connfixture correctly saves and restoresqs.settings.SQLITE_QUEUE_PATH, but several tests use only thedb_pathfixture and reassign the module-level setting directly without restoring it (e.g. Lines 416, 427, 438, 511, 529, 546, 559, 571, 592). Becausedb_pathdeletes its temp file in teardown, the leaked path points at a now-deleted database. Tests that rely on the ambient global setting and don't set their own —test_no_current_job_by_default(Line 475) andtest_get_status_nonexistent(Line 598) — then become order-dependent and may fail or pass depending on execution order.Consider centralizing the patch with
monkeypatch(auto-reverted) so every test gets isolated, restored state:♻️ Example fixture using monkeypatch
`@pytest.fixture`() def set_db_path(db_path, monkeypatch): import find_api.core.queue_sqlite as qs monkeypatch.setattr(qs.settings, "SQLITE_QUEUE_PATH", db_path) return db_pathThen depend on
set_db_pathinstead of manually assigningqs.settings.SQLITE_QUEUE_PATH = db_path.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/tests/test_sqlite_queue.py` around lines 51 - 67, The suite leaks a mutated module-level setting qs.settings.SQLITE_QUEUE_PATH when tests set it directly and don't restore it; create a dedicated fixture (e.g., set_db_path) that depends on db_path and monkeypatch and uses monkeypatch.setattr(qs.settings, "SQLITE_QUEUE_PATH", db_path) to automatically revert after each test, then update tests that currently assign qs.settings.SQLITE_QUEUE_PATH = db_path to instead depend on the new set_db_path fixture (or use monkeypatch in those tests) and remove manual restores so no test leaves a stale path behind.
213-222: ⚡ Quick winTest doesn't validate queue-name isolation it claims to.
The docstring states dequeue "only returns jobs from its own queue," but the assertions only check that
d1andd2are non-None. If isolation were broken (e.g.q1.dequeue()returnedq2's job), this test would still pass. Assert the returned ids to actually exercise isolation.💚 Strengthen assertions
d1 = q1.dequeue() d2 = q2.dequeue() - assert d1 is not None - assert d2 is not None + assert d1 is not None + assert d2 is not None + assert d1.args == (1,) + assert d2.args == (2,)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/tests/test_sqlite_queue.py` around lines 213 - 222, The test_dequeue_respects_queue_name currently only checks q1.dequeue() and q2.dequeue() are non-None, which doesn't verify queue isolation; update the test to assert the actual job identities returned by SQLiteQueue.dequeue() match the jobs enqueued via SQLiteQueue.enqueue() (e.g., capture return values or job ids when calling q1.enqueue(_dummy_success, 1) and q2.enqueue(_dummy_success, 2) and assert that q1.dequeue() returns the id/job from q1 and q2.dequeue() returns the id/job from q2), referencing SQLiteQueue, enqueue, dequeue, and _dummy_success to locate the code.
367-368: 💤 Low valueAvoid
__import__("sqlite3").Row; import the module directly.
__import__("sqlite3").Row(also at Lines 61, 384, 399) is an awkward idiom.connectis already imported fromsqlite3; addingimport sqlite3and usingsqlite3.Rowis clearer.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/tests/test_sqlite_queue.py` around lines 367 - 368, The tests use the awkward __import__("sqlite3").Row for setting row_factory (seen where new_conn.row_factory is assigned and at the other occurrences), so add a direct import for sqlite3 at the top of the test module and replace all __import__("sqlite3").Row references with sqlite3.Row; keep the existing use of connect (from sqlite3) unchanged and only change the row_factory assignments to use sqlite3.Row.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/src/find_api/core/queue_sqlite.py`:
- Around line 588-626: The SELECT then INSERT around CLUSTERING_LOCK_KEY is
vulnerable to a TOCTOU race that can cause an unhandled IntegrityError; update
the logic in the block that uses conn, CLUSTERING_LOCK_KEY and inserts into
job_queue (the code that creates SQLiteQueue("default"), enqueues cluster_images
and inserts the lock/ref rows) to perform the check+insert atomically—either
wrap the check+insert in a transaction started with "BEGIN IMMEDIATE" (or
equivalent connection.begin()) so the insert cannot race, or insert the lock row
first using an atomic insert-with-conflict-handling (e.g. INSERT ... ON CONFLICT
DO NOTHING) and bail if the insert reports the row already exists, then proceed
to enqueue and insert CLUSTERING_JOB_ID_KEY; ensure IntegrityError is
caught/handled if it still can occur.
- Around line 361-365: SQLiteQueue.dequeue currently accepts a timeout parameter
but ignores it, causing callers that expect blocking semantics to spin; either
remove the unused timeout parameter from the method signature or implement
timeout-backed waiting. To fix, update the SQLiteQueue.dequeue function to honor
timeout by repeatedly attempting to claim a job with a short sleep until timeout
expires (or use a blocking condition/notification if available), or remove the
timeout argument and update all callers; add a focused unit test for
SQLiteQueue.dequeue that verifies blocking behavior when timeout is provided
(and a complementary test when timeout is None/zero). Ensure references to
SQLiteQueue.dequeue in consumers (e.g., the SQLite worker poll loop) are updated
accordingly.
- Around line 384-394: The dequeue() loop currently checks conn.total_changes
which is cumulative across the connection and can falsely indicate a successful
claim; change the UPDATE call to capture its Cursor (e.g., cursor =
conn.execute(...)) and use cursor.rowcount to determine if the per-UPDATE
affected rows > 0, then commit and only SELECT/return
SQLiteJob.from_row(updated) when cursor.rowcount > 0; replace uses of
conn.total_changes with the Cursor.rowcount check while keeping the same UPDATE
SQL and commit/SELECT flow.
In `@backend/src/find_api/routers/status.py`:
- Around line 106-110: The sqlite path in the status handler uses row.get(...)
which raises AttributeError for sqlite3.Row; in the failed-branch (where code
checks row["status"] == "failed") replace row.get("error", ...) with a safe
lookup using row["error"] if present (e.g., row["error"] if "error" in
row.keys() else fallback) and set status_info["error"] = meta.get("error",
row_error_fallback) and status_info["stage"] = meta.get("stage", "failed");
avoid catching all Exceptions around this logic so sqlite attribute errors
aren’t converted into a 404 (narrow the try/except to only expected errors or
remove it); add a focused unit/integration test that exercises the sqlite
"failed" path to assert the endpoint returns the failed status with correct
error text; and normalize response field names/types by mapping sqlite's
completed_at to the canonical ended_at (or vice versa) so clients receive
consistent fields across backends.
- Around line 85-104: The SQLite branch builds status_info from row[...] but
returns timestamps and result in a different shape than the Redis/RQ backend;
update the code that constructs status_info (the block that reads
row["created_at"], row["started_at"], row["completed_at"], row["result"] and the
meta parsing) to: 1) convert created_at and started_at (if present) from the
REAL float to ISO strings via datetime.fromtimestamp(...).isoformat(), 2) derive
ended_at (not completed_at) as an ISO string from row["completed_at"] when
present and omit/combine the completed_at key to match Redis naming, and 3) if
row["result"] is a JSON string deserialize it with json.loads before assigning
status_info["result"] so the frontend receives an object (only keep raw value if
already non-string). Keep the existing meta->stage logic. Also add a focused
test asserting the /api/status/{job_id} payload shape
(created_at/started_at/ended_at as ISO strings and result as deserialized
object) for both SQLite and Redis/RQ backends.
In `@backend/src/find_api/workers/sqlite_worker.py`:
- Around line 98-108: Daemon loop currently always sleeps after each iteration
which limits throughput; change the loop so it only sleeps when there was no job
processed. Modify the loop around run_worker_once(queue) to detect whether a job
was handled (either by having run_worker_once return a boolean indicating
work_done, or by checking queue.empty() after calling it) and only call
time.sleep(POLL_INTERVAL_SECONDS) when no work was processed; preserve the
shutdown_event check and existing exception handling (logger.exception) so
behavior on shutdown/errors is unchanged.
---
Nitpick comments:
In `@backend/tests/test_sqlite_queue.py`:
- Around line 51-67: The suite leaks a mutated module-level setting
qs.settings.SQLITE_QUEUE_PATH when tests set it directly and don't restore it;
create a dedicated fixture (e.g., set_db_path) that depends on db_path and
monkeypatch and uses monkeypatch.setattr(qs.settings, "SQLITE_QUEUE_PATH",
db_path) to automatically revert after each test, then update tests that
currently assign qs.settings.SQLITE_QUEUE_PATH = db_path to instead depend on
the new set_db_path fixture (or use monkeypatch in those tests) and remove
manual restores so no test leaves a stale path behind.
- Around line 213-222: The test_dequeue_respects_queue_name currently only
checks q1.dequeue() and q2.dequeue() are non-None, which doesn't verify queue
isolation; update the test to assert the actual job identities returned by
SQLiteQueue.dequeue() match the jobs enqueued via SQLiteQueue.enqueue() (e.g.,
capture return values or job ids when calling q1.enqueue(_dummy_success, 1) and
q2.enqueue(_dummy_success, 2) and assert that q1.dequeue() returns the id/job
from q1 and q2.dequeue() returns the id/job from q2), referencing SQLiteQueue,
enqueue, dequeue, and _dummy_success to locate the code.
- Around line 367-368: The tests use the awkward __import__("sqlite3").Row for
setting row_factory (seen where new_conn.row_factory is assigned and at the
other occurrences), so add a direct import for sqlite3 at the top of the test
module and replace all __import__("sqlite3").Row references with sqlite3.Row;
keep the existing use of connect (from sqlite3) unchanged and only change the
row_factory assignments to use sqlite3.Row.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: aa27814d-ada4-43b0-925a-d9785658d4a5
📒 Files selected for processing (8)
backend/src/find_api/core/config.pybackend/src/find_api/core/queue.pybackend/src/find_api/core/queue_sqlite.pybackend/src/find_api/core/recovery.pybackend/src/find_api/routers/status.pybackend/src/find_api/workers/jobs.pybackend/src/find_api/workers/sqlite_worker.pybackend/tests/test_sqlite_queue.py
- TOCTOU race in enqueue_clustering_job_sqlite: atomic lock acquisition with INSERT ... ON CONFLICT DO NOTHING - Remove unused timeout param from dequeue() - status.py SQLite branch: ISO timestamps, ended_at, deserialize JSON result, narrow exception chaining - sqlite_worker.py: only sleep when no job was processed - test improvements: sqlite3.Row import, set_db_path fixture with monkeypatch, queue isolation assertions, status payload shape tests
SummaryImplements a SQLite-backed job queue for desktop mode behind a Changes
Testing
Review Fixes (commit
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/src/find_api/core/recovery.py (1)
98-103:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHandle SQLite status lookup failures the same way as Redis.
Line 99–103 returns
get_job_status(job_id)without a guard, but the Redis path (Line 104–108) intentionally degrades toNoneon lookup errors. A SQLite exception here can abort reconciliation for the whole batch instead of treating that job as unknown.Suggested fix
backend = get_queue_backend() if backend == "sqlite": from find_api.core.queue_sqlite import get_job_status - return get_job_status(job_id) + try: + return get_job_status(job_id) + except Exception: # noqa: BLE001 + return None🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/src/find_api/core/recovery.py` around lines 98 - 103, The SQLite branch currently returns get_job_status(job_id) directly and can raise exceptions that abort batch reconciliation; change it to mirror the Redis path by wrapping the call to get_job_status(job_id) (after importing from find_api.core.queue_sqlite) in a try/except that catches exceptions and returns None on error, so get_queue_backend() == "sqlite" degrades to unknown status instead of propagating the exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@backend/src/find_api/core/recovery.py`:
- Around line 98-103: The SQLite branch currently returns get_job_status(job_id)
directly and can raise exceptions that abort batch reconciliation; change it to
mirror the Redis path by wrapping the call to get_job_status(job_id) (after
importing from find_api.core.queue_sqlite) in a try/except that catches
exceptions and returns None on error, so get_queue_backend() == "sqlite"
degrades to unknown status instead of propagating the exception.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 837a6e17-83ca-45df-bb1b-e8a364fc7174
📒 Files selected for processing (8)
backend/src/find_api/core/config.pybackend/src/find_api/core/queue.pybackend/src/find_api/core/queue_sqlite.pybackend/src/find_api/core/recovery.pybackend/src/find_api/routers/status.pybackend/src/find_api/workers/jobs.pybackend/src/find_api/workers/sqlite_worker.pybackend/tests/test_sqlite_queue.py
🚧 Files skipped from review as they are similar to previous changes (7)
- backend/src/find_api/workers/jobs.py
- backend/src/find_api/core/config.py
- backend/src/find_api/routers/status.py
- backend/src/find_api/workers/sqlite_worker.py
- backend/src/find_api/core/queue_sqlite.py
- backend/src/find_api/core/queue.py
- backend/tests/test_sqlite_queue.py
|
All CodeRabbit findings have been addressed — including the SQLite try/except guard in recovery.py. Could you please remove the do-not-merge label and merge this PR? All 52 tests pass, lint and format are clean. |
Summary
Add a durable SQLite-backed job queue for desktop mode, removing the Redis dependency for local-first deployments while preserving Redis/RQ for Docker/server deployments.
Fixes #257
Type of change
What changed
backend/src/find_api/core/queue_sqlite.py— full SQLite queue implementation withSQLiteJob/SQLiteQueueclasses, WAL mode concurrency, thread-localget_current_job(), job persistence across restarts, and SQLite-backed clustering coalescing logicbackend/src/find_api/workers/sqlite_worker.py— polling worker that dequeues and executes jobs from the SQLite queue with graceful shutdown supportQUEUE_BACKEND("redis"/"sqlite", default"redis") andSQLITE_QUEUE_PATHsettings tobackend/src/find_api/core/config.pybackend/src/find_api/core/queue.pyinto a unified backend-agnostic interface — factory functions delegate to Redis/RQ or SQLite based onsettings.QUEUE_BACKENDbackend/src/find_api/core/recovery.py– job status lookup works with both backendsbackend/src/find_api/routers/status.py– job status endpoint supports SQLite backendsbackend/src/find_api/workers/jobs.py– uses unifiedget_current_jobfromcore/queuebackend/tests/test_sqlite_queue.py— 49 tests covering schema, enqueue, dequeue, completion, failure, metadata persistence, restart survival, clustering coalescing, worker execution, and edge casesScreenshots / recordings (for UI changes)
N/A — backend change with no UI impact.
How to test