diff --git a/crates/segcache/src/ttl_buckets/error.rs b/crates/segcache/src/ttl_buckets/error.rs index 9f21e7b..4fdd5a6 100644 --- a/crates/segcache/src/ttl_buckets/error.rs +++ b/crates/segcache/src/ttl_buckets/error.rs @@ -1,13 +1,11 @@ -// Copyright 2021 Twitter, Inc. -// Copyright 2023 Pelikan Cache contributors -// Licensed under the MIT and Apache-2.0 licenses +//! Error types for TTL bucket operations. use thiserror::Error; #[derive(Error, Debug)] pub enum TtlBucketsError { - #[error("item is oversized ({size:?} bytes)")] + #[error("item oversized ({size} bytes)")] ItemOversized { size: usize }, - #[error("ttl bucket expansion failed, no free segments")] + #[error("no free segments available for TTL bucket expansion")] NoFreeSegments, } diff --git a/crates/segcache/src/ttl_buckets/mod.rs b/crates/segcache/src/ttl_buckets/mod.rs index 9ec70dc..5c96213 100644 --- a/crates/segcache/src/ttl_buckets/mod.rs +++ b/crates/segcache/src/ttl_buckets/mod.rs @@ -1,16 +1,8 @@ -// Copyright 2021 Twitter, Inc. -// Copyright 2023 Pelikan Cache contributors -// Licensed under the MIT and Apache-2.0 licenses - -//! TTL buckets are used to group items by TTL to enable eager expiration. -//! -//! The total collection of [`TtlBuckets`] is a contiguous allocation of -//! [`TtlBucket`]s which cover the full range of TTLs. -//! -//! Each [`TtlBucket`] contains a segment chain holding items with a similar -//! TTL. See the blog post for more details on the segcache design: -//! +//! TTL buckets group segments by expiration time for eager expiration. //! +//! Each [`TtlBucket`] contains a doubly-linked chain of segments whose +//! items share a similar TTL. The [`TtlBuckets`] collection maps the +//! full TTL range across 1024 buckets with logarithmic widths. mod error; mod ttl_bucket; diff --git a/crates/segcache/src/ttl_buckets/tests.rs b/crates/segcache/src/ttl_buckets/tests.rs index 6f8b259..da3f95f 100644 --- a/crates/segcache/src/ttl_buckets/tests.rs +++ b/crates/segcache/src/ttl_buckets/tests.rs @@ -1,6 +1,4 @@ -// Copyright 2021 Twitter, Inc. -// Copyright 2023 Pelikan Cache contributors -// Licensed under the MIT and Apache-2.0 licenses +//! Tests for TTL bucket index mapping. use crate::*; @@ -8,18 +6,18 @@ use crate::*; fn bucket_index() { let ttl_buckets = TtlBuckets::new(); - // Zero TTL and max duration both go into the same TtlBucket + // Zero TTL and max duration both map to the last bucket. assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(0)), 1023); assert_eq!( ttl_buckets.get_bucket_index(Duration::from_secs(u32::MAX)), 1023 ); - // first bucket is only 7s wide because 0 is no ttl + // First bucket covers TTLs 1–7s (bucket 0). assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(1)), 0); assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(7)), 0); - // buckets from 8s - 2048s (0..34 minutes) are all 8s wide + // Tier 1: 8s–2048s, buckets 1–255, each 8s wide. for bucket in 1..256 { let start = Duration::from_secs(8 * bucket); let end = Duration::from_secs(8 * bucket + 7); @@ -35,7 +33,7 @@ fn bucket_index() { ); } - // buckets from 2048s - 32_768s (34 minutes .. 9 hours) are all 128s wide (2 minutes) + // Tier 2: 2048s–32768s, buckets 256–511, each 128s wide. for bucket in 16..256 { let start = Duration::from_secs(128 * bucket); let end = Duration::from_secs(128 * bucket + 127); @@ -51,7 +49,7 @@ fn bucket_index() { ); } - // buckets from 32_768s - 524_288s (9 hours .. 6 days) are all 2048s wide (34 minutes) + // Tier 3: 32768s–524288s, buckets 512–767, each 2048s wide. for bucket in 16..256 { let start = Duration::from_secs(2048 * bucket); let end = Duration::from_secs(2048 * bucket + 2047); @@ -67,7 +65,7 @@ fn bucket_index() { ); } - // buckets from 524_288s - 8_388_608s (6 days .. 97 days) are all 32_768s wide (9 hours) + // Tier 4: 524288s–8388608s, buckets 768–1023, each 32768s wide. for bucket in 16..256 { let start = Duration::from_secs(32_768 * bucket); let end = Duration::from_secs(32_768 * bucket + 32_767); @@ -83,7 +81,7 @@ fn bucket_index() { ); } - // TTLs longer than 97 days are the max TTL + // Beyond ~97 days maps to the max bucket. assert_eq!( ttl_buckets.get_bucket_index(Duration::from_secs(8_388_608)) as u32, 1023 diff --git a/crates/segcache/src/ttl_buckets/ttl_bucket.rs b/crates/segcache/src/ttl_buckets/ttl_bucket.rs index b6b801a..5a0bd59 100644 --- a/crates/segcache/src/ttl_buckets/ttl_bucket.rs +++ b/crates/segcache/src/ttl_buckets/ttl_bucket.rs @@ -1,11 +1,9 @@ -// Copyright 2021 Twitter, Inc. -// Copyright 2023 Pelikan Cache contributors -// Licensed under the MIT and Apache-2.0 licenses - -//! TTL bucket containing a segment chain which stores items with a similar TTL -//! in an ordered fashion. +//! A single TTL bucket containing a segment chain. +//! +//! Items with similar TTLs are stored in segments linked together in a +//! doubly-linked chain. The head segment is always the oldest, enabling +//! O(1) expiration by checking only the head. //! -//! TTL Bucket: //! ```text //! ┌──────────────┬──────────────┬─────────────┬──────────────┐ //! │ HEAD SEG │ TAIL SEG │ TTL │ NSEG │ @@ -29,10 +27,9 @@ use crate::*; use core::num::NonZeroU32; -/// Each ttl bucket contains a segment chain to store items with a similar TTL -/// in an ordered fashion. The first segment to expire will be the head of the -/// segment chain. This allows us to efficiently scan across the [`TtlBuckets`] -/// and expire segments in an eager fashion. +/// A TTL bucket holding a doubly-linked segment chain. +/// +/// Padded to exactly 64 bytes (one cache line). pub struct TtlBucket { head: Option, tail: Option, @@ -43,7 +40,7 @@ pub struct TtlBucket { } impl TtlBucket { - /// Create a new `TtlBucket` which will hold items with the provided TTL. + /// Create an empty bucket for the given TTL. pub(super) fn new(ttl: i32) -> Self { Self { head: None, @@ -55,29 +52,30 @@ impl TtlBucket { } } - /// Returns the segment ID of the head of the `TtlBucket`. + /// Head of the segment chain (oldest segment). pub fn head(&self) -> Option { self.head } - /// Set the segment ID of the head of the `TtlBucket`. + /// Set the head segment. pub fn set_head(&mut self, id: Option) { self.head = id; } - /// Returns the segment ID of the next segment to merge within the - /// `TtlBucket`. + /// Next segment to merge (for merge eviction policy). pub fn next_to_merge(&self) -> Option { self.next_to_merge } - /// Set the next segment to be merged within the `TtlBucket`. + /// Set the next merge target. pub fn set_next_to_merge(&mut self, next: Option) { self.next_to_merge = next; } - /// Expire segments from this TtlBucket, returns the number of segments - /// expired. + /// Expire segments whose TTL has elapsed. + /// + /// Walks the chain from head, clearing and freeing segments whose + /// `create_at + ttl <= now`. Returns the number of segments expired. pub(super) fn expire( &mut self, hashtable: &MultiChoiceHashtable, @@ -88,37 +86,34 @@ impl TtlBucket { } let mut expired = 0; - let ts = Instant::now(); + let now = Instant::now(); loop { - let seg_id = self.head; - if let Some(seg_id) = seg_id { - let mut segment = segments.get_mut(seg_id).unwrap(); - if segment.create_at() + segment.ttl() <= ts { - if let Some(next) = segment.next_seg() { - self.head = Some(next); - } else { - self.head = None; - self.tail = None; - } - segment.clear(hashtable, true); - segments.push_free(seg_id); + let seg_id = match self.head { + Some(id) => id, + None => return expired, + }; + + let mut segment = segments.get_mut(seg_id).unwrap(); + if segment.create_at() + segment.ttl() <= now { + self.head = segment.next_seg(); + if self.head.is_none() { + self.tail = None; + } + segment.clear(hashtable, true); + segments.push_free(seg_id); - #[cfg(feature = "metrics")] - SEGMENT_EXPIRE.increment(); + #[cfg(feature = "metrics")] + SEGMENT_EXPIRE.increment(); - expired += 1; - } else { - return expired; - } + expired += 1; } else { return expired; } } } - /// Clear segments from this TtlBucket, returns the number of segments - /// expired. + /// Clear all segments in this bucket. Returns the count cleared. pub(super) fn clear( &mut self, hashtable: &MultiChoiceHashtable, @@ -131,74 +126,70 @@ impl TtlBucket { let mut cleared = 0; loop { - let seg_id = self.head; - if let Some(seg_id) = seg_id { - let mut segment = segments.get_mut(seg_id).unwrap(); - if let Some(next) = segment.next_seg() { - self.head = Some(next); - } else { - self.head = None; - self.tail = None; - } - segment.clear(hashtable, true); - segments.push_free(seg_id); - - #[cfg(feature = "metrics")] - SEGMENT_CLEAR.increment(); + let seg_id = match self.head { + Some(id) => id, + None => return cleared, + }; - cleared += 1; - } else { - return cleared; + let mut segment = segments.get_mut(seg_id).unwrap(); + self.head = segment.next_seg(); + if self.head.is_none() { + self.tail = None; } + segment.clear(hashtable, true); + segments.push_free(seg_id); + + #[cfg(feature = "metrics")] + SEGMENT_CLEAR.increment(); + + cleared += 1; } } - /// Attempts to expand the `TtlBucket` by allocating a segment from the free - /// queue. If there are no segments currently free, this function will - /// return and error. It is up to the caller to handle the error and retry. + /// Allocate a new segment and link it as the tail of this bucket. fn try_expand(&mut self, segments: &mut Segments) -> Result<(), TtlBucketsError> { - if let Some(id) = segments.pop_free() { - { - if let Some(tail_id) = self.tail { - let tail = segments.get_mut(tail_id).unwrap(); - tail.set_next_seg(Some(id)); - } - } + let id = segments.pop_free().ok_or(TtlBucketsError::NoFreeSegments)?; - let segment = segments.get_mut(id).unwrap(); - segment.set_prev_seg(self.tail); - segment.set_next_seg(None); - segment.set_ttl(Duration::from_secs(self.ttl as u32)); - if self.head.is_none() { - debug_assert!(self.tail.is_none()); - self.head = Some(id); - } - self.tail = Some(id); - self.nseg += 1; - debug_assert!(!segment.evictable(), "segment should not be evictable"); - segment.set_evictable(true); - segment.set_accessible(true); - Ok(()) - } else { - Err(TtlBucketsError::NoFreeSegments) + // Link the new segment after the current tail. + if let Some(tail_id) = self.tail { + let tail = segments.get_mut(tail_id).unwrap(); + tail.set_next_seg(Some(id)); } + + let segment = segments.get_mut(id).unwrap(); + segment.set_prev_seg(self.tail); + segment.set_next_seg(None); + segment.set_ttl(Duration::from_secs(self.ttl as u32)); + + if self.head.is_none() { + debug_assert!(self.tail.is_none()); + self.head = Some(id); + } + self.tail = Some(id); + self.nseg += 1; + + debug_assert!( + !segment.evictable(), + "fresh segment should not be evictable" + ); + segment.set_evictable(true); + segment.set_accessible(true); + Ok(()) } - /// Reserve space in this `TtlBucket` for an item with the specified size in - /// bytes. This function will return an error if the item is oversized, or - /// if there is no space in the `TtlBucket` for the item and the `TtlBucket` - /// could not be expanded with a segment from the free queue. + /// Reserve space for an item in this bucket's tail segment. + /// + /// Expands the bucket with a new segment if the current tail is full + /// or inaccessible. Returns a `ReservedItem` pointing to the allocated + /// space, or an error if the item is oversized or no segments are free. pub(crate) fn reserve( &mut self, size: usize, segments: &mut Segments, ) -> Result { - trace!("reserving: {} bytes for ttl: {}", size, self.ttl); - let seg_size = segments.segment_size() as usize; if size > seg_size { - debug!("item is oversized"); return Err(TtlBucketsError::ItemOversized { size }); } @@ -209,10 +200,8 @@ impl TtlBucket { continue; } let offset = segment.write_offset() as usize; - trace!("offset: {offset}"); if offset + size <= seg_size { - let size = size as i32; - let item = segment.alloc_item(size); + let item = segment.alloc_item(size as i32); return Ok(ReservedItem::new(item, segment.id(), offset)); } } diff --git a/crates/segcache/src/ttl_buckets/ttl_buckets.rs b/crates/segcache/src/ttl_buckets/ttl_buckets.rs index 5f47d22..c613fb2 100644 --- a/crates/segcache/src/ttl_buckets/ttl_buckets.rs +++ b/crates/segcache/src/ttl_buckets/ttl_buckets.rs @@ -1,126 +1,98 @@ -// Copyright 2021 Twitter, Inc. -// Copyright 2023 Pelikan Cache contributors -// Licensed under the MIT and Apache-2.0 licenses - -//! A collection of [`TtlBucket`]s which covers the full range of TTLs. +//! Collection of TTL buckets covering the full TTL range. +//! +//! 1024 buckets organized in 4 logarithmic tiers: //! -//! We use a total of 1024 buckets to represent the full range of TTLs. We -//! divide the buckets into 4 ranges: -//! * 1-2048s (1 second - ~34 minutes) are stored in buckets which are 8s wide. -//! * 2048-32_768s (~34 minutes - ~9 hours) are stored in buckets which are 128s -//! (~2 minutes) wide. -//! * 32_768-524_288s (~9 hours - ~6 days) are stored in buckets which are 2048s -//! (~34 minutes) wide. -//! * 524_288-8_388_608s (~6 days - ~97 days) are stored in buckets which are -//! 32_768s (~9 hours) wide. -//! * TTLs beyond 8_388_608s (~97 days) and TTLs of 0 are all treated as the max -//! TTL. +//! | Tier | TTL range | Bucket width | Buckets | +//! |------|--------------------|--------------|---------| +//! | 1 | 1s – 2048s | 8s | 256 | +//! | 2 | 2048s – 32,768s | 128s | 256 | +//! | 3 | 32,768s – 524,288s | 2,048s | 256 | +//! | 4 | 524,288s – 8.4Ms | 32,768s | 256 | //! -//! See the -//! [Segcache paper](https://www.usenix.org/system/files/nsdi21-yang.pdf) for -//! more detail. +//! TTL of 0 (no expiry) and TTLs beyond ~97 days map to the last bucket. use crate::*; -const N_BUCKET_PER_STEP_N_BIT: usize = 8; -const N_BUCKET_PER_STEP: usize = 1 << N_BUCKET_PER_STEP_N_BIT; - -const TTL_BUCKET_INTERVAL_N_BIT_1: usize = 3; -const TTL_BUCKET_INTERVAL_N_BIT_2: usize = 7; -const TTL_BUCKET_INTERVAL_N_BIT_3: usize = 11; -const TTL_BUCKET_INTERVAL_N_BIT_4: usize = 15; +const BUCKETS_PER_TIER: usize = 256; +const TIER_COUNT: usize = 4; +const TOTAL_BUCKETS: usize = BUCKETS_PER_TIER * TIER_COUNT; -const TTL_BUCKET_INTERVAL_1: usize = 1 << TTL_BUCKET_INTERVAL_N_BIT_1; -const TTL_BUCKET_INTERVAL_2: usize = 1 << TTL_BUCKET_INTERVAL_N_BIT_2; -const TTL_BUCKET_INTERVAL_3: usize = 1 << TTL_BUCKET_INTERVAL_N_BIT_3; -const TTL_BUCKET_INTERVAL_4: usize = 1 << TTL_BUCKET_INTERVAL_N_BIT_4; +// Tier widths as bit shifts (each tier is 4x wider than the previous). +const TIER_1_SHIFT: usize = 3; // 8s +const TIER_2_SHIFT: usize = 7; // 128s +const TIER_3_SHIFT: usize = 11; // 2048s +const TIER_4_SHIFT: usize = 15; // 32768s -const TTL_BOUNDARY_1: i32 = 1 << (TTL_BUCKET_INTERVAL_N_BIT_1 + N_BUCKET_PER_STEP_N_BIT); -const TTL_BOUNDARY_2: i32 = 1 << (TTL_BUCKET_INTERVAL_N_BIT_2 + N_BUCKET_PER_STEP_N_BIT); -const TTL_BOUNDARY_3: i32 = 1 << (TTL_BUCKET_INTERVAL_N_BIT_3 + N_BUCKET_PER_STEP_N_BIT); - -const MAX_N_TTL_BUCKET: usize = N_BUCKET_PER_STEP * 4; -const MAX_TTL_BUCKET_IDX: usize = MAX_N_TTL_BUCKET - 1; +// Tier boundaries: the max TTL (exclusive) that fits in each tier. +const TIER_1_MAX: i32 = 1 << (TIER_1_SHIFT + 8); // 2,048 +const TIER_2_MAX: i32 = 1 << (TIER_2_SHIFT + 8); // 32,768 +const TIER_3_MAX: i32 = 1 << (TIER_3_SHIFT + 8); // 524,288 +/// The full collection of TTL buckets. pub struct TtlBuckets { pub(crate) buckets: Box<[TtlBucket]>, pub(crate) last_expired: Instant, } impl TtlBuckets { - /// Create a new set of `TtlBuckets` which cover the full range of TTLs. See - /// the module-level documentation for how the range of TTLs are stored. + /// Create a new set of 1024 TTL buckets covering the full TTL range. pub fn new() -> Self { - let intervals = [ - TTL_BUCKET_INTERVAL_1, - TTL_BUCKET_INTERVAL_2, - TTL_BUCKET_INTERVAL_3, - TTL_BUCKET_INTERVAL_4, + let widths = [ + 1 << TIER_1_SHIFT, + 1 << TIER_2_SHIFT, + 1 << TIER_3_SHIFT, + 1 << TIER_4_SHIFT, ]; - let mut buckets = Vec::with_capacity(0); - buckets.reserve_exact(intervals.len() * N_BUCKET_PER_STEP); - - for interval in &intervals { - for j in 0..N_BUCKET_PER_STEP { - let ttl = interval * j + 1; - let bucket = TtlBucket::new(ttl as i32); - buckets.push(bucket); + let mut buckets = Vec::with_capacity(TOTAL_BUCKETS); + for width in &widths { + for j in 0..BUCKETS_PER_TIER { + let ttl = width * j + 1; + buckets.push(TtlBucket::new(ttl as i32)); } } - let buckets = buckets.into_boxed_slice(); - let last_expired = Instant::now(); - Self { - buckets, - last_expired, + buckets: buckets.into_boxed_slice(), + last_expired: Instant::now(), } } - /// Get the index of the `TtlBucket` for the given TTL. + /// Map a TTL duration to its bucket index (0–1023). pub(crate) fn get_bucket_index(&self, ttl: Duration) -> usize { - let ttl = ttl.as_secs() as i32; - if ttl <= 0 { + let secs = ttl.as_secs() as i32; + if secs <= 0 { self.buckets.len() - 1 - } else if ttl & !(TTL_BOUNDARY_1 - 1) == 0 { - (ttl >> TTL_BUCKET_INTERVAL_N_BIT_1) as usize - } else if ttl & !(TTL_BOUNDARY_2 - 1) == 0 { - (ttl >> TTL_BUCKET_INTERVAL_N_BIT_2) as usize + N_BUCKET_PER_STEP - } else if ttl & !(TTL_BOUNDARY_3 - 1) == 0 { - (ttl >> TTL_BUCKET_INTERVAL_N_BIT_3) as usize + N_BUCKET_PER_STEP * 2 + } else if secs & !(TIER_1_MAX - 1) == 0 { + (secs >> TIER_1_SHIFT) as usize + } else if secs & !(TIER_2_MAX - 1) == 0 { + (secs >> TIER_2_SHIFT) as usize + BUCKETS_PER_TIER + } else if secs & !(TIER_3_MAX - 1) == 0 { + (secs >> TIER_3_SHIFT) as usize + BUCKETS_PER_TIER * 2 } else { - let bucket_idx = (ttl >> TTL_BUCKET_INTERVAL_N_BIT_4) as usize + N_BUCKET_PER_STEP * 3; - if bucket_idx > MAX_TTL_BUCKET_IDX { - MAX_TTL_BUCKET_IDX - } else { - bucket_idx - } + let idx = (secs >> TIER_4_SHIFT) as usize + BUCKETS_PER_TIER * 3; + idx.min(TOTAL_BUCKETS - 1) } } - // TODO(bmartin): confirm handling for negative TTLs here... - /// Get a mutable reference to the `TtlBucket` for the given TTL. + /// Get a mutable reference to the bucket for the given TTL. pub(crate) fn get_mut_bucket(&mut self, ttl: Duration) -> &mut TtlBucket { let index = self.get_bucket_index(ttl); - - // NOTE: since get_bucket_index() must return an index within the slice, - // we do not need to worry about UB here. + // SAFETY: get_bucket_index always returns a valid index. unsafe { self.buckets.get_unchecked_mut(index) } } + /// Run eager expiration across all buckets. Returns total segments expired. pub(crate) fn expire( &mut self, hashtable: &MultiChoiceHashtable, segments: &mut Segments, ) -> usize { let now = Instant::now(); - if now == self.last_expired { return 0; - } else { - self.last_expired = now; } + self.last_expired = now; let start = Instant::now(); let mut expired = 0; @@ -136,6 +108,7 @@ impl TtlBuckets { expired } + /// Clear all segments across all buckets. Returns total segments cleared. pub(crate) fn clear( &mut self, hashtable: &MultiChoiceHashtable, @@ -147,7 +120,7 @@ impl TtlBuckets { cleared += bucket.clear(hashtable, segments); } let duration = start.elapsed(); - debug!("expired: {cleared} segments in {duration:?}"); + debug!("cleared: {cleared} segments in {duration:?}"); #[cfg(feature = "metrics")] CLEAR_TIME.add(duration.as_nanos() as _);