Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ async fn leaf_search_single_split(
.leaf_search_single_split_warmup_num_bytes
.observe(warmup_size.as_u64() as f64);
search_permit.update_memory_usage(warmup_size);
search_permit.free_warmup_slot();

let split_num_docs = split.num_docs;

Expand Down
66 changes: 17 additions & 49 deletions quickwit/quickwit-search/src/search_permit_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::metrics::SearchTaskMetrics;

/// Distributor of permits to perform split search operation.
///
/// Requests are served in order. Each permit initially reserves a slot for the
/// warmup (limit concurrent downloads) and a pessimistic amount of memory. Once
/// the warmup is completed, the actual memory usage is set and the warmup slot
/// is released. Once the search is completed and the permit is dropped, the
/// remaining memory is also released.
/// Requests are served in order. Each permit reserves a slot for concurrent
/// search execution and a pessimistic amount of memory. The slot is held for
/// the entire duration of the search. Once the actual memory usage is known,
/// it can be updated via `update_memory_usage()`. When the permit is dropped,
/// both the search slot and memory are released.
#[derive(Clone)]
pub struct SearchPermitProvider {
message_sender: mpsc::UnboundedSender<SearchPermitMessage>,
Expand All @@ -49,10 +49,8 @@ pub enum SearchPermitMessage {
UpdateMemory {
memory_delta: i64,
},
FreeWarmupSlot,
Drop {
memory_size: u64,
warmup_slot_freed: bool,
},
}

Expand Down Expand Up @@ -82,7 +80,7 @@ pub fn compute_initial_memory_allocation(

impl SearchPermitProvider {
pub fn new(
num_download_slots: usize,
max_num_concurrent_split_searches: usize,
memory_budget: ByteSize,
metrics: SearchTaskMetrics,
) -> Self {
Comment thread
rdettai-sk marked this conversation as resolved.
Expand All @@ -92,7 +90,7 @@ impl SearchPermitProvider {
let actor = SearchPermitActor {
msg_receiver: message_receiver,
msg_sender: message_sender.downgrade(),
num_warmup_slots_available: num_download_slots,
num_search_slots_available: max_num_concurrent_split_searches,
total_memory_budget: memory_budget.as_u64(),
permits_requests: BinaryHeap::new(),
total_memory_allocated: 0u64,
Expand Down Expand Up @@ -140,7 +138,7 @@ struct SearchPermitActor {
metrics: SearchTaskMetrics,
msg_receiver: mpsc::UnboundedReceiver<SearchPermitMessage>,
msg_sender: mpsc::WeakUnboundedSender<SearchPermitMessage>,
num_warmup_slots_available: usize,
num_search_slots_available: usize,
/// Note it is possible for memory_allocated to exceed memory_budget temporarily,
/// if and only if a split leaf search task ended up using more than `initial_allocation`.
/// When it happens, new permits will not be assigned until the memory is freed.
Expand Down Expand Up @@ -263,17 +261,8 @@ impl SearchPermitActor {
(self.total_memory_allocated as i64 + memory_delta) as u64;
self.assign_available_permits();
}
SearchPermitMessage::FreeWarmupSlot => {
self.num_warmup_slots_available += 1;
self.assign_available_permits();
}
SearchPermitMessage::Drop {
memory_size,
warmup_slot_freed,
} => {
if !warmup_slot_freed {
self.num_warmup_slots_available += 1;
}
SearchPermitMessage::Drop { memory_size } => {
self.num_search_slots_available += 1;
self.total_memory_allocated = self
.total_memory_allocated
.checked_sub(memory_size)
Expand All @@ -284,7 +273,7 @@ impl SearchPermitActor {
}

fn pop_next_request_if_serviceable(&mut self) -> Option<SingleSplitPermitRequest> {
if self.num_warmup_slots_available == 0 {
if self.num_search_slots_available == 0 {
return None;
}
let available_memory = self
Expand All @@ -307,14 +296,13 @@ impl SearchPermitActor {
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(ongoing_tasks_metric);
ongoing_gauge_guard.add(1);
self.total_memory_allocated += permit_request.permit_size;
self.num_warmup_slots_available -= 1;
self.num_search_slots_available -= 1;
permit_request
.permit_sender
.send(SearchPermit {
_ongoing_gauge_guard: ongoing_gauge_guard,
msg_sender: self.msg_sender.clone(),
memory_allocation: permit_request.permit_size,
warmup_slot_freed: false,
})
// if the requester dropped its receiver, we drop the newly
// created SearchPermit which releases the resources
Expand All @@ -333,7 +321,6 @@ pub struct SearchPermit {
_ongoing_gauge_guard: GaugeGuard<'static>,
msg_sender: mpsc::WeakUnboundedSender<SearchPermitMessage>,
memory_allocation: u64,
warmup_slot_freed: bool,
}

impl SearchPermit {
Expand All @@ -347,16 +334,6 @@ impl SearchPermit {
self.send_if_still_running(SearchPermitMessage::UpdateMemory { memory_delta });
}

/// Drop the warmup permit, allowing more downloads to be started. Only one
/// slot is attached to each permit so calling this again has no effect.
pub fn free_warmup_slot(&mut self) {
if self.warmup_slot_freed {
return;
}
self.warmup_slot_freed = true;
self.send_if_still_running(SearchPermitMessage::FreeWarmupSlot);
}

pub fn memory_allocation(&self) -> ByteSize {
ByteSize(self.memory_allocation)
}
Expand All @@ -376,7 +353,6 @@ impl Drop for SearchPermit {
fn drop(&mut self) {
self.send_if_still_running(SearchPermitMessage::Drop {
memory_size: self.memory_allocation,
warmup_slot_freed: self.warmup_slot_freed,
});
}
}
Expand Down Expand Up @@ -615,7 +591,7 @@ mod tests {
}

#[tokio::test]
async fn test_warmup_slot() {
async fn test_concurrent_search_slots() {
let permit_provider = SearchPermitProvider::new(10, ByteSize::mb(100), test_metrics());
let mut permit_futs = permit_provider
.get_permits(repeat_n(ByteSize::mb(1), 16))
Expand All @@ -627,27 +603,19 @@ mod tests {
.buffered(1)
.collect()
.await;
// the next permit is blocked by the warmup slots
// the next permit is blocked by the concurrent search slots
let next_blocked_permit_fut = remaining_permit_futs.next().unwrap();
try_get(next_blocked_permit_fut).await.err().unwrap();
// if we drop one of the permits, we can get a new one
permits.drain(0..1);
let next_permit_fut = remaining_permit_futs.next().unwrap();
permits.push(try_get(next_permit_fut).await.unwrap());
// the next permit is blocked again by the warmup slots
// the next permit is blocked again by the concurrent search slots
let next_blocked_permit_fut = remaining_permit_futs.next().unwrap();
try_get(next_blocked_permit_fut).await.err().unwrap();
// we can explicitly free the warmup slot on a permit
permits[0].free_warmup_slot();
// dropping a permit frees up a slot
permits.drain(0..1);
let next_permit_fut = remaining_permit_futs.next().unwrap();
permits.push(try_get(next_permit_fut).await.unwrap());
// dropping that same permit does not free up another slot
permits.drain(0..1);
let next_blocked_permit_fut = remaining_permit_futs.next().unwrap();
try_get(next_blocked_permit_fut).await.err().unwrap();
// but dropping a permit for which the slot wasn't explicitly free does free up a slot
permits.drain(0..1);
let next_blocked_permit_fut = remaining_permit_futs.next().unwrap();
permits.push(try_get(next_blocked_permit_fut).await.unwrap());
}
}
Loading