Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions codel.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,28 @@ type coDelQueue struct {
}

type packetWithTimestamp struct {
Packet
*Packet
ts time.Time
}

func newCoDelQueue(target, interval time.Duration) coDelQueue {
return coDelQueue{
target: target,
interval: interval,
q: newRingBuffer[packetWithTimestamp](128),
q: newRingBuffer[packetWithTimestamp](4),
}
}

// Enqueue adds a packet to the queue
func (q *coDelQueue) Enqueue(p Packet) {
func (q *coDelQueue) Enqueue(p *Packet) {
q.byteCount += uint64(len(p.buf))
q.q.PushBack(packetWithTimestamp{p, time.Now()})
}

// Dequeue removes and returns the next packet when it's ready for delivery
// This blocks until a packet is available AND its delivery time has been reached
// Uses a timer that can be reset if a packet with earlier delivery time arrives
func (q *coDelQueue) Dequeue() (Packet, bool) {
func (q *coDelQueue) Dequeue() (*Packet, bool) {
now := time.Now()
p, okayToDrop := q.doDequeue(now)

Expand Down Expand Up @@ -96,7 +96,7 @@ func (q *coDelQueue) Dequeue() (Packet, bool) {
q.lastCount = q.count
}

return p.Packet, len(p.Packet.buf) > 0
return p.Packet, p.Packet != nil
}

func (q *coDelQueue) drop(p packetWithTimestamp) {
Expand Down
8 changes: 4 additions & 4 deletions codel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) {

// Build a queue that stays full for more than one interval.
for i := range 4 {
q.Enqueue(Packet{buf: []byte{byte(i)}})
q.Enqueue(&Packet{buf: []byte{byte(i)}})
}

// Allow in-flight packets to accumulate sojourn time.
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestCodelQueueNoDropOnTransientQueue(t *testing.T) {
q.MTU = 1

for i := range 2 {
q.Enqueue(Packet{buf: []byte{byte(i)}})
q.Enqueue(&Packet{buf: []byte{byte(i)}})
}

// The queue goes above target but drains in less than one interval.
Expand Down Expand Up @@ -95,10 +95,10 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) {
const initSize = 30000
const queueSize = 50000

packets := make([]Packet, queueSize)
packets := make([]*Packet, queueSize)
data := []byte("test")
for i := range queueSize {
packets[i] = Packet{buf: data}
packets[i] = &Packet{buf: data}
}

q := newCoDelQueue(5*time.Millisecond, 100*time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions fq_codel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newFqCoDel(target, interval time.Duration, quantum, flowCount int) fqCoDel
}
}

func (q *fqCoDel) Enqueue(p Packet) {
func (q *fqCoDel) Enqueue(p *Packet) {
bucket := int(p.Hash(&q.hash) % uint64(len(q.flows)))

fq := q.flows[bucket]
Expand Down Expand Up @@ -74,7 +74,7 @@ func (q *fqCoDel) Enqueue(p Packet) {
/* | | Credits Exhausted
/* +-----------------+
*/
func (q *fqCoDel) Dequeue() (Packet, bool) {
func (q *fqCoDel) Dequeue() (*Packet, bool) {

for !q.newFlows.empty() {
fq := q.newFlows.peek()
Expand Down Expand Up @@ -153,7 +153,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) {
}
}

return Packet{}, false
return nil, false
}

type linkedList[T any] struct {
Expand Down
8 changes: 4 additions & 4 deletions fq_codel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ type testFlow struct {
from net.UDPAddr
}

func (f testFlow) packet(seq, size int) Packet {
func (f testFlow) packet(seq, size int) *Packet {
to := f.to
from := f.from
return Packet{
return &Packet{
To: &to,
From: &from,
buf: flowPayload(f.id, seq, size),
Expand All @@ -219,11 +219,11 @@ func (f testFlow) packet(seq, size int) Packet {
func (f testFlow) bucket(q *fqCoDel) int {
to := f.to
from := f.from
probe := Packet{To: &to, From: &from}
probe := &Packet{To: &to, From: &from}
return int(probe.Hash(&q.hash) % uint64(len(q.flows)))
}

func decodeFlowAndSeq(p Packet) (flowID, seq int) {
func decodeFlowAndSeq(p *Packet) (flowID, seq int) {
if len(p.buf) < 2 {
return -1, -1
}
Expand Down
55 changes: 43 additions & 12 deletions packetheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,59 @@ type packetWithDeliveryTimeAndOrder struct {
deliveryTime time.Time
}

// packetHeap implements heap.Interface ordered by packet delivery time.
// packetHeap is a min-heap ordered by delivery time, then order.
// It avoids container/heap to eliminate interface{} boxing allocations.
type packetHeap []packetWithDeliveryTimeAndOrder

func (h packetHeap) Len() int { return len(h) }

func (h packetHeap) Less(i, j int) bool {
func (h packetHeap) less(i, j int) bool {
return (h[i].deliveryTime.Before(h[j].deliveryTime) ||
h[i].deliveryTime.Equal(h[j].deliveryTime) && h[i].order < h[j].order)
}

func (h packetHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *packetHeap) Push(x any) {
*h = append(*h, x.(packetWithDeliveryTimeAndOrder))
func (h *packetHeap) push(x packetWithDeliveryTimeAndOrder) {
*h = append(*h, x)
h.siftUp(len(*h) - 1)
}

func (h *packetHeap) Pop() any {
func (h *packetHeap) pop() packetWithDeliveryTimeAndOrder {
old := *h
n := len(old)
item := old[n-1]
item := old[0]
old[0] = old[n-1]
old[n-1] = packetWithDeliveryTimeAndOrder{} // clear for GC
*h = old[:n-1]
if len(*h) > 0 {
h.siftDown(0)
}
return item
}

func (h packetHeap) siftUp(i int) {
for i > 0 {
parent := (i - 1) / 2
if !h.less(i, parent) {
break
}
h[i], h[parent] = h[parent], h[i]
i = parent
}
}

func (h packetHeap) siftDown(i int) {
n := len(h)
for {
left := 2*i + 1
if left >= n {
break
}
j := left
if right := left + 1; right < n && h.less(right, left) {
j = right
}
if !h.less(j, i) {
break
}
h[i], h[j] = h[j], h[i]
i = j
}
}
67 changes: 67 additions & 0 deletions packetheap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package simnet

import (
"container/heap"
"testing"
"time"
)

// stdlibPacketHeap wraps packetHeap to implement heap.Interface,
// used as the baseline to measure how much the direct implementation saves.
type stdlibPacketHeap []packetWithDeliveryTimeAndOrder

func (h stdlibPacketHeap) Len() int { return len(h) }
func (h stdlibPacketHeap) Less(i, j int) bool {
return (h[i].deliveryTime.Before(h[j].deliveryTime) ||
h[i].deliveryTime.Equal(h[j].deliveryTime) && h[i].order < h[j].order)
}
func (h stdlibPacketHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *stdlibPacketHeap) Push(x any) { *h = append(*h, x.(packetWithDeliveryTimeAndOrder)) }
func (h *stdlibPacketHeap) Pop() any {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}

func makePacket(order int, deliveryTime time.Time) packetWithDeliveryTimeAndOrder {
return packetWithDeliveryTimeAndOrder{
Packet: &Packet{buf: []byte{byte(order)}},
order: order,
deliveryTime: deliveryTime,
}
}

func BenchmarkHeapPushPop(b *testing.B) {
// Pre-generate delivery times to avoid measuring time.Now in the loop.
base := time.Now()
const batchSize = 100

b.Run("container/heap", func(b *testing.B) {
b.ReportAllocs()
var h stdlibPacketHeap
for b.Loop() {
// Push a batch then pop them all, simulating real usage.
for j := range batchSize {
heap.Push(&h, makePacket(j, base.Add(time.Duration(batchSize-j)*time.Millisecond)))
}
for h.Len() > 0 {
heap.Pop(&h)
}
}
})

b.Run("direct", func(b *testing.B) {
b.ReportAllocs()
var h packetHeap
for b.Loop() {
for j := range batchSize {
h.push(makePacket(j, base.Add(time.Duration(batchSize-j)*time.Millisecond)))
}
for len(h) > 0 {
h.pop()
}
}
})
}
2 changes: 1 addition & 1 deletion ratelink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (l *RateLink) Reserve(now time.Time, packetSize int) time.Duration {
return r.DelayFrom(now)
}

func (l *RateLink) RecvPacket(p Packet) {
func (l *RateLink) RecvPacket(p *Packet) {
l.Receiver.RecvPacket(p)
}
4 changes: 2 additions & 2 deletions ratelink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type countingReceiver struct {
totalBytes int
}

func (c *countingReceiver) RecvPacket(p Packet) {
func (c *countingReceiver) RecvPacket(p *Packet) {
c.totalBytes += len(p.buf)
}

Expand All @@ -32,7 +32,7 @@ func TestRateLinkObservedBandwidth(t *testing.T) {

start := time.Now()
for range packets {
p := Packet{buf: chunk}
p := &Packet{buf: chunk}
time.Sleep(link.Reserve(time.Now(), len(p.buf)))
link.RecvPacket(p)
}
Expand Down
Loading
Loading