Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions crates/segcache/src/ttl_buckets/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
// Copyright 2021 Twitter, Inc.
// Copyright 2023 Pelikan Cache contributors
// Licensed under the MIT and Apache-2.0 licenses
//! Error types for TTL bucket operations.

use thiserror::Error;

#[derive(Error, Debug)]
pub enum TtlBucketsError {
#[error("item is oversized ({size:?} bytes)")]
#[error("item oversized ({size} bytes)")]
ItemOversized { size: usize },
#[error("ttl bucket expansion failed, no free segments")]
#[error("no free segments available for TTL bucket expansion")]
NoFreeSegments,
}
16 changes: 4 additions & 12 deletions crates/segcache/src/ttl_buckets/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
// Copyright 2021 Twitter, Inc.
// Copyright 2023 Pelikan Cache contributors
// Licensed under the MIT and Apache-2.0 licenses

//! TTL buckets are used to group items by TTL to enable eager expiration.
//!
//! The total collection of [`TtlBuckets`] is a contiguous allocation of
//! [`TtlBucket`]s which cover the full range of TTLs.
//!
//! Each [`TtlBucket`] contains a segment chain holding items with a similar
//! TTL. See the blog post for more details on the segcache design:
//! <https://twitter.github.io/pelikan/2021/segcache.html>
//! TTL buckets group segments by expiration time for eager expiration.
//!
//! Each [`TtlBucket`] contains a doubly-linked chain of segments whose
//! items share a similar TTL. The [`TtlBuckets`] collection maps the
//! full TTL range across 1024 buckets with logarithmic widths.

mod error;
mod ttl_bucket;
Expand Down
18 changes: 8 additions & 10 deletions crates/segcache/src/ttl_buckets/tests.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
// Copyright 2021 Twitter, Inc.
// Copyright 2023 Pelikan Cache contributors
// Licensed under the MIT and Apache-2.0 licenses
//! Tests for TTL bucket index mapping.

use crate::*;

#[test]
fn bucket_index() {
let ttl_buckets = TtlBuckets::new();

// Zero TTL and max duration both go into the same TtlBucket
// Zero TTL and max duration both map to the last bucket.
assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(0)), 1023);
assert_eq!(
ttl_buckets.get_bucket_index(Duration::from_secs(u32::MAX)),
1023
);

// first bucket is only 7s wide because 0 is no ttl
// First bucket covers TTLs 1–7s (bucket 0).
assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(1)), 0);
assert_eq!(ttl_buckets.get_bucket_index(Duration::from_secs(7)), 0);

// buckets from 8s - 2048s (0..34 minutes) are all 8s wide
// Tier 1: 8s2048s, buckets 1–255, each 8s wide.
for bucket in 1..256 {
let start = Duration::from_secs(8 * bucket);
let end = Duration::from_secs(8 * bucket + 7);
Expand All @@ -35,7 +33,7 @@ fn bucket_index() {
);
}

// buckets from 2048s - 32_768s (34 minutes .. 9 hours) are all 128s wide (2 minutes)
// Tier 2: 2048s–32768s, buckets 256–511, each 128s wide.
for bucket in 16..256 {
let start = Duration::from_secs(128 * bucket);
let end = Duration::from_secs(128 * bucket + 127);
Expand All @@ -51,7 +49,7 @@ fn bucket_index() {
);
}

// buckets from 32_768s - 524_288s (9 hours .. 6 days) are all 2048s wide (34 minutes)
// Tier 3: 32768s–524288s, buckets 512–767, each 2048s wide.
for bucket in 16..256 {
let start = Duration::from_secs(2048 * bucket);
let end = Duration::from_secs(2048 * bucket + 2047);
Expand All @@ -67,7 +65,7 @@ fn bucket_index() {
);
}

// buckets from 524_288s - 8_388_608s (6 days .. 97 days) are all 32_768s wide (9 hours)
// Tier 4: 524288s–8388608s, buckets 768–1023, each 32768s wide.
for bucket in 16..256 {
let start = Duration::from_secs(32_768 * bucket);
let end = Duration::from_secs(32_768 * bucket + 32_767);
Expand All @@ -83,7 +81,7 @@ fn bucket_index() {
);
}

// TTLs longer than 97 days are the max TTL
// Beyond ~97 days maps to the max bucket.
assert_eq!(
ttl_buckets.get_bucket_index(Duration::from_secs(8_388_608)) as u32,
1023
Expand Down
175 changes: 82 additions & 93 deletions crates/segcache/src/ttl_buckets/ttl_bucket.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
// Copyright 2021 Twitter, Inc.
// Copyright 2023 Pelikan Cache contributors
// Licensed under the MIT and Apache-2.0 licenses

//! TTL bucket containing a segment chain which stores items with a similar TTL
//! in an ordered fashion.
//! A single TTL bucket containing a segment chain.
//!
//! Items with similar TTLs are stored in segments linked together in a
//! doubly-linked chain. The head segment is always the oldest, enabling
//! O(1) expiration by checking only the head.
//!
//! TTL Bucket:
//! ```text
//! ┌──────────────┬──────────────┬─────────────┬──────────────┐
//! │ HEAD SEG │ TAIL SEG │ TTL │ NSEG │
Expand All @@ -29,10 +27,9 @@
use crate::*;
use core::num::NonZeroU32;

/// Each ttl bucket contains a segment chain to store items with a similar TTL
/// in an ordered fashion. The first segment to expire will be the head of the
/// segment chain. This allows us to efficiently scan across the [`TtlBuckets`]
/// and expire segments in an eager fashion.
/// A TTL bucket holding a doubly-linked segment chain.
///
/// Padded to exactly 64 bytes (one cache line).
pub struct TtlBucket {
head: Option<NonZeroU32>,
tail: Option<NonZeroU32>,
Expand All @@ -43,7 +40,7 @@ pub struct TtlBucket {
}

impl TtlBucket {
/// Create a new `TtlBucket` which will hold items with the provided TTL.
/// Create an empty bucket for the given TTL.
pub(super) fn new(ttl: i32) -> Self {
Self {
head: None,
Expand All @@ -55,29 +52,30 @@ impl TtlBucket {
}
}

/// Returns the segment ID of the head of the `TtlBucket`.
/// Head of the segment chain (oldest segment).
pub fn head(&self) -> Option<NonZeroU32> {
self.head
}

/// Set the segment ID of the head of the `TtlBucket`.
/// Set the head segment.
pub fn set_head(&mut self, id: Option<NonZeroU32>) {
self.head = id;
}

/// Returns the segment ID of the next segment to merge within the
/// `TtlBucket`.
/// Next segment to merge (for merge eviction policy).
pub fn next_to_merge(&self) -> Option<NonZeroU32> {
self.next_to_merge
}

/// Set the next segment to be merged within the `TtlBucket`.
/// Set the next merge target.
pub fn set_next_to_merge(&mut self, next: Option<NonZeroU32>) {
self.next_to_merge = next;
}

/// Expire segments from this TtlBucket, returns the number of segments
/// expired.
/// Expire segments whose TTL has elapsed.
///
/// Walks the chain from head, clearing and freeing segments whose
/// `create_at + ttl <= now`. Returns the number of segments expired.
pub(super) fn expire(
&mut self,
hashtable: &MultiChoiceHashtable,
Expand All @@ -88,37 +86,34 @@ impl TtlBucket {
}

let mut expired = 0;
let ts = Instant::now();
let now = Instant::now();

loop {
let seg_id = self.head;
if let Some(seg_id) = seg_id {
let mut segment = segments.get_mut(seg_id).unwrap();
if segment.create_at() + segment.ttl() <= ts {
if let Some(next) = segment.next_seg() {
self.head = Some(next);
} else {
self.head = None;
self.tail = None;
}
segment.clear(hashtable, true);
segments.push_free(seg_id);
let seg_id = match self.head {
Some(id) => id,
None => return expired,
};

let mut segment = segments.get_mut(seg_id).unwrap();
if segment.create_at() + segment.ttl() <= now {
self.head = segment.next_seg();
if self.head.is_none() {
self.tail = None;
}
segment.clear(hashtable, true);
segments.push_free(seg_id);

#[cfg(feature = "metrics")]
SEGMENT_EXPIRE.increment();
#[cfg(feature = "metrics")]
SEGMENT_EXPIRE.increment();

expired += 1;
} else {
return expired;
}
expired += 1;
} else {
return expired;
}
}
}

/// Clear segments from this TtlBucket, returns the number of segments
/// expired.
/// Clear all segments in this bucket. Returns the count cleared.
pub(super) fn clear(
&mut self,
hashtable: &MultiChoiceHashtable,
Expand All @@ -131,74 +126,70 @@ impl TtlBucket {
let mut cleared = 0;

loop {
let seg_id = self.head;
if let Some(seg_id) = seg_id {
let mut segment = segments.get_mut(seg_id).unwrap();
if let Some(next) = segment.next_seg() {
self.head = Some(next);
} else {
self.head = None;
self.tail = None;
}
segment.clear(hashtable, true);
segments.push_free(seg_id);

#[cfg(feature = "metrics")]
SEGMENT_CLEAR.increment();
let seg_id = match self.head {
Some(id) => id,
None => return cleared,
};

cleared += 1;
} else {
return cleared;
let mut segment = segments.get_mut(seg_id).unwrap();
self.head = segment.next_seg();
if self.head.is_none() {
self.tail = None;
}
segment.clear(hashtable, true);
segments.push_free(seg_id);

#[cfg(feature = "metrics")]
SEGMENT_CLEAR.increment();

cleared += 1;
}
}

/// Attempts to expand the `TtlBucket` by allocating a segment from the free
/// queue. If there are no segments currently free, this function will
/// return and error. It is up to the caller to handle the error and retry.
/// Allocate a new segment and link it as the tail of this bucket.
fn try_expand(&mut self, segments: &mut Segments) -> Result<(), TtlBucketsError> {
if let Some(id) = segments.pop_free() {
{
if let Some(tail_id) = self.tail {
let tail = segments.get_mut(tail_id).unwrap();
tail.set_next_seg(Some(id));
}
}
let id = segments.pop_free().ok_or(TtlBucketsError::NoFreeSegments)?;

let segment = segments.get_mut(id).unwrap();
segment.set_prev_seg(self.tail);
segment.set_next_seg(None);
segment.set_ttl(Duration::from_secs(self.ttl as u32));
if self.head.is_none() {
debug_assert!(self.tail.is_none());
self.head = Some(id);
}
self.tail = Some(id);
self.nseg += 1;
debug_assert!(!segment.evictable(), "segment should not be evictable");
segment.set_evictable(true);
segment.set_accessible(true);
Ok(())
} else {
Err(TtlBucketsError::NoFreeSegments)
// Link the new segment after the current tail.
if let Some(tail_id) = self.tail {
let tail = segments.get_mut(tail_id).unwrap();
tail.set_next_seg(Some(id));
}

let segment = segments.get_mut(id).unwrap();
segment.set_prev_seg(self.tail);
segment.set_next_seg(None);
segment.set_ttl(Duration::from_secs(self.ttl as u32));

if self.head.is_none() {
debug_assert!(self.tail.is_none());
self.head = Some(id);
}
self.tail = Some(id);
self.nseg += 1;

debug_assert!(
!segment.evictable(),
"fresh segment should not be evictable"
);
segment.set_evictable(true);
segment.set_accessible(true);
Ok(())
}

/// Reserve space in this `TtlBucket` for an item with the specified size in
/// bytes. This function will return an error if the item is oversized, or
/// if there is no space in the `TtlBucket` for the item and the `TtlBucket`
/// could not be expanded with a segment from the free queue.
/// Reserve space for an item in this bucket's tail segment.
///
/// Expands the bucket with a new segment if the current tail is full
/// or inaccessible. Returns a `ReservedItem` pointing to the allocated
/// space, or an error if the item is oversized or no segments are free.
pub(crate) fn reserve(
&mut self,
size: usize,
segments: &mut Segments,
) -> Result<ReservedItem, TtlBucketsError> {
trace!("reserving: {} bytes for ttl: {}", size, self.ttl);

let seg_size = segments.segment_size() as usize;

if size > seg_size {
debug!("item is oversized");
return Err(TtlBucketsError::ItemOversized { size });
}

Expand All @@ -209,10 +200,8 @@ impl TtlBucket {
continue;
}
let offset = segment.write_offset() as usize;
trace!("offset: {offset}");
if offset + size <= seg_size {
let size = size as i32;
let item = segment.alloc_item(size);
let item = segment.alloc_item(size as i32);
return Ok(ReservedItem::new(item, segment.id(), offset));
}
}
Expand Down
Loading
Loading