feat: defer embedding messages#704
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughMessages now create per-chunk pending MessageEmbedding rows (via embedding_client.prepare_chunks) instead of inline embedding; EmbeddingClient gains token-aware batching and prepare_chunks(); a reconciler and a new embed_messages_now fast-path persist embeddings to Postgres or external stores; tests and fixtures updated. ChangesAsynchronous Embedding Pipeline
Sequence DiagramsequenceDiagram
participant CreateMsg as create_messages()
participant DB as Database
participant EmbClient as EmbeddingClient
participant Reconciler as run_vector_reconciliation_cycle
participant ExtStore as ExternalVectorStore
CreateMsg->>DB: INSERT Message rows + COMMIT
CreateMsg->>EmbClient: prepare_chunks(message_id->text)
EmbClient-->>CreateMsg: dict[msg_id, chunk_list]
CreateMsg->>DB: INSERT MessageEmbedding rows (pending, embedding=NULL)
Reconciler->>DB: select distinct message_ids -> claim chunks FOR UPDATE SKIP LOCKED
Reconciler->>EmbClient: batch_embed(chunks) with MESSAGE_CREATE
alt external store configured
Reconciler->>ExtStore: upsert_many(vector_records)
ExtStore-->>DB: per-row UPDATE sync_state/embedding
else pgvector-only
Reconciler->>DB: per-row UPDATE embedding + sync_state
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tests/deriver/test_vector_reconciliation.py (1)
900-950: ⚡ Quick winAdd a mixed-eligibility regression here too.
This only proves the all-eligible case. A sibling chunk with a recent
last_sync_atwould currently let_get_message_embeddings_needing_sync()return a partial message, so a one-ineligible-chunk case would lock the intended invariant down.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/deriver/test_vector_reconciliation.py` around lines 900 - 950, Extend test_all_chunks_of_a_message_claimed_together to cover a mixed-eligibility case: create a second Message with chunk_count chunks where one MessageEmbedding has last_sync_at set to now (making it ineligible) and the rest are "pending", then call _get_message_embeddings_needing_sync(db_session, batch_size=1) and assert that the returned claimed set does not include any embeddings for that second message (i.e., the function must not return a partial message); refer to the test function name test_all_chunks_of_a_message_claimed_together, the helper _get_message_embeddings_needing_sync, and the MessageEmbedding.last_sync_at / sync_state fields to locate and modify the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/crud/message.py`:
- Around line 276-306: The create_messages flow currently delays persisting
Message rows until after preparing and enqueueing embeddings, so failures in
embedding_client.prepare_chunks or models.MessageEmbedding inserts can roll back
the messages; fix by committing the new Message rows immediately (call await
db.commit() right after messages are added in create_messages) before checking
settings.EMBED_MESSAGES, then perform embedding_client.prepare_chunks and
db.add_all(pending MessageEmbedding) in a separate transaction/session (or wrap
in try/except) so errors inserting MessageEmbedding do not rollback the
already-committed messages and do not hold the advisory lock through chunking
work.
In `@src/reconciler/sync_vectors.py`:
- Around line 144-160: The current selection in rows_stmt can return only a
subset of a message's chunks because _backoff_eligible and FOR UPDATE SKIP
LOCKED are applied per-row; change the logic to first compute the set of
message_ids whose entire set of pending chunks are eligible to be claimed (e.g.,
query MessageEmbedding grouped by message_id and compare counts: total pending
chunks vs pending+eligible chunks using _backoff_eligible predicate) and only
then select all MessageEmbedding rows for those fully-eligible message_ids with
.with_for_update(skip_locked=True) to claim every chunk for a message
atomically; update the code paths that reference message_ids, rows_stmt, and
_backoff_eligible accordingly so you fetch complete chunk sets per message
instead of partial subsets.
---
Nitpick comments:
In `@tests/deriver/test_vector_reconciliation.py`:
- Around line 900-950: Extend test_all_chunks_of_a_message_claimed_together to
cover a mixed-eligibility case: create a second Message with chunk_count chunks
where one MessageEmbedding has last_sync_at set to now (making it ineligible)
and the rest are "pending", then call
_get_message_embeddings_needing_sync(db_session, batch_size=1) and assert that
the returned claimed set does not include any embeddings for that second message
(i.e., the function must not return a partial message); refer to the test
function name test_all_chunks_of_a_message_claimed_together, the helper
_get_message_embeddings_needing_sync, and the MessageEmbedding.last_sync_at /
sync_state fields to locate and modify the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f997b22b-a0b8-4a56-a30e-3bb311adcbd2
📒 Files selected for processing (9)
src/crud/message.pysrc/embedding_client.pysrc/reconciler/sync_vectors.pysrc/telemetry/prometheus/metrics.pytests/conftest.pytests/deriver/test_vector_reconciliation.pytests/integration/test_message_embeddings.pytests/llm/test_embedding_client.pytests/startup/test_embedding_validator.py
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/reconciler/sync_vectors.py (1)
145-161:⚠️ Potential issue | 🟠 Major | ⚡ Quick winStep 2 can still claim only a subset of a message's chunks.
The
_backoff_eligible()predicate at line 154 filters individual rows. If message M has chunks C1 (eligible) and C2 (recently attempted, still in backoff), Step 1 selects M, but Step 2 skips C2 due to the backoff filter. This leaves the message partially processed.The intent is "all chunks together," but the current query can return incomplete sets when sibling chunks have different
last_sync_attimestamps.Potential fix: remove backoff filter from Step 2
rows_stmt = ( select(models.MessageEmbedding) .where( and_( models.MessageEmbedding.message_id.in_(message_ids), models.MessageEmbedding.sync_state == "pending", - _backoff_eligible(models.MessageEmbedding.last_sync_at), ) ) .order_by(models.MessageEmbedding.message_id, models.MessageEmbedding.id) .with_for_update(skip_locked=True) )Step 1 already guarantees the message has at least one eligible chunk. Step 2 should claim all pending chunks for those messages, regardless of individual chunk backoff state, to maintain atomicity.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/reconciler/sync_vectors.py` around lines 145 - 161, The rows selection in rows_stmt incorrectly applies the _backoff_eligible(models.MessageEmbedding.last_sync_at) predicate per-chunk, which can cause partial claims (e.g., C1 claimed but C2 skipped) even though message_ids was chosen based on at least one eligible chunk; remove the _backoff_eligible(...) condition from rows_stmt so it only filters MessageEmbedding.message_id.in_(message_ids) and models.MessageEmbedding.sync_state == "pending" (keeping the ordering and with_for_update(skip_locked=True)), thereby claiming all pending chunks for the selected message_ids while relying on the prior step that used _backoff_eligible to pick messages.
🧹 Nitpick comments (1)
src/crud/message.py (1)
276-308: ⚡ Quick winPrevious concern is partially addressed but transaction isolation can still be improved.
The
prepare_chunks()call is now purely local (no network), so the advisory lock isn't held during external calls. However, if chunk preparation orMessageEmbeddingrow insertion fails, both the messages and embeddings roll back together.Since embeddings are now truly non-critical (reconciler will create them if missing), consider wrapping the embedding block in try/except to let messages commit even if embedding setup fails:
Suggested improvement
db.add_all(message_objects) +await db.commit() # If embedding is enabled, locally chunk the content and insert # one pending MessageEmbedding row per chunk in chunk order. The actual # embedding work is deferred to the reconciler if settings.EMBED_MESSAGES: + try: id_resource_dict = { ... } if id_resource_dict: ... if pending_rows: db.add_all(pending_rows) + await db.commit() + except Exception: + logger.exception("Failed to enqueue pending message embeddings") - -await db.commit() return message_objectsThis ensures message creation never fails due to embedding setup issues. Based on learnings: defer non-critical "post-message" work so it does not block the critical path of
create_messages().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/crud/message.py` around lines 276 - 308, The embedding setup currently runs inside the same transaction and can roll back message creation if it fails; wrap the EMBED_MESSAGES block (the call to embedding_client.prepare_chunks and the creation/db.add_all of models.MessageEmbedding pending_rows) in a try/except so any exception is caught, logged, and swallowed, ensuring the function (e.g., create_messages) proceeds to await db.commit() for the messages; alternatively move the embedding creation to after the messages commit and still guard it with try/except so failures won't prevent the message commit. Ensure you reference embedding_client.prepare_chunks, models.MessageEmbedding, pending_rows, and db.add_all in the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@src/reconciler/sync_vectors.py`:
- Around line 145-161: The rows selection in rows_stmt incorrectly applies the
_backoff_eligible(models.MessageEmbedding.last_sync_at) predicate per-chunk,
which can cause partial claims (e.g., C1 claimed but C2 skipped) even though
message_ids was chosen based on at least one eligible chunk; remove the
_backoff_eligible(...) condition from rows_stmt so it only filters
MessageEmbedding.message_id.in_(message_ids) and
models.MessageEmbedding.sync_state == "pending" (keeping the ordering and
with_for_update(skip_locked=True)), thereby claiming all pending chunks for the
selected message_ids while relying on the prior step that used _backoff_eligible
to pick messages.
---
Nitpick comments:
In `@src/crud/message.py`:
- Around line 276-308: The embedding setup currently runs inside the same
transaction and can roll back message creation if it fails; wrap the
EMBED_MESSAGES block (the call to embedding_client.prepare_chunks and the
creation/db.add_all of models.MessageEmbedding pending_rows) in a try/except so
any exception is caught, logged, and swallowed, ensuring the function (e.g.,
create_messages) proceeds to await db.commit() for the messages; alternatively
move the embedding creation to after the messages commit and still guard it with
try/except so failures won't prevent the message commit. Ensure you reference
embedding_client.prepare_chunks, models.MessageEmbedding, pending_rows, and
db.add_all in the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 17f77c42-43b7-4c8d-a87e-bce14d1be189
📒 Files selected for processing (5)
src/crud/message.pysrc/embedding_client.pysrc/reconciler/sync_vectors.pysrc/telemetry/events/llm.pytests/startup/test_embedding_validator.py
✅ Files skipped from review due to trivial changes (1)
- src/telemetry/events/llm.py
…766) Adds embed_messages_now background task so newly created messages are searchable within seconds instead of waiting up to the reconciler interval. Three-phase claim/lease → embed → persist never holds a DB session across the embedding call; the reconciler remains the fallback for failures and stragglers.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tests/deriver/test_embed_now.py (1)
150-235: ⚡ Quick winCover the external-store failure branch.
_persist_external()now has explicit fallback behavior forVectorStoreErrorand unexpectedupsert_many()failures, but this suite only exercises successful upserts. Add one case that makesupsert_manyraise and asserts the rows staypending/unembedded so the reconciler fallback stays protected.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/deriver/test_embed_now.py` around lines 150 - 235, Add a test that simulates external-store failures by patching get_external_vector_store to return a mock whose upsert_many raises VectorStoreError (and optionally a generic Exception) and then call embed_messages_now([message_id]); assert upsert_many was awaited and that all affected MessageEmbedding rows (look up by emb_ids returned from _create_message_with_pending_chunks) remain with sync_state == "pending" (i.e., not marked "synced"), ensuring _persist_external's fallback is exercised; reference functions/classes: _persist_external, embed_messages_now, get_external_vector_store, upsert_many, VectorStoreError, and the helper _create_message_with_pending_chunks to find the correct spot to add the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/reconciler/embed_now.py`:
- Around line 198-205: The tracked_db("embed_now_persist") context is kept open
while calling the external vector store (via _persist_external ->
external.upsert_many), holding a DB session/transaction during a network call;
refactor so all external calls happen outside the async with: compute or call
external.upsert_many and any network/LLM/embedding operations before entering
tracked_db or after exiting (i.e., call _persist_external only after awaiting
db.commit() and closing the context), and instead pass the precomputed
message_ids/vector_by_id/store_in_postgres results into the DB-only persistence
helpers (_persist_pgvector/_persist_external) so no external I/O occurs while
tracked_db is open (also apply same change for the similar block around lines
245-275).
In `@tests/routes/test_messages.py`:
- Around line 62-71: Patch the EMBED_MESSAGES flag to True for this test so it
doesn't rely on ambient settings: when you patch
"src.routers.messages.embed_messages_now" also patch
"src.routers.messages.EMBED_MESSAGES" (or use patch.object on the
src.routers.messages module) to True (mirroring how the negative case sets it to
False), then run the same request and assertions so mock_embed_now is guaranteed
to be awaited once with [public_id].
---
Nitpick comments:
In `@tests/deriver/test_embed_now.py`:
- Around line 150-235: Add a test that simulates external-store failures by
patching get_external_vector_store to return a mock whose upsert_many raises
VectorStoreError (and optionally a generic Exception) and then call
embed_messages_now([message_id]); assert upsert_many was awaited and that all
affected MessageEmbedding rows (look up by emb_ids returned from
_create_message_with_pending_chunks) remain with sync_state == "pending" (i.e.,
not marked "synced"), ensuring _persist_external's fallback is exercised;
reference functions/classes: _persist_external, embed_messages_now,
get_external_vector_store, upsert_many, VectorStoreError, and the helper
_create_message_with_pending_chunks to find the correct spot to add the test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 49b48a83-2317-4361-ac5c-35ba94bfcc73
📒 Files selected for processing (7)
src/config.pysrc/reconciler/embed_now.pysrc/reconciler/sync_vectors.pysrc/routers/messages.pytests/conftest.pytests/deriver/test_embed_now.pytests/routes/test_messages.py
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/conftest.py
- src/reconciler/sync_vectors.py
| async with tracked_db("embed_now_persist") as db: | ||
| if external is None: | ||
| await _persist_pgvector(db, claimed, vector_by_id) | ||
| else: | ||
| await _persist_external( | ||
| db, message_ids, claimed, vector_by_id, store_in_postgres, external | ||
| ) | ||
| await db.commit() |
There was a problem hiding this comment.
Close the DB session before calling the external vector store.
tracked_db("embed_now_persist") stays open through external.upsert_many(...), so this path keeps a transaction/connection checked out while waiting on the network. That undermines the short-lived transaction design here and can block other workers when the vector store is slow.
♻️ Suggested structure
async def _persist(...):
...
- async with tracked_db("embed_now_persist") as db:
- if external is None:
- await _persist_pgvector(db, claimed, vector_by_id)
- else:
- await _persist_external(
- db, message_ids, claimed, vector_by_id, store_in_postgres, external
- )
- await db.commit()
+ if external is None:
+ async with tracked_db("embed_now_persist") as db:
+ await _persist_pgvector(db, claimed, vector_by_id)
+ await db.commit()
+ return
+
+ async with tracked_db("embed_now_positions") as db:
+ chunk_position = await compute_chunk_positions(db, message_ids)
+
+ synced_chunk_ids = await _upsert_external(
+ claimed, vector_by_id, chunk_position, external
+ )
+
+ async with tracked_db("embed_now_persist") as db:
+ await _mark_external_chunks_synced(
+ db, claimed, vector_by_id, synced_chunk_ids, store_in_postgres
+ )
+ await db.commit()As per coding guidelines, "Never hold a DB session during external calls (LLM, embedding, HTTP) in Python code; compute external results first and pass them as parameters".
Also applies to: 245-275
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/reconciler/embed_now.py` around lines 198 - 205, The
tracked_db("embed_now_persist") context is kept open while calling the external
vector store (via _persist_external -> external.upsert_many), holding a DB
session/transaction during a network call; refactor so all external calls happen
outside the async with: compute or call external.upsert_many and any
network/LLM/embedding operations before entering tracked_db or after exiting
(i.e., call _persist_external only after awaiting db.commit() and closing the
context), and instead pass the precomputed
message_ids/vector_by_id/store_in_postgres results into the DB-only persistence
helpers (_persist_pgvector/_persist_external) so no external I/O occurs while
tracked_db is open (also apply same change for the similar block around lines
245-275).
| with patch( | ||
| "src.routers.messages.embed_messages_now", new=AsyncMock() | ||
| ) as mock_embed_now: | ||
| response = client.post( | ||
| f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages", | ||
| json={"messages": [{"content": "hello", "peer_id": test_peer.name}]}, | ||
| ) | ||
| assert response.status_code == 201 | ||
| public_id = response.json()[0]["id"] | ||
| mock_embed_now.assert_awaited_once_with([public_id]) |
There was a problem hiding this comment.
Make this test force EMBED_MESSAGES=True.
This assertion currently depends on the ambient settings default. If another test or env override flips that flag, this stops verifying the intended contract. Patch the setting to True here, the same way the negative case patches it to False.
🧪 Suggested fix
- with patch(
- "src.routers.messages.embed_messages_now", new=AsyncMock()
- ) as mock_embed_now:
+ with (
+ patch("src.config.settings.EMBED_MESSAGES", True),
+ patch("src.routers.messages.embed_messages_now", new=AsyncMock())
+ as mock_embed_now,
+ ):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with patch( | |
| "src.routers.messages.embed_messages_now", new=AsyncMock() | |
| ) as mock_embed_now: | |
| response = client.post( | |
| f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages", | |
| json={"messages": [{"content": "hello", "peer_id": test_peer.name}]}, | |
| ) | |
| assert response.status_code == 201 | |
| public_id = response.json()[0]["id"] | |
| mock_embed_now.assert_awaited_once_with([public_id]) | |
| with ( | |
| patch("src.config.settings.EMBED_MESSAGES", True), | |
| patch("src.routers.messages.embed_messages_now", new=AsyncMock()) | |
| as mock_embed_now, | |
| ): | |
| response = client.post( | |
| f"/v3/workspaces/{test_workspace.name}/sessions/{test_session.name}/messages", | |
| json={"messages": [{"content": "hello", "peer_id": test_peer.name}]}, | |
| ) | |
| assert response.status_code == 201 | |
| public_id = response.json()[0]["id"] | |
| mock_embed_now.assert_awaited_once_with([public_id]) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/routes/test_messages.py` around lines 62 - 71, Patch the EMBED_MESSAGES
flag to True for this test so it doesn't rely on ambient settings: when you
patch "src.routers.messages.embed_messages_now" also patch
"src.routers.messages.EMBED_MESSAGES" (or use patch.object on the
src.routers.messages module) to True (mirroring how the negative case sets it to
False), then run the same request and assertions so mock_embed_now is guaranteed
to be awaited once with [public_id].
Summary by CodeRabbit
New Features
Improvements
Tests