From a74f33eca3d2e0c96240aa9ee6f0f23c3bd0e4c5 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Sun, 10 May 2026 22:11:45 +0200 Subject: [PATCH] Change max_num_concurrent_split_searches from warmup limit to processing limit --- quickwit/quickwit-search/src/leaf.rs | 1 - .../src/search_permit_provider.rs | 66 +++++-------------- 2 files changed, 17 insertions(+), 50 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 765ec9be589..63504e474b3 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -526,7 +526,6 @@ async fn leaf_search_single_split( .leaf_search_single_split_warmup_num_bytes .observe(warmup_size.as_u64() as f64); search_permit.update_memory_usage(warmup_size); - search_permit.free_warmup_slot(); let split_num_docs = split.num_docs; diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index c009d5c1f13..502c54710a3 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -29,11 +29,11 @@ use crate::metrics::SearchTaskMetrics; /// Distributor of permits to perform split search operation. /// -/// Requests are served in order. Each permit initially reserves a slot for the -/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once -/// the warmup is completed, the actual memory usage is set and the warmup slot -/// is released. Once the search is completed and the permit is dropped, the -/// remaining memory is also released. +/// Requests are served in order. Each permit reserves a slot for concurrent +/// search execution and a pessimistic amount of memory. The slot is held for +/// the entire duration of the search. Once the actual memory usage is known, +/// it can be updated via `update_memory_usage()`. When the permit is dropped, +/// both the search slot and memory are released. #[derive(Clone)] pub struct SearchPermitProvider { message_sender: mpsc::UnboundedSender, @@ -49,10 +49,8 @@ pub enum SearchPermitMessage { UpdateMemory { memory_delta: i64, }, - FreeWarmupSlot, Drop { memory_size: u64, - warmup_slot_freed: bool, }, } @@ -82,7 +80,7 @@ pub fn compute_initial_memory_allocation( impl SearchPermitProvider { pub fn new( - num_download_slots: usize, + max_num_concurrent_split_searches: usize, memory_budget: ByteSize, metrics: SearchTaskMetrics, ) -> Self { @@ -92,7 +90,7 @@ impl SearchPermitProvider { let actor = SearchPermitActor { msg_receiver: message_receiver, msg_sender: message_sender.downgrade(), - num_warmup_slots_available: num_download_slots, + num_search_slots_available: max_num_concurrent_split_searches, total_memory_budget: memory_budget.as_u64(), permits_requests: BinaryHeap::new(), total_memory_allocated: 0u64, @@ -140,7 +138,7 @@ struct SearchPermitActor { metrics: SearchTaskMetrics, msg_receiver: mpsc::UnboundedReceiver, msg_sender: mpsc::WeakUnboundedSender, - num_warmup_slots_available: usize, + num_search_slots_available: usize, /// Note it is possible for memory_allocated to exceed memory_budget temporarily, /// if and only if a split leaf search task ended up using more than `initial_allocation`. /// When it happens, new permits will not be assigned until the memory is freed. @@ -263,17 +261,8 @@ impl SearchPermitActor { (self.total_memory_allocated as i64 + memory_delta) as u64; self.assign_available_permits(); } - SearchPermitMessage::FreeWarmupSlot => { - self.num_warmup_slots_available += 1; - self.assign_available_permits(); - } - SearchPermitMessage::Drop { - memory_size, - warmup_slot_freed, - } => { - if !warmup_slot_freed { - self.num_warmup_slots_available += 1; - } + SearchPermitMessage::Drop { memory_size } => { + self.num_search_slots_available += 1; self.total_memory_allocated = self .total_memory_allocated .checked_sub(memory_size) @@ -284,7 +273,7 @@ impl SearchPermitActor { } fn pop_next_request_if_serviceable(&mut self) -> Option { - if self.num_warmup_slots_available == 0 { + if self.num_search_slots_available == 0 { return None; } let available_memory = self @@ -307,14 +296,13 @@ impl SearchPermitActor { let mut ongoing_gauge_guard = GaugeGuard::from_gauge(ongoing_tasks_metric); ongoing_gauge_guard.add(1); self.total_memory_allocated += permit_request.permit_size; - self.num_warmup_slots_available -= 1; + self.num_search_slots_available -= 1; permit_request .permit_sender .send(SearchPermit { _ongoing_gauge_guard: ongoing_gauge_guard, msg_sender: self.msg_sender.clone(), memory_allocation: permit_request.permit_size, - warmup_slot_freed: false, }) // if the requester dropped its receiver, we drop the newly // created SearchPermit which releases the resources @@ -333,7 +321,6 @@ pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, - warmup_slot_freed: bool, } impl SearchPermit { @@ -347,16 +334,6 @@ impl SearchPermit { self.send_if_still_running(SearchPermitMessage::UpdateMemory { memory_delta }); } - /// Drop the warmup permit, allowing more downloads to be started. Only one - /// slot is attached to each permit so calling this again has no effect. - pub fn free_warmup_slot(&mut self) { - if self.warmup_slot_freed { - return; - } - self.warmup_slot_freed = true; - self.send_if_still_running(SearchPermitMessage::FreeWarmupSlot); - } - pub fn memory_allocation(&self) -> ByteSize { ByteSize(self.memory_allocation) } @@ -376,7 +353,6 @@ impl Drop for SearchPermit { fn drop(&mut self) { self.send_if_still_running(SearchPermitMessage::Drop { memory_size: self.memory_allocation, - warmup_slot_freed: self.warmup_slot_freed, }); } } @@ -615,7 +591,7 @@ mod tests { } #[tokio::test] - async fn test_warmup_slot() { + async fn test_concurrent_search_slots() { let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), test_metrics()); let mut permit_futs = permit_provider .get_permits(repeat_n(ByteSize::mb(1), 16)) @@ -627,27 +603,19 @@ mod tests { .buffered(1) .collect() .await; - // the next permit is blocked by the warmup slots + // the next permit is blocked by the concurrent search slots let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); try_get(next_blocked_permit_fut).await.err().unwrap(); // if we drop one of the permits, we can get a new one permits.drain(0..1); let next_permit_fut = remaining_permit_futs.next().unwrap(); permits.push(try_get(next_permit_fut).await.unwrap()); - // the next permit is blocked again by the warmup slots + // the next permit is blocked again by the concurrent search slots let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); try_get(next_blocked_permit_fut).await.err().unwrap(); - // we can explicitly free the warmup slot on a permit - permits[0].free_warmup_slot(); + // dropping a permit frees up a slot + permits.drain(0..1); let next_permit_fut = remaining_permit_futs.next().unwrap(); permits.push(try_get(next_permit_fut).await.unwrap()); - // dropping that same permit does not free up another slot - permits.drain(0..1); - let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.err().unwrap(); - // but dropping a permit for which the slot wasn't explicitly free does free up a slot - permits.drain(0..1); - let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - permits.push(try_get(next_blocked_permit_fut).await.unwrap()); } }