Skip to content

feat: add durable SQLite-backed job queue for desktop mode (#257)#299

Open
rajesh-puripanda wants to merge 3 commits into
Abhash-Chakraborty:mainfrom
rajesh-puripanda:issue-257-sqlite-job-queue
Open

feat: add durable SQLite-backed job queue for desktop mode (#257)#299
rajesh-puripanda wants to merge 3 commits into
Abhash-Chakraborty:mainfrom
rajesh-puripanda:issue-257-sqlite-job-queue

Conversation

@rajesh-puripanda
Copy link
Copy Markdown

@rajesh-puripanda rajesh-puripanda commented Jun 1, 2026

Implements a SQLite-backed job queue that replaces Redis/RQ for desktop mode.

Fixes #257

@github-actions github-actions Bot added the needs linked issue Pull request needs to link a valid issue before review. label Jun 1, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

PR Context Summary

Suggested issue links

  • No strong issue match found yet.

Use Fixes #123 or Closes #123 in the PR body when one of the suggestions is the intended issue.
Manual rerun: Actions > PR Context Triage > Run workflow > set pr_number and force_review=true.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 1, 2026

Review Change Stack

Warning

Review limit reached

@rajesh-puripanda, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 16 minutes and 50 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 36aeb36d-7f3b-4c28-ae12-af08436fb639

📥 Commits

Reviewing files that changed from the base of the PR and between d24f648 and 37067a2.

📒 Files selected for processing (1)
  • backend/src/find_api/core/config.py
📝 Walkthrough

Walkthrough

Adds a durable SQLite-backed job queue and a backend-agnostic queue layer that routes between Redis/RQ and SQLite. Implements SqliteQueue and SqliteJob, a SQLite worker thread, updates recovery and status routes to use unified helpers, adds configuration for QUEUE_MODE/QUEUE_DB_PATH, and provides comprehensive tests.

Changes

SQLite Job Queue Implementation

Layer / File(s) Summary
Configuration and backend selection
backend/src/find_api/core/config.py
Adds QUEUE_MODE and QUEUE_DB_PATH settings.
Backend-agnostic queue interface
backend/src/find_api/core/queue.py
Adds lazy backend selection (_get_backend()), get_redis_connection(), get_task_queue(), enqueue_job(), get_job(), and backend-aware clustering helpers.
SQLite queue data store and job lifecycle
backend/src/find_api/core/sqlite_queue.py
Implements SqliteQueue and SqliteJob, durable DDL, atomic dequeue, status transitions, maintenance, clustering lock persistence, job-type registry, and serialization/time helpers.
SQLite background worker process
backend/src/find_api/workers/sqlite_worker.py
Implements run_worker_once(), run_worker_loop(), dispatcher, start_worker_thread(), and stop_worker_thread() with graceful shutdown.
Application startup and shutdown
backend/src/find_api/main.py
Starts/stops SQLite worker thread from app lifespan when QUEUE_MODE == "sqlite".

Recovery and Status Router Backend Abstraction

Layer / File(s) Summary
Backend-agnostic job status resolution
backend/src/find_api/core/recovery.py
Replaces Redis/RQ-specific wiring with get_job()-based status retrieval, expands active statuses, and adjusts terminal-state handling.
Backend-agnostic job and Redis access
backend/src/find_api/routers/status.py
Refactors /status/models to use get_redis_connection() safely and /status/{job_id} to use get_job() with explicit 404s and timestamp/error normalization.

Test Coverage

Layer / File(s) Summary
SQLite queue lifecycle and persistence tests
backend/tests/test_sqlite_queue.py
End-to-end tests for enqueue/dequeue, transitions, persistence, introspection, worker dispatch, and concurrency.
Queue backend abstraction integration tests
backend/tests/test_queue_integration.py
Integration tests for backend selection, clustering deduplication, and Redis/SQLite mode behaviors.
Recovery and status endpoint tests
backend/tests/test_recovery.py, backend/tests/test_status.py
Refactors to patch _get_job_status/get_job and to mock get_redis_connection(); updates fake-job helpers and expectations.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

  • Abhash-Chakraborty/Find#139: Related to refactoring reconcile_abandoned_analysis_jobs and recovery behavior to use a unified job-status interface.

Suggested labels

backend, type:feature, level:advanced, architecture, type:testing, quality:clean

Suggested reviewers

  • Abhash-Chakraborty

Poem

🐰 In a burrow where queues softly hum,
SQLite listens where Redis once strum,
Workers awaken and jobs gently run,
Locks held in stone till their time is done,
A rabbit applauds — two backends, one sun.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.46% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ⚠️ Warning The pull request description is minimal and does not follow the provided template structure. Expand the description to include all template sections: Summary, Type of change (with checkboxes), What changed (with bullet points), How to test, and Checklist items. Add detail about the SQLite implementation and testing approach.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding a durable SQLite-backed job queue for desktop mode. It is specific, concise, and directly reflects the primary feature being implemented.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@macroscopeapp
Copy link
Copy Markdown
Contributor

macroscopeapp Bot commented Jun 1, 2026

Approvability

Verdict: Needs human review

Unable to check for correctness in d24f648. This PR introduces a significant new feature: a complete SQLite-backed job queue system as an alternative to Redis for desktop mode. The ~1400 lines of new code add new infrastructure components, threading, and database interactions. Additionally, several review comments about error handling and worker lifecycle remain unaddressed. New features of this scope warrant human review.

You can customize Macroscope's approvability policy. Learn more.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🧹 Nitpick comments (1)
backend/tests/test_sqlite_queue.py (1)

100-100: ⚡ Quick win

Unused local variables will trip Ruff F841.

j1 (line 100) and j2/j3 (lines 189-190) are assigned but never read, which Ruff flags as F841 and would fail the required ruff check.

🧹 Proposed fix
-        j1 = queue.enqueue("first")
+        queue.enqueue("first")
         queue.dequeue()  # claims j1
         queue.enqueue("a")
-        j2 = queue.enqueue("b")
-        j3 = queue.enqueue("c")
+        queue.enqueue("b")
+        queue.enqueue("c")
         j1 = queue.dequeue()  # claim "a"

As per coding guidelines, "Backend code targets Python 3.12 and is checked with Ruff".

Also applies to: 189-190

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/tests/test_sqlite_queue.py` at line 100, Remove the unused local
variable assignments that trigger Ruff F841: replace the assignments to j1, j2,
and j3 in test_sqlite_queue.py with either direct calls (e.g., call
queue.enqueue("...") without assigning) or capture them into a throwaway name
(e.g., _ = queue.enqueue(...)) or immediately assert their value if needed
(e.g., assert queue.enqueue(...) is not None); update the three occurrences
referring to j1, j2, j3 so no assigned variable remains unused.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/src/find_api/core/queue.py`:
- Around line 134-151: The current SQLite path in _set_clustering_lock is
non-atomic (scans backend.list_active), so change it to an atomic DB-backed
lock: before checking active jobs, delete any expired lock rows using
_cluster_lock_ttl(), then perform a single INSERT (e.g. INSERT INTO
clustering_locks(key, reason, expires_at) VALUES (CLUSTERING_LOCK_KEY, reason,
now+ttl) ON CONFLICT DO NOTHING) against the backend DB and return True iff the
insert affected a row; if the insert fails (no row inserted) return False. Use
CLUSTERING_LOCK_KEY and _cluster_lock_ttl() for the key/expiry and keep the
existing redis branch unchanged. Ensure the lock table creation/migration exists
or create it lazily if missing.
- Around line 70-81: The enqueue_job path forwards RQ-only control kwargs into
SQLite jobs causing those kwargs to be replayed into the target callable; in
enqueue_job, when _get_backend().mode == "sqlite" drop RQ control keys (e.g.
"job_timeout" and "result_ttl" — and any other RQ-specific enqueue kwargs you
use) from kwargs before calling backend.enqueue_call(func, *args, **kwargs) so
only the target callable args are stored; implement this by creating a
filtered_kwargs = {k:v for k,v in kwargs.items() if k not in
("job_timeout","result_ttl")} and pass filtered_kwargs to backend.enqueue_call.

In `@backend/src/find_api/core/recovery.py`:
- Around line 99-108: The helper _get_job_status currently swallows all
exceptions from get_job and returns None, which causes transient queue/DB errors
to be treated as missing jobs; change _get_job_status so it does not catch broad
Exception — simply call get_job(job_id) and if it returns None return None,
otherwise return job.get_status(); remove the try/except (or re-raise any
exceptions) so errors propagate to run_analysis_recovery_loop instead of being
interpreted as an abandoned job (referencing _get_job_status and get_job).

In `@backend/src/find_api/core/sqlite_queue.py`:
- Around line 85-87: The SQL_CLEAR_COMPLETED query currently deletes based on
created_at; change it to use the job completion timestamp so retention is
relative to completion, not enqueue. Update the SQL_CLEAR_COMPLETED constant in
sqlite_queue.py to reference completed_at (e.g., "DELETE FROM job_queue WHERE
status = 'completed' AND completed_at < ?") and ensure any callers that bind the
cutoff value still pass the intended completion-based cutoff; also verify the
job_queue schema and code that sets completed_at (if any) uses the same column
name.
- Around line 362-363: _now_str currently ignores its offset_seconds parameter
causing clear_completed and reset_stale_running to compute cutoffs incorrectly;
change _now_str in sqlite_queue.py to apply the offset by adding
datetime.timedelta(seconds=offset_seconds) to datetime.now(timezone.utc) before
calling isoformat(), and add the necessary timedelta import if missing so that
calls from clear_completed and reset_stale_running with negative offsets produce
the intended earlier timestamps.

In `@backend/src/find_api/routers/status.py`:
- Around line 26-40: The Redis read block using get_redis_connection() and
iterating redis_conn.scan_iter("find:model_status:*") can raise exceptions other
than RuntimeError (e.g., scan_iter() or get() failures) and should fall back to
the local snapshot; wrap the redis scan/get operations in a broader try/except
(or catch exceptions from scan_iter/get inside the outer try) so any exception
while reading Redis causes you to skip Redis and preserve the existing
local_status snapshot (i.e., ensure process_status remains populated from
local_status and no exception is propagated), referencing get_redis_connection,
scan_iter, redis_conn.get, and process_status/local_status to locate the code to
change.
- Around line 68-95: The current handler converts all unexpected exceptions into
a 404; keep the explicit None -> 404 branch for get_job(job_id) but remove or
replace the broad except Exception that maps everything to 404. In the status
route (the block that calls get_job), let non-HTTP exceptions surface as 5xx by
either re-raising the original exception or catching Exception as e and raising
HTTPException(status_code=503, detail=f"Error fetching job {job_id}") after
logging; preserve the existing except HTTPException: raise to pass through
client errors. Update the try/except around get_job/job processing (refer to
get_job, job.is_finished, job.is_failed, and _attr_iso) accordingly.

In `@backend/src/find_api/workers/sqlite_worker.py`:
- Around line 134-138: The stop_worker_thread helper currently only sets the
thread's _stop_event; update stop_worker_thread(thread: threading.Thread) to set
the event (using getattr(thread, "_stop_event", None)) and then call
thread.join(timeout=some_bounded_seconds) to wait for the worker to exit,
optionally checking thread.is_alive() after join and logging a warning or error
if it did not stop within the timeout; use a small, configurable timeout
constant (e.g., WORKER_SHUTDOWN_TIMEOUT) and reference the function name
stop_worker_thread and the _stop_event attribute so the change is applied in the
same helper.
- Around line 63-68: run_worker_once currently dequeues and calls _dispatch
without initializing handlers, causing fresh processes to fail with "Unknown job
type"; update run_worker_once to call _ensure_registrations() (the same setup
used by the loop path) before dequeuing or at least before calling _dispatch so
registrations are initialized for the one-shot path; locate run_worker_once and
add a call to _ensure_registrations() (referencing _ensure_registrations,
run_worker_once, and _dispatch) to ensure consistent initialization.
- Around line 119-130: The worker startup currently defers creation/validation
of the queue and job registrations into the daemon thread (run_worker_loop),
which hides initialization failures; instead, instantiate and validate the
SqliteQueue (or otherwise ensure `queue` is non-None) and perform any job
registration/initialization on the main thread before calling thread.start(); if
validation fails, raise/log and avoid starting the thread so the service fails
fast — update code paths around the `queue` variable, SqliteQueue construction,
and any registration helpers invoked by run_worker_loop to run in the parent
thread prior to thread.start().

In `@backend/tests/test_queue_integration.py`:
- Around line 91-95: The test test_enqueue_clustering_dedup is incomplete: it
only calls enqueue_clustering_job once, leaves r1 unused, and has no assertions;
update the test to call enqueue_clustering_job a second time with the same
deduping parameters, assert that the second call returns the same job id/object
as the first (or that it signals an existing job), and add assertions to
validate dedup behavior (e.g., compare r1 and r2 or check job status). Ensure r1
is used to avoid an unused-variable error (Ruff F841) and include any necessary
cleanup or mocking used elsewhere in the test suite so the test remains
deterministic.
- Around line 24-32: The fixture resets are overzealous: qm._BACKEND is the
cache gate used by _get_backend(), so only qm._BACKEND needs to be cleared to
avoid leaks; qm._get_backend.queue is vestigial and qm._get_backend.mode /
qm._get_backend.redis_conn are reinitialized by _get_backend() when _BACKEND is
None so resetting them is redundant. Simplify the fixture by removing
assignments to qm._get_backend.queue, qm._get_backend.mode, and
qm._get_backend.redis_conn and only set qm._BACKEND = None before and after the
yield so _get_backend() can perform its own initialization.

In `@backend/tests/test_sqlite_queue.py`:
- Line 18: The test file imports SqliteJob but never uses it and assigns
variables j1, j2, j3 without using them; remove SqliteJob from the import list
in the from ... import line (leave SqliteQueue and _resolve_job_type) and either
use or mark the unused local variables by renaming them to _j1, _j2, _j3 (or add
minimal assertions that reference them) so Ruff no longer reports F401/F841;
update references to those variables accordingly in tests that need them.

---

Nitpick comments:
In `@backend/tests/test_sqlite_queue.py`:
- Line 100: Remove the unused local variable assignments that trigger Ruff F841:
replace the assignments to j1, j2, and j3 in test_sqlite_queue.py with either
direct calls (e.g., call queue.enqueue("...") without assigning) or capture them
into a throwaway name (e.g., _ = queue.enqueue(...)) or immediately assert their
value if needed (e.g., assert queue.enqueue(...) is not None); update the three
occurrences referring to j1, j2, j3 so no assigned variable remains unused.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 986f8997-404b-475d-8dbf-2806d5381776

📥 Commits

Reviewing files that changed from the base of the PR and between 20b306b and bc55530.

📒 Files selected for processing (11)
  • backend/src/find_api/core/config.py
  • backend/src/find_api/core/queue.py
  • backend/src/find_api/core/recovery.py
  • backend/src/find_api/core/sqlite_queue.py
  • backend/src/find_api/main.py
  • backend/src/find_api/routers/status.py
  • backend/src/find_api/workers/sqlite_worker.py
  • backend/tests/test_queue_integration.py
  • backend/tests/test_recovery.py
  • backend/tests/test_sqlite_queue.py
  • backend/tests/test_status.py

Comment thread backend/src/find_api/core/queue.py
Comment thread backend/src/find_api/core/queue.py Outdated
Comment thread backend/src/find_api/core/recovery.py
Comment thread backend/src/find_api/core/sqlite_queue.py
Comment thread backend/src/find_api/core/sqlite_queue.py Outdated
Comment thread backend/src/find_api/workers/sqlite_worker.py
Comment thread backend/src/find_api/workers/sqlite_worker.py
Comment thread backend/tests/test_queue_integration.py Outdated
Comment thread backend/tests/test_queue_integration.py
Comment thread backend/tests/test_sqlite_queue.py Outdated
- Filter RQ control kwargs in enqueue_job for SQLite mode
- Use atomic INSERT ... ON CONFLICT for clustering lock (SQLite)
- Fix SQL_CLEAR_COMPLETED to use completed_at column
- Fix _now_str to honor offset_seconds parameter
- Broaden Redis exception handling in status.py
- Return 503 instead of 404 for backend errors in status endpoint
- Add _ensure_registrations() call in run_worker_once
- Join worker thread with timeout in stop_worker_thread
- Validate worker queue/registrations on caller thread before start
- Remove broad try/except from _get_job_status in recovery.py
- Clean up test fixtures and fix unused variables
- Fix incomplete dedup test with proper assertions
@rajesh-puripanda
Copy link
Copy Markdown
Author

Update: All CodeRabbit findings resolved

I have addressed all review comments across 7 files:

Critical fixes: Filter RQ-only kwargs from SQLite enqueue, atomic clustering lock (INSERT ... ON CONFLICT), SQL_CLEAR_COMPLETED uses completed_at, _now_str applies offset_seconds

Error handling: status.py broadened Redis handling + 503 for backend errors, recovery.py exceptions propagate to recovery loop

Worker: run_worker_once calls _ensure_registrations, stop_worker_thread joins with 5s timeout, start_worker_thread validates before starting

Tests: fixture cleanup, dedup assertions, unused import/variable cleanup

Also updated clear_clustering_job_state() for SQLite mode.

@Abhash-Chakraborty kindly review and merge!

@github-actions github-actions Bot removed the needs linked issue Pull request needs to link a valid issue before review. label Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add durable SQLite-backed job queue for desktop mode

1 participant