From 9daced270011af5896fad6a3ee49abb15a69b94b Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 15:40:06 -0800 Subject: [PATCH 01/23] speedup --- internal/intern/intern.go | 60 ++++++++++--------- internal/intern/intern_test.go | 104 +++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 27 deletions(-) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 7b9203c0f..e465ca840 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "github.com/bufbuild/protocompile/internal/ext/mapsx" "github.com/bufbuild/protocompile/internal/ext/unsafex" @@ -71,11 +72,12 @@ func (id ID) GoString() string { // // The zero value of Table is empty and ready to use. type Table struct { - mu sync.RWMutex - index map[string]ID - table []string + index sync.Map + table atomic.Pointer[[]string] } +var pending []string + // Intern interns the given string into this table. // // This function may be called by multiple goroutines concurrently. @@ -107,11 +109,12 @@ func (t *Table) Query(s string) (ID, bool) { return char6, true } - t.mu.RLock() - id, ok := t.index[s] - t.mu.RUnlock() + id, ok := t.index.Load(s) + if !ok { + return 0, false + } - return id, ok + return id.(ID), true } func (t *Table) internSlow(s string) ID { @@ -122,34 +125,34 @@ func (t *Table) internSlow(s string) ID { // a []byte as a string temporarily for querying the intern table. s = strings.Clone(s) - t.mu.Lock() - defer t.mu.Unlock() + // Take ownership of the table. +cas: + table := t.table.Load() + if table == &pending || !t.table.CompareAndSwap(table, &pending) { + goto cas + } + if table == nil { + table = new([]string) + } + defer t.table.Store(table) - // Check if someone raced us to intern this string. We have to check again - // because in the unsynchronized section between RUnlock and Lock, another - // goroutine might have successfully interned s. - // - // TODO: We can reduce the number of map hits if we switch to a different - // Map implementation that provides an upsert primitive. - if id, ok := t.index[s]; ok { - return id + // Check to see if someone beat us to the punch. + if id, ok := t.index.Load(s); ok { + return id.(ID) } // As of here, we have unique ownership of the table, and s has not been // inserted yet. - t.table = append(t.table, s) + *table = append(*table, s) // The first ID will have value 1. ID 0 is reserved for "". - id := ID(len(t.table)) + id := ID(len(*table)) if id < 0 { - panic(fmt.Sprintf("internal/intern: %d interning IDs exhausted", len(t.table))) + panic(fmt.Sprintf("internal/intern: %d interning IDs exhausted", len(*table))) } - if t.index == nil { - t.index = make(map[string]ID) - } - t.index[s] = id + t.index.Store(s, id) return id } @@ -221,9 +224,12 @@ func (t *Table) Preload(ids any) { } func (t *Table) getSlow(id ID) string { - t.mu.RLock() - defer t.mu.RUnlock() - return t.table[int(id)-1] +again: + table := t.table.Load() + if table == &pending { + goto again + } + return (*table)[int(id)-1] } // Set is a set of intern IDs. diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index b47c8ac2c..7bd71b508 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -16,11 +16,16 @@ package intern_test import ( "fmt" + "runtime" + "slices" "strings" + "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" + "github.com/bufbuild/protocompile/internal/ext/slicesx" "github.com/bufbuild/protocompile/internal/intern" ) @@ -75,3 +80,102 @@ func shouldInline(s string) bool { return true } + +func TestHammer(t *testing.T) { + t.Parallel() + + start := new(sync.WaitGroup) + end := new(sync.WaitGroup) + + n := new(atomic.Int64) + it := new(intern.Table) + + // We collect the results of every query to the table, and then ensure + // each gets a unique answer. + mu := new(sync.Mutex) + query := make(map[string][]intern.ID) + value := make(map[intern.ID][]string) + + for range runtime.GOMAXPROCS(0) { + start.Add(1) + end.Add(1) + go func() { + defer end.Done() + + data := makeData(int(n.Add(1))) + m1 := make(map[string][]intern.ID) + m2 := make(map[intern.ID][]string) + + // This ensures that we have a thundering herd situation: all of + // these goroutines wake up and hammer the intern table at the + // same time. + start.Done() + start.Wait() + + for _, s := range data { + id := it.Intern(s) + m1[s] = append(m1[s], id) + + v := it.Value(id) + m2[id] = append(m2[id], v) + + assert.Equal(t, s, v) + } + + mu.Lock() + defer mu.Unlock() + for k, v := range m1 { + query[k] = append(query[k], v...) + } + for k, v := range m2 { + value[k] = append(value[k], v...) + } + }() + } + + end.Wait() + + for k, v := range query { + slices.Sort(v) + v = slicesx.Dedup(v) + assert.Equal(t, 1, len(v), "query[%v]: %v", k, v) + } + + for k, v := range value { + slices.Sort(v) + v = slicesx.Dedup(v) + assert.Equal(t, 1, len(v), "value[%v]: %v", k, v) + } +} + +func BenchmarkIntern(b *testing.B) { + n := new(atomic.Int64) + it := new(intern.Table) + b.RunParallel(func(p *testing.PB) { + data := makeData(int(n.Add(1))) + for p.Next() { + for _, s := range data { + _ = it.Value(it.Intern(s)) + } + } + }) +} + +// makeData generates deterministic pseudo-random data of poor quality, meaning +// that strings are likely to repeat in different orders across different +// seeds. +func makeData(seed int) []string { + var data []string + n := seed + for i := range 10000 { + n += 5 + n %= 99 + + buf := new(strings.Builder) + for j := range n { + buf.WriteRune(rune('a' + (i+j)%26)) + } + data = append(data, buf.String()) + } + return data +} From bca63881c46552bd69caf7a16e2ee159b3b67d26 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 15:55:32 -0800 Subject: [PATCH 02/23] comments --- internal/intern/intern.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index e465ca840..c93020db9 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -73,7 +73,7 @@ func (id ID) GoString() string { // The zero value of Table is empty and ready to use. type Table struct { index sync.Map - table atomic.Pointer[[]string] + table atomic.Pointer[[]string] // Spinlock; locked when it equals &pending. } var pending []string @@ -117,6 +117,7 @@ func (t *Table) Query(s string) (ID, bool) { return id.(ID), true } +//go:nosplit // Avoid preemption while holding the spinlock. func (t *Table) internSlow(s string) ID { // Intern tables are expected to be long-lived. Avoid holding onto a larger // buffer that s is an internal pointer to by cloning it. @@ -125,16 +126,19 @@ func (t *Table) internSlow(s string) ID { // a []byte as a string temporarily for querying the intern table. s = strings.Clone(s) - // Take ownership of the table. -cas: + // Take ownership of the table. We need to take this before we check the + // index for a race, because we can otherwise get a TOCTOU bug: + // 1. Ee check the index, it's missing s. + // 2. Another goroutine inserts s. + // 3. We lock the table to insert, resulting in a duplicate. table := t.table.Load() - if table == &pending || !t.table.CompareAndSwap(table, &pending) { - goto cas + for table == &pending || !t.table.CompareAndSwap(table, &pending) { + table = t.table.Load() } if table == nil { table = new([]string) } - defer t.table.Store(table) + defer t.table.Store(table) // This is the "unlock". // Check to see if someone beat us to the punch. if id, ok := t.index.Load(s); ok { @@ -153,7 +157,6 @@ cas: } t.index.Store(s, id) - return id } @@ -224,12 +227,14 @@ func (t *Table) Preload(ids any) { } func (t *Table) getSlow(id ID) string { -again: - table := t.table.Load() - if table == &pending { - goto again + for { + table := t.table.Load() + if table != &pending { + // If table is nil, that means no intern calls have occurred yet, + // so panicking is fine here (equivalent to an out-of-bounds access). + return (*table)[int(id)-1] + } } - return (*table)[int(id)-1] } // Set is a set of intern IDs. From 437c66c64d63984d1614f80426042490c9451129 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 16:02:52 -0800 Subject: [PATCH 03/23] lint --- internal/intern/intern.go | 29 ++++++++++++++++------------- internal/intern/intern_test.go | 10 +++++----- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index c93020db9..e3c197aed 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -19,6 +19,7 @@ package intern import ( "fmt" "reflect" + "runtime" "strings" "sync" "sync/atomic" @@ -114,7 +115,7 @@ func (t *Table) Query(s string) (ID, bool) { return 0, false } - return id.(ID), true + return id.(ID), true //nolint:errcheck } //go:nosplit // Avoid preemption while holding the spinlock. @@ -133,6 +134,7 @@ func (t *Table) internSlow(s string) ID { // 3. We lock the table to insert, resulting in a duplicate. table := t.table.Load() for table == &pending || !t.table.CompareAndSwap(table, &pending) { + runtime.Gosched() table = t.table.Load() } if table == nil { @@ -142,7 +144,7 @@ func (t *Table) internSlow(s string) ID { // Check to see if someone beat us to the punch. if id, ok := t.index.Load(s); ok { - return id.(ID) + return id.(ID) //nolint:errcheck } // As of here, we have unique ownership of the table, and s has not been @@ -204,6 +206,18 @@ func (t *Table) Value(id ID) string { return t.getSlow(id) } +func (t *Table) getSlow(id ID) string { + for { + table := t.table.Load() + if table != &pending { + // If table is nil, that means no intern calls have occurred yet, + // so panicking is fine here (equivalent to an out-of-bounds access). + return (*table)[int(id)-1] + } + runtime.Gosched() + } +} + // Preload takes a pointer to a struct type and initializes [ID]-typed fields // with statically-specified strings. // @@ -226,17 +240,6 @@ func (t *Table) Preload(ids any) { } } -func (t *Table) getSlow(id ID) string { - for { - table := t.table.Load() - if table != &pending { - // If table is nil, that means no intern calls have occurred yet, - // so panicking is fine here (equivalent to an out-of-bounds access). - return (*table)[int(id)-1] - } - } -} - // Set is a set of intern IDs. type Set map[ID]struct{} diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index 7bd71b508..b1c196fbf 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -138,13 +138,13 @@ func TestHammer(t *testing.T) { for k, v := range query { slices.Sort(v) v = slicesx.Dedup(v) - assert.Equal(t, 1, len(v), "query[%v]: %v", k, v) + assert.Len(t, v, 1, "query[%v]: %v", k, v) } for k, v := range value { slices.Sort(v) v = slicesx.Dedup(v) - assert.Equal(t, 1, len(v), "value[%v]: %v", k, v) + assert.Len(t, v, 1, "value[%v]: %v", k, v) } } @@ -165,9 +165,9 @@ func BenchmarkIntern(b *testing.B) { // that strings are likely to repeat in different orders across different // seeds. func makeData(seed int) []string { - var data []string + data := make([]string, 10000) n := seed - for i := range 10000 { + for i := range data { n += 5 n %= 99 @@ -175,7 +175,7 @@ func makeData(seed int) []string { for j := range n { buf.WriteRune(rune('a' + (i+j)%26)) } - data = append(data, buf.String()) + data[i] = buf.String() } return data } From b32958f8cc0926fab437cfe4b6d4db9448593130 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 16:13:34 -0800 Subject: [PATCH 04/23] get value to inline --- internal/intern/char6.go | 29 ++++++++++++++--------------- internal/intern/intern.go | 24 ++++++++---------------- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/internal/intern/char6.go b/internal/intern/char6.go index 9c248244f..5623b6e6f 100644 --- a/internal/intern/char6.go +++ b/internal/intern/char6.go @@ -40,6 +40,8 @@ var ( }() ) +type inlined [maxInlined]byte + // encodeChar6 attempts to encoding data using the char6 encoding. Returns // whether encoding was successful, and an encoded value. func encodeChar6(data string) (ID, bool) { @@ -79,30 +81,27 @@ func encodeOutlined(data string) (ID, bool) { return value, true } -// decodeChar6 decodes id assuming it contains a char6-encoded string. -func decodeChar6(id ID) string { - // The main decoding loop is outlined to promote inlining of decodeChar6, - // and thus heap-promotion of the returned string. - data, len := decodeOutlined(id) //nolint:predeclared,revive // For `len`. - return unsafex.StringAlias(data[:len]) -} +// decodeChar6 decodes id assuming it contains a char6-encoded string, and +// writes the result to buf. +func decodeChar6(id ID, buf *inlined) string { + if id == 0 { + return "" + } -//nolint:predeclared,revive // For `len`. -func decodeOutlined(id ID) (data [maxInlined]byte, len int) { - for i := range data { - data[i] = char6ToByte[int(id&077)] + for i := range buf { + buf[i] = char6ToByte[int(id&077)] id >>= 6 } // Figure out the length by removing a maximal suffix of // '.' bytes. Note that an all-ones value will decode to "", but encode // will never return that value. - len = maxInlined - for ; len > 0; len-- { - if data[len-1] != '.' { + n := maxInlined + for ; n > 0; n-- { + if buf[n-1] != '.' { break } } - return data, len + return unsafex.StringAlias(buf[:n]) } diff --git a/internal/intern/intern.go b/internal/intern/intern.go index e3c197aed..371cb80f3 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -57,7 +57,7 @@ func (id ID) String() string { return `intern.ID("")` } if id < 0 { - return fmt.Sprintf("intern.ID(%q)", decodeChar6(id)) + return fmt.Sprintf("intern.ID(%q)", decodeChar6(id, new(inlined))) } return fmt.Sprintf("intern.ID(%d)", int(id)) } @@ -90,9 +90,6 @@ func (t *Table) Intern(s string) ID { if id, ok := t.Query(s); ok { return id } - - // Outline the fallback for when we haven't interned, to promote inlining - // of Intern(). return t.internSlow(s) } @@ -192,21 +189,16 @@ func (t *Table) QueryBytes(bytes []byte) (ID, bool) { // // This function may be called by multiple goroutines concurrently. func (t *Table) Value(id ID) string { - if id == 0 { - return "" - } + // NOTE: this function is carefully written such that Go inlines it into + // the caller, allowing the result to be promoted to the stack. + return t.value(id, new(inlined)) +} - if id < 0 { - return decodeChar6(id) +func (t *Table) value(id ID, buf *inlined) string { + if id <= 0 { + return decodeChar6(id, buf) } - // The locking part of Get is outlined to promote inlining of the two - // fast paths above. This in turn allows decodeChar6 to be inlined, which - // allows the returned string to be stack-promoted. - return t.getSlow(id) -} - -func (t *Table) getSlow(id ID) string { for { table := t.table.Load() if table != &pending { From 785c90210c9a6651fc1654118aca73ca5adf1f26 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 16:36:23 -0800 Subject: [PATCH 05/23] wip --- internal/intern/intern.go | 21 +++++++-------- internal/intern/intern_test.go | 48 ++++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 371cb80f3..37448f3cd 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -139,23 +139,20 @@ func (t *Table) internSlow(s string) ID { } defer t.table.Store(table) // This is the "unlock". + // Figure out the next interning ID. + // The first ID will have value 1. ID 0 is reserved for "". + id := ID(len(*table)) + 1 + if id < 0 { + panic(fmt.Sprintf("internal/intern: %d interning IDs exhausted", len(*table)+1)) + } + // Check to see if someone beat us to the punch. - if id, ok := t.index.Load(s); ok { + if id, ok := t.index.LoadOrStore(s, id); ok { return id.(ID) //nolint:errcheck } - // As of here, we have unique ownership of the table, and s has not been - // inserted yet. - + // Commit the ID we chose. *table = append(*table, s) - - // The first ID will have value 1. ID 0 is reserved for "". - id := ID(len(*table)) - if id < 0 { - panic(fmt.Sprintf("internal/intern: %d interning IDs exhausted", len(*table))) - } - - t.index.Store(s, id) return id } diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index b1c196fbf..0de1ae716 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -16,6 +16,7 @@ package intern_test import ( "fmt" + "math/rand" "runtime" "slices" "strings" @@ -113,6 +114,7 @@ func TestHammer(t *testing.T) { start.Wait() for _, s := range data { + s := string(s) id := it.Intern(s) m1[s] = append(m1[s], id) @@ -149,33 +151,51 @@ func TestHammer(t *testing.T) { } func BenchmarkIntern(b *testing.B) { - n := new(atomic.Int64) - it := new(intern.Table) - b.RunParallel(func(p *testing.PB) { - data := makeData(int(n.Add(1))) - for p.Next() { - for _, s := range data { - _ = it.Value(it.Intern(s)) + b.Run("Collisions", func(b *testing.B) { + n := new(atomic.Int64) + it := new(intern.Table) + b.RunParallel(func(p *testing.PB) { + data := makeData(int(n.Add(1))) + for p.Next() { + for _, s := range data { + _ = it.Value(it.InternBytes(s)) + } } - } + }) + }) + + b.Run("Unique", func(b *testing.B) { + n := new(atomic.Int64) + it := new(intern.Table) + b.RunParallel(func(p *testing.PB) { + data := makeData(int(n.Add(1))) + for p.Next() { + for i, s := range data { + s = append(s, '0') + data[i] = s + _ = it.Value(it.InternBytes(s)) + } + } + }) }) } // makeData generates deterministic pseudo-random data of poor quality, meaning // that strings are likely to repeat in different orders across different // seeds. -func makeData(seed int) []string { - data := make([]string, 10000) +func makeData(seed int) [][]byte { + data := make([][]byte, 10000) n := seed + r := rand.New(rand.NewSource(int64(seed))) for i := range data { n += 5 n %= 99 - buf := new(strings.Builder) - for j := range n { - buf.WriteRune(rune('a' + (i+j)%26)) + buf := make([]byte, n) + for i := range buf { + buf[i] = byte('a' + r.Intn(26)) } - data[i] = buf.String() + data[i] = buf } return data } From 07fa3df849501ec0dff59e3ca321e9500a78cad1 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Wed, 4 Mar 2026 22:54:26 -0800 Subject: [PATCH 06/23] wip --- internal/intern/intern.go | 7 +++++-- internal/intern/unsafe.go | 40 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 internal/intern/unsafe.go diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 37448f3cd..018720d59 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -75,6 +75,9 @@ func (id ID) GoString() string { type Table struct { index sync.Map table atomic.Pointer[[]string] // Spinlock; locked when it equals &pending. + + keys anyArena[string] + values anyArena[ID] } var pending []string @@ -126,7 +129,7 @@ func (t *Table) internSlow(s string) ID { // Take ownership of the table. We need to take this before we check the // index for a race, because we can otherwise get a TOCTOU bug: - // 1. Ee check the index, it's missing s. + // 1. We check the index, it's missing s. // 2. Another goroutine inserts s. // 3. We lock the table to insert, resulting in a duplicate. table := t.table.Load() @@ -147,7 +150,7 @@ func (t *Table) internSlow(s string) ID { } // Check to see if someone beat us to the punch. - if id, ok := t.index.LoadOrStore(s, id); ok { + if id, ok := t.index.LoadOrStore(t.keys.alloc(s), t.values.alloc(id)); ok { return id.(ID) //nolint:errcheck } diff --git a/internal/intern/unsafe.go b/internal/intern/unsafe.go new file mode 100644 index 000000000..2855cc5fb --- /dev/null +++ b/internal/intern/unsafe.go @@ -0,0 +1,40 @@ +package intern + +import ( + "reflect" + "unsafe" + + "github.com/bufbuild/protocompile/internal/ext/unsafex" +) + +// anyArena emulates the runtime.convT functions, except that it places the +// allocated value into the given arena. +// +// It only works for types T which are not indirect interface values (e.g., +// pointer-shaped types). +type anyArena[T any] []T + +func (c *anyArena[T]) alloc(value T) any { + a := *c + if len(a) == cap(a) { + // append(nil, make) ensures that the capacity is as large as possible, + // i.e., matching a size class. + a = append([]T(nil), make([]T, cap(a)*2)...)[:0] + } + + a = append(a, value) + *c = a + p := &a[len(a)-1] + + type iface struct { + itab unsafe.Pointer + data unsafe.Pointer + } + + var z T + itab := unsafex.Bitcast[iface](reflect.TypeOf(z)).data + return unsafex.Bitcast[any](iface{ + itab: itab, + data: unsafe.Pointer(p), + }) +} From 744163ec69efee79e52054e1732e1f63da9b70d6 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 16:50:28 -0800 Subject: [PATCH 07/23] use an atomic-ish log --- internal/ext/atomicx/log.go | 85 ++++++++++++++++++++++++++++++++ internal/ext/atomicx/log_test.go | 46 +++++++++++++++++ internal/intern/intern.go | 56 ++++++--------------- internal/intern/unsafe.go | 40 --------------- 4 files changed, 146 insertions(+), 81 deletions(-) create mode 100644 internal/ext/atomicx/log.go create mode 100644 internal/ext/atomicx/log_test.go delete mode 100644 internal/intern/unsafe.go diff --git a/internal/ext/atomicx/log.go b/internal/ext/atomicx/log.go new file mode 100644 index 000000000..f20cee755 --- /dev/null +++ b/internal/ext/atomicx/log.go @@ -0,0 +1,85 @@ +package atomicx + +import ( + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + "unsafe" +) + +const debugLog = false + +var capacities sync.Map + +// Log is an append-only log. Loading operations may happen concurrently with +// append operations, but append operations may not be concurrent with each +// other. +type Log[T any] struct { + ptr atomic.Pointer[T] + len atomic.Int32 + cap int32 // Only modified by Append, does not need synchronization. +} + +// Load returns the value at the given index. +// +// This function may be called concurrently with [Log.Append]. +func (s *Log[T]) Load(idx int) T { + // Order doesn't matter here. Len is always updated after ptr, so ptr will + // always be valid for len elements. + len := s.len.Load() + ptr := s.ptr.Load() + + if debugLog { + v, _ := capacities.Load(unsafe.Pointer(ptr)) + cap := v.(int) + if cap < int(len) { + panic(fmt.Errorf("atomicx.Log: loaded %p with cap=%d < len=%d", ptr, cap, len)) + } + } + + return unsafe.Slice(ptr, len)[idx] +} + +// Append adds a new value to this slice. +// +// Append may be called concurrently with [Log.Load], but must *not* be called +// concurrently with itself. An external mutex should be used to protect calls +// to Append. +// +// Returns the new length of the log. +func (s *Log[T]) Append(v T) int { + p := s.ptr.Load() + l := s.len.Load() + c := s.cap + + if l == math.MaxInt32 { + panic(errors.New("internal/atomicx: cannot allocate more than 2^32 elements")) + } + + slice := unsafe.Slice(p, c) + + if l < c { // Don't need to grow the slice. + // Write the value first, and *then* make it visible to Load by + // incrementing the length. + slice[l] = v + return int(s.len.Add(1)) + } + + // Grow a new slice. + slice = append(slice, v) + + if debugLog { + capacities.Store(unsafe.Pointer(unsafe.SliceData(slice)), cap(slice)) + } + + // Update the pointer, length, and capacity as appropriate. + // Note that we update the length *after* the pointer, so an interleaved + // call to Load will not see a longer length with an old pointer. + s.ptr.Store(unsafe.SliceData(slice)) + s.len.Store(int32(len(slice))) + s.cap = int32(cap(slice)) + + return len(slice) +} diff --git a/internal/ext/atomicx/log_test.go b/internal/ext/atomicx/log_test.go new file mode 100644 index 000000000..c8fd94899 --- /dev/null +++ b/internal/ext/atomicx/log_test.go @@ -0,0 +1,46 @@ +package atomicx_test + +import ( + "runtime" + "sync" + "testing" + + "github.com/bufbuild/protocompile/internal/ext/atomicx" + "github.com/stretchr/testify/assert" +) + +func TestLog(t *testing.T) { + t.Parallel() + + const trials = 1000 + + mu := new(sync.Mutex) + log := new(atomicx.Log[int]) + + start := new(sync.WaitGroup) + end := new(sync.WaitGroup) + + for i := range runtime.GOMAXPROCS(0) { + start.Add(1) + end.Add(1) + go func() { + defer end.Done() + + // This ensures that we have a thundering herd situation: all of + // these goroutines wake up and hammer the intern table at the + // same time. + start.Done() + start.Wait() + + for j := range trials { + n := i*trials + j + mu.Lock() + i := log.Append(n) - 1 + mu.Unlock() + assert.Equal(t, n, log.Load(i)) + } + }() + } + + end.Wait() +} diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 018720d59..c3c9fae6f 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -19,11 +19,10 @@ package intern import ( "fmt" "reflect" - "runtime" "strings" "sync" - "sync/atomic" + "github.com/bufbuild/protocompile/internal/ext/atomicx" "github.com/bufbuild/protocompile/internal/ext/mapsx" "github.com/bufbuild/protocompile/internal/ext/unsafex" ) @@ -74,14 +73,12 @@ func (id ID) GoString() string { // The zero value of Table is empty and ready to use. type Table struct { index sync.Map - table atomic.Pointer[[]string] // Spinlock; locked when it equals &pending. - keys anyArena[string] - values anyArena[ID] + // Ensures writers to table are exclusive, as required by Log.Append. + writer sync.Mutex + table atomicx.Log[string] } -var pending []string - // Intern interns the given string into this table. // // This function may be called by multiple goroutines concurrently. @@ -118,7 +115,6 @@ func (t *Table) Query(s string) (ID, bool) { return id.(ID), true //nolint:errcheck } -//go:nosplit // Avoid preemption while holding the spinlock. func (t *Table) internSlow(s string) ID { // Intern tables are expected to be long-lived. Avoid holding onto a larger // buffer that s is an internal pointer to by cloning it. @@ -127,35 +123,21 @@ func (t *Table) internSlow(s string) ID { // a []byte as a string temporarily for querying the intern table. s = strings.Clone(s) - // Take ownership of the table. We need to take this before we check the - // index for a race, because we can otherwise get a TOCTOU bug: - // 1. We check the index, it's missing s. - // 2. Another goroutine inserts s. - // 3. We lock the table to insert, resulting in a duplicate. - table := t.table.Load() - for table == &pending || !t.table.CompareAndSwap(table, &pending) { - runtime.Gosched() - table = t.table.Load() - } - if table == nil { - table = new([]string) + t.writer.Lock() + defer t.writer.Unlock() + + // Check if we've been beaten. This must happen while holding the writer + // lock to prevent a write happening between this check and calling Append + // below. + if id, ok := t.index.Load(s); ok { + return id.(ID) } - defer t.table.Store(table) // This is the "unlock". // Figure out the next interning ID. // The first ID will have value 1. ID 0 is reserved for "". - id := ID(len(*table)) + 1 - if id < 0 { - panic(fmt.Sprintf("internal/intern: %d interning IDs exhausted", len(*table)+1)) - } + id := ID(t.table.Append(s)) + t.index.Store(s, id) // Commit the ID. - // Check to see if someone beat us to the punch. - if id, ok := t.index.LoadOrStore(t.keys.alloc(s), t.values.alloc(id)); ok { - return id.(ID) //nolint:errcheck - } - - // Commit the ID we chose. - *table = append(*table, s) return id } @@ -199,15 +181,7 @@ func (t *Table) value(id ID, buf *inlined) string { return decodeChar6(id, buf) } - for { - table := t.table.Load() - if table != &pending { - // If table is nil, that means no intern calls have occurred yet, - // so panicking is fine here (equivalent to an out-of-bounds access). - return (*table)[int(id)-1] - } - runtime.Gosched() - } + return t.table.Load(int(id) - 1) } // Preload takes a pointer to a struct type and initializes [ID]-typed fields diff --git a/internal/intern/unsafe.go b/internal/intern/unsafe.go deleted file mode 100644 index 2855cc5fb..000000000 --- a/internal/intern/unsafe.go +++ /dev/null @@ -1,40 +0,0 @@ -package intern - -import ( - "reflect" - "unsafe" - - "github.com/bufbuild/protocompile/internal/ext/unsafex" -) - -// anyArena emulates the runtime.convT functions, except that it places the -// allocated value into the given arena. -// -// It only works for types T which are not indirect interface values (e.g., -// pointer-shaped types). -type anyArena[T any] []T - -func (c *anyArena[T]) alloc(value T) any { - a := *c - if len(a) == cap(a) { - // append(nil, make) ensures that the capacity is as large as possible, - // i.e., matching a size class. - a = append([]T(nil), make([]T, cap(a)*2)...)[:0] - } - - a = append(a, value) - *c = a - p := &a[len(a)-1] - - type iface struct { - itab unsafe.Pointer - data unsafe.Pointer - } - - var z T - itab := unsafex.Bitcast[iface](reflect.TypeOf(z)).data - return unsafex.Bitcast[any](iface{ - itab: itab, - data: unsafe.Pointer(p), - }) -} From 4683a8f8dbe9908689cb7ae561d4c76e9193035b Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 17:17:28 -0800 Subject: [PATCH 08/23] fixes --- internal/intern/intern.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index c3c9fae6f..03c615903 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -123,6 +123,7 @@ func (t *Table) internSlow(s string) ID { // a []byte as a string temporarily for querying the intern table. s = strings.Clone(s) + // This lock accounts for almost all of the time spend in this function. t.writer.Lock() defer t.writer.Unlock() @@ -176,6 +177,7 @@ func (t *Table) Value(id ID) string { return t.value(id, new(inlined)) } +//go:noinline func (t *Table) value(id ID, buf *inlined) string { if id <= 0 { return decodeChar6(id, buf) From d6d38c8b93dcaa63ff05c343784dfc044809872d Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 17:18:29 -0800 Subject: [PATCH 09/23] better benchmark --- internal/intern/intern_test.go | 62 +++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index 0de1ae716..96e398e06 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -151,33 +151,47 @@ func TestHammer(t *testing.T) { } func BenchmarkIntern(b *testing.B) { - b.Run("Collisions", func(b *testing.B) { - n := new(atomic.Int64) - it := new(intern.Table) - b.RunParallel(func(p *testing.PB) { - data := makeData(int(n.Add(1))) - for p.Next() { - for _, s := range data { - _ = it.Value(it.InternBytes(s)) - } + // Helper to ensure that it.Value is actually inlined, which is relevant + // for benchmarks. Calls within the body of a benchmark are never inlined. + // + // Returns the length of the string to ensure that this function is not + // DCE'd. + value := func(it *intern.Table, id intern.ID) int { + return len(it.Value(id)) + } + + run := func(name string, unique float64) { + b.Run(name, func(b *testing.B) { + // Pre-allocate data samples for each goroutine. + data := make([][][]byte, runtime.GOMAXPROCS(0)) + for i := range data { + data[i] = makeData(i) } - }) - }) - b.Run("Unique", func(b *testing.B) { - n := new(atomic.Int64) - it := new(intern.Table) - b.RunParallel(func(p *testing.PB) { - data := makeData(int(n.Add(1))) - for p.Next() { - for i, s := range data { - s = append(s, '0') - data[i] = s - _ = it.Value(it.InternBytes(s)) + n := new(atomic.Int64) + it := new(intern.Table) + b.RunParallel(func(p *testing.PB) { + n := n.Add(1) - 1 + data := data[n] + r := rand.New(rand.NewSource(n)) + + for p.Next() { + for i, s := range data { + if r.Float64() < unique { + s = append(s, '0') + data[i] = s + } + + _ = value(it, it.InternBytes(s)) + } } - } + }) }) - }) + } + + run("hits", 0.0) + run("mixed", 0.1) + run("misses", 1.0) } // makeData generates deterministic pseudo-random data of poor quality, meaning @@ -191,7 +205,7 @@ func makeData(seed int) [][]byte { n += 5 n %= 99 - buf := make([]byte, n) + buf := make([]byte, n, 10000) for i := range buf { buf[i] = byte('a' + r.Intn(26)) } From 54dc3f8d5ab3d43275038d7bbe77e750d8e4ae22 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 17:21:46 -0800 Subject: [PATCH 10/23] lint --- internal/ext/atomicx/log.go | 38 +++++++++++++++----------------- internal/ext/atomicx/log_test.go | 17 +++++++++++++- internal/intern/intern.go | 2 +- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/internal/ext/atomicx/log.go b/internal/ext/atomicx/log.go index f20cee755..a85fb1e57 100644 --- a/internal/ext/atomicx/log.go +++ b/internal/ext/atomicx/log.go @@ -1,18 +1,27 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint:revive,predeclared package atomicx import ( "errors" - "fmt" "math" - "sync" "sync/atomic" "unsafe" ) -const debugLog = false - -var capacities sync.Map - // Log is an append-only log. Loading operations may happen concurrently with // append operations, but append operations may not be concurrent with each // other. @@ -26,19 +35,12 @@ type Log[T any] struct { // // This function may be called concurrently with [Log.Append]. func (s *Log[T]) Load(idx int) T { - // Order doesn't matter here. Len is always updated after ptr, so ptr will - // always be valid for len elements. + // Read len first. This ensures ordering such that after we load ptr, we + // don't load a len value incremented by a different call to Append that + // triggered a reallocation. len := s.len.Load() ptr := s.ptr.Load() - if debugLog { - v, _ := capacities.Load(unsafe.Pointer(ptr)) - cap := v.(int) - if cap < int(len) { - panic(fmt.Errorf("atomicx.Log: loaded %p with cap=%d < len=%d", ptr, cap, len)) - } - } - return unsafe.Slice(ptr, len)[idx] } @@ -70,10 +72,6 @@ func (s *Log[T]) Append(v T) int { // Grow a new slice. slice = append(slice, v) - if debugLog { - capacities.Store(unsafe.Pointer(unsafe.SliceData(slice)), cap(slice)) - } - // Update the pointer, length, and capacity as appropriate. // Note that we update the length *after* the pointer, so an interleaved // call to Load will not see a longer length with an old pointer. diff --git a/internal/ext/atomicx/log_test.go b/internal/ext/atomicx/log_test.go index c8fd94899..08449084a 100644 --- a/internal/ext/atomicx/log_test.go +++ b/internal/ext/atomicx/log_test.go @@ -1,3 +1,17 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package atomicx_test import ( @@ -5,8 +19,9 @@ import ( "sync" "testing" - "github.com/bufbuild/protocompile/internal/ext/atomicx" "github.com/stretchr/testify/assert" + + "github.com/bufbuild/protocompile/internal/ext/atomicx" ) func TestLog(t *testing.T) { diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 03c615903..b2fcdf442 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -131,7 +131,7 @@ func (t *Table) internSlow(s string) ID { // lock to prevent a write happening between this check and calling Append // below. if id, ok := t.index.Load(s); ok { - return id.(ID) + return id.(ID) //nolint:errcheck } // Figure out the next interning ID. From b1eed0e8c3e59eccff21d2658c0702463f8a0dd6 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 17:35:13 -0800 Subject: [PATCH 11/23] add instrumentation --- internal/intern/intern.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index b2fcdf442..68d8e1858 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "github.com/bufbuild/protocompile/internal/ext/atomicx" "github.com/bufbuild/protocompile/internal/ext/mapsx" @@ -79,6 +80,20 @@ type Table struct { table atomicx.Log[string] } +var ( + // Set to true to enable instrumentation of intern tables. + Instrument bool + hits, misses atomic.Int64 +) + +// Returns the ratio of interning operations that miss the cache. +func Misses() float64 { + hits := float64(hits.Load()) + misses := float64(misses.Load()) + + return misses / (hits + Misses()) +} + // Intern interns the given string into this table. // // This function may be called by multiple goroutines concurrently. @@ -108,6 +123,14 @@ func (t *Table) Query(s string) (ID, bool) { } id, ok := t.index.Load(s) + if Instrument { + if ok { + hits.Add(1) + } else { + misses.Add(1) + } + } + if !ok { return 0, false } From 8d6a168c674029aab282b049008df6a7d0c57ce4 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Thu, 5 Mar 2026 17:45:45 -0800 Subject: [PATCH 12/23] fix --- internal/intern/intern.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 68d8e1858..3d91215b9 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -91,7 +91,7 @@ func Misses() float64 { hits := float64(hits.Load()) misses := float64(misses.Load()) - return misses / (hits + Misses()) + return misses / (hits + misses) } // Intern interns the given string into this table. @@ -102,9 +102,21 @@ func (t *Table) Intern(s string) ID { // all strings are interned, so we can take a read lock to avoid needing // to trap to the scheduler on concurrent access (all calls to Intern() will // still contend mu.readCount, because RLock atomically increments it). - if id, ok := t.Query(s); ok { + id, ok := t.Query(s) + + // Instrument whether this is a hit or a miss. + if Instrument { + if ok { + hits.Add(1) + } else { + misses.Add(1) + } + } + + if ok { return id } + return t.internSlow(s) } @@ -123,14 +135,6 @@ func (t *Table) Query(s string) (ID, bool) { } id, ok := t.index.Load(s) - if Instrument { - if ok { - hits.Add(1) - } else { - misses.Add(1) - } - } - if !ok { return 0, false } From cd6ba227daf6d2c6a99292a40cb6bb53620d401d Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Fri, 6 Mar 2026 13:14:23 -0800 Subject: [PATCH 13/23] benchmarks --- internal/intern/intern_test.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index 96e398e06..5f24c186a 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -15,6 +15,7 @@ package intern_test import ( + "crypto/sha256" "fmt" "math/rand" "runtime" @@ -189,9 +190,23 @@ func BenchmarkIntern(b *testing.B) { }) } - run("hits", 0.0) - run("mixed", 0.1) - run("misses", 1.0) + run("0pct", 0.0) + run("10pct", 0.1) + run("50pct", 0.5) + run("100pct", 1.0) + + // Compare with computing a cryptographic hash, to show that's not + // worthwhile as an interning strategy. + b.Run("sha256", func(b *testing.B) { + data := makeData(0) + b.ResetTimer() + + for b.Loop() { + for _, s := range data { + _ = sha256.Sum256(s) + } + } + }) } // makeData generates deterministic pseudo-random data of poor quality, meaning From 6b4241c2c53e85b5d1a9136886c8c7137cee5e9b Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Fri, 6 Mar 2026 18:11:40 -0800 Subject: [PATCH 14/23] add an inlining test --- internal/inlinetest/inlinetest.go | 65 +++++++++++++++++++++++++++++++ internal/intern/intern_test.go | 6 +++ 2 files changed, 71 insertions(+) create mode 100644 internal/inlinetest/inlinetest.go diff --git a/internal/inlinetest/inlinetest.go b/internal/inlinetest/inlinetest.go new file mode 100644 index 000000000..87db0d0df --- /dev/null +++ b/internal/inlinetest/inlinetest.go @@ -0,0 +1,65 @@ +package inlinetest + +import ( + "errors" + "fmt" + "os" + "os/exec" + "regexp" + "runtime/debug" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +// AssertInlined returns whether the compiler is willing to inline the given +// symbols in the package being tested. +// +// The symbols must be either a single identifier or of the form Type.Method. +// Pointer-receiver methods should not use the (*Type).Method syntax. +func AssertInlined(t *testing.T, symbols ...string) { + t.Helper() + for _, symbol := range symbols { + _, ok := inlined[symbol] + assert.True(t, ok, "%s is not inlined", symbol) + } +} + +var inlined = make(map[string]struct{}) + +func init() { + if !testing.Testing() { + panic("inlinetest: cannot import inlinetest except in a test") + } + + // This is based on a pattern of tests appearing in several places in Go's + // standard library. + tool := "go" + if env, ok := os.LookupEnv("GO"); ok { + tool = env + } + + info, ok := debug.ReadBuildInfo() + if !ok { + panic(errors.New("inlinetest: could not read build info")) + } + + out, err := exec.Command( + tool, + "build", + "--gcflags=-m", // -m records optimization decisions. + strings.TrimSuffix(info.Path, ".test"), + ).CombinedOutput() + if err != nil { + panic(fmt.Errorf("inlinetest: go build failed: %w, %s", err, out)) + } + + remarkRe := regexp.MustCompile(`(?m)^\./\S+\.go:\d+:\d+: can inline (.+?)$`) + ptrRe := regexp.MustCompile(`\(\*(.+)\)\.`) + for _, match := range remarkRe.FindAllSubmatch(out, -1) { + match := string(match[1]) + match = ptrRe.ReplaceAllString(match, "$1.") + inlined[match] = struct{}{} + } +} diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index 5f24c186a..d95b426c0 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/bufbuild/protocompile/internal/ext/slicesx" + "github.com/bufbuild/protocompile/internal/inlinetest" "github.com/bufbuild/protocompile/internal/intern" ) @@ -83,6 +84,11 @@ func shouldInline(s string) bool { return true } +func TestInline(t *testing.T) { + t.Parallel() + inlinetest.AssertInlined(t, "Table.Value") +} + func TestHammer(t *testing.T) { t.Parallel() From 926f4229dd266ffbb9e78a93471d0e1418a47f3b Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Sat, 7 Mar 2026 01:05:58 -0800 Subject: [PATCH 15/23] lockless --- internal/ext/atomicx/log.go | 124 ++++++++++++++++++++---------- internal/ext/atomicx/log_test.go | 5 +- internal/inlinetest/inlinetest.go | 15 ++++ internal/intern/intern.go | 50 ++++++++---- internal/intern/intern_test.go | 14 ---- 5 files changed, 133 insertions(+), 75 deletions(-) diff --git a/internal/ext/atomicx/log.go b/internal/ext/atomicx/log.go index a85fb1e57..8115b2a46 100644 --- a/internal/ext/atomicx/log.go +++ b/internal/ext/atomicx/log.go @@ -18,66 +18,110 @@ package atomicx import ( "errors" "math" + "runtime" "sync/atomic" "unsafe" ) -// Log is an append-only log. Loading operations may happen concurrently with -// append operations, but append operations may not be concurrent with each -// other. +// Log is an append-only log. +// +// Loading and append operations may happen concurrently with each other, +// with the caveat that loading indices not yet appended may produce garbage +// values. type Log[T any] struct { - ptr atomic.Pointer[T] - len atomic.Int32 - cap int32 // Only modified by Append, does not need synchronization. + // Protected by the lock bit in cap. This means that cap must be loaded + // before loading this value, to ensure that prior writes to it are seen, + // and cap must be stored after storing this value, to ensure the write is + // published. + ptr *T + + next atomic.Int32 // The next index to fill. + cap atomic.Int32 // Top bit is used as a spinlock. } -// Load returns the value at the given index. -// -// This function may be called concurrently with [Log.Append]. +const lockbit = math.MinInt32 + +// Load returns the value at the given index. This index must have been +// previously returned by [Log.Append]. func (s *Log[T]) Load(idx int) T { - // Read len first. This ensures ordering such that after we load ptr, we - // don't load a len value incremented by a different call to Append that - // triggered a reallocation. - len := s.len.Load() - ptr := s.ptr.Load() + // Read cap first, which is required before we can read s.ptr. + cap := s.cap.Load() - return unsafe.Slice(ptr, len)[idx] + return unsafe.Slice(s.ptr, cap&^lockbit)[idx] } // Append adds a new value to this slice. // -// Append may be called concurrently with [Log.Load], but must *not* be called -// concurrently with itself. An external mutex should be used to protect calls -// to Append. -// -// Returns the new length of the log. +// Returns the index of the appended element, which can be looked up with +// [Log.Load]. func (s *Log[T]) Append(v T) int { - p := s.ptr.Load() - l := s.len.Load() - c := s.cap - - if l == math.MaxInt32 { + i := s.next.Add(1) + if i == 0 { panic(errors.New("internal/atomicx: cannot allocate more than 2^32 elements")) } + i-- - slice := unsafe.Slice(p, c) +again: + // Load cap first. See comment in [Load]. + c := s.cap.Load() + if c&lockbit != 0 { + runtime.Gosched() + goto again + } - if l < c { // Don't need to grow the slice. - // Write the value first, and *then* make it visible to Load by - // incrementing the length. - slice[l] = v - return int(s.len.Add(1)) + slice := unsafe.Slice(s.ptr, c) + if uint32(i) < uint32(c) { // Don't need to grow the slice. + // This is a data race. However, it's fine, because this slot is not + // valid yet, so tearing it is fine. + // + // This is, in fact, a benign race. So long as this value is not read + // at before this function returns i, no memory corruption is possible. + // In particular, Go promises to never tear pointers, so we can't make + // the GC freak out about broken pointers. + // + // See https://go.dev/ref/mem#restrictions + // + // This store is also the slowest part of this function, due to + // significant cache thrashing if the slice is resized from under us. + storeNoRace(&slice[i], v) + + if s.cap.Load() != c { + // If the value was potentially torn, it would have resulted in c + // changing, meaning we need to try again. + runtime.Gosched() + goto again + } + + return int(i) + } + + // Need to grow a slice. Lock the slice by setting the sign bit of the + // capacity. + // + // This lock is necessary so that updating ptr always happens together with + // cap. + if !s.cap.CompareAndSwap(c, c|lockbit) { + goto again } - // Grow a new slice. - slice = append(slice, v) + // Grow the slice enough to insert our value. + // Getting preempted by the call into the allocator would be... non-ideal, + // but there isn't really a way to prevent that. + // + // To try to tame down the number of times we need to grow the slice, since + // that cause significant cache thrash due to racing reads and writes, we + // grow the underlying buffer a little faster than O(2^n). + slice = append(slice, make([]T, i+1-c)...) + slice[i] = v + + // Drop the lock on the slice. + s.ptr = unsafe.SliceData(slice) + s.cap.Store(int32(cap(slice))) - // Update the pointer, length, and capacity as appropriate. - // Note that we update the length *after* the pointer, so an interleaved - // call to Load will not see a longer length with an old pointer. - s.ptr.Store(unsafe.SliceData(slice)) - s.len.Store(int32(len(slice))) - s.cap = int32(cap(slice)) + return int(i) +} - return len(slice) +//go:norace +func storeNoRace[T any](p *T, v T) { + *p = v } diff --git a/internal/ext/atomicx/log_test.go b/internal/ext/atomicx/log_test.go index 08449084a..b3f446b80 100644 --- a/internal/ext/atomicx/log_test.go +++ b/internal/ext/atomicx/log_test.go @@ -29,7 +29,6 @@ func TestLog(t *testing.T) { const trials = 1000 - mu := new(sync.Mutex) log := new(atomicx.Log[int]) start := new(sync.WaitGroup) @@ -49,9 +48,7 @@ func TestLog(t *testing.T) { for j := range trials { n := i*trials + j - mu.Lock() - i := log.Append(n) - 1 - mu.Unlock() + i := log.Append(n) assert.Equal(t, n, log.Load(i)) } }() diff --git a/internal/inlinetest/inlinetest.go b/internal/inlinetest/inlinetest.go index 87db0d0df..50fb3d23a 100644 --- a/internal/inlinetest/inlinetest.go +++ b/internal/inlinetest/inlinetest.go @@ -1,3 +1,17 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package inlinetest import ( @@ -45,6 +59,7 @@ func init() { panic(errors.New("inlinetest: could not read build info")) } + //nolint:gosec out, err := exec.Command( tool, "build", diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 76f117d59..88b7347e4 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -19,6 +19,7 @@ package intern import ( "fmt" "reflect" + "runtime" "strings" "sync" "sync/atomic" @@ -75,11 +76,7 @@ func (id ID) GoString() string { // The zero value of Table is empty and ready to use. type Table struct { index sync.Map - - // Ensures writers to table are exclusive, as required by Log.Append. - writer sync.Mutex - table atomicx.Log[string] - + table atomicx.Log[string] stats atomic.Pointer[stats] } @@ -195,7 +192,7 @@ func (t *Table) Query(s string) (ID, bool) { return char6, true } - id, ok := t.index.Load(s) + v, ok := t.index.Load(s) if stats != nil { stats.queries.Add(1) stats.queryBytes.Add(int64(len(s))) @@ -205,9 +202,21 @@ func (t *Table) Query(s string) (ID, bool) { return 0, false } - return id.(ID), true //nolint:errcheck + id := v.(ID) //nolint:errcheck + if id == 0 { + // Handle the case where this is a mid-insertion. + return 0, false + } + + return id, true } +// Used as a sentinel in internSlow. 0 always represents "" and is never +// present as a value in the index. +// +// This is here to avoid a call to runtime.convT32 in internSlow. +var inserting any = ID(0) + func (t *Table) internSlow(s string) ID { // Intern tables are expected to be long-lived. Avoid holding onto a larger // buffer that s is an internal pointer to by cloning it. @@ -216,21 +225,28 @@ func (t *Table) internSlow(s string) ID { // a []byte as a string temporarily for querying the intern table. s = strings.Clone(s) - // This lock accounts for almost all of the time spend in this function. - t.writer.Lock() - defer t.writer.Unlock() + // Pre-convert to `any`, since this triggers an allocation via + // `runtime.convTstring`. + key := any(s) + +again: + // Try to become the "leader" which is interning s. + if v, ok := t.index.LoadOrStore(key, inserting); ok { + id := v.(ID) //nolint:errcheck + if id == 0 { + // Someone *else* is doing the inserting, apparently. + runtime.Gosched() + goto again + } - // Check if we've been beaten. This must happen while holding the writer - // lock to prevent a write happening between this check and calling Append - // below. - if id, ok := t.index.Load(s); ok { - return id.(ID) //nolint:errcheck + // Someone else already inserted, we'de done. + return id } // Figure out the next interning ID. // The first ID will have value 1. ID 0 is reserved for "". - id := ID(t.table.Append(s)) - t.index.Store(s, id) // Commit the ID. + id := ID(t.table.Append(s) + 1) + t.index.Store(key, id) // Commit the ID. return id } diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index d95b426c0..7baa9c0eb 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -15,7 +15,6 @@ package intern_test import ( - "crypto/sha256" "fmt" "math/rand" "runtime" @@ -200,19 +199,6 @@ func BenchmarkIntern(b *testing.B) { run("10pct", 0.1) run("50pct", 0.5) run("100pct", 1.0) - - // Compare with computing a cryptographic hash, to show that's not - // worthwhile as an interning strategy. - b.Run("sha256", func(b *testing.B) { - data := makeData(0) - b.ResetTimer() - - for b.Loop() { - for _, s := range data { - _ = sha256.Sum256(s) - } - } - }) } // makeData generates deterministic pseudo-random data of poor quality, meaning From 05899caa293a479fa19a632f3a1ff1e64dbe0a04 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Sat, 7 Mar 2026 01:26:10 -0800 Subject: [PATCH 16/23] fix spurious -race failure --- internal/ext/atomicx/log.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/ext/atomicx/log.go b/internal/ext/atomicx/log.go index 8115b2a46..8bcc4a67d 100644 --- a/internal/ext/atomicx/log.go +++ b/internal/ext/atomicx/log.go @@ -114,9 +114,10 @@ again: slice = append(slice, make([]T, i+1-c)...) slice[i] = v - // Drop the lock on the slice. - s.ptr = unsafe.SliceData(slice) - s.cap.Store(int32(cap(slice))) + // Race detector does not understand that this write is protected by + // the store that immediately follows it. + storeNoRace(&s.ptr, unsafe.SliceData(slice)) + s.cap.Store(int32(cap(slice))) // Drop the lock. return int(i) } From cb4a02fc199180c5ac33b18b6e379bfb2661c1bd Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 11:54:22 -0700 Subject: [PATCH 17/23] cleanup --- internal/ext/synctestx/synctestx.go | 36 ++++++++++++ internal/ext/{atomicx => syncx}/log.go | 38 +++++++++---- internal/ext/{atomicx => syncx}/log_test.go | 43 ++++---------- internal/intern/container.go | 61 ++++++++++++++++++++ internal/intern/intern.go | 63 +-------------------- internal/intern/intern_test.go | 60 +++++++------------- 6 files changed, 159 insertions(+), 142 deletions(-) create mode 100644 internal/ext/synctestx/synctestx.go rename internal/ext/{atomicx => syncx}/log.go (75%) rename internal/ext/{atomicx => syncx}/log_test.go (54%) create mode 100644 internal/intern/container.go diff --git a/internal/ext/synctestx/synctestx.go b/internal/ext/synctestx/synctestx.go new file mode 100644 index 000000000..94557af77 --- /dev/null +++ b/internal/ext/synctestx/synctestx.go @@ -0,0 +1,36 @@ +package synctestx + +import ( + "runtime" + "sync" +) + +// Hammer runs f across count goroutines, ensuring that f is called +// simultaneously, simulating a thundering herd. Returns once all spawned +// goroutines have exited. +// +// If count is zero, uses GOMAXPROCS instead. +func Hammer(count int, f func()) { + if count == 0 { + count = runtime.GOMAXPROCS(0) + } + + start := new(sync.WaitGroup) + end := new(sync.WaitGroup) + for range count { + start.Add(1) + end.Add(1) + go func() { + defer end.Done() + + // This ensures that we have a thundering herd situation: all of + // these goroutines wake up and hammer f() at the same time. + start.Done() + start.Wait() + + f() + }() + } + + end.Wait() +} diff --git a/internal/ext/atomicx/log.go b/internal/ext/syncx/log.go similarity index 75% rename from internal/ext/atomicx/log.go rename to internal/ext/syncx/log.go index 8bcc4a67d..c53aab203 100644 --- a/internal/ext/atomicx/log.go +++ b/internal/ext/syncx/log.go @@ -13,7 +13,7 @@ // limitations under the License. //nolint:revive,predeclared -package atomicx +package syncx import ( "errors" @@ -33,7 +33,12 @@ type Log[T any] struct { // before loading this value, to ensure that prior writes to it are seen, // and cap must be stored after storing this value, to ensure the write is // published. - ptr *T + // + // This value is atomic because otherwise a data race occurs. Go guarantees + // this is not actually a data race, because in Go all pointer loads/stores + // are relaxed atomic, but this is essentially free, because we're already + // loading cap. + ptr atomic.Pointer[T] next atomic.Int32 // The next index to fill. cap atomic.Int32 // Top bit is used as a spinlock. @@ -46,8 +51,9 @@ const lockbit = math.MinInt32 func (s *Log[T]) Load(idx int) T { // Read cap first, which is required before we can read s.ptr. cap := s.cap.Load() + ptr := s.ptr.Load() - return unsafe.Slice(s.ptr, cap&^lockbit)[idx] + return unsafe.Slice(ptr, cap&^lockbit)[idx] } // Append adds a new value to this slice. @@ -69,7 +75,8 @@ again: goto again } - slice := unsafe.Slice(s.ptr, c) + p := s.ptr.Load() + slice := unsafe.Slice(p, c) if uint32(i) < uint32(c) { // Don't need to grow the slice. // This is a data race. However, it's fine, because this slot is not // valid yet, so tearing it is fine. @@ -85,9 +92,19 @@ again: // significant cache thrashing if the slice is resized from under us. storeNoRace(&slice[i], v) - if s.cap.Load() != c { - // If the value was potentially torn, it would have resulted in c - // changing, meaning we need to try again. + // If the value was potentially torn, it would have resulted in c + // changing, meaning we need to try again. + // + // A very important property is that this value never returns to the + // same value after a resize begins, preventing an ABA problem. If the + // capacity does not change across a store, it means that store + // succeeded + // + // To ensure that the value we just wrote above is visible to other + // goroutines, in particular a goroutine that wants to perform a resize, + // We need to store to the capacity. The easiest way to achieve both + // things at once is with the following CAS. + if !s.cap.CompareAndSwap(c, c) { runtime.Gosched() goto again } @@ -105,18 +122,15 @@ again: } // Grow the slice enough to insert our value. + // // Getting preempted by the call into the allocator would be... non-ideal, // but there isn't really a way to prevent that. - // - // To try to tame down the number of times we need to grow the slice, since - // that cause significant cache thrash due to racing reads and writes, we - // grow the underlying buffer a little faster than O(2^n). slice = append(slice, make([]T, i+1-c)...) slice[i] = v // Race detector does not understand that this write is protected by // the store that immediately follows it. - storeNoRace(&s.ptr, unsafe.SliceData(slice)) + s.ptr.Store(unsafe.SliceData(slice)) s.cap.Store(int32(cap(slice))) // Drop the lock. return int(i) diff --git a/internal/ext/atomicx/log_test.go b/internal/ext/syncx/log_test.go similarity index 54% rename from internal/ext/atomicx/log_test.go rename to internal/ext/syncx/log_test.go index b3f446b80..c272575f1 100644 --- a/internal/ext/atomicx/log_test.go +++ b/internal/ext/syncx/log_test.go @@ -12,16 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package atomicx_test +package syncx_test import ( - "runtime" - "sync" + "math/rand" "testing" + "github.com/bufbuild/protocompile/internal/ext/synctestx" + "github.com/bufbuild/protocompile/internal/ext/syncx" "github.com/stretchr/testify/assert" - - "github.com/bufbuild/protocompile/internal/ext/atomicx" ) func TestLog(t *testing.T) { @@ -29,30 +28,12 @@ func TestLog(t *testing.T) { const trials = 1000 - log := new(atomicx.Log[int]) - - start := new(sync.WaitGroup) - end := new(sync.WaitGroup) - - for i := range runtime.GOMAXPROCS(0) { - start.Add(1) - end.Add(1) - go func() { - defer end.Done() - - // This ensures that we have a thundering herd situation: all of - // these goroutines wake up and hammer the intern table at the - // same time. - start.Done() - start.Wait() - - for j := range trials { - n := i*trials + j - i := log.Append(n) - assert.Equal(t, n, log.Load(i)) - } - }() - } - - end.Wait() + log := new(syncx.Log[int]) + synctestx.Hammer(0, func() { + for range trials { + n := rand.Int() + i := log.Append(n) + assert.Equal(t, n, log.Load(i)) + } + }) } diff --git a/internal/intern/container.go b/internal/intern/container.go new file mode 100644 index 000000000..41ebf1f13 --- /dev/null +++ b/internal/intern/container.go @@ -0,0 +1,61 @@ +package intern + +import "github.com/bufbuild/protocompile/internal/ext/mapsx" + +// Set is a set of intern IDs. +type Set map[ID]struct{} + +// ContainsID returns whether s contains the given ID. +func (s Set) ContainsID(id ID) bool { + _, ok := s[id] + return ok +} + +// Contains returns whether s contains the given string. +func (s Set) Contains(table *Table, key string) bool { + k, ok := table.Query(key) + if !ok { + return false + } + _, ok = s[k] + return ok +} + +// AddID adds an ID to s, and returns whether it was added. +func (s Set) AddID(id ID) (inserted bool) { + return mapsx.AddZero(s, id) +} + +// Add adds a string to s, and returns whether it was added. +func (s Set) Add(table *Table, key string) (inserted bool) { + k := table.Intern(key) + _, ok := s[k] + if !ok { + s[k] = struct{}{} + } + return !ok +} + +// Map is a map keyed by intern IDs. +type Map[T any] map[ID]T + +// Get returns the value that key maps to. +func (m Map[T]) Get(table *Table, key string) (T, bool) { + k, ok := table.Query(key) + if !ok { + var z T + return z, false + } + v, ok := m[k] + return v, ok +} + +// AddID adds an ID to m, and returns whether it was added. +func (m Map[T]) AddID(id ID, v T) (mapped T, inserted bool) { + return mapsx.Add(m, id, v) +} + +// Add adds a string to m, and returns whether it was added. +func (m Map[T]) Add(table *Table, key string, v T) (mapped T, inserted bool) { + return m.AddID(table.Intern(key), v) +} diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 88b7347e4..5c64af76e 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -24,9 +24,8 @@ import ( "sync" "sync/atomic" - "github.com/bufbuild/protocompile/internal/ext/atomicx" "github.com/bufbuild/protocompile/internal/ext/bitsx" - "github.com/bufbuild/protocompile/internal/ext/mapsx" + "github.com/bufbuild/protocompile/internal/ext/syncx" "github.com/bufbuild/protocompile/internal/ext/unsafex" ) @@ -76,7 +75,7 @@ func (id ID) GoString() string { // The zero value of Table is empty and ready to use. type Table struct { index sync.Map - table atomicx.Log[string] + table syncx.Log[string] stats atomic.Pointer[stats] } @@ -316,61 +315,3 @@ func (t *Table) Preload(ids any) { } } } - -// Set is a set of intern IDs. -type Set map[ID]struct{} - -// ContainsID returns whether s contains the given ID. -func (s Set) ContainsID(id ID) bool { - _, ok := s[id] - return ok -} - -// Contains returns whether s contains the given string. -func (s Set) Contains(table *Table, key string) bool { - k, ok := table.Query(key) - if !ok { - return false - } - _, ok = s[k] - return ok -} - -// AddID adds an ID to s, and returns whether it was added. -func (s Set) AddID(id ID) (inserted bool) { - return mapsx.AddZero(s, id) -} - -// Add adds a string to s, and returns whether it was added. -func (s Set) Add(table *Table, key string) (inserted bool) { - k := table.Intern(key) - _, ok := s[k] - if !ok { - s[k] = struct{}{} - } - return !ok -} - -// Map is a map keyed by intern IDs. -type Map[T any] map[ID]T - -// Get returns the value that key maps to. -func (m Map[T]) Get(table *Table, key string) (T, bool) { - k, ok := table.Query(key) - if !ok { - var z T - return z, false - } - v, ok := m[k] - return v, ok -} - -// AddID adds an ID to m, and returns whether it was added. -func (m Map[T]) AddID(id ID, v T) (mapped T, inserted bool) { - return mapsx.Add(m, id, v) -} - -// Add adds a string to m, and returns whether it was added. -func (m Map[T]) Add(table *Table, key string, v T) (mapped T, inserted bool) { - return m.AddID(table.Intern(key), v) -} diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index 7baa9c0eb..db423e54c 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/bufbuild/protocompile/internal/ext/slicesx" + "github.com/bufbuild/protocompile/internal/ext/synctestx" "github.com/bufbuild/protocompile/internal/inlinetest" "github.com/bufbuild/protocompile/internal/intern" ) @@ -91,9 +92,6 @@ func TestInline(t *testing.T) { func TestHammer(t *testing.T) { t.Parallel() - start := new(sync.WaitGroup) - end := new(sync.WaitGroup) - n := new(atomic.Int64) it := new(intern.Table) @@ -103,45 +101,31 @@ func TestHammer(t *testing.T) { query := make(map[string][]intern.ID) value := make(map[intern.ID][]string) - for range runtime.GOMAXPROCS(0) { - start.Add(1) - end.Add(1) - go func() { - defer end.Done() - - data := makeData(int(n.Add(1))) - m1 := make(map[string][]intern.ID) - m2 := make(map[intern.ID][]string) - - // This ensures that we have a thundering herd situation: all of - // these goroutines wake up and hammer the intern table at the - // same time. - start.Done() - start.Wait() + synctestx.Hammer(0, func() { + data := makeData(int(n.Add(1))) + m1 := make(map[string][]intern.ID) + m2 := make(map[intern.ID][]string) - for _, s := range data { - s := string(s) - id := it.Intern(s) - m1[s] = append(m1[s], id) - - v := it.Value(id) - m2[id] = append(m2[id], v) + for _, s := range data { + s := string(s) + id := it.Intern(s) + m1[s] = append(m1[s], id) - assert.Equal(t, s, v) - } + v := it.Value(id) + m2[id] = append(m2[id], v) - mu.Lock() - defer mu.Unlock() - for k, v := range m1 { - query[k] = append(query[k], v...) - } - for k, v := range m2 { - value[k] = append(value[k], v...) - } - }() - } + assert.Equal(t, s, v) + } - end.Wait() + mu.Lock() + defer mu.Unlock() + for k, v := range m1 { + query[k] = append(query[k], v...) + } + for k, v := range m2 { + value[k] = append(value[k], v...) + } + }) for k, v := range query { slices.Sort(v) From a1a2fdfe0e1cfdd4f85aaf0f4cafcfd9091ac9d9 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 12:19:17 -0700 Subject: [PATCH 18/23] lint --- internal/ext/synctestx/synctestx.go | 14 ++++++++++++++ internal/intern/container.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/internal/ext/synctestx/synctestx.go b/internal/ext/synctestx/synctestx.go index 94557af77..ad9128e66 100644 --- a/internal/ext/synctestx/synctestx.go +++ b/internal/ext/synctestx/synctestx.go @@ -1,3 +1,17 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package synctestx import ( diff --git a/internal/intern/container.go b/internal/intern/container.go index 41ebf1f13..6f870804d 100644 --- a/internal/intern/container.go +++ b/internal/intern/container.go @@ -1,3 +1,17 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package intern import "github.com/bufbuild/protocompile/internal/ext/mapsx" From d68bb4a495f49a072c44406c4f22172110a4ea00 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 13:40:47 -0700 Subject: [PATCH 19/23] lint --- internal/ext/syncx/log_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/ext/syncx/log_test.go b/internal/ext/syncx/log_test.go index c272575f1..7cc32f5dc 100644 --- a/internal/ext/syncx/log_test.go +++ b/internal/ext/syncx/log_test.go @@ -18,9 +18,10 @@ import ( "math/rand" "testing" + "github.com/stretchr/testify/assert" + "github.com/bufbuild/protocompile/internal/ext/synctestx" "github.com/bufbuild/protocompile/internal/ext/syncx" - "github.com/stretchr/testify/assert" ) func TestLog(t *testing.T) { From 1a7baa11765eed2bb3b2568db88b6377c7fac4d5 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 16:04:51 -0700 Subject: [PATCH 20/23] make misuse of log impossible --- internal/ext/slicesx/slicesx.go | 7 ++++ internal/ext/syncx/log.go | 60 ++++++++++++++++++++------- internal/ext/syncx/log_export_test.go | 21 ++++++++++ internal/ext/syncx/log_test.go | 12 ++++++ 4 files changed, 86 insertions(+), 14 deletions(-) create mode 100644 internal/ext/syncx/log_export_test.go diff --git a/internal/ext/slicesx/slicesx.go b/internal/ext/slicesx/slicesx.go index 1a20b1ba9..3fdd4e479 100644 --- a/internal/ext/slicesx/slicesx.go +++ b/internal/ext/slicesx/slicesx.go @@ -37,6 +37,13 @@ func One[E any](p *E) []E { return unsafe.Slice(p, 1) } +// New returns a new slice with at least the given length. +func New[S ~[]E, E any](count int) []E { + // Append will always round up to a size class for us. + s := append(S(nil), make(S, count)...)[:] + return s[:cap(s)] +} + // Get performs a bounds check and returns the value at idx. // // If the bounds check fails, returns the zero value and false. diff --git a/internal/ext/syncx/log.go b/internal/ext/syncx/log.go index c53aab203..d36f2fbec 100644 --- a/internal/ext/syncx/log.go +++ b/internal/ext/syncx/log.go @@ -17,6 +17,7 @@ package syncx import ( "errors" + "fmt" "math" "runtime" "sync/atomic" @@ -25,9 +26,7 @@ import ( // Log is an append-only log. // -// Loading and append operations may happen concurrently with each other, -// with the caveat that loading indices not yet appended may produce garbage -// values. +// Loading and append operations may happen concurrently with each other. type Log[T any] struct { // Protected by the lock bit in cap. This means that cap must be loaded // before loading this value, to ensure that prior writes to it are seen, @@ -40,20 +39,33 @@ type Log[T any] struct { // loading cap. ptr atomic.Pointer[T] + // Array of bits with length equal to cap divided by the bit size of uintptr, + // rounded up. + bits atomic.Pointer[atomic.Uintptr] + next atomic.Int32 // The next index to fill. cap atomic.Int32 // Top bit is used as a spinlock. } -const lockbit = math.MinInt32 +const ( + lockbit = math.MinInt32 + wordBits = int(unsafe.Sizeof(uintptr(0)) * 8) +) -// Load returns the value at the given index. This index must have been -// previously returned by [Log.Append]. +// Load returns the value at the given index. +// +// Panics if no value is at that index. func (s *Log[T]) Load(idx int) T { // Read cap first, which is required before we can read s.ptr. - cap := s.cap.Load() + cap := s.cap.Load() &^ lockbit ptr := s.ptr.Load() - return unsafe.Slice(ptr, cap&^lockbit)[idx] + bits := unsafe.Slice(s.bits.Load(), logBits(int(cap))) + if n := idx / wordBits; n >= len(bits) || bits[n].Load()&(1<<(idx%wordBits)) == 0 { + panic(fmt.Errorf("internal/syncx: index out of bounds [%v]", idx)) + } + + return unsafe.Slice(ptr, cap)[idx] } // Append adds a new value to this slice. @@ -62,8 +74,8 @@ func (s *Log[T]) Load(idx int) T { // [Log.Load]. func (s *Log[T]) Append(v T) int { i := s.next.Add(1) - if i == 0 { - panic(errors.New("internal/atomicx: cannot allocate more than 2^32 elements")) + if i < 0 { + panic(errors.New("internal/syncx: cannot allocate more than 2^32 elements")) } i-- @@ -76,8 +88,12 @@ again: } p := s.ptr.Load() + b := s.bits.Load() slice := unsafe.Slice(p, c) + bits := unsafe.Slice(b, logBits(int(c))) if uint32(i) < uint32(c) { // Don't need to grow the slice. + i := int(i) + // This is a data race. However, it's fine, because this slot is not // valid yet, so tearing it is fine. // @@ -92,6 +108,9 @@ again: // significant cache thrashing if the slice is resized from under us. storeNoRace(&slice[i], v) + // Mark this index as claimed. + bits[i/wordBits].Or(1 << (i % wordBits)) + // If the value was potentially torn, it would have resulted in c // changing, meaning we need to try again. // @@ -109,7 +128,7 @@ again: goto again } - return int(i) + return i } // Need to grow a slice. Lock the slice by setting the sign bit of the @@ -125,14 +144,23 @@ again: // // Getting preempted by the call into the allocator would be... non-ideal, // but there isn't really a way to prevent that. - slice = append(slice, make([]T, i+1-c)...) + slice = append(slice, make([]T, max(i+1, 16)-c)...) slice[i] = v - // Race detector does not understand that this write is protected by - // the store that immediately follows it. + // Now, grow the bit array. + bits2 := make([]atomic.Uintptr, logBits(cap(slice))) + for i := range bits { + bits2[i].Store(bits[i].Load()) + } + bits = bits2 + s.ptr.Store(unsafe.SliceData(slice)) + s.bits.Store(unsafe.SliceData(bits)) s.cap.Store(int32(cap(slice))) // Drop the lock. + // Mark this index as claimed. + bits[int(i)/wordBits].Or(1 << (int(i) % wordBits)) + return int(i) } @@ -140,3 +168,7 @@ again: func storeNoRace[T any](p *T, v T) { *p = v } + +func logBits(cap int) int { + return (cap + wordBits - 1) / wordBits +} diff --git a/internal/ext/syncx/log_export_test.go b/internal/ext/syncx/log_export_test.go new file mode 100644 index 000000000..ecedfc4cb --- /dev/null +++ b/internal/ext/syncx/log_export_test.go @@ -0,0 +1,21 @@ +// Copyright 2020-2025 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncx + +import "math" + +func (s *Log[T]) SetFull() { + s.cap.Store(math.MaxInt32) +} diff --git a/internal/ext/syncx/log_test.go b/internal/ext/syncx/log_test.go index 7cc32f5dc..352127882 100644 --- a/internal/ext/syncx/log_test.go +++ b/internal/ext/syncx/log_test.go @@ -37,4 +37,16 @@ func TestLog(t *testing.T) { assert.Equal(t, n, log.Load(i)) } }) + + // Verify that mis-using an index panics. + i := log.Append(0) + assert.Panics(t, func() { log.Load(i + 1) }) +} + +func TestExhaust(t *testing.T) { + t.Parallel() + + log := new(syncx.Log[int]) + log.SetFull() + assert.Panics(t, func() { log.Append(0) }) } From 9a313c4c25c6b8d6c209082ed862c1b97e7c9833 Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 16:39:21 -0700 Subject: [PATCH 21/23] remove data race --- internal/ext/syncx/log.go | 138 +++++++++----------------------------- internal/intern/intern.go | 21 +++--- 2 files changed, 41 insertions(+), 118 deletions(-) diff --git a/internal/ext/syncx/log.go b/internal/ext/syncx/log.go index d36f2fbec..3bae0ca24 100644 --- a/internal/ext/syncx/log.go +++ b/internal/ext/syncx/log.go @@ -17,8 +17,6 @@ package syncx import ( "errors" - "fmt" - "math" "runtime" "sync/atomic" "unsafe" @@ -27,45 +25,23 @@ import ( // Log is an append-only log. // // Loading and append operations may happen concurrently with each other. +// Can hold at most 2^31 elements. type Log[T any] struct { - // Protected by the lock bit in cap. This means that cap must be loaded - // before loading this value, to ensure that prior writes to it are seen, - // and cap must be stored after storing this value, to ensure the write is - // published. - // - // This value is atomic because otherwise a data race occurs. Go guarantees - // this is not actually a data race, because in Go all pointer loads/stores - // are relaxed atomic, but this is essentially free, because we're already - // loading cap. ptr atomic.Pointer[T] - // Array of bits with length equal to cap divided by the bit size of uintptr, - // rounded up. - bits atomic.Pointer[atomic.Uintptr] - - next atomic.Int32 // The next index to fill. - cap atomic.Int32 // Top bit is used as a spinlock. + next, len atomic.Int32 // The next index to fill. + cap atomic.Int32 // Top bit is used as a spinlock. } -const ( - lockbit = math.MinInt32 - wordBits = int(unsafe.Sizeof(uintptr(0)) * 8) -) - // Load returns the value at the given index. // // Panics if no value is at that index. func (s *Log[T]) Load(idx int) T { // Read cap first, which is required before we can read s.ptr. - cap := s.cap.Load() &^ lockbit + len := s.len.Load() ptr := s.ptr.Load() - bits := unsafe.Slice(s.bits.Load(), logBits(int(cap))) - if n := idx / wordBits; n >= len(bits) || bits[n].Load()&(1<<(idx%wordBits)) == 0 { - panic(fmt.Errorf("internal/syncx: index out of bounds [%v]", idx)) - } - - return unsafe.Slice(ptr, cap)[idx] + return unsafe.Slice(ptr, len)[idx] } // Append adds a new value to this slice. @@ -75,100 +51,50 @@ func (s *Log[T]) Load(idx int) T { func (s *Log[T]) Append(v T) int { i := s.next.Add(1) if i < 0 { - panic(errors.New("internal/syncx: cannot allocate more than 2^32 elements")) + panic(errors.New("internal/syncx: cannot allocate more than 2^31 elements")) } i-- -again: - // Load cap first. See comment in [Load]. + // Wait for the capacity to be large enough for our index, or for us to + // be responsible for growing it (i == c). c := s.cap.Load() - if c&lockbit != 0 { + for i > c { runtime.Gosched() - goto again + c = s.cap.Load() } - p := s.ptr.Load() - b := s.bits.Load() - slice := unsafe.Slice(p, c) - bits := unsafe.Slice(b, logBits(int(c))) - if uint32(i) < uint32(c) { // Don't need to grow the slice. - i := int(i) - - // This is a data race. However, it's fine, because this slot is not - // valid yet, so tearing it is fine. - // - // This is, in fact, a benign race. So long as this value is not read - // at before this function returns i, no memory corruption is possible. - // In particular, Go promises to never tear pointers, so we can't make - // the GC freak out about broken pointers. - // - // See https://go.dev/ref/mem#restrictions - // - // This store is also the slowest part of this function, due to - // significant cache thrashing if the slice is resized from under us. - storeNoRace(&slice[i], v) + // Fast path (i < c): slice is already large enough. + if i < c { + p := s.ptr.Load() + unsafe.Slice(p, c)[i] = v - // Mark this index as claimed. - bits[i/wordBits].Or(1 << (i % wordBits)) - - // If the value was potentially torn, it would have resulted in c - // changing, meaning we need to try again. - // - // A very important property is that this value never returns to the - // same value after a resize begins, preventing an ABA problem. If the - // capacity does not change across a store, it means that store - // succeeded - // - // To ensure that the value we just wrote above is visible to other - // goroutines, in particular a goroutine that wants to perform a resize, - // We need to store to the capacity. The easiest way to achieve both - // things at once is with the following CAS. - if !s.cap.CompareAndSwap(c, c) { + s.len.Add(1) + for s.len.Load() <= i { + // Make sure that every index before us also completes, to ensure + // that Load does not panic. runtime.Gosched() - goto again } - return i + return int(i) } - // Need to grow a slice. Lock the slice by setting the sign bit of the - // capacity. - // - // This lock is necessary so that updating ptr always happens together with - // cap. - if !s.cap.CompareAndSwap(c, c|lockbit) { - goto again + // Slow path (i == c): we are responsible for growing the slice. Need to + // wait until all fast-path writers to finish. Any further writers will + // spin in the i > c loop. + for s.len.Load() != c { + runtime.Gosched() } - // Grow the slice enough to insert our value. - // - // Getting preempted by the call into the allocator would be... non-ideal, - // but there isn't really a way to prevent that. - slice = append(slice, make([]T, max(i+1, 16)-c)...) - slice[i] = v - - // Now, grow the bit array. - bits2 := make([]atomic.Uintptr, logBits(cap(slice))) - for i := range bits { - bits2[i].Store(bits[i].Load()) - } - bits = bits2 + // Grow the slice. + // i == c, so we are appending exactly one element right now. + p := s.ptr.Load() + slice := append(unsafe.Slice(p, c), v) + // Publish the new slice to readers and waiting writers. + // Pointer must be stored before capacity to prevent out-of-bounds panics in Load. s.ptr.Store(unsafe.SliceData(slice)) - s.bits.Store(unsafe.SliceData(bits)) - s.cap.Store(int32(cap(slice))) // Drop the lock. - - // Mark this index as claimed. - bits[int(i)/wordBits].Or(1 << (int(i) % wordBits)) + s.cap.Store(int32(cap(slice))) + s.len.Add(1) return int(i) } - -//go:norace -func storeNoRace[T any](p *T, v T) { - *p = v -} - -func logBits(cap int) int { - return (cap + wordBits - 1) / wordBits -} diff --git a/internal/intern/intern.go b/internal/intern/intern.go index 5c64af76e..dc7ac48ef 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -74,7 +74,7 @@ func (id ID) GoString() string { // // The zero value of Table is empty and ready to use. type Table struct { - index sync.Map + index sync.Map // [string, atomic.Int32] table syncx.Log[string] stats atomic.Pointer[stats] } @@ -201,7 +201,7 @@ func (t *Table) Query(s string) (ID, bool) { return 0, false } - id := v.(ID) //nolint:errcheck + id := ID(v.(*atomic.Int32).Load()) //nolint:errcheck if id == 0 { // Handle the case where this is a mid-insertion. return 0, false @@ -210,12 +210,6 @@ func (t *Table) Query(s string) (ID, bool) { return id, true } -// Used as a sentinel in internSlow. 0 always represents "" and is never -// present as a value in the index. -// -// This is here to avoid a call to runtime.convT32 in internSlow. -var inserting any = ID(0) - func (t *Table) internSlow(s string) ID { // Intern tables are expected to be long-lived. Avoid holding onto a larger // buffer that s is an internal pointer to by cloning it. @@ -229,9 +223,12 @@ func (t *Table) internSlow(s string) ID { key := any(s) again: - // Try to become the "leader" which is interning s. - if v, ok := t.index.LoadOrStore(key, inserting); ok { - id := v.(ID) //nolint:errcheck + // Try to become the "leader" which is interning s. Insert a 0, which is + // "" (never interned), to mark this slot as taken. + v, ok := t.index.LoadOrStore(key, new(atomic.Int32)) + p := v.(*atomic.Int32) //nolint:errcheck + if ok { + id := ID(p.Load()) if id == 0 { // Someone *else* is doing the inserting, apparently. runtime.Gosched() @@ -245,7 +242,7 @@ again: // Figure out the next interning ID. // The first ID will have value 1. ID 0 is reserved for "". id := ID(t.table.Append(s) + 1) - t.index.Store(key, id) // Commit the ID. + p.Store(int32(id)) return id } From a1048db555098e9bbc2ad09a10057ba1c3ad503f Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Mon, 9 Mar 2026 16:56:39 -0700 Subject: [PATCH 22/23] unbreak --- internal/ext/syncx/log_export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/ext/syncx/log_export_test.go b/internal/ext/syncx/log_export_test.go index ecedfc4cb..ce02f1d2b 100644 --- a/internal/ext/syncx/log_export_test.go +++ b/internal/ext/syncx/log_export_test.go @@ -17,5 +17,5 @@ package syncx import "math" func (s *Log[T]) SetFull() { - s.cap.Store(math.MaxInt32) + s.next.Store(math.MaxInt32) } From 9eb9d65abbd2472c4b241846055261c607cfd0fc Mon Sep 17 00:00:00 2001 From: Miguel Young de la Sota Date: Tue, 10 Mar 2026 13:47:20 -0700 Subject: [PATCH 23/23] ensure exit --- internal/ext/syncx/log.go | 25 ++++++++++++++--- internal/ext/syncx/log_test.go | 10 ++++--- .../export_test.go} | 8 +++--- internal/intern/intern.go | 28 +++++++++++++++---- internal/intern/intern_test.go | 14 ++++++++++ 5 files changed, 68 insertions(+), 17 deletions(-) rename internal/{ext/syncx/log_export_test.go => intern/export_test.go} (81%) diff --git a/internal/ext/syncx/log.go b/internal/ext/syncx/log.go index 3bae0ca24..2cb6883ff 100644 --- a/internal/ext/syncx/log.go +++ b/internal/ext/syncx/log.go @@ -17,11 +17,15 @@ package syncx import ( "errors" + "math" "runtime" "sync/atomic" + "testing" "unsafe" ) +var ErrLogExhausted = errors.New("internal/syncx: cannot allocate more than 2^31 elements") + // Log is an append-only log. // // Loading and append operations may happen concurrently with each other. @@ -48,10 +52,12 @@ func (s *Log[T]) Load(idx int) T { // // Returns the index of the appended element, which can be looked up with // [Log.Load]. -func (s *Log[T]) Append(v T) int { +// +// Returns an error if indices are exhausted. +func (s *Log[T]) Append(v T) (int, error) { i := s.next.Add(1) if i < 0 { - panic(errors.New("internal/syncx: cannot allocate more than 2^31 elements")) + return 0, ErrLogExhausted } i-- @@ -75,7 +81,7 @@ func (s *Log[T]) Append(v T) int { runtime.Gosched() } - return int(i) + return int(i), nil } // Slow path (i == c): we are responsible for growing the slice. Need to @@ -96,5 +102,16 @@ func (s *Log[T]) Append(v T) int { s.cap.Store(int32(cap(slice))) s.len.Add(1) - return int(i) + return int(i), nil +} + +// SetFullForTesting sets this log to full, so that future calls to [Log.Append] +// panic. +// +// Must not be called concurrently. Can only be called from a unit test. +func (s *Log[T]) SetFullForTesting() { + if !testing.Testing() { + panic("called SetFull outside of a test") + } + s.next.Store(math.MaxInt32) } diff --git a/internal/ext/syncx/log_test.go b/internal/ext/syncx/log_test.go index 352127882..ac01e2058 100644 --- a/internal/ext/syncx/log_test.go +++ b/internal/ext/syncx/log_test.go @@ -33,13 +33,13 @@ func TestLog(t *testing.T) { synctestx.Hammer(0, func() { for range trials { n := rand.Int() - i := log.Append(n) + i, _ := log.Append(n) assert.Equal(t, n, log.Load(i)) } }) // Verify that mis-using an index panics. - i := log.Append(0) + i, _ := log.Append(0) assert.Panics(t, func() { log.Load(i + 1) }) } @@ -47,6 +47,8 @@ func TestExhaust(t *testing.T) { t.Parallel() log := new(syncx.Log[int]) - log.SetFull() - assert.Panics(t, func() { log.Append(0) }) + log.SetFullForTesting() + + _, err := log.Append(0) + assert.Error(t, err) } diff --git a/internal/ext/syncx/log_export_test.go b/internal/intern/export_test.go similarity index 81% rename from internal/ext/syncx/log_export_test.go rename to internal/intern/export_test.go index ce02f1d2b..20644aade 100644 --- a/internal/ext/syncx/log_export_test.go +++ b/internal/intern/export_test.go @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package syncx +package intern -import "math" +import "github.com/bufbuild/protocompile/internal/ext/syncx" -func (s *Log[T]) SetFull() { - s.next.Store(math.MaxInt32) +func (t *Table) Table() *syncx.Log[string] { + return &t.table } diff --git a/internal/intern/intern.go b/internal/intern/intern.go index dc7ac48ef..7d70ea22b 100644 --- a/internal/intern/intern.go +++ b/internal/intern/intern.go @@ -201,7 +201,12 @@ func (t *Table) Query(s string) (ID, bool) { return 0, false } - id := ID(v.(*atomic.Int32).Load()) //nolint:errcheck + p := v.(*atomic.Int32) //nolint:errcheck + if p == nil { + // This key has been poisoned because we ran out of entries. + return 0, false + } + id := ID(p.Load()) if id == 0 { // Handle the case where this is a mid-insertion. return 0, false @@ -225,9 +230,14 @@ func (t *Table) internSlow(s string) ID { again: // Try to become the "leader" which is interning s. Insert a 0, which is // "" (never interned), to mark this slot as taken. - v, ok := t.index.LoadOrStore(key, new(atomic.Int32)) + v, loaded := t.index.LoadOrStore(key, new(atomic.Int32)) p := v.(*atomic.Int32) //nolint:errcheck - if ok { + if loaded { + if p == nil { + // We ran out of IDs for this key. + panic(syncx.ErrLogExhausted) + } + id := ID(p.Load()) if id == 0 { // Someone *else* is doing the inserting, apparently. @@ -240,8 +250,16 @@ again: } // Figure out the next interning ID. - // The first ID will have value 1. ID 0 is reserved for "". - id := ID(t.table.Append(s) + 1) + i, err := t.table.Append(s) + if err != nil { + // Poison this key. This will cause any goroutines waiting for interning + // to complete to also panic. + t.index.Store(key, (*atomic.Int32)(nil)) + panic(err) + } + + // Commit the new ID. + id := ID(i + 1) p.Store(int32(id)) return id diff --git a/internal/intern/intern_test.go b/internal/intern/intern_test.go index db423e54c..5f7770c91 100644 --- a/internal/intern/intern_test.go +++ b/internal/intern/intern_test.go @@ -28,6 +28,7 @@ import ( "github.com/bufbuild/protocompile/internal/ext/slicesx" "github.com/bufbuild/protocompile/internal/ext/synctestx" + "github.com/bufbuild/protocompile/internal/ext/syncx" "github.com/bufbuild/protocompile/internal/inlinetest" "github.com/bufbuild/protocompile/internal/intern" ) @@ -140,6 +141,19 @@ func TestHammer(t *testing.T) { } } +func TestExhaust(t *testing.T) { + t.Parallel() + + // Validate that if IDs are exhausted, every thread potentially waiting on + // that panics and does not hang. + it := new(intern.Table) + it.Table().SetFullForTesting() + synctestx.Hammer(0, func() { + defer func() { assert.Equal(t, syncx.ErrLogExhausted, recover()) }() + it.Intern("uh oh") + }) +} + func BenchmarkIntern(b *testing.B) { // Helper to ensure that it.Value is actually inlined, which is relevant // for benchmarks. Calls within the body of a benchmark are never inlined.