Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions codel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newCoDelQueue(target, interval time.Duration) coDelQueue {

// Enqueue adds a packet to the queue
func (q *coDelQueue) Enqueue(p Packet) {
q.byteCount += uint64(len(p.buf))
q.byteCount += uint64(len(p.Buf))
q.q.PushBack(packetWithTimestamp{p, time.Now()})
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func (q *coDelQueue) Dequeue() (Packet, bool) {
q.lastCount = q.count
}

return p.Packet, len(p.Packet.buf) > 0
return p.Packet, len(p.Packet.Buf) > 0
}

func (q *coDelQueue) drop(p packetWithTimestamp) {
Expand All @@ -110,7 +110,7 @@ func (q *coDelQueue) doDequeue(now time.Time) (p packetWithTimestamp, okToDrop b
}

p = q.q.PopFront()
q.byteCount -= uint64(len(p.Packet.buf))
q.byteCount -= uint64(len(p.Packet.Buf))
sojournTime := now.Sub(p.ts)
if sojournTime < q.target || q.byteCount < uint64(q.MTU) {
q.firstAbove = time.Time{}
Expand Down
24 changes: 12 additions & 12 deletions codel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) {

// Build a queue that stays full for more than one interval.
for i := range 4 {
q.Enqueue(Packet{buf: []byte{byte(i)}})
q.Enqueue(Packet{Buf: []byte{byte(i)}})
}

// Allow in-flight packets to accumulate sojourn time.
time.Sleep(3 * q.interval)

pkt, ok := q.Dequeue()
if !ok || len(pkt.buf) == 0 {
if !ok || len(pkt.Buf) == 0 {
t.Fatal("expected packet before CoDel enters drop state")
}
if got := pkt.buf[0]; got != 0 {
if got := pkt.Buf[0]; got != 0 {
t.Fatalf("first dequeue returned %d, want 0", got)
}
if q.dropping {
Expand All @@ -40,10 +40,10 @@ func TestCodelQueueDropsPersistentBadQueue(t *testing.T) {
time.Sleep(3 * q.interval)

pkt, ok = q.Dequeue()
if !ok || len(pkt.buf) == 0 {
if !ok || len(pkt.Buf) == 0 {
t.Fatal("expected packet after CoDel begins dropping")
}
if got := pkt.buf[0]; got != 2 {
if got := pkt.Buf[0]; got != 2 {
t.Fatalf("persistent queue should drop packet 1, got %d", got)
}
if !q.dropping {
Expand All @@ -62,27 +62,27 @@ func TestCodelQueueNoDropOnTransientQueue(t *testing.T) {
q.MTU = 1

for i := range 2 {
q.Enqueue(Packet{buf: []byte{byte(i)}})
q.Enqueue(Packet{Buf: []byte{byte(i)}})
}

// The queue goes above target but drains in less than one interval.
time.Sleep(q.target + q.interval/10)

pkt, ok := q.Dequeue()
if !ok || len(pkt.buf) == 0 {
if !ok || len(pkt.Buf) == 0 {
t.Fatal("expected first packet delivery")
}
if got := pkt.buf[0]; got != 0 {
if got := pkt.Buf[0]; got != 0 {
t.Fatalf("expected packet 0, got %d", got)
}

time.Sleep(q.interval / 2)

pkt, ok = q.Dequeue()
if !ok || len(pkt.buf) == 0 {
if !ok || len(pkt.Buf) == 0 {
t.Fatal("expected second packet delivery")
}
if got := pkt.buf[0]; got != 1 {
if got := pkt.Buf[0]; got != 1 {
t.Fatalf("expected packet 1, got %d", got)
}
if q.dropping {
Expand All @@ -98,7 +98,7 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) {
packets := make([]Packet, queueSize)
data := []byte("test")
for i := range queueSize {
packets[i] = Packet{buf: data}
packets[i] = Packet{Buf: data}
}

q := newCoDelQueue(5*time.Millisecond, 100*time.Millisecond)
Expand All @@ -112,7 +112,7 @@ func BenchmarkCodelQueueEnqueueDequeue(b *testing.B) {
i = (i + 1) % queueSize

pkt, ok := q.Dequeue()
if !ok || len(pkt.buf) == 0 {
if !ok || len(pkt.Buf) == 0 {
b.Fatal("unexpected empty dequeue")
}
}
Expand Down
4 changes: 2 additions & 2 deletions fq_codel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) {

// Otherwise, that queue is selected for dequeue.
if p, ok := fq.Dequeue(); ok {
fq.credits -= len(p.buf)
fq.credits -= len(p.Buf)
return p, true
} else {
// If the CoDel algorithm does not return a packet, then the
Expand Down Expand Up @@ -136,7 +136,7 @@ func (q *fqCoDel) Dequeue() (Packet, bool) {
// algorithm, it subtracts the size of the packet from the byte credits
// for the selected queue and returns the packet as the result of the
// dequeue operation.
fq.credits -= len(p.buf)
fq.credits -= len(p.Buf)
return p, true
} else {
// Finally, if the CoDel algorithm does not return a packet, then the
Expand Down
6 changes: 3 additions & 3 deletions fq_codel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (f testFlow) packet(seq, size int) Packet {
return Packet{
To: &to,
From: &from,
buf: flowPayload(f.id, seq, size),
Buf: flowPayload(f.id, seq, size),
}
}

Expand All @@ -224,10 +224,10 @@ func (f testFlow) bucket(q *fqCoDel) int {
}

func decodeFlowAndSeq(p Packet) (flowID, seq int) {
if len(p.buf) < 2 {
if len(p.Buf) < 2 {
return -1, -1
}
return int(p.buf[0]), int(p.buf[1])
return int(p.Buf[0]), int(p.Buf[1])
}

func flowPayload(flowID, seq, size int) []byte {
Expand Down
6 changes: 3 additions & 3 deletions ratelink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type countingReceiver struct {
}

func (c *countingReceiver) RecvPacket(p Packet) {
c.totalBytes += len(p.buf)
c.totalBytes += len(p.Buf)
}

func TestRateLinkObservedBandwidth(t *testing.T) {
Expand All @@ -32,8 +32,8 @@ func TestRateLinkObservedBandwidth(t *testing.T) {

start := time.Now()
for range packets {
p := Packet{buf: chunk}
time.Sleep(link.Reserve(time.Now(), len(p.buf)))
p := Packet{Buf: chunk}
time.Sleep(link.Reserve(time.Now(), len(p.Buf)))
link.RecvPacket(p)
}
duration := time.Since(start)
Expand Down
30 changes: 15 additions & 15 deletions router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestVariableLatencyRouterDelaysPackets(t *testing.T) {
}, receiver)

sendTime := time.Now()
router.RecvPacket(Packet{From: from, To: to, buf: []byte{0x1}})
router.RecvPacket(Packet{From: from, To: to, Buf: []byte{0x1}})

delivery := receiver.waitFor(t)
delay := delivery.arrival.Sub(sendTime)
Expand All @@ -35,26 +35,26 @@ func TestVariableLatencyRouterAllowsReordering(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
receiver := newRouterRecordingReceiver(2)
router, from, to := startVariableLatencyRouter(t, func(p *Packet) time.Duration {
if len(p.buf) == 0 {
if len(p.Buf) == 0 {
return 0
}
if p.buf[0] == 1 {
if p.Buf[0] == 1 {
return 60 * time.Millisecond
}
return 5 * time.Millisecond
}, receiver)

router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}})
router.RecvPacket(Packet{From: from, To: to, Buf: []byte{1}})
time.Sleep(10 * time.Millisecond)
router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}})
router.RecvPacket(Packet{From: from, To: to, Buf: []byte{2}})

first := receiver.waitFor(t)
second := receiver.waitFor(t)
if first.packet.buf[0] != 2 {
t.Fatalf("expected packet 2 to arrive first, got %d", first.packet.buf[0])
if first.packet.Buf[0] != 2 {
t.Fatalf("expected packet 2 to arrive first, got %d", first.packet.Buf[0])
}
if second.packet.buf[0] != 1 {
t.Fatalf("expected packet 1 to arrive second, got %d", second.packet.buf[0])
if second.packet.Buf[0] != 1 {
t.Fatalf("expected packet 1 to arrive second, got %d", second.packet.Buf[0])
}
})
}
Expand All @@ -67,16 +67,16 @@ func TestVariableLatencyRouterKeepsOrderWithEqualLatency(t *testing.T) {
return latency
}, receiver)

router.RecvPacket(Packet{From: from, To: to, buf: []byte{1}})
router.RecvPacket(Packet{From: from, To: to, buf: []byte{2}})
router.RecvPacket(Packet{From: from, To: to, Buf: []byte{1}})
router.RecvPacket(Packet{From: from, To: to, Buf: []byte{2}})

first := receiver.waitFor(t)
second := receiver.waitFor(t)
if first.packet.buf[0] != 1 {
t.Fatalf("expected packet 1 to arrive first, got %d", first.packet.buf[0])
if first.packet.Buf[0] != 1 {
t.Fatalf("expected packet 1 to arrive first, got %d", first.packet.Buf[0])
}
if second.packet.buf[0] != 2 {
t.Fatalf("expected packet 2 to arrive second, got %d", second.packet.buf[0])
if second.packet.Buf[0] != 2 {
t.Fatalf("expected packet 2 to arrive second, got %d", second.packet.Buf[0])
}
if second.arrival.Before(first.arrival) {
t.Fatalf("expected packets with same latency to preserve order, got first=%v second=%v", first.arrival, second.arrival)
Expand Down
8 changes: 4 additions & 4 deletions simconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Router interface {
type Packet struct {
To net.Addr
From net.Addr
buf []byte
Buf []byte
}

func (p *Packet) Hash(h *maphash.Hash) uint64 {
Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *SimConn) RecvPacket(p Packet) {
}
c.mu.Unlock()
c.packetsRcvd.Add(1)
c.bytesRcvd.Add(int64(len(p.buf)))
c.bytesRcvd.Add(int64(len(p.Buf)))

if c.recvBackPressure {
select {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *SimConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
return 0, nil, ErrDeadlineExceeded
}

n = copy(p, pkt.buf)
n = copy(p, pkt.Buf)
// if the provided buffer is not enough to read the whole packet, we drop
// the rest of the data. this is similar to what `recvfrom` does on Linux
// and macOS.
Expand All @@ -223,7 +223,7 @@ func (c *SimConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
pkt := Packet{
From: c.myAddr,
To: addr,
buf: slices.Clone(p),
Buf: slices.Clone(p),
}
if c.upPacketReceiver == nil {
panic("upPacketReceiver is nil. Did you forget to call simconn.SetUpPacketReceiver?")
Expand Down
24 changes: 12 additions & 12 deletions simlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type LinkSettings struct {
// Simlink simulates a bidirectional network link with variable latency,
// bandwidth limiting, and CoDel-based bufferbloat mitigation
type Simlink struct {
up *linkDriver
down *linkDriver
up *LinkDriver
down *LinkDriver
}

func NewSimlink(
Expand Down Expand Up @@ -62,15 +62,15 @@ func NewSimlink(
}

return &Simlink{
up: newLinkDriver(
up: NewLinkDriver(
target, interval,
linkSettings.Uplink.MTU,
linkSettings.Uplink.FlowBucketCount,
linkSettings.Uplink.MTU, linkSettings.Uplink.BitsPerSecond,
upPacketReceiver,
closeSignal,
),
down: newLinkDriver(
down: NewLinkDriver(
target, interval,
linkSettings.Downlink.MTU,
linkSettings.Downlink.FlowBucketCount,
Expand All @@ -86,33 +86,33 @@ func (l *Simlink) Start(wg *sync.WaitGroup) {
l.down.Start(wg)
}

type linkDriver struct {
type LinkDriver struct {
newPacket chan Packet
q fqCoDel
rateLink *RateLink
closeSignal chan struct{}
}

func newLinkDriver(
func NewLinkDriver(
target, interval time.Duration,
quantum int,
flowCount int,
mtu int, bandwidth int,
receiver PacketReceiver,
closeSignal chan struct{}) *linkDriver {
return &linkDriver{
closeSignal chan struct{}) *LinkDriver {
return &LinkDriver{
newPacket: make(chan Packet, 1_024),
q: newFqCoDel(target, interval, quantum, flowCount),
closeSignal: closeSignal,
rateLink: NewRateLink(bandwidth, mtu, receiver),
}
}

func (d *linkDriver) RecvPacket(p Packet) {
func (d *LinkDriver) RecvPacket(p Packet) {
d.newPacket <- p
}

func (d *linkDriver) Start(wg *sync.WaitGroup) {
func (d *LinkDriver) Start(wg *sync.WaitGroup) {
wgGo(wg, func() {
deqTimer := time.NewTimer(0)
deqTimer.Stop()
Expand All @@ -138,10 +138,10 @@ func (d *linkDriver) Start(wg *sync.WaitGroup) {
if ok {
pendingPacket = &p
now := time.Now()
if d.rateLink.AllowN(now, len(p.buf)) {
if d.rateLink.AllowN(now, len(p.Buf)) {
continue
}
delayDeq := d.rateLink.Reserve(now, len(p.buf))
delayDeq := d.rateLink.Reserve(now, len(p.Buf))
deqTimer.Reset(delayDeq)
}
break
Expand Down
8 changes: 4 additions & 4 deletions simlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLinkDriver(t *testing.T) {
}

closeSignal := make(chan struct{})
ld := newLinkDriver(
ld := NewLinkDriver(
5*time.Millisecond,
100*time.Millisecond,
10*mtu,
Expand All @@ -57,7 +57,7 @@ func TestLinkDriver(t *testing.T) {
defer close(closeSignal)

ld.RecvPacket(Packet{
buf: []byte("Hello World"),
Buf: []byte("Hello World"),
To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")),
From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")),
})
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) {
recvStarted = true
recvStartTimeChan <- time.Now()
}
bytesRead += len(p.buf)
bytesRead += len(p.Buf)
}

router := &testRouter{}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestBandwidthLimiter_synctest(t *testing.T) {
// but it acts as a simple pacer to avoid just dropping the packets when the link is saturated.
time.Sleep(100 * time.Microsecond)
p := Packet{
buf: chunk,
Buf: chunk,
To: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.4:1234")),
From: net.UDPAddrFromAddrPort(netip.MustParseAddrPort("1.2.3.5:1234")),
}
Expand Down