feat(deriver): add stale batch timeout to prevent indefinite queue stall#703
feat(deriver): add stale batch timeout to prevent indefinite queue stall#703maxxqf-ai wants to merge 1 commit into
Conversation
When REPRESENTATION_BATCH_MAX_TOKENS is set and FLUSH_ENABLED is false, work units that never accumulate enough tokens remain stuck in the queue indefinitely. This is especially problematic for short conversational sessions where the FLUSH size is smaller than the batch threshold. The problem: FLUSH mode (immediate processing) produces fragmented observations lacking context — information is broken into tiny pieces with high redundancy. The token-batch threshold solves this by grouping messages for richer context. However, work units that never reach the threshold are never processed at all. Solution: Add STALE_BATCH_HOURS config (default 0 = disabled). When set to a positive value, work units whose oldest queue item exceeds this age are force-processed regardless of token count. This provides a safety net that preserves batch quality for active sessions while ensuring no work unit is abandoned forever. Changes: - src/config.py: Add STALE_BATCH_HOURS field to DeriverSettings - src/deriver/queue_manager.py: Extend get_and_claim_work_units() to include stale work units when STALE_BATCH_HOURS > 0
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughThis PR adds a stale batch timeout mechanism to the deriver queue manager. A new ChangesStale Batch Timeout
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
+1, would love to see this merged. We're hitting the exact symptom on the managed instance ( Our context:
What we tried before finding this PR:
Why this PR is the right shape:
Requests, if helpful:
Happy to test against a self-hosted build with this patch applied if it helps validate before merge. |
Problem
When
REPRESENTATION_BATCH_MAX_TOKENSis configured andFLUSH_ENABLEDisfalse, work units that never accumulate enough tokens remain stuck in the queue indefinitely. This is especially problematic for short conversational sessions where individual session message tokens never reach the batch threshold.In our deployment (Hermes agent with FLUSH at ~2K tokens but batch threshold at 4K), short sessions would produce ~2K tokens per flush — never reaching the 4K threshold. This resulted in 245 queue items accumulated over 10 days with zero processing.
Why not just use FLUSH_ENABLED=true?
FLUSH mode processes every message immediately, which produces fragmented observations lacking context. The extracted information is broken into tiny pieces with high redundancy and poor quality. The token-batch threshold exists to group related messages together for richer, more coherent observations.
The trade-off: batch mode gives better quality, but work units that never reach the threshold are never processed at all.
Solution
Add a
STALE_BATCH_HOURSconfiguration option (default0= disabled) to DeriverSettings. When set to a positive value, work units whose oldest queue item exceeds this age are force-processed regardless of token count.This provides a safety net that:
Changes
src/config.py: AddSTALE_BATCH_HOURSfield toDeriverSettingswith validation (0-168 hours)src/deriver/queue_manager.py: Extendget_and_claim_work_units()to include stale work units via a subquery onMIN(QueueItem.created_at)whenSTALE_BATCH_HOURS > 0Configuration Example
Related
Possibly related to #494 (messages ingested but derived memory does not appear automatically).
Summary by CodeRabbit
Configuration
STALE_BATCH_HOURSsetting (0-168 hours) to control work unit processing thresholds.Improvements