Skip to content

feat: defer embedding messages#704

Open
Rajat-Ahuja1997 wants to merge 5 commits into
mainfrom
DEV-1789
Open

feat: defer embedding messages#704
Rajat-Ahuja1997 wants to merge 5 commits into
mainfrom
DEV-1789

Conversation

@Rajat-Ahuja1997
Copy link
Copy Markdown
Collaborator

@Rajat-Ahuja1997 Rajat-Ahuja1997 commented May 20, 2026

Summary by CodeRabbit

  • New Features

    • Message embeddings are now created as pending per-chunk rows and processed asynchronously; added an immediate "embed now" fast path and router scheduling for it when enabled.
  • Improvements

    • Token- and request-aware batching and chunk preparation for embeddings; reconciler now claims and processes whole-message chunks atomically.
    • Optional pgvector-only persistence and a new concurrency cap for embedding tasks.
  • Tests

    • Expanded coverage for embedding flow, batching, chunking, reconciliation, and the immediate-embed path.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d003f85d-137b-49f5-bcd6-f6086ca0f4d5

📥 Commits

Reviewing files that changed from the base of the PR and between 7a63e4b and f362292.

📒 Files selected for processing (1)
  • src/config.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/config.py

Walkthrough

Messages now create per-chunk pending MessageEmbedding rows (via embedding_client.prepare_chunks) instead of inline embedding; EmbeddingClient gains token-aware batching and prepare_chunks(); a reconciler and a new embed_messages_now fast-path persist embeddings to Postgres or external stores; tests and fixtures updated.

Changes

Asynchronous Embedding Pipeline

Layer / File(s) Summary
Embedding Client: Token-aware batching and chunking
src/embedding_client.py, tests/llm/test_embedding_client.py
simple_batch_embed() now uses token-aware batching, validating per-input limits and batching by request token budget; added prepare_chunks() to compute chunk lists without API calls; public wrappers delegate to internal client. Tests cover batching, oversize rejection, and chunk ordering.
Message Creation: Deferred embedding via pending rows
src/crud/message.py, tests/integration/test_message_embeddings.py
create_messages commits Message rows, calls prepare_chunks, and inserts one MessageEmbedding row per chunk with sync_state="pending" and embedding=None. Removed immediate embedding/upsert logic; imports cleaned. Integration tests assert batch_embed is not called inline and embeddings remain pending after creation.
Immediate embedding fast-path
src/reconciler/embed_now.py, tests/deriver/test_embed_now.py
New embed_messages_now(message_ids) claims eligible chunk rows, embeds under a semaphore via simple_batch_embed, and persists per-row to Postgres or an external store (with optional dual-write). Provides reset_embed_semaphore() test hook and lease/claim semantics. Tests cover pgvector-only, external upsert, failures, concurrency, and idempotency.
Vector Reconciliation: Pgvector-only mode and improved batch claiming
src/reconciler/sync_vectors.py, tests/deriver/test_vector_reconciliation.py
Claiming is two-step: select distinct message_ids then claim all pending chunks for those IDs with FOR UPDATE SKIP LOCKED. _sync_message_embeddings accepts optional external_vector_store; pgvector-only path re-embeds and writes embeddings per-row and marks rows synced. External-store path builds stable {message_id}_{chunk_position} IDs and updates rows per-row with conditional embedding writes. Loop ordering adjusted for pgvector-only reconciliation.
Routers: schedule immediate embed job
src/routers/messages.py, tests/routes/test_messages.py
After creating messages (session/file flows), routers now schedule embed_messages_now in background tasks when settings.EMBED_MESSAGES is enabled. Route tests assert scheduling behavior and the disabled path.
Tests & fixtures: prepare_chunks mock and tracked DB refactor
tests/conftest.py, tests/*
mock_openai_embeddings fixture now patches prepare_chunks (returns single-chunk per id) and yields it; mock_tracked_db uses contextlib.ExitStack to patch multiple targets. Integration/unit tests updated for deferred embedding and embed-now behavior.
Config & telemetry
src/config.py, src/telemetry/events/llm.py
Added EmbeddingSettings.MAX_CONCURRENT_EMBEDDINGS setting. Added clarifying comment in EmbeddingCallPurpose documenting MESSAGE_CREATE semantics (pending rows created at message time, embedded later by reconciler).
Startup validator test: env isolation
tests/startup/test_embedding_validator.py
Test builds a filtered subprocess env excluding existing VECTOR_STORE*/EMBEDDING* keys and sets PYTHON_DOTENV_DISABLED and EMBEDDING_VECTOR_DIMENSIONS before running AppSettings.

Sequence Diagram

sequenceDiagram
  participant CreateMsg as create_messages()
  participant DB as Database
  participant EmbClient as EmbeddingClient
  participant Reconciler as run_vector_reconciliation_cycle
  participant ExtStore as ExternalVectorStore
  CreateMsg->>DB: INSERT Message rows + COMMIT
  CreateMsg->>EmbClient: prepare_chunks(message_id->text)
  EmbClient-->>CreateMsg: dict[msg_id, chunk_list]
  CreateMsg->>DB: INSERT MessageEmbedding rows (pending, embedding=NULL)
  Reconciler->>DB: select distinct message_ids -> claim chunks FOR UPDATE SKIP LOCKED
  Reconciler->>EmbClient: batch_embed(chunks) with MESSAGE_CREATE
  alt external store configured
    Reconciler->>ExtStore: upsert_many(vector_records)
    ExtStore-->>DB: per-row UPDATE sync_state/embedding
  else pgvector-only
    Reconciler->>DB: per-row UPDATE embedding + sync_state
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • plastic-labs/honcho#678: Modifies src/embedding_client.py embedding behavior; overlaps in changes to batching and embedding call paths.

Suggested reviewers

  • VVoruganti
  • ajspig

Poem

A rabbit hops through embeddings deferred,
Chunks are queued, their vectors postponed,
A semaphore hums, the reconciler stirred,
Stable IDs stitched where each piece is owned.
🐰✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: defer embedding messages' clearly and concisely summarizes the main architectural change: moving from inline embedding during message creation to deferred embedding via a reconciliation pipeline.
Docstring Coverage ✅ Passed Docstring coverage is 82.76% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch DEV-1789

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/deriver/test_vector_reconciliation.py (1)

900-950: ⚡ Quick win

Add a mixed-eligibility regression here too.

This only proves the all-eligible case. A sibling chunk with a recent last_sync_at would currently let _get_message_embeddings_needing_sync() return a partial message, so a one-ineligible-chunk case would lock the intended invariant down.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/deriver/test_vector_reconciliation.py` around lines 900 - 950, Extend
test_all_chunks_of_a_message_claimed_together to cover a mixed-eligibility case:
create a second Message with chunk_count chunks where one MessageEmbedding has
last_sync_at set to now (making it ineligible) and the rest are "pending", then
call _get_message_embeddings_needing_sync(db_session, batch_size=1) and assert
that the returned claimed set does not include any embeddings for that second
message (i.e., the function must not return a partial message); refer to the
test function name test_all_chunks_of_a_message_claimed_together, the helper
_get_message_embeddings_needing_sync, and the MessageEmbedding.last_sync_at /
sync_state fields to locate and modify the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/crud/message.py`:
- Around line 276-306: The create_messages flow currently delays persisting
Message rows until after preparing and enqueueing embeddings, so failures in
embedding_client.prepare_chunks or models.MessageEmbedding inserts can roll back
the messages; fix by committing the new Message rows immediately (call await
db.commit() right after messages are added in create_messages) before checking
settings.EMBED_MESSAGES, then perform embedding_client.prepare_chunks and
db.add_all(pending MessageEmbedding) in a separate transaction/session (or wrap
in try/except) so errors inserting MessageEmbedding do not rollback the
already-committed messages and do not hold the advisory lock through chunking
work.

In `@src/reconciler/sync_vectors.py`:
- Around line 144-160: The current selection in rows_stmt can return only a
subset of a message's chunks because _backoff_eligible and FOR UPDATE SKIP
LOCKED are applied per-row; change the logic to first compute the set of
message_ids whose entire set of pending chunks are eligible to be claimed (e.g.,
query MessageEmbedding grouped by message_id and compare counts: total pending
chunks vs pending+eligible chunks using _backoff_eligible predicate) and only
then select all MessageEmbedding rows for those fully-eligible message_ids with
.with_for_update(skip_locked=True) to claim every chunk for a message
atomically; update the code paths that reference message_ids, rows_stmt, and
_backoff_eligible accordingly so you fetch complete chunk sets per message
instead of partial subsets.

---

Nitpick comments:
In `@tests/deriver/test_vector_reconciliation.py`:
- Around line 900-950: Extend test_all_chunks_of_a_message_claimed_together to
cover a mixed-eligibility case: create a second Message with chunk_count chunks
where one MessageEmbedding has last_sync_at set to now (making it ineligible)
and the rest are "pending", then call
_get_message_embeddings_needing_sync(db_session, batch_size=1) and assert that
the returned claimed set does not include any embeddings for that second message
(i.e., the function must not return a partial message); refer to the test
function name test_all_chunks_of_a_message_claimed_together, the helper
_get_message_embeddings_needing_sync, and the MessageEmbedding.last_sync_at /
sync_state fields to locate and modify the test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f997b22b-a0b8-4a56-a30e-3bb311adcbd2

📥 Commits

Reviewing files that changed from the base of the PR and between b0f0295 and 47f9733.

📒 Files selected for processing (9)
  • src/crud/message.py
  • src/embedding_client.py
  • src/reconciler/sync_vectors.py
  • src/telemetry/prometheus/metrics.py
  • tests/conftest.py
  • tests/deriver/test_vector_reconciliation.py
  • tests/integration/test_message_embeddings.py
  • tests/llm/test_embedding_client.py
  • tests/startup/test_embedding_validator.py

Comment thread src/crud/message.py
Comment thread src/reconciler/sync_vectors.py
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/reconciler/sync_vectors.py (1)

145-161: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Step 2 can still claim only a subset of a message's chunks.

The _backoff_eligible() predicate at line 154 filters individual rows. If message M has chunks C1 (eligible) and C2 (recently attempted, still in backoff), Step 1 selects M, but Step 2 skips C2 due to the backoff filter. This leaves the message partially processed.

The intent is "all chunks together," but the current query can return incomplete sets when sibling chunks have different last_sync_at timestamps.

Potential fix: remove backoff filter from Step 2
     rows_stmt = (
         select(models.MessageEmbedding)
         .where(
             and_(
                 models.MessageEmbedding.message_id.in_(message_ids),
                 models.MessageEmbedding.sync_state == "pending",
-                _backoff_eligible(models.MessageEmbedding.last_sync_at),
             )
         )
         .order_by(models.MessageEmbedding.message_id, models.MessageEmbedding.id)
         .with_for_update(skip_locked=True)
     )

Step 1 already guarantees the message has at least one eligible chunk. Step 2 should claim all pending chunks for those messages, regardless of individual chunk backoff state, to maintain atomicity.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/reconciler/sync_vectors.py` around lines 145 - 161, The rows selection in
rows_stmt incorrectly applies the
_backoff_eligible(models.MessageEmbedding.last_sync_at) predicate per-chunk,
which can cause partial claims (e.g., C1 claimed but C2 skipped) even though
message_ids was chosen based on at least one eligible chunk; remove the
_backoff_eligible(...) condition from rows_stmt so it only filters
MessageEmbedding.message_id.in_(message_ids) and
models.MessageEmbedding.sync_state == "pending" (keeping the ordering and
with_for_update(skip_locked=True)), thereby claiming all pending chunks for the
selected message_ids while relying on the prior step that used _backoff_eligible
to pick messages.
🧹 Nitpick comments (1)
src/crud/message.py (1)

276-308: ⚡ Quick win

Previous concern is partially addressed but transaction isolation can still be improved.

The prepare_chunks() call is now purely local (no network), so the advisory lock isn't held during external calls. However, if chunk preparation or MessageEmbedding row insertion fails, both the messages and embeddings roll back together.

Since embeddings are now truly non-critical (reconciler will create them if missing), consider wrapping the embedding block in try/except to let messages commit even if embedding setup fails:

Suggested improvement
 db.add_all(message_objects)
+await db.commit()

 # If embedding is enabled, locally chunk the content and insert
 # one pending MessageEmbedding row per chunk in chunk order. The actual
 # embedding work is deferred to the reconciler
 if settings.EMBED_MESSAGES:
+    try:
         id_resource_dict = {
             ...
         }
         if id_resource_dict:
             ...
             if pending_rows:
                 db.add_all(pending_rows)
+                await db.commit()
+    except Exception:
+        logger.exception("Failed to enqueue pending message embeddings")
-
-await db.commit()

 return message_objects

This ensures message creation never fails due to embedding setup issues. Based on learnings: defer non-critical "post-message" work so it does not block the critical path of create_messages().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/crud/message.py` around lines 276 - 308, The embedding setup currently
runs inside the same transaction and can roll back message creation if it fails;
wrap the EMBED_MESSAGES block (the call to embedding_client.prepare_chunks and
the creation/db.add_all of models.MessageEmbedding pending_rows) in a try/except
so any exception is caught, logged, and swallowed, ensuring the function (e.g.,
create_messages) proceeds to await db.commit() for the messages; alternatively
move the embedding creation to after the messages commit and still guard it with
try/except so failures won't prevent the message commit. Ensure you reference
embedding_client.prepare_chunks, models.MessageEmbedding, pending_rows, and
db.add_all in the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@src/reconciler/sync_vectors.py`:
- Around line 145-161: The rows selection in rows_stmt incorrectly applies the
_backoff_eligible(models.MessageEmbedding.last_sync_at) predicate per-chunk,
which can cause partial claims (e.g., C1 claimed but C2 skipped) even though
message_ids was chosen based on at least one eligible chunk; remove the
_backoff_eligible(...) condition from rows_stmt so it only filters
MessageEmbedding.message_id.in_(message_ids) and
models.MessageEmbedding.sync_state == "pending" (keeping the ordering and
with_for_update(skip_locked=True)), thereby claiming all pending chunks for the
selected message_ids while relying on the prior step that used _backoff_eligible
to pick messages.

---

Nitpick comments:
In `@src/crud/message.py`:
- Around line 276-308: The embedding setup currently runs inside the same
transaction and can roll back message creation if it fails; wrap the
EMBED_MESSAGES block (the call to embedding_client.prepare_chunks and the
creation/db.add_all of models.MessageEmbedding pending_rows) in a try/except so
any exception is caught, logged, and swallowed, ensuring the function (e.g.,
create_messages) proceeds to await db.commit() for the messages; alternatively
move the embedding creation to after the messages commit and still guard it with
try/except so failures won't prevent the message commit. Ensure you reference
embedding_client.prepare_chunks, models.MessageEmbedding, pending_rows, and
db.add_all in the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 17f77c42-43b7-4c8d-a87e-bce14d1be189

📥 Commits

Reviewing files that changed from the base of the PR and between b5f23a1 and aa5a519.

📒 Files selected for processing (5)
  • src/crud/message.py
  • src/embedding_client.py
  • src/reconciler/sync_vectors.py
  • src/telemetry/events/llm.py
  • tests/startup/test_embedding_validator.py
✅ Files skipped from review due to trivial changes (1)
  • src/telemetry/events/llm.py

…766)

Adds embed_messages_now background task so newly created messages are
searchable within seconds instead of waiting up to the reconciler
interval. Three-phase claim/lease → embed → persist never holds a DB
session across the embedding call; the reconciler remains the fallback
for failures and stragglers.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
tests/deriver/test_embed_now.py (1)

150-235: ⚡ Quick win

Cover the external-store failure branch.

_persist_external() now has explicit fallback behavior for VectorStoreError and unexpected upsert_many() failures, but this suite only exercises successful upserts. Add one case that makes upsert_many raise and asserts the rows stay pending/unembedded so the reconciler fallback stays protected.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/deriver/test_embed_now.py` around lines 150 - 235, Add a test that
simulates external-store failures by patching get_external_vector_store to
return a mock whose upsert_many raises VectorStoreError (and optionally a
generic Exception) and then call embed_messages_now([message_id]); assert
upsert_many was awaited and that all affected MessageEmbedding rows (look up by
emb_ids returned from _create_message_with_pending_chunks) remain with
sync_state == "pending" (i.e., not marked "synced"), ensuring
_persist_external's fallback is exercised; reference functions/classes:
_persist_external, embed_messages_now, get_external_vector_store, upsert_many,
VectorStoreError, and the helper _create_message_with_pending_chunks to find the
correct spot to add the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/reconciler/embed_now.py`:
- Around line 198-205: The tracked_db("embed_now_persist") context is kept open
while calling the external vector store (via _persist_external ->
external.upsert_many), holding a DB session/transaction during a network call;
refactor so all external calls happen outside the async with: compute or call
external.upsert_many and any network/LLM/embedding operations before entering
tracked_db or after exiting (i.e., call _persist_external only after awaiting
db.commit() and closing the context), and instead pass the precomputed
message_ids/vector_by_id/store_in_postgres results into the DB-only persistence
helpers (_persist_pgvector/_persist_external) so no external I/O occurs while
tracked_db is open (also apply same change for the similar block around lines
245-275).

In `@tests/routes/test_messages.py`:
- Around line 62-71: Patch the EMBED_MESSAGES flag to True for this test so it
doesn't rely on ambient settings: when you patch
"src.routers.messages.embed_messages_now" also patch
"src.routers.messages.EMBED_MESSAGES" (or use patch.object on the
src.routers.messages module) to True (mirroring how the negative case sets it to
False), then run the same request and assertions so mock_embed_now is guaranteed
to be awaited once with [public_id].

---

Nitpick comments:
In `@tests/deriver/test_embed_now.py`:
- Around line 150-235: Add a test that simulates external-store failures by
patching get_external_vector_store to return a mock whose upsert_many raises
VectorStoreError (and optionally a generic Exception) and then call
embed_messages_now([message_id]); assert upsert_many was awaited and that all
affected MessageEmbedding rows (look up by emb_ids returned from
_create_message_with_pending_chunks) remain with sync_state == "pending" (i.e.,
not marked "synced"), ensuring _persist_external's fallback is exercised;
reference functions/classes: _persist_external, embed_messages_now,
get_external_vector_store, upsert_many, VectorStoreError, and the helper
_create_message_with_pending_chunks to find the correct spot to add the test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 49b48a83-2317-4361-ac5c-35ba94bfcc73

📥 Commits

Reviewing files that changed from the base of the PR and between aa5a519 and 7a63e4b.

📒 Files selected for processing (7)
  • src/config.py
  • src/reconciler/embed_now.py
  • src/reconciler/sync_vectors.py
  • src/routers/messages.py
  • tests/conftest.py
  • tests/deriver/test_embed_now.py
  • tests/routes/test_messages.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/conftest.py
  • src/reconciler/sync_vectors.py

Comment on lines +198 to +205
async with tracked_db("embed_now_persist") as db:
if external is None:
await _persist_pgvector(db, claimed, vector_by_id)
else:
await _persist_external(
db, message_ids, claimed, vector_by_id, store_in_postgres, external
)
await db.commit()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Close the DB session before calling the external vector store.

tracked_db("embed_now_persist") stays open through external.upsert_many(...), so this path keeps a transaction/connection checked out while waiting on the network. That undermines the short-lived transaction design here and can block other workers when the vector store is slow.

♻️ Suggested structure
 async def _persist(...):
     ...
-    async with tracked_db("embed_now_persist") as db:
-        if external is None:
-            await _persist_pgvector(db, claimed, vector_by_id)
-        else:
-            await _persist_external(
-                db, message_ids, claimed, vector_by_id, store_in_postgres, external
-            )
-        await db.commit()
+    if external is None:
+        async with tracked_db("embed_now_persist") as db:
+            await _persist_pgvector(db, claimed, vector_by_id)
+            await db.commit()
+        return
+
+    async with tracked_db("embed_now_positions") as db:
+        chunk_position = await compute_chunk_positions(db, message_ids)
+
+    synced_chunk_ids = await _upsert_external(
+        claimed, vector_by_id, chunk_position, external
+    )
+
+    async with tracked_db("embed_now_persist") as db:
+        await _mark_external_chunks_synced(
+            db, claimed, vector_by_id, synced_chunk_ids, store_in_postgres
+        )
+        await db.commit()

As per coding guidelines, "Never hold a DB session during external calls (LLM, embedding, HTTP) in Python code; compute external results first and pass them as parameters".

Also applies to: 245-275

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/reconciler/embed_now.py` around lines 198 - 205, The
tracked_db("embed_now_persist") context is kept open while calling the external
vector store (via _persist_external -> external.upsert_many), holding a DB
session/transaction during a network call; refactor so all external calls happen
outside the async with: compute or call external.upsert_many and any
network/LLM/embedding operations before entering tracked_db or after exiting
(i.e., call _persist_external only after awaiting db.commit() and closing the
context), and instead pass the precomputed
message_ids/vector_by_id/store_in_postgres results into the DB-only persistence
helpers (_persist_pgvector/_persist_external) so no external I/O occurs while
tracked_db is open (also apply same change for the similar block around lines
245-275).

Comment on lines +62 to +71
with patch(
"src.routers.messages.embed_messages_now", new=AsyncMock()
) as mock_embed_now:
response = client.post(
f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages",
json={"messages": [{"content": "hello", "peer_id": test_peer.name}]},
)
assert response.status_code == 201
public_id = response.json()[0]["id"]
mock_embed_now.assert_awaited_once_with([public_id])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Make this test force EMBED_MESSAGES=True.

This assertion currently depends on the ambient settings default. If another test or env override flips that flag, this stops verifying the intended contract. Patch the setting to True here, the same way the negative case patches it to False.

🧪 Suggested fix
-    with patch(
-        "src.routers.messages.embed_messages_now", new=AsyncMock()
-    ) as mock_embed_now:
+    with (
+        patch("src.config.settings.EMBED_MESSAGES", True),
+        patch("src.routers.messages.embed_messages_now", new=AsyncMock())
+        as mock_embed_now,
+    ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with patch(
"src.routers.messages.embed_messages_now", new=AsyncMock()
) as mock_embed_now:
response = client.post(
f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages",
json={"messages": [{"content": "hello", "peer_id": test_peer.name}]},
)
assert response.status_code == 201
public_id = response.json()[0]["id"]
mock_embed_now.assert_awaited_once_with([public_id])
with (
patch("src.config.settings.EMBED_MESSAGES", True),
patch("src.routers.messages.embed_messages_now", new=AsyncMock())
as mock_embed_now,
):
response = client.post(
f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages",
json={"messages": [{"content": "hello", "peer_id": test_peer.name}]},
)
assert response.status_code == 201
public_id = response.json()[0]["id"]
mock_embed_now.assert_awaited_once_with([public_id])
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/routes/test_messages.py` around lines 62 - 71, Patch the EMBED_MESSAGES
flag to True for this test so it doesn't rely on ambient settings: when you
patch "src.routers.messages.embed_messages_now" also patch
"src.routers.messages.EMBED_MESSAGES" (or use patch.object on the
src.routers.messages module) to True (mirroring how the negative case sets it to
False), then run the same request and assertions so mock_embed_now is guaranteed
to be awaited once with [public_id].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants