From b54db8b554d2558c7959603873e2bd6d07adc88d Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 12 Jan 2026 15:06:40 +0530 Subject: [PATCH] export FQ_CODEL This allows users to use the FQ_CODEL logic without using rest of simnet. --- codel.go | 6 +++--- codel_test.go | 24 ++++++++++++------------ fq_codel.go | 4 ++-- fq_codel_test.go | 6 +++--- ratelink_test.go | 6 +++--- router_test.go | 30 +++++++++++++++--------------- simconn.go | 8 ++++---- simlink.go | 24 ++++++++++++------------ simlink_test.go | 8 ++++---- 9 files changed, 58 insertions(+), 58 deletions(-) diff --git a/codel.go b/codel.go index c763714..9c497b0 100644 --- a/codel.go +++ b/codel.go @@ -39,7 +39,7 @@ func newCoDelQueue(target, interval time.Duration) coDelQueue { // Enqueue adds a packet to the queue func (q *coDelQueue) Enqueue(p Packet) { - q.byteCount += uint64(len(p.buf)) + q.byteCount += uint64(len(p.Buf)) q.q.PushBack(packetWithTimestamp{p, time.Now()}) } @@ -96,7 +96,7 @@ func (q *coDelQueue) Dequeue() (Packet, bool) { q.lastCount = q.count } - return p.Packet, len(p.Packet.buf) > 0 + return p.Packet, len(p.Packet.Buf) > 0 } func (q *coDelQueue) drop(p packetWithTimestamp) { @@ -110,7 +110,7 @@ func (q *coDelQueue) doDequeue(now time.Time) (p packetWithTimestamp, okToDrop b } p = q.q.PopFront() - q.byteCount -= uint64(len(p.Packet.buf)) + q.byteCount -= uint64(len(p.Packet.Buf)) sojournTime := now.Sub(p.ts) if sojournTime < q.target || q.byteCount < uint64(q.MTU) { q.firstAbove = time.Time{} diff --git a/codel_test.go b/codel_test.go index 19e796d..89b149b 100644 --- a/codel_test.go +++ b/codel_test.go @@ -19,17 +19,17 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) { // Build a queue that stays full for more than one interval. for i := range 4 { - q.Enqueue(Packet{buf: []byte{byte(i)}}) + q.Enqueue(Packet{Buf: []byte{byte(i)}}) } // Allow in-flight packets to accumulate sojourn time. time.Sleep(3 * q.interval) pkt, ok := q.Dequeue() - if !ok || len(pkt.buf) == 0 { + if !ok || len(pkt.Buf) == 0 { t.Fatal("expected packet before CoDel enters drop state") } - if got := pkt.buf[0]; got != 0 { + if got := pkt.Buf[0]; got != 0 { t.Fatalf("first dequeue returned %d, want 0", got) } if q.dropping { @@ -40,10 +40,10 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) { time.Sleep(3 * q.interval) pkt, ok = q.Dequeue() - if !ok || len(pkt.buf) == 0 { + if !ok || len(pkt.Buf) == 0 { t.Fatal("expected packet after CoDel begins dropping") } - if got := pkt.buf[0]; got != 2 { + if got := pkt.Buf[0]; got != 2 { t.Fatalf("persistent queue should drop packet 1, got %d", got) } if !q.dropping { @@ -62,27 +62,27 @@ func TestCodelQueueNoDropOnTransientQueue(t *testing.T) { q.MTU = 1 for i := range 2 { - q.Enqueue(Packet{buf: []byte{byte(i)}}) + q.Enqueue(Packet{Buf: []byte{byte(i)}}) } // The queue goes above target but drains in less than one interval. time.Sleep(q.target + q.interval/10) pkt, ok := q.Dequeue() - if !ok || len(pkt.buf) == 0 { + if !ok || len(pkt.Buf) == 0 { t.Fatal("expected first packet delivery") } - if got := pkt.buf[0]; got != 0 { + if got := pkt.Buf[0]; got != 0 { t.Fatalf("expected packet 0, got %d", got) } time.Sleep(q.interval / 2) pkt, ok = q.Dequeue() - if !ok || len(pkt.buf) == 0 { + if !ok || len(pkt.Buf) == 0 { t.Fatal("expected second packet delivery") } - if got := pkt.buf[0]; got != 1 { + if got := pkt.Buf[0]; got != 1 { t.Fatalf("expected packet 1, got %d", got) } if q.dropping { @@ -98,7 +98,7 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) { packets := make([]Packet, queueSize) data := []byte("test") for i := range queueSize { - packets[i] = Packet{buf: data} + packets[i] = Packet{Buf: data} } q := newCoDelQueue(5*time.Millisecond, 100*time.Millisecond) @@ -112,7 +112,7 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) { i = (i + 1) % queueSize pkt, ok := q.Dequeue() - if !ok || len(pkt.buf) == 0 { + if !ok || len(pkt.Buf) == 0 { b.Fatal("unexpected empty dequeue") } } diff --git a/fq_codel.go b/fq_codel.go index 06f7741..e785ca2 100644 --- a/fq_codel.go +++ b/fq_codel.go @@ -94,7 +94,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) { // Otherwise, that queue is selected for dequeue. if p, ok := fq.Dequeue(); ok { - fq.credits -= len(p.buf) + fq.credits -= len(p.Buf) return p, true } else { // If the CoDel algorithm does not return a packet, then the @@ -136,7 +136,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) { // algorithm, it subtracts the size of the packet from the byte credits // for the selected queue and returns the packet as the result of the // dequeue operation. - fq.credits -= len(p.buf) + fq.credits -= len(p.Buf) return p, true } else { // Finally, if the CoDel algorithm does not return a packet, then the diff --git a/fq_codel_test.go b/fq_codel_test.go index 13ed3d9..1292083 100644 --- a/fq_codel_test.go +++ b/fq_codel_test.go @@ -212,7 +212,7 @@ func (f testFlow) packet(seq, size int) Packet { return Packet{ To: &to, From: &from, - buf: flowPayload(f.id, seq, size), + Buf: flowPayload(f.id, seq, size), } } @@ -224,10 +224,10 @@ func (f testFlow) bucket(q *fqCoDel) int { } func decodeFlowAndSeq(p Packet) (flowID, seq int) { - if len(p.buf) < 2 { + if len(p.Buf) < 2 { return -1, -1 } - return int(p.buf[0]), int(p.buf[1]) + return int(p.Buf[0]), int(p.Buf[1]) } func flowPayload(flowID, seq, size int) []byte { diff --git a/ratelink_test.go b/ratelink_test.go index 3815b43..c2ad852 100644 --- a/ratelink_test.go +++ b/ratelink_test.go @@ -12,7 +12,7 @@ type countingReceiver struct { } func (c *countingReceiver) RecvPacket(p Packet) { - c.totalBytes += len(p.buf) + c.totalBytes += len(p.Buf) } func TestRateLinkObservedBandwidth(t *testing.T) { @@ -32,8 +32,8 @@ func TestRateLinkObservedBandwidth(t *testing.T) { start := time.Now() for range packets { - p := Packet{buf: chunk} - time.Sleep(link.Reserve(time.Now(), len(p.buf))) + p := Packet{Buf: chunk} + time.Sleep(link.Reserve(time.Now(), len(p.Buf))) link.RecvPacket(p) } duration := time.Since(start) diff --git a/router_test.go b/router_test.go index 23f357b..f65c664 100644 --- a/router_test.go +++ b/router_test.go @@ -17,7 +17,7 @@ func TestVariableLatencyRouterDelaysPackets(t *testing.T) { }, receiver) sendTime := time.Now() - router.RecvPacket(Packet{From: from, To: to, buf: []byte{0x1}}) + router.RecvPacket(Packet{From: from, To: to, Buf: []byte{0x1}}) delivery := receiver.waitFor(t) delay := delivery.arrival.Sub(sendTime) @@ -35,26 +35,26 @@ func TestVariableLatencyRouterAllowsReordering(t *testing.T) { synctest.Test(t, func(t *testing.T) { receiver := newRouterRecordingReceiver(2) router, from, to := startVariableLatencyRouter(t, func(p *Packet) time.Duration { - if len(p.buf) == 0 { + if len(p.Buf) == 0 { return 0 } - if p.buf[0] == 1 { + if p.Buf[0] == 1 { return 60 * time.Millisecond } return 5 * time.Millisecond }, receiver) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}}) + router.RecvPacket(Packet{From: from, To: to, Buf: []byte{1}}) time.Sleep(10 * time.Millisecond) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}}) + router.RecvPacket(Packet{From: from, To: to, Buf: []byte{2}}) first := receiver.waitFor(t) second := receiver.waitFor(t) - if first.packet.buf[0] != 2 { - t.Fatalf("expected packet 2 to arrive first, got %d", first.packet.buf[0]) + if first.packet.Buf[0] != 2 { + t.Fatalf("expected packet 2 to arrive first, got %d", first.packet.Buf[0]) } - if second.packet.buf[0] != 1 { - t.Fatalf("expected packet 1 to arrive second, got %d", second.packet.buf[0]) + if second.packet.Buf[0] != 1 { + t.Fatalf("expected packet 1 to arrive second, got %d", second.packet.Buf[0]) } }) } @@ -67,16 +67,16 @@ func TestVariableLatencyRouterKeepsOrderWithEqualLatency(t *testing.T) { return latency }, receiver) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}}) - router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}}) + router.RecvPacket(Packet{From: from, To: to, Buf: []byte{1}}) + router.RecvPacket(Packet{From: from, To: to, Buf: []byte{2}}) first := receiver.waitFor(t) second := receiver.waitFor(t) - if first.packet.buf[0] != 1 { - t.Fatalf("expected packet 1 to arrive first, got %d", first.packet.buf[0]) + if first.packet.Buf[0] != 1 { + t.Fatalf("expected packet 1 to arrive first, got %d", first.packet.Buf[0]) } - if second.packet.buf[0] != 2 { - t.Fatalf("expected packet 2 to arrive second, got %d", second.packet.buf[0]) + if second.packet.Buf[0] != 2 { + t.Fatalf("expected packet 2 to arrive second, got %d", second.packet.Buf[0]) } if second.arrival.Before(first.arrival) { t.Fatalf("expected packets with same latency to preserve order, got first=%v second=%v", first.arrival, second.arrival) diff --git a/simconn.go b/simconn.go index 38cf75c..9f7ab0c 100644 --- a/simconn.go +++ b/simconn.go @@ -26,7 +26,7 @@ type Router interface { type Packet struct { To net.Addr From net.Addr - buf []byte + Buf []byte } func (p *Packet) Hash(h *maphash.Hash) uint64 { @@ -134,7 +134,7 @@ func (c *SimConn) RecvPacket(p Packet) { } c.mu.Unlock() c.packetsRcvd.Add(1) - c.bytesRcvd.Add(int64(len(p.buf))) + c.bytesRcvd.Add(int64(len(p.Buf))) if c.recvBackPressure { select { @@ -196,7 +196,7 @@ func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { return 0, nil, ErrDeadlineExceeded } - n = copy(p, pkt.buf) + n = copy(p, pkt.Buf) // if the provided buffer is not enough to read the whole packet, we drop // the rest of the data. this is similar to what `recvfrom` does on Linux // and macOS. @@ -223,7 +223,7 @@ func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { pkt := Packet{ From: c.myAddr, To: addr, - buf: slices.Clone(p), + Buf: slices.Clone(p), } if c.upPacketReceiver == nil { panic("upPacketReceiver is nil. Did you forget to call simconn.SetUpPacketReceiver?") diff --git a/simlink.go b/simlink.go index 402a69b..b06f87f 100644 --- a/simlink.go +++ b/simlink.go @@ -31,8 +31,8 @@ type LinkSettings struct { // Simlink simulates a bidirectional network link with variable latency, // bandwidth limiting, and CoDel-based bufferbloat mitigation type Simlink struct { - up *linkDriver - down *linkDriver + up *LinkDriver + down *LinkDriver } func NewSimlink( @@ -62,7 +62,7 @@ func NewSimlink( } return &Simlink{ - up: newLinkDriver( + up: NewLinkDriver( target, interval, linkSettings.Uplink.MTU, linkSettings.Uplink.FlowBucketCount, @@ -70,7 +70,7 @@ func NewSimlink( upPacketReceiver, closeSignal, ), - down: newLinkDriver( + down: NewLinkDriver( target, interval, linkSettings.Downlink.MTU, linkSettings.Downlink.FlowBucketCount, @@ -86,21 +86,21 @@ func (l *Simlink) Start(wg *sync.WaitGroup) { l.down.Start(wg) } -type linkDriver struct { +type LinkDriver struct { newPacket chan Packet q fqCoDel rateLink *RateLink closeSignal chan struct{} } -func newLinkDriver( +func NewLinkDriver( target, interval time.Duration, quantum int, flowCount int, mtu int, bandwidth int, receiver PacketReceiver, - closeSignal chan struct{}) *linkDriver { - return &linkDriver{ + closeSignal chan struct{}) *LinkDriver { + return &LinkDriver{ newPacket: make(chan Packet, 1_024), q: newFqCoDel(target, interval, quantum, flowCount), closeSignal: closeSignal, @@ -108,11 +108,11 @@ func newLinkDriver( } } -func (d *linkDriver) RecvPacket(p Packet) { +func (d *LinkDriver) RecvPacket(p Packet) { d.newPacket <- p } -func (d *linkDriver) Start(wg *sync.WaitGroup) { +func (d *LinkDriver) Start(wg *sync.WaitGroup) { wgGo(wg, func() { deqTimer := time.NewTimer(0) deqTimer.Stop() @@ -138,10 +138,10 @@ func (d *linkDriver) Start(wg *sync.WaitGroup) { if ok { pendingPacket = &p now := time.Now() - if d.rateLink.AllowN(now, len(p.buf)) { + if d.rateLink.AllowN(now, len(p.Buf)) { continue } - delayDeq := d.rateLink.Reserve(now, len(p.buf)) + delayDeq := d.rateLink.Reserve(now, len(p.Buf)) deqTimer.Reset(delayDeq) } break diff --git a/simlink_test.go b/simlink_test.go index 1007021..b8f6d40 100644 --- a/simlink_test.go +++ b/simlink_test.go @@ -39,7 +39,7 @@ func TestLinkDriver(t *testing.T) { } closeSignal := make(chan struct{}) - ld := newLinkDriver( + ld := NewLinkDriver( 5*time.Millisecond, 100*time.Millisecond, 10*mtu, @@ -57,7 +57,7 @@ func TestLinkDriver(t *testing.T) { defer close(closeSignal) ld.RecvPacket(Packet{ - buf: []byte("Hello World"), + Buf: []byte("Hello World"), To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")), From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")), }) @@ -91,7 +91,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) { recvStarted = true recvStartTimeChan <- time.Now() } - bytesRead += len(p.buf) + bytesRead += len(p.Buf) } router := &testRouter{} @@ -119,7 +119,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) { // but it acts as a simple pacer to avoid just dropping the packets when the link is saturated. time.Sleep(100 * time.Microsecond) p := Packet{ - buf: chunk, + Buf: chunk, To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")), From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")), }