Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions codel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type codelQueue struct {
packets packetHeap
newPacket chan struct{}
closed bool
pushCount int

// CoDel state
dropping bool
Expand Down Expand Up @@ -41,7 +42,8 @@ func (q *codelQueue) Enqueue(p *packetWithDeliveryTime) {
if q.closed {
return
}
heap.Push(&q.packets, p)
q.pushCount++
heap.Push(&q.packets, packetWithDeliveryTimeAndOrder{packetWithDeliveryTime: p, count: q.pushCount})

// Signal that a new packet arrived (non-blocking)
select {
Expand Down Expand Up @@ -100,7 +102,8 @@ func (q *codelQueue) Dequeue() (*packetWithDeliveryTime, bool) {
}

// Packet is ready, remove from queue and return it
p := heap.Pop(&q.packets).(*packetWithDeliveryTime)
po := heap.Pop(&q.packets).(packetWithDeliveryTimeAndOrder)
p := po.packetWithDeliveryTime

// Reset CoDel state when queue becomes empty
if len(q.packets) == 0 {
Expand Down Expand Up @@ -177,21 +180,27 @@ func (q *codelQueue) Close() {
close(q.newPacket)
}

type packetWithDeliveryTimeAndOrder struct {
count int
*packetWithDeliveryTime
}

// packetHeap implements heap.Interface ordered by packet delivery time.
type packetHeap []*packetWithDeliveryTime
type packetHeap []packetWithDeliveryTimeAndOrder

func (h packetHeap) Len() int { return len(h) }

func (h packetHeap) Less(i, j int) bool {
return h[i].DeliveryTime.Before(h[j].DeliveryTime)
return (h[i].DeliveryTime.Before(h[j].DeliveryTime) ||
h[i].DeliveryTime.Equal(h[j].DeliveryTime) && h[i].count < h[j].count)
}

func (h packetHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *packetHeap) Push(x any) {
*h = append(*h, x.(*packetWithDeliveryTime))
*h = append(*h, x.(packetWithDeliveryTimeAndOrder))
}

func (h *packetHeap) Pop() any {
Expand Down