Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 93 additions & 171 deletions codel.go
Original file line number Diff line number Diff line change
@@ -1,212 +1,134 @@
package simnet

import (
"container/heap"
"math"
"sync"
"time"
)

// codelQueue is a FIFO queue with CoDel bufferbloat control
type codelQueue struct {
mu sync.Mutex
packets packetHeap
newPacket chan struct{}
closed bool
pushCount int
// coDelQueue is a FIFO queue with CoDel bufferbloat control.
// Refer to RFC 8289
type coDelQueue struct {
q ringBuffer[packetWithTimestamp]

byteCount uint64
MTU uint16

// CoDel state
dropping bool
firstAbove time.Time
dropNext time.Time
count int
target time.Duration // target queue delay (e.g., 5ms)
interval time.Duration // interval for sustained bad queue (e.g., 100ms)
lastDropTime time.Time
target time.Duration // target queue delay (e.g., 5ms)
interval time.Duration // interval for sustained bad queue (e.g., 100ms)

dropping bool
firstAbove time.Time
dropNext time.Time
count int
lastCount int
}

func newCodelQueue(target, interval time.Duration) *codelQueue {
q := &codelQueue{
target: target,
interval: interval,
newPacket: make(chan struct{}, 1),
}
heap.Init(&q.packets)
return q
type packetWithTimestamp struct {
Packet
ts time.Time
}

// Enqueue adds a packet to the queue
func (q *codelQueue) Enqueue(p *packetWithDeliveryTime) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
func newCoDelQueue(target, interval time.Duration) coDelQueue {
return coDelQueue{
target: target,
interval: interval,
q: newRingBuffer[packetWithTimestamp](128),
}
q.pushCount++
heap.Push(&q.packets, packetWithDeliveryTimeAndOrder{packetWithDeliveryTime: p, count: q.pushCount})
}

// Signal that a new packet arrived (non-blocking)
select {
case q.newPacket <- struct{}{}:
default:
}
// Enqueue adds a packet to the queue
func (q *coDelQueue) Enqueue(p Packet) {
q.byteCount += uint64(len(p.buf))
q.q.PushBack(packetWithTimestamp{p, time.Now()})
}

// Dequeue removes and returns the next packet when it's ready for delivery
// This blocks until a packet is available AND its delivery time has been reached
// Uses a timer that can be reset if a packet with earlier delivery time arrives
func (q *codelQueue) Dequeue() (*packetWithDeliveryTime, bool) {
timer := time.NewTimer(time.Hour)
timer.Stop()

for {
q.mu.Lock()
func (q *coDelQueue) Dequeue() (Packet, bool) {
now := time.Now()
p, okayToDrop := q.doDequeue(now)

if q.closed {
q.mu.Unlock()
timer.Stop()
return nil, false
}

if len(q.packets) == 0 {
// No packets, wait for one to arrive
q.mu.Unlock()
select {
case <-q.newPacket:
timer.Stop()
continue
case <-timer.C:
continue
}
if q.dropping {
if !okayToDrop {
// sojourn below target leave dropping
q.dropping = false
}

earliest := q.packets[0]
earliestTime := earliest.DeliveryTime

now := time.Now()
if now.Before(earliestTime) {
// Not ready yet, wait until delivery time or new packet
waitDuration := earliestTime.Sub(now)
timer.Reset(waitDuration)
q.mu.Unlock()

select {
case <-timer.C:
// Timer expired, check again
continue
case <-q.newPacket:
// New packet arrived, might have earlier delivery time
timer.Stop()
continue
for !now.Before(q.dropNext) && q.dropping {
// implicitly drop the packet
q.drop(p)
q.count++
p, okayToDrop = q.doDequeue(now)
if !okayToDrop {
// leave drop state
q.dropping = false
} else {
// schedule next drop
q.dropNext = controlLaw(q.dropNext, q.interval, q.count)
}
}
} else if okayToDrop {
// If we get here, we're not in drop state. The `okToDrop`
// return from doDequeue means that the sojourn time has been above
// 'TARGET' for 'INTERVAL', so enter drop state.
q.drop(p)
p, _ = q.doDequeue(now)
q.dropping = true

// Packet is ready, remove from queue and return it
po := heap.Pop(&q.packets).(packetWithDeliveryTimeAndOrder)
p := po.packetWithDeliveryTime

// Reset CoDel state when queue becomes empty
if len(q.packets) == 0 {
q.dropping = false
q.firstAbove = time.Time{}
// If min went above TARGET close to when it last went
// below, assume that the drop rate that controlled the
// queue on the last cycle is a good starting point to
// control it now. (`dropNext` will be at most 'INTERVAL'
// later than the time of the last drop, so 'now - dropNext'
// is a good approximation of the time from the last drop
// until now.) Implementations vary slightly here; this is
// the Linux version, which is more widely deployed and
// tested.
delta := q.count - q.lastCount
q.count = 1
if delta > 1 && now.Sub(q.dropNext) < 16*q.interval {
q.count = delta
}

q.mu.Unlock()

return p, true
q.dropNext = controlLaw(now, q.interval, q.count)
q.lastCount = q.count
}
}

// shouldDrop implements the CoDel dropping decision (thread-safe version)
func (q *codelQueue) shouldDrop(sojournTime time.Duration) bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.codelShouldDrop(sojournTime, time.Now())
return p.Packet, len(p.Packet.buf) > 0
}

// codelShouldDrop implements the CoDel dropping decision
func (q *codelQueue) codelShouldDrop(sojournTime time.Duration, now time.Time) bool {
// Reset CoDel state when queue is empty (checked by caller before dequeue)
// This is handled by resetting state when queue becomes empty in Dequeue
func (q *coDelQueue) drop(p packetWithTimestamp) {
// TODO add stats
}

if sojournTime < q.target {
// Queue is good, reset state
func (q *coDelQueue) doDequeue(now time.Time) (p packetWithTimestamp, okToDrop bool) {
if q.q.Empty() {
q.firstAbove = time.Time{}
q.dropping = false
return false
}

// Queue delay is above target
if q.firstAbove.IsZero() {
// First time above target, start tracking
q.firstAbove = now.Add(q.interval)
return false
}

if now.Before(q.firstAbove) {
// Haven't been above target for long enough
return false
}

// We've been above target for the full interval
if !q.dropping {
// Enter dropping state
q.dropping = true
q.count = 1
q.dropNext = now
q.lastDropTime = now
return true
return
}

// Already in dropping state
if now.After(q.dropNext) {
// Time to drop another packet
q.count++
// Calculate next drop time using control law: interval / sqrt(count)
delta := time.Duration(float64(q.interval) / math.Sqrt(float64(q.count)))
q.dropNext = now.Add(delta)
q.lastDropTime = now
return true
p = q.q.PopFront()
q.byteCount -= uint64(len(p.Packet.buf))
sojournTime := now.Sub(p.ts)
if sojournTime < q.target || q.byteCount < uint64(q.MTU) {
q.firstAbove = time.Time{}
} else {
if q.firstAbove.IsZero() {
// Just went above from below. If still above later will say it's
// okay to drop
q.firstAbove = now.Add(q.interval)
} else if !now.Before(q.firstAbove) {
okToDrop = true
}
}

return false
}

// Close closes the queue
func (q *codelQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
close(q.newPacket)
}

type packetWithDeliveryTimeAndOrder struct {
count int
*packetWithDeliveryTime
}

// packetHeap implements heap.Interface ordered by packet delivery time.
type packetHeap []packetWithDeliveryTimeAndOrder

func (h packetHeap) Len() int { return len(h) }

func (h packetHeap) Less(i, j int) bool {
return (h[i].DeliveryTime.Before(h[j].DeliveryTime) ||
h[i].DeliveryTime.Equal(h[j].DeliveryTime) && h[i].count < h[j].count)
}

func (h packetHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *packetHeap) Push(x any) {
*h = append(*h, x.(packetWithDeliveryTimeAndOrder))
return
}

func (h *packetHeap) Pop() any {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
func controlLaw(t time.Time, interval time.Duration, count int) time.Time {
return t.Add(time.Duration(
float64(time.Second) *
(interval.Seconds() / math.Sqrt(float64(count)))))
}
Loading