diff --git a/codel.go b/codel.go index c763714..c4feb9b 100644 --- a/codel.go +++ b/codel.go @@ -25,7 +25,7 @@ type coDelQueue struct { } type packetWithTimestamp struct { - Packet + *Packet ts time.Time } @@ -33,12 +33,12 @@ func newCoDelQueue(target, interval time.Duration) coDelQueue { return coDelQueue{ target: target, interval: interval, - q: newRingBuffer[packetWithTimestamp](128), + q: newRingBuffer[packetWithTimestamp](4), } } // Enqueue adds a packet to the queue -func (q *coDelQueue) Enqueue(p Packet) { +func (q *coDelQueue) Enqueue(p *Packet) { q.byteCount += uint64(len(p.buf)) q.q.PushBack(packetWithTimestamp{p, time.Now()}) } @@ -46,7 +46,7 @@ func (q *coDelQueue) Enqueue(p Packet) { // Dequeue removes and returns the next packet when it's ready for delivery // This blocks until a packet is available AND its delivery time has been reached // Uses a timer that can be reset if a packet with earlier delivery time arrives -func (q *coDelQueue) Dequeue() (Packet, bool) { +func (q *coDelQueue) Dequeue() (*Packet, bool) { now := time.Now() p, okayToDrop := q.doDequeue(now) @@ -96,7 +96,7 @@ func (q *coDelQueue) Dequeue() (Packet, bool) { q.lastCount = q.count } - return p.Packet, len(p.Packet.buf) > 0 + return p.Packet, p.Packet != nil } func (q *coDelQueue) drop(p packetWithTimestamp) { diff --git a/codel_test.go b/codel_test.go index 19e796d..23c72c1 100644 --- a/codel_test.go +++ b/codel_test.go @@ -19,7 +19,7 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) { // Build a queue that stays full for more than one interval. for i := range 4 { - q.Enqueue(Packet{buf: []byte{byte(i)}}) + q.Enqueue(&Packet{buf: []byte{byte(i)}}) } // Allow in-flight packets to accumulate sojourn time. @@ -62,7 +62,7 @@ func TestCodelQueueNoDropOnTransientQueue(t *testing.T) { q.MTU = 1 for i := range 2 { - q.Enqueue(Packet{buf: []byte{byte(i)}}) + q.Enqueue(&Packet{buf: []byte{byte(i)}}) } // The queue goes above target but drains in less than one interval. @@ -95,10 +95,10 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) { const initSize = 30000 const queueSize = 50000 - packets := make([]Packet, queueSize) + packets := make([]*Packet, queueSize) data := []byte("test") for i := range queueSize { - packets[i] = Packet{buf: data} + packets[i] = &Packet{buf: data} } q := newCoDelQueue(5*time.Millisecond, 100*time.Millisecond) diff --git a/fq_codel.go b/fq_codel.go index 06f7741..fb7a3d1 100644 --- a/fq_codel.go +++ b/fq_codel.go @@ -40,7 +40,7 @@ func newFqCoDel(target, interval time.Duration, quantum, flowCount int) fqCoDel } } -func (q *fqCoDel) Enqueue(p Packet) { +func (q *fqCoDel) Enqueue(p *Packet) { bucket := int(p.Hash(&q.hash) % uint64(len(q.flows))) fq := q.flows[bucket] @@ -74,7 +74,7 @@ func (q *fqCoDel) Enqueue(p Packet) { /* | | Credits Exhausted /* +-----------------+ */ -func (q *fqCoDel) Dequeue() (Packet, bool) { +func (q *fqCoDel) Dequeue() (*Packet, bool) { for !q.newFlows.empty() { fq := q.newFlows.peek() @@ -153,7 +153,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) { } } - return Packet{}, false + return nil, false } type linkedList[T any] struct { diff --git a/fq_codel_test.go b/fq_codel_test.go index 13ed3d9..5b997fe 100644 --- a/fq_codel_test.go +++ b/fq_codel_test.go @@ -206,10 +206,10 @@ type testFlow struct { from net.UDPAddr } -func (f testFlow) packet(seq, size int) Packet { +func (f testFlow) packet(seq, size int) *Packet { to := f.to from := f.from - return Packet{ + return &Packet{ To: &to, From: &from, buf: flowPayload(f.id, seq, size), @@ -219,11 +219,11 @@ func (f testFlow) packet(seq, size int) Packet { func (f testFlow) bucket(q *fqCoDel) int { to := f.to from := f.from - probe := Packet{To: &to, From: &from} + probe := &Packet{To: &to, From: &from} return int(probe.Hash(&q.hash) % uint64(len(q.flows))) } -func decodeFlowAndSeq(p Packet) (flowID, seq int) { +func decodeFlowAndSeq(p *Packet) (flowID, seq int) { if len(p.buf) < 2 { return -1, -1 } diff --git a/packetheap.go b/packetheap.go index 50f7ef5..d67326b 100644 --- a/packetheap.go +++ b/packetheap.go @@ -8,28 +8,59 @@ type packetWithDeliveryTimeAndOrder struct { deliveryTime time.Time } -// packetHeap implements heap.Interface ordered by packet delivery time. +// packetHeap is a min-heap ordered by delivery time, then order. +// It avoids container/heap to eliminate interface{} boxing allocations. type packetHeap []packetWithDeliveryTimeAndOrder -func (h packetHeap) Len() int { return len(h) } - -func (h packetHeap) Less(i, j int) bool { +func (h packetHeap) less(i, j int) bool { return (h[i].deliveryTime.Before(h[j].deliveryTime) || h[i].deliveryTime.Equal(h[j].deliveryTime) && h[i].order < h[j].order) } -func (h packetHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *packetHeap) Push(x any) { - *h = append(*h, x.(packetWithDeliveryTimeAndOrder)) +func (h *packetHeap) push(x packetWithDeliveryTimeAndOrder) { + *h = append(*h, x) + h.siftUp(len(*h) - 1) } -func (h *packetHeap) Pop() any { +func (h *packetHeap) pop() packetWithDeliveryTimeAndOrder { old := *h n := len(old) - item := old[n-1] + item := old[0] + old[0] = old[n-1] + old[n-1] = packetWithDeliveryTimeAndOrder{} // clear for GC *h = old[:n-1] + if len(*h) > 0 { + h.siftDown(0) + } return item } + +func (h packetHeap) siftUp(i int) { + for i > 0 { + parent := (i - 1) / 2 + if !h.less(i, parent) { + break + } + h[i], h[parent] = h[parent], h[i] + i = parent + } +} + +func (h packetHeap) siftDown(i int) { + n := len(h) + for { + left := 2*i + 1 + if left >= n { + break + } + j := left + if right := left + 1; right < n && h.less(right, left) { + j = right + } + if !h.less(j, i) { + break + } + h[i], h[j] = h[j], h[i] + i = j + } +} diff --git a/packetheap_test.go b/packetheap_test.go new file mode 100644 index 0000000..e9d7f21 --- /dev/null +++ b/packetheap_test.go @@ -0,0 +1,67 @@ +package simnet + +import ( + "container/heap" + "testing" + "time" +) + +// stdlibPacketHeap wraps packetHeap to implement heap.Interface, +// used as the baseline to measure how much the direct implementation saves. +type stdlibPacketHeap []packetWithDeliveryTimeAndOrder + +func (h stdlibPacketHeap) Len() int { return len(h) } +func (h stdlibPacketHeap) Less(i, j int) bool { + return (h[i].deliveryTime.Before(h[j].deliveryTime) || + h[i].deliveryTime.Equal(h[j].deliveryTime) && h[i].order < h[j].order) +} +func (h stdlibPacketHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *stdlibPacketHeap) Push(x any) { *h = append(*h, x.(packetWithDeliveryTimeAndOrder)) } +func (h *stdlibPacketHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +func makePacket(order int, deliveryTime time.Time) packetWithDeliveryTimeAndOrder { + return packetWithDeliveryTimeAndOrder{ + Packet: &Packet{buf: []byte{byte(order)}}, + order: order, + deliveryTime: deliveryTime, + } +} + +func BenchmarkHeapPushPop(b *testing.B) { + // Pre-generate delivery times to avoid measuring time.Now in the loop. + base := time.Now() + const batchSize = 100 + + b.Run("container/heap", func(b *testing.B) { + b.ReportAllocs() + var h stdlibPacketHeap + for b.Loop() { + // Push a batch then pop them all, simulating real usage. + for j := range batchSize { + heap.Push(&h, makePacket(j, base.Add(time.Duration(batchSize-j)*time.Millisecond))) + } + for h.Len() > 0 { + heap.Pop(&h) + } + } + }) + + b.Run("direct", func(b *testing.B) { + b.ReportAllocs() + var h packetHeap + for b.Loop() { + for j := range batchSize { + h.push(makePacket(j, base.Add(time.Duration(batchSize-j)*time.Millisecond))) + } + for len(h) > 0 { + h.pop() + } + } + }) +} diff --git a/ratelink.go b/ratelink.go index f3c2a02..daf7dd0 100644 --- a/ratelink.go +++ b/ratelink.go @@ -33,6 +33,6 @@ func (l *RateLink) Reserve(now time.Time, packetSize int) time.Duration { return r.DelayFrom(now) } -func (l *RateLink) RecvPacket(p Packet) { +func (l *RateLink) RecvPacket(p *Packet) { l.Receiver.RecvPacket(p) } diff --git a/ratelink_test.go b/ratelink_test.go index 3815b43..d3121d5 100644 --- a/ratelink_test.go +++ b/ratelink_test.go @@ -11,7 +11,7 @@ type countingReceiver struct { totalBytes int } -func (c *countingReceiver) RecvPacket(p Packet) { +func (c *countingReceiver) RecvPacket(p *Packet) { c.totalBytes += len(p.buf) } @@ -32,7 +32,7 @@ func TestRateLinkObservedBandwidth(t *testing.T) { start := time.Now() for range packets { - p := Packet{buf: chunk} + p := &Packet{buf: chunk} time.Sleep(link.Reserve(time.Now(), len(p.buf))) link.RecvPacket(p) } diff --git a/router.go b/router.go index 0c641c8..4519e09 100644 --- a/router.go +++ b/router.go @@ -1,7 +1,6 @@ package simnet import ( - "container/heap" "fmt" "log/slog" "net" @@ -18,16 +17,17 @@ const ( DropReasonFirewalled DropReason = "Packet firewalled" ) -type OnDrop func(packet Packet, reason DropReason) +type OnDrop func(packet *Packet, reason DropReason) func LogOnDrop(logger *slog.Logger) OnDrop { - return func(packet Packet, reason DropReason) { + return func(packet *Packet, reason DropReason) { logger.Error("Dropping packet", "from", packet.From, "to", packet.To, "reason", reason) } } type ipPortKey struct { - ip string + ip [16]byte // fixed-size to avoid string allocation + ipLen uint8 port uint16 isUDP bool } @@ -35,29 +35,25 @@ type ipPortKey struct { func (k *ipPortKey) FromNetAddr(addr net.Addr) error { switch addr := addr.(type) { case *net.UDPAddr: - *k = ipPortKey{ - ip: string(addr.IP), - port: uint16(addr.Port), - isUDP: true, - } + k.ipLen = uint8(copy(k.ip[:], addr.IP)) + k.port = uint16(addr.Port) + k.isUDP = true return nil case *net.TCPAddr: - *k = ipPortKey{ - ip: string(addr.IP), - port: uint16(addr.Port), - isUDP: false, - } + k.ipLen = uint8(copy(k.ip[:], addr.IP)) + k.port = uint16(addr.Port) + k.isUDP = false return nil default: - ip, err := netip.ParseAddrPort(addr.String()) + ap, err := netip.ParseAddrPort(addr.String()) if err != nil { return err } - *k = ipPortKey{ - ip: string(ip.Addr().AsSlice()), - port: ip.Port(), - isUDP: addr.Network() == "udp", - } + b := ap.Addr().As16() + k.ip = b + k.ipLen = 16 + k.port = ap.Port() + k.isUDP = addr.Network() == "udp" return nil } } @@ -119,7 +115,7 @@ type PerfectRouter struct { nodes addrMap[PacketReceiver] } -func (r *PerfectRouter) RecvPacket(p Packet) { +func (r *PerfectRouter) RecvPacket(p *Packet) { conn, ok := r.nodes.Get(p.To) if !ok { if r.OnDrop != nil { @@ -146,12 +142,12 @@ type VariableLatencyRouter struct { LatencyFunc func(packet *Packet) time.Duration CloseSignal chan struct{} - packets chan Packet + packets chan *Packet packetCount int h packetHeap } -func (r *VariableLatencyRouter) RecvPacket(p Packet) { +func (r *VariableLatencyRouter) RecvPacket(p *Packet) { r.packets <- p } @@ -165,8 +161,7 @@ func wgGo(wg *sync.WaitGroup, f func()) { } func (r *VariableLatencyRouter) Start(wg *sync.WaitGroup) { - r.packets = make(chan Packet, 128) - heap.Init(&r.h) + r.packets = make(chan *Packet, 128) wgGo(wg, func() { var nextDelivery time.Time @@ -179,10 +174,10 @@ func (r *VariableLatencyRouter) Start(wg *sync.WaitGroup) { return case p := <-r.packets: r.packetCount++ - latency := r.LatencyFunc(&p) + latency := r.LatencyFunc(p) deliveryTime := time.Now().Add(latency) - heap.Push(&r.h, packetWithDeliveryTimeAndOrder{ - Packet: &p, + r.h.push(packetWithDeliveryTimeAndOrder{ + Packet: p, order: r.packetCount, deliveryTime: deliveryTime, }) @@ -193,8 +188,8 @@ func (r *VariableLatencyRouter) Start(wg *sync.WaitGroup) { case <-deliveryTimer.C: now := time.Now() for len(r.h) > 0 && !r.h[0].deliveryTime.After(now) { - p := heap.Pop(&r.h).(packetWithDeliveryTimeAndOrder).Packet - r.PerfectRouter.RecvPacket(*p) + p := r.h.pop().Packet + r.PerfectRouter.RecvPacket(p) } if len(r.h) > 0 { nextDelivery = r.h[0].deliveryTime @@ -214,7 +209,7 @@ type simpleNodeFirewall struct { node PacketReceiver } -func (f *simpleNodeFirewall) MarkPacketSentOut(p Packet) { +func (f *simpleNodeFirewall) MarkPacketSentOut(p *Packet) { f.mu.Lock() defer f.mu.Unlock() if f.packetsOutTo == nil { @@ -223,7 +218,7 @@ func (f *simpleNodeFirewall) MarkPacketSentOut(p Packet) { f.packetsOutTo[p.To.String()] = struct{}{} } -func (f *simpleNodeFirewall) IsPacketInAllowed(p Packet) bool { +func (f *simpleNodeFirewall) IsPacketInAllowed(p *Packet) bool { f.mu.Lock() defer f.mu.Unlock() if f.publiclyReachable { @@ -264,7 +259,7 @@ func (r *SimpleFirewallRouter) SetAddrPubliclyReachable(addr net.Addr) { r.publiclyReachableAddrs[addr.String()] = true } -func (r *SimpleFirewallRouter) RecvPacket(p Packet) { +func (r *SimpleFirewallRouter) RecvPacket(p *Packet) { r.mu.Lock() defer r.mu.Unlock() toNode, exists := r.nodes[p.To.String()] diff --git a/router_test.go b/router_test.go index 23f357b..4f4fb19 100644 --- a/router_test.go +++ b/router_test.go @@ -17,7 +17,7 @@ func TestVariableLatencyRouterDelaysPackets(t *testing.T) { }, receiver) sendTime := time.Now() - router.RecvPacket(Packet{From: from, To: to, buf: []byte{0x1}}) + router.RecvPacket(&Packet{From: from, To: to, buf: []byte{0x1}}) delivery := receiver.waitFor(t) delay := delivery.arrival.Sub(sendTime) @@ -44,9 +44,9 @@ func TestVariableLatencyRouterAllowsReordering(t *testing.T) { return 5 * time.Millisecond }, receiver) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}}) + router.RecvPacket(&Packet{From: from, To: to, buf: []byte{1}}) time.Sleep(10 * time.Millisecond) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}}) + router.RecvPacket(&Packet{From: from, To: to, buf: []byte{2}}) first := receiver.waitFor(t) second := receiver.waitFor(t) @@ -67,8 +67,8 @@ func TestVariableLatencyRouterKeepsOrderWithEqualLatency(t *testing.T) { return latency }, receiver) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}}) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}}) + router.RecvPacket(&Packet{From: from, To: to, buf: []byte{1}}) + router.RecvPacket(&Packet{From: from, To: to, buf: []byte{2}}) first := receiver.waitFor(t) second := receiver.waitFor(t) @@ -85,7 +85,7 @@ func TestVariableLatencyRouterKeepsOrderWithEqualLatency(t *testing.T) { } type deliveredPacket struct { - packet Packet + packet *Packet arrival time.Time } @@ -99,7 +99,7 @@ func newRouterRecordingReceiver(buffer int) *routerRecordingReceiver { } } -func (r *routerRecordingReceiver) RecvPacket(p Packet) { +func (r *routerRecordingReceiver) RecvPacket(p *Packet) { r.deliveries <- deliveredPacket{ packet: p, arrival: time.Now(), @@ -117,6 +117,53 @@ func (r *routerRecordingReceiver) waitFor(t *testing.T) deliveredPacket { } } +func BenchmarkAddrMapGet(b *testing.B) { + var m addrMap[int] + addrs := []*net.UDPAddr{ + {IP: net.IPv4(10, 0, 0, 1), Port: 1234}, + {IP: net.IPv4(10, 0, 0, 2), Port: 5678}, + {IP: net.IPv4(192, 168, 1, 1), Port: 80}, + {IP: net.IPv4(203, 0, 113, 5), Port: 443}, + } + for i, a := range addrs { + m.Set(a, i) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + m.Get(addrs[i%len(addrs)]) + } +} + +func BenchmarkAddrMapSet(b *testing.B) { + var m addrMap[int] + addrs := []*net.UDPAddr{ + {IP: net.IPv4(10, 0, 0, 1), Port: 1234}, + {IP: net.IPv4(10, 0, 0, 2), Port: 5678}, + {IP: net.IPv4(192, 168, 1, 1), Port: 80}, + {IP: net.IPv4(203, 0, 113, 5), Port: 443}, + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + m.Set(addrs[i%len(addrs)], i) + } +} + +func BenchmarkIPPortKeyFromNetAddr(b *testing.B) { + addr := &net.UDPAddr{IP: net.IPv4(10, 0, 0, 1), Port: 1234} + var k ipPortKey + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + k.FromNetAddr(addr) + } + _ = k +} + func startVariableLatencyRouter(t *testing.T, latency func(*Packet) time.Duration, receiver PacketReceiver) (*VariableLatencyRouter, net.Addr, net.Addr) { t.Helper() router := &VariableLatencyRouter{ diff --git a/simconn.go b/simconn.go index 38cf75c..aabe07c 100644 --- a/simconn.go +++ b/simconn.go @@ -13,7 +13,7 @@ import ( var ErrDeadlineExceeded = errors.New("deadline exceeded") type PacketReceiver interface { - RecvPacket(p Packet) + RecvPacket(p *Packet) } // Router handles routing of packets between simulated connections. @@ -31,11 +31,27 @@ type Packet struct { func (p *Packet) Hash(h *maphash.Hash) uint64 { h.Reset() - h.WriteString(p.To.String()) - h.WriteString(p.From.String()) + hashAddr(h, p.To) + hashAddr(h, p.From) return h.Sum64() } +// hashAddr writes the address bytes to the hash without allocating a string. +func hashAddr(h *maphash.Hash, addr net.Addr) { + switch a := addr.(type) { + case *net.UDPAddr: + h.Write(a.IP) + h.WriteByte(byte(a.Port >> 8)) + h.WriteByte(byte(a.Port)) + case *net.TCPAddr: + h.Write(a.IP) + h.WriteByte(byte(a.Port >> 8)) + h.WriteByte(byte(a.Port)) + default: + h.WriteString(addr.String()) + } +} + // SimConn is a simulated network connection that implements net.PacketConn. It // provides packet-based communication through a Router for testing and // simulation purposes. All send/recv operations are handled through the @@ -57,7 +73,7 @@ type SimConn struct { myAddr *net.UDPAddr myLocalAddr net.Addr - packetsToRead chan Packet + packetsToRead chan *Packet // Controls whether to block when receiving packets if our buffer is full. // If false, drops packets. @@ -83,7 +99,7 @@ func newSimConn(addr *net.UDPAddr, block bool) *SimConn { c := &SimConn{ recvBackPressure: block, myAddr: addr, - packetsToRead: make(chan Packet, 32), + packetsToRead: make(chan *Packet, 32), closedChan: make(chan struct{}), deadlineUpdated: make(chan struct{}, 1), } @@ -126,7 +142,7 @@ func (c *SimConn) SetWriteBuffer(n int) error { return nil } -func (c *SimConn) RecvPacket(p Packet) { +func (c *SimConn) RecvPacket(p *Packet) { c.mu.Lock() if c.closed { c.mu.Unlock() @@ -180,7 +196,7 @@ func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { return 0, nil, ErrDeadlineExceeded } - var pkt Packet + var pkt *Packet var deadlineTimer <-chan time.Time if !deadline.IsZero() { deadlineTimer = time.After(time.Until(deadline)) @@ -220,7 +236,7 @@ func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { c.packetsSent.Add(1) c.bytesSent.Add(int64(len(p))) - pkt := Packet{ + pkt := &Packet{ From: c.myAddr, To: addr, buf: slices.Clone(p), diff --git a/simconn_test.go b/simconn_test.go index 42b3978..72781ef 100644 --- a/simconn_test.go +++ b/simconn_test.go @@ -2,6 +2,7 @@ package simnet import ( "bytes" + "hash/maphash" "net" "sync" "testing" @@ -223,6 +224,31 @@ func TestSimpleHolePunch(t *testing.T) { }) } +func BenchmarkPacketHash(b *testing.B) { + p := Packet{ + To: &net.UDPAddr{IP: IntToPublicIPv4(1), Port: 1234}, + From: &net.UDPAddr{IP: IntToPublicIPv4(2), Port: 5678}, + } + var h maphash.Hash + + b.Run("optimized", func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + p.Hash(&h) + } + }) + + b.Run("string", func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + h.Reset() + h.WriteString(p.To.String()) + h.WriteString(p.From.String()) + h.Sum64() + } + }) +} + func TestPublicIP(t *testing.T) { err := quick.Check(func(n int) bool { ip := IntToPublicIPv4(n) diff --git a/simlink.go b/simlink.go index 432b39b..83ce9fd 100644 --- a/simlink.go +++ b/simlink.go @@ -81,7 +81,7 @@ func (l *Simlink) Start(wg *sync.WaitGroup) { } type linkDriver struct { - newPacket chan Packet + newPacket chan *Packet q fqCoDel rateLink *RateLink closeSignal chan struct{} @@ -96,14 +96,14 @@ func newLinkDriver( receiver PacketReceiver, closeSignal chan struct{}) *linkDriver { return &linkDriver{ - newPacket: make(chan Packet, 1_024), + newPacket: make(chan *Packet, 32), q: newFqCoDel(target, interval, quantum, flowCount), closeSignal: closeSignal, rateLink: NewRateLink(bandwidth, mtu, receiver), } } -func (d *linkDriver) RecvPacket(p Packet) { +func (d *linkDriver) RecvPacket(p *Packet) { d.newPacket <- p } @@ -112,7 +112,7 @@ func (d *linkDriver) RecvPacket(p Packet) { func (d *linkDriver) processQueue() time.Duration { for { if d.pendingPacket != nil { - d.rateLink.RecvPacket(*d.pendingPacket) + d.rateLink.RecvPacket(d.pendingPacket) d.pendingPacket = nil } @@ -120,7 +120,7 @@ func (d *linkDriver) processQueue() time.Duration { if !ok { return 0 // queue empty } - d.pendingPacket = &p + d.pendingPacket = p now := time.Now() if d.rateLink.AllowN(now, len(p.buf)) { continue // can send immediately diff --git a/simlink_test.go b/simlink_test.go index 1007021..7ae1c08 100644 --- a/simlink_test.go +++ b/simlink_test.go @@ -17,10 +17,10 @@ import ( ) type testRouter struct { - onRecv func(p Packet) + onRecv func(p *Packet) } -func (r *testRouter) RecvPacket(p Packet) { +func (r *testRouter) RecvPacket(p *Packet) { r.onRecv(p) } @@ -33,7 +33,7 @@ func TestLinkDriver(t *testing.T) { const mtu = 1500 var recvdPackets atomic.Uint32 tr := testRouter{ - onRecv: func(p Packet) { + onRecv: func(p *Packet) { recvdPackets.Add(1) }, } @@ -56,7 +56,7 @@ func TestLinkDriver(t *testing.T) { defer wg.Wait() defer close(closeSignal) - ld.RecvPacket(Packet{ + ld.RecvPacket(&Packet{ buf: []byte("Hello World"), To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")), From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")), @@ -86,7 +86,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) { recvStartTimeChan := make(chan time.Time, 1) recvStarted := false bytesRead := 0 - packetHandler := func(p Packet) { + packetHandler := func(p *Packet) { if !recvStarted { recvStarted = true recvStartTimeChan <- time.Now() @@ -118,7 +118,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) { // This sleep shouldn't limit the speed. 1400 Bytes/100us = 14KB/ms = 14MB/s = 14*8 Mbps // but it acts as a simple pacer to avoid just dropping the packets when the link is saturated. time.Sleep(100 * time.Microsecond) - p := Packet{ + p := &Packet{ buf: chunk, To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")), From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")),