From 8ad7913a8adf6495a991f61ff29536f0e854d5fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 3 Feb 2024 11:01:15 +0100 Subject: [PATCH 1/4] wip better carlog index --- carlog/carlog.go | 1 + carlog/idx_log_bsst.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 carlog/idx_log_bsst.go diff --git a/carlog/carlog.go b/carlog/carlog.go index e36b9dd..140a9f2 100644 --- a/carlog/carlog.go +++ b/carlog/carlog.go @@ -717,6 +717,7 @@ func (j *CarLog) Commit() (int64, error) { defer j.idxLk.RUnlock() // todo log commit? + j.wIdx.Sync() if err := j.flushBuffered(); err != nil { return 0, xerrors.Errorf("flushing buffered data: %w", err) diff --git a/carlog/idx_log_bsst.go b/carlog/idx_log_bsst.go new file mode 100644 index 0000000..c1c2504 --- /dev/null +++ b/carlog/idx_log_bsst.go @@ -0,0 +1,34 @@ +package carlog + +import ( + mh "github.com/multiformats/go-multihash" +) + +type LogBsstIndex struct { +} + +func (l *LogBsstIndex) Put(c []mh.Multihash, offs []int64) error { + // append to current log + + // if log is too big, create new log, schedule compaction for old log + + //TODO implement me + panic("implement me") +} + +func (l *LogBsstIndex) Del(c []mh.Multihash) error { + //TODO implement me + panic("implement me") +} + +func (l *LogBsstIndex) ToTruncate(atOrAbove int64) ([]mh.Multihash, error) { + //TODO implement me + panic("implement me") +} + +func (l *LogBsstIndex) Close() error { + //TODO implement me + panic("implement me") +} + +var _ WritableIndex = (*LogBsstIndex)(nil) From d7b610aca1a29a2833a41612b7980c0885c379d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 3 Feb 2024 17:07:19 +0100 Subject: [PATCH 2/4] bssti: Mostly implemented Put --- carlog/idx_log_bsst.go | 274 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 270 insertions(+), 4 deletions(-) diff --git a/carlog/idx_log_bsst.go b/carlog/idx_log_bsst.go index c1c2504..eaf02b3 100644 --- a/carlog/idx_log_bsst.go +++ b/carlog/idx_log_bsst.go @@ -1,22 +1,265 @@ package carlog import ( + "bufio" + "encoding/binary" + "fmt" + "github.com/lotus-web3/ribs/bsst" + "github.com/minio/sha256-simd" mh "github.com/multiformats/go-multihash" + "golang.org/x/xerrors" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" +) + +const ( + LogBufSize = 128 << 10 + LogWriteCh = 128 + + LogMaxSize = 1 << 20 // 1 Mi entries + + StringLogExt = ".sl" + BsstProgExt = ".bsst.prog" + BsstExt = ".bsst" ) type LogBsstIndex struct { + root string // root dir + + // mhh salt + Salt [32]byte + + partitions []*partition // last one is write log, rest is compacting or compacted partitions + + writeLk sync.Mutex } +type partition struct { + compacted atomic.Bool // unless compacted read from logIndex + + // Log state + + logFile *os.File + + logIndexLk sync.Mutex // maps ops are fast, so rw lock may actually be slower (todo benchmark) + logIndex map[string]int64 + + // writing + writesSent int64 + //writesRecv atomic.Int64 + writesDone atomic.Int64 + + writeCh chan []byte + closing bool + + bw *bufio.Writer + + writeFlushed atomic.Int64 + + lastFinishedWrite atomic.Int64 + + // Compacted state +} + +func OpenLogBsstIndex(root string) (*LogBsstIndex, error) { + if err := os.MkdirAll(root, 0755); err != nil { + return nil, xerrors.Errorf("mkdirall: %w", err) + } + + lbi := &LogBsstIndex{ + root: root, + } + + // Load salt (if not present we're creating a new index) + + // Load partitions + // * Every non-last log is a compacting log + // * Every log with .bsst is a compacted log + // * If a log with .bsst has a .sl file, it's a finished compaction but .sl wasn't removed (we do that as cleanup on startup) + // * Logs with .bsst.prog are unfinished compactions + // * Last log is a write log unless it has a .bsst.prog file, in which case it's a compacting log + // * If there's no last writable log, new one will be created in Put + + return lbi, nil +} + +/* +---------------------------- +---------- WRITE ---------- +---------------------------- +*/ + func (l *LogBsstIndex) Put(c []mh.Multihash, offs []int64) error { - // append to current log - - // if log is too big, create new log, schedule compaction for old log + if len(c) != len(offs) { + return xerrors.New("mismatched input lengths") + } + if len(c) == 0 { + return nil + } + + // make entries + /*writeBuf := pool.Get(len(c) * bsst.EntrySize) + defer pool.Put(writeBuf)*/ + writeBuf := make([]byte, len(c)*bsst.EntrySize) + + for i, h := range c { + k := l.makeMHKey(h, 0) // always 0th instance, we assume entries don't repeat + + // bsst.EntKeyBytes + copy(writeBuf[i*bsst.EntrySize:], k[:bsst.EntKeyBytes]) + binary.LittleEndian.PutUint64(writeBuf[i*bsst.EntrySize+bsst.EntKeyBytes:], uint64(offs[i])) + } + + l.writeLk.Lock() + + { + // ensure space in current log + + needNewLog := true + if len(l.partitions) > 0 { + lastLog := l.partitions[len(l.partitions)-1] + if lastLog.writesSent+int64(len(c)) < LogMaxSize { + needNewLog = false + } + } + + if needNewLog { + if err := l.newLog(); err != nil { + l.writeLk.Unlock() + return err + } + } + } + + l.partitions[len(l.partitions)-1].writesSent += int64(len(c)) + l.partitions[len(l.partitions)-1].writeCh <- writeBuf + + l.partitions[len(l.partitions)-1].logIndexLk.Lock() + l.writeLk.Unlock() + + for i, h := range c { + l.partitions[len(l.partitions)-1].logIndex[string(h)] = offs[i] + } + l.partitions[len(l.partitions)-1].logIndexLk.Unlock() + + return nil +} + +func (l *LogBsstIndex) newLog() error { + logPath := filepath.Join(l.root, fmt.Sprintf("log%d.sl", len(l.partitions))) + + f, err := os.Create(logPath) + if err != nil { + return xerrors.Errorf("create new log file: %w", err) + } + + ilo := &partition{ + logFile: f, + logIndex: map[string]int64{}, // todo map pool? + writeCh: make(chan []byte, LogWriteCh), + bw: bufio.NewWriterSize(f, LogBufSize), + } + + prevLast := l.partitions[len(l.partitions)-1] + close(prevLast.writeCh) + l.partitions = append(l.partitions, ilo) + go ilo.run() + + return nil +} + +func (l *LogBsstIndex) Del(c []mh.Multihash) error { + panic("implement me") +} + +func (i *partition) run() { + var writesRecv int64 + + for b := range i.writeCh { + if len(b) == 0 { + if err := i.flush(writesRecv); err != nil { + log.Errorf("flushing log: %s", err) + return + } + continue + } + + writesRecv++ + + _, err := i.bw.Write(b) + if err != nil { + log.Errorf("writing to log: %s", err) + return // this will make things hang, but we can't really do anything else (todo wider panik) + } + } + + if err := i.flush(writesRecv); err != nil { + log.Errorf("flushing log: %s", err) + return + } + + if i.closing { + if err := i.logFile.Close(); err != nil { + log.Errorf("closing log file: %s", err) + return + } + + return + } + + i.compact() +} + +func (i *partition) compact() { + nonSlPath := strings.TrimSuffix(i.logFile.Name(), StringLogExt) + bsstPath := nonSlPath + BsstExt + + bsst.CreateAdv() +} + +func (i *partition) flush(writesRecv int64) error { + if writesRecv == 0 { + return nil + } + + if err := i.bw.Flush(); err != nil { + return xerrors.Errorf("flushing log: %w", err) + } + + if err := i.logFile.Sync(); err != nil { + return xerrors.Errorf("syncing log: %w", err) + } + + i.writeFlushed.Store(writesRecv) + return nil +} + +/* +---------------------------- +----------- READ ----------- +---------------------------- +*/ + +func (l *LogBsstIndex) Has(c []mh.Multihash) ([]bool, error) { //TODO implement me panic("implement me") } -func (l *LogBsstIndex) Del(c []mh.Multihash) error { +func (l *LogBsstIndex) Get(c []mh.Multihash) ([]int64, error) { + //TODO implement me + panic("implement me") +} + +func (l *LogBsstIndex) Entries() (int64, error) { + //TODO implement me + panic("implement me") +} + +func (l *LogBsstIndex) List(f func(c mh.Multihash, offs []int64) error) error { //TODO implement me panic("implement me") } @@ -31,4 +274,27 @@ func (l *LogBsstIndex) Close() error { panic("implement me") } +type multiHashHash struct { + mhh [32]byte + off int64 +} + +func (l *LogBsstIndex) makeMHH(c mh.Multihash, i int64, off int64) multiHashHash { + return multiHashHash{ + mhh: l.makeMHKey(c, i), + off: off, + } +} + +func (l *LogBsstIndex) makeMHKey(c mh.Multihash, i int64) [32]byte { + // buf = [salt][i: le64][c[:64]] + var buf [(32 + 8) + (32 * 2)]byte + copy(buf[:], l.Salt[:]) + binary.LittleEndian.PutUint64(buf[32:], uint64(i)) + copy(buf[32+8:], c) + + return sha256.Sum256(buf[:]) +} + var _ WritableIndex = (*LogBsstIndex)(nil) +var _ ReadableIndex = (*LogBsstIndex)(nil) From 6e514fd7a0a1e8bce14df86ef4333f9b8f0d9b86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 4 Feb 2024 12:30:38 +0100 Subject: [PATCH 3/4] bssti: partition compact --- bsst/bsst.go | 254 +++++++++++++++++++++++++++++++++++++++++ bsst/bsst_test.go | 30 ++++- carlog/idx_log_bsst.go | 99 +++++++++++++--- 3 files changed, 366 insertions(+), 17 deletions(-) diff --git a/bsst/bsst.go b/bsst/bsst.go index f9967c2..0508bba 100644 --- a/bsst/bsst.go +++ b/bsst/bsst.go @@ -79,6 +79,260 @@ type BSST struct { var BsstCIDSampleSize = 1024 +func Create2(path string, entries int64, source Source) (bsst *BSST, err error) { + var salt [32]byte + if _, err := rand.Read(salt[:]); err != nil { + return nil, xerrors.Errorf("generate salt: %w", err) + } + + header := &BSSTHeader{ + Salt: salt, + } + + var nextLevel []multiHashHash + + createSample := make([]multihash.Multihash, 0, BsstCIDSampleSize) // TODO!!! + + err = source.List(func(c multihash.Multihash, offs []int64) error { + if len(createSample) < BsstCIDSampleSize { + // copy c data + + c := append([]byte(nil), c...) + createSample = append(createSample, c) + } + for i, off := range offs { + nextLevel = append(nextLevel, header.makeMHH(c, int64(i), off)) + } + return nil + }) + if err != nil { + return nil, xerrors.Errorf("write buckets: %w", err) + } + + // sort buckets + + // todo parallel merge sort + sort.Slice(nextLevel, func(i, j int) bool { + return bytes.Compare(nextLevel[i].mhh[:], nextLevel[j].mhh[:]) < 0 + }) + + bsst, err = CreateAdv(path, entries, salt, func(i int64) []byte { // todo test + var ent [EntrySize]byte + copy(ent[:], nextLevel[i].mhh[:]) + binary.LittleEndian.PutUint64(ent[EntKeyBytes:], uint64(nextLevel[i].off)) + return ent[:] + }) + if err != nil { + return nil, xerrors.Errorf("create adv: %w", err) + } + + bsst.CreateSample = createSample + return bsst, nil +} + +func CreateAdv(path string, entries int64, salt [32]byte, data func(i int64) []byte) (bsst *BSST, err error) { + f, err := os.Create(path) + if err != nil { + return nil, xerrors.Errorf("open file: %w", err) + } + + header := &BSSTHeader{ + L0Buckets: (entries + MeanEntriesPerBucket - 1) / MeanEntriesPerBucket, // ceil entries / MeanEntriesPerBucket + BucketSize: BucketSize, + Entries: entries, + + Levels: 1, + LevelFactor: LevelFactor, + Finalized: false, + + Salt: salt, + } + + var hdrBuf [BucketSize]byte + + copy(hdrBuf[:], BsstMagic) + + if err := header.MarshalCBOR(bytes.NewBuffer(hdrBuf[8:])); err != nil { + return nil, xerrors.Errorf("marshal header: %w", err) + } + + if _, err := f.Write(hdrBuf[:]); err != nil { + return nil, xerrors.Errorf("write header: %w", err) + } + + // sync header + if err := f.Sync(); err != nil { + return nil, xerrors.Errorf("sync header: %w", err) + } + + // write buckets + bufWriter := bufio.NewWriterSize(f, 4<<20) + nullEntry := [EntrySize]byte{} + + inBucket := uint64(0) + bucketEnts := uint64(0) + + var bloomHead [BucketBloomFilterSize]byte + + flushBucket := func(bucketIdx uint64) error { + if inBucket > bucketIdx { + panic("buckets not sorted") + } + + //bSizes[bucketEnts]++ + + // pad up to BucketUserEntries entries + for bucketEnts < BucketUserEntries { + if _, err := bufWriter.Write(nullEntry[:]); err != nil { + return xerrors.Errorf("write padding entry: %w", err) + } + bucketEnts++ + } + + // write bloom filter / header padding + if _, err := bufWriter.Write(bloomHead[:]); err != nil { + return xerrors.Errorf("write header entry filter: %w", err) + } + + // reset temp vars + inBucket++ + bucketEnts = 0 + + return nil + } + + var ents uint64 + var level int + levelBuckets := uint64(header.L0Buckets) + prevLevelBuckets := uint64(0) + + var nextLevel [][]byte + first := true + entsToIter := entries + + for len(nextLevel) > 0 || first { + list := nextLevel + nextLevel = [][]byte{} + + layerSource := data + if !first { + layerSource = func(i int64) []byte { + return list[i] + } + entsToIter = int64(len(list)) + } + + first = false + + bucketRange := math.MaxUint64 / levelBuckets // todo techincally +1?? (if changing note this is also calculated below) + + for i := int64(0); i < entsToIter; i++ { + entry := layerSource(i) + + // first 64 bits of hash to calculate bucket + hashidx := binary.BigEndian.Uint64(entry[:8]) + bucketIdx := prevLevelBuckets + (hashidx / bucketRange) + bloomEntIdx := (hashidx % bucketRange) / (bucketRange / BucketBloomFilterEntries) + + for inBucket != bucketIdx { + if err := flushBucket(bucketIdx); err != nil { + return nil, err + } + + // reset bloom + for i := range bloomHead[:BucketBloomFilterSize] { + bloomHead[i] = 0 + } + } + + // update bloom filter first, so that even if this entry won't fit in this level, we know that it doesn't + // exist in one i/o if it's not in bloom + bloomHead[bloomEntIdx/8] |= 1 << (bloomEntIdx % 8) + + // check if we have space for this entry + if bucketEnts >= BucketUserEntries { + nextLevel = append(nextLevel, entry) + continue + } + + // write entry + if _, err := bufWriter.Write(entry[:EntrySize]); err != nil { + return nil, xerrors.Errorf("write entry: %w", err) + } + + bucketEnts++ + ents++ + } + + // flush last buckets + for inBucket < levelBuckets+prevLevelBuckets { // todo is the condition right? + if err := flushBucket(levelBuckets + prevLevelBuckets); err != nil { + return nil, err + } + + // todo could only do this once + for i := range bloomHead[:BucketBloomFilterSize] { + bloomHead[i] = 0 + } + } + + level++ + prevLevelBuckets += levelBuckets + levelBuckets = (levelBuckets + LevelFactor - 1) / LevelFactor // ceil(levelBuckets / LevelFactor) + } + + header.Levels = int64(level) + + if err := bufWriter.Flush(); err != nil { + return nil, xerrors.Errorf("flush buckets: %w", err) + } + + if err := f.Sync(); err != nil { + return nil, xerrors.Errorf("sync buckets: %w", err) + } + + // finalize header + header.Finalized = true + if _, err := f.Seek(8, io.SeekStart); err != nil { + return nil, xerrors.Errorf("seek to header: %w", err) + } + + if err := header.MarshalCBOR(f); err != nil { + return nil, xerrors.Errorf("marshal header: %w", err) + } + + if err := f.Sync(); err != nil { + return nil, xerrors.Errorf("sync header: %w", err) + } + + // print bSizes sorted by key + /*var keys []int + for k := range bSizes { + keys = append(keys, int(k)) + } + sort.Ints(keys)*/ + + /*var ibe int64 + for _, k := range keys { + ibe += bSizes[uint64(k)] * int64(k) + //fmt.Printf("bs %d: %d\n", k, bSizes[uint64(k)]) + }*/ + + //fmt.Println("ents ", ents, " bkt ", header.L0Buckets, " ibe ", ibe) + + if err := f.Close(); err != nil { + return nil, xerrors.Errorf("close written bsst: %w", err) + } + + // reopen in read-only mode + bsst, err = Open(path) + if err != nil { + return nil, xerrors.Errorf("reopen bsst: %w", err) + } + + return bsst, nil +} + func Create(path string, entries int64, source Source) (bsst *BSST, err error) { f, err := os.Create(path) if err != nil { diff --git a/bsst/bsst_test.go b/bsst/bsst_test.go index 772e636..c321c1c 100644 --- a/bsst/bsst_test.go +++ b/bsst/bsst_test.go @@ -34,8 +34,7 @@ func (t *testSource) List(f func(c mh.Multihash, offs []int64) error) error { var _ Source = (*testSource)(nil) func TestBSSTCreate(t *testing.T) { - - n := int64(13421280) + n := int64(13_421_280) bsst, err := Create(filepath.Join(t.TempDir(), "/a.bsst"), n, &testSource{n: n}) require.NoError(t, err) @@ -60,3 +59,30 @@ func TestBSSTCreate(t *testing.T) { require.NoError(t, err) require.Equal(t, n, got) } + +func TestBSSTCreate2(t *testing.T) { + n := int64(13_421_280) + + bsst, err := Create2(filepath.Join(t.TempDir(), "/a.bsst"), n, &testSource{n: n}) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, bsst.Close()) }) + + var got int64 + + err = (&testSource{n: n}).List(func(c mh.Multihash, offs []int64) error { + h, err := bsst.Has([]mh.Multihash{c}) + require.NoError(t, err) + if !h[0] { + require.True(t, h[0]) // todo probably full buckets + } + + r, err := bsst.Get([]mh.Multihash{c}) + require.NoError(t, err) + require.Equal(t, offs[0], r[0]) + + got++ + return nil + }) + require.NoError(t, err) + require.Equal(t, n, got) +} diff --git a/carlog/idx_log_bsst.go b/carlog/idx_log_bsst.go index eaf02b3..248f635 100644 --- a/carlog/idx_log_bsst.go +++ b/carlog/idx_log_bsst.go @@ -2,14 +2,17 @@ package carlog import ( "bufio" + "bytes" "encoding/binary" "fmt" + pool "github.com/libp2p/go-buffer-pool" "github.com/lotus-web3/ribs/bsst" "github.com/minio/sha256-simd" mh "github.com/multiformats/go-multihash" "golang.org/x/xerrors" "os" "path/filepath" + "sort" "strings" "sync" "sync/atomic" @@ -30,8 +33,9 @@ type LogBsstIndex struct { root string // root dir // mhh salt - Salt [32]byte + salt [32]byte + partsRWLk sync.RWMutex // note that writeLk is an implicit read lock for partitions as we only write partitions while holding it partitions []*partition // last one is write log, rest is compacting or compacted partitions writeLk sync.Mutex @@ -50,18 +54,20 @@ type partition struct { // writing writesSent int64 //writesRecv atomic.Int64 - writesDone atomic.Int64 writeCh chan []byte closing bool bw *bufio.Writer - writeFlushed atomic.Int64 + writeFlushed atomic.Int64 // TODO SET ON REOPEN lastFinishedWrite atomic.Int64 // Compacted state + salt [32]byte + + bss *bsst.BSST } func OpenLogBsstIndex(root string) (*LogBsstIndex, error) { @@ -161,13 +167,15 @@ func (l *LogBsstIndex) newLog() error { logIndex: map[string]int64{}, // todo map pool? writeCh: make(chan []byte, LogWriteCh), bw: bufio.NewWriterSize(f, LogBufSize), + salt: l.salt, + // writeFlushed = 0 is correct, new index } prevLast := l.partitions[len(l.partitions)-1] close(prevLast.writeCh) l.partitions = append(l.partitions, ilo) - go ilo.run() + go ilo.run(0) // new index so no writes yet return nil } @@ -176,9 +184,7 @@ func (l *LogBsstIndex) Del(c []mh.Multihash) error { panic("implement me") } -func (i *partition) run() { - var writesRecv int64 - +func (i *partition) run(writesRecv int64) { for b := range i.writeCh { if len(b) == 0 { if err := i.flush(writesRecv); err != nil { @@ -202,12 +208,12 @@ func (i *partition) run() { return } - if i.closing { - if err := i.logFile.Close(); err != nil { - log.Errorf("closing log file: %s", err) - return - } + if err := i.logFile.Close(); err != nil { + log.Errorf("closing log file: %s", err) + return + } + if i.closing { return } @@ -216,12 +222,75 @@ func (i *partition) run() { func (i *partition) compact() { nonSlPath := strings.TrimSuffix(i.logFile.Name(), StringLogExt) - bsstPath := nonSlPath + BsstExt + bsstPath := nonSlPath + BsstProgExt + bsstPathFinal := nonSlPath + BsstExt + + // load the whole log + logSize := i.writeFlushed.Load() * bsst.EntrySize + logData := pool.Get(int(logSize)) + defer pool.Put(logData) + + data := entrySlice(logData) + + // sort it + sort.Sort(data) + + // write bsst + bss, err := bsst.CreateAdv(bsstPath, int64(data.Len()), i.salt, data.Entry) + if err != nil { + log.Errorf("creating bsst: %s", err) + return // on restart we'll see that the bsst is or isn't correctly created and potentially redo the compaction + } + + i.bss = bss + + // rename bsst to final name + if err := os.Rename(bsstPath, bsstPathFinal); err != nil { + log.Errorf("renaming bsst: %s", err) + return + } + + // remove log + if err := os.Remove(i.logFile.Name()); err != nil { + log.Errorf("removing log: %s", err) + return + } + + // swap status to compacted + i.compacted.Store(true) + i.logIndexLk.Lock() + i.logIndex = nil // allow gc + i.logIndexLk.Unlock() +} + +type entrySlice []byte + +func (e entrySlice) Len() int { + return len(e) / bsst.EntrySize +} + +func (e entrySlice) Less(i, j int) bool { + return bytes.Compare(e[i*bsst.EntrySize:i*bsst.EntrySize+bsst.EntKeyBytes], e[j*bsst.EntrySize:j*bsst.EntKeyBytes+bsst.EntKeyBytes]) < 0 +} + +func (e entrySlice) Swap(i, j int) { + ii := i * bsst.EntrySize + jj := j * bsst.EntrySize - bsst.CreateAdv() + var temp [bsst.EntrySize]byte + + copy(temp[:], e[ii:ii+bsst.EntrySize]) + copy(e[ii:ii+bsst.EntrySize], e[jj:jj+bsst.EntrySize]) + copy(e[jj:jj+bsst.EntrySize], temp[:]) +} + +func (e entrySlice) Entry(i int64) []byte { + return e[i*bsst.EntrySize : (i+1)*bsst.EntrySize] } func (i *partition) flush(writesRecv int64) error { + // note: this method is only called from partition.run + if writesRecv == 0 { return nil } @@ -289,7 +358,7 @@ func (l *LogBsstIndex) makeMHH(c mh.Multihash, i int64, off int64) multiHashHash func (l *LogBsstIndex) makeMHKey(c mh.Multihash, i int64) [32]byte { // buf = [salt][i: le64][c[:64]] var buf [(32 + 8) + (32 * 2)]byte - copy(buf[:], l.Salt[:]) + copy(buf[:], l.salt[:]) binary.LittleEndian.PutUint64(buf[32:], uint64(i)) copy(buf[32+8:], c) From 69ea16f33eb16305f1c9c1a09d9ac66046a79d13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 4 Feb 2024 15:49:01 +0100 Subject: [PATCH 4/4] bssti: Mostly implemented read path --- bsst/bsst.go | 1 + carlog/idx_log_bsst.go | 313 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 279 insertions(+), 35 deletions(-) diff --git a/bsst/bsst.go b/bsst/bsst.go index 0508bba..70a04e7 100644 --- a/bsst/bsst.go +++ b/bsst/bsst.go @@ -725,6 +725,7 @@ func (h *BSST) Get(c []multihash.Multihash) ([]int64, error) { top: for i, k := range keys { + out[i] = -1 levelBuckets := uint64(h.h.L0Buckets) prevLevelBuckets := uint64(0) diff --git a/carlog/idx_log_bsst.go b/carlog/idx_log_bsst.go index 248f635..c04e13b 100644 --- a/carlog/idx_log_bsst.go +++ b/carlog/idx_log_bsst.go @@ -16,13 +16,14 @@ import ( "strings" "sync" "sync/atomic" + "time" ) const ( LogBufSize = 128 << 10 LogWriteCh = 128 - LogMaxSize = 1 << 20 // 1 Mi entries + LogMaxSize = 2 << 20 // 2 Mi entries, ~64MiB in memory StringLogExt = ".sl" BsstProgExt = ".bsst.prog" @@ -35,14 +36,14 @@ type LogBsstIndex struct { // mhh salt salt [32]byte - partsRWLk sync.RWMutex // note that writeLk is an implicit read lock for partitions as we only write partitions while holding it + partsLk sync.Mutex partitions []*partition // last one is write log, rest is compacting or compacted partitions writeLk sync.Mutex } type partition struct { - compacted atomic.Bool // unless compacted read from logIndex + compacting, compacted atomic.Bool // unless compacted read from logIndex // Log state @@ -62,8 +63,6 @@ type partition struct { writeFlushed atomic.Int64 // TODO SET ON REOPEN - lastFinishedWrite atomic.Int64 - // Compacted state salt [32]byte @@ -171,10 +170,20 @@ func (l *LogBsstIndex) newLog() error { // writeFlushed = 0 is correct, new index } + l.partsLk.Lock() + + // close previous last log prevLast := l.partitions[len(l.partitions)-1] - close(prevLast.writeCh) + // add new log l.partitions = append(l.partitions, ilo) + + l.partsLk.Unlock() + + // if we weren't holding the l.writeLk here this close would have to be under partsLk + close(prevLast.writeCh) + prevLast.writeCh = nil + go ilo.run(0) // new index so no writes yet return nil @@ -185,7 +194,9 @@ func (l *LogBsstIndex) Del(c []mh.Multihash) error { } func (i *partition) run(writesRecv int64) { - for b := range i.writeCh { + writeCh := i.writeCh + + for b := range writeCh { if len(b) == 0 { if err := i.flush(writesRecv); err != nil { log.Errorf("flushing log: %s", err) @@ -208,12 +219,11 @@ func (i *partition) run(writesRecv int64) { return } - if err := i.logFile.Close(); err != nil { - log.Errorf("closing log file: %s", err) - return - } - if i.closing { + if err := i.logFile.Close(); err != nil { + log.Errorf("closing log file: %s", err) + return + } return } @@ -221,19 +231,19 @@ func (i *partition) run(writesRecv int64) { } func (i *partition) compact() { + i.compacting.Store(true) + nonSlPath := strings.TrimSuffix(i.logFile.Name(), StringLogExt) bsstPath := nonSlPath + BsstProgExt bsstPathFinal := nonSlPath + BsstExt - // load the whole log - logSize := i.writeFlushed.Load() * bsst.EntrySize - logData := pool.Get(int(logSize)) - defer pool.Put(logData) - - data := entrySlice(logData) - - // sort it - sort.Sort(data) + // load and sorted log + data, err := i.mhhSortedLog(i.writeFlushed.Load()) + if err != nil { + log.Errorf("loading sorted log: %s", err) + return + } + defer pool.Put(data) // write bsst bss, err := bsst.CreateAdv(bsstPath, int64(data.Len()), i.salt, data.Entry) @@ -242,7 +252,14 @@ func (i *partition) compact() { return // on restart we'll see that the bsst is or isn't correctly created and potentially redo the compaction } - i.bss = bss + // swap status to compacted + i.bss = bss // first ready for reads from bsst + + i.compacted.Store(true) // say that reads should be from bsst + + i.logIndexLk.Lock() + i.logIndex = nil // remove memory index, allow gc + i.logIndexLk.Unlock() // rename bsst to final name if err := os.Rename(bsstPath, bsstPathFinal); err != nil { @@ -250,17 +267,35 @@ func (i *partition) compact() { return } + if err := i.logFile.Close(); err != nil { + log.Errorf("closing log file: %s", err) + return + } + i.logFile = nil + // remove log if err := os.Remove(i.logFile.Name()); err != nil { log.Errorf("removing log: %s", err) return } +} - // swap status to compacted - i.compacted.Store(true) - i.logIndexLk.Lock() - i.logIndex = nil // allow gc - i.logIndexLk.Unlock() +// callers MUST call pool.Put on the returned slice +func (i *partition) mhhSortedLog(sizeEntries int64) (entrySlice, error) { + logSize := sizeEntries * bsst.EntrySize + logData := pool.Get(int(logSize)) + + _, err := i.logFile.ReadAt(logData, 0) + if err != nil { + return nil, err + } + + data := entrySlice(logData) + + // sort it + sort.Sort(data) + + return data, nil } type entrySlice []byte @@ -307,6 +342,47 @@ func (i *partition) flush(writesRecv int64) error { return nil } +// MUST be called with LogBsstIndex.writeLk held +func (i *partition) startSync() int64 { + if i.writeCh == nil { + // either closing on compaction requested. in both cases that stops writes + // and triggers a flush, so we can just return the current writesSent + return i.writesSent + } + + waitUntil := i.writesSent + i.writeCh <- nil + return waitUntil +} + +const ( + syncPollIntervalMax = 20 * time.Millisecond + syncPollIntervalMin = 1 * time.Microsecond + + syncTimeout = 2 * time.Minute +) + +func (i *partition) waitSync(waitUntil int64) error { + nextWait := syncPollIntervalMin + start := time.Now() + + for i.writeFlushed.Load() < waitUntil { + time.Sleep(nextWait) + + if time.Since(start) > syncTimeout { + return xerrors.Errorf("sync timeout") + } + + if nextWait < syncPollIntervalMax { + nextWait *= 2 + } else { + nextWait = syncPollIntervalMax + } + } + + return nil +} + /* ---------------------------- ----------- READ ----------- @@ -314,23 +390,190 @@ func (i *partition) flush(writesRecv int64) error { */ func (l *LogBsstIndex) Has(c []mh.Multihash) ([]bool, error) { - //TODO implement me - panic("implement me") + l.partsLk.Lock() + partitions := l.partitions // copy so that appends in write don't swap the slice under us + partsLen := len(partitions) // append can change len too, so save it here + l.partsLk.Unlock() + + results := make([]bool, len(c)) + + for i, multihash := range c { + // look through partitions starting with last (reads into recent writes are more likely) + for p := partsLen - 1; p >= 0; p-- { + part := partitions[p] + if !part.compacted.Load() { + // it's maybe not a compacted partition + part.logIndexLk.Lock() + + // compaction may finish before taking logIndexLk, so check again + if part.logIndex != nil { + if _, ok := part.logIndex[string(multihash)]; ok { + results[i] = true + part.logIndexLk.Unlock() + break + } + } + + part.logIndexLk.Unlock() + } + + // we HAVE TO load again, compaction may have juust finished + if part.compacted.Load() { + off, err := part.bss.Has([]mh.Multihash{multihash}) // todo HasSingle, not much advantage here from slices + if err != nil { + return nil, err + } + + if off[0] { + results[i] = true + break + } + } + } + } + + return results, nil } func (l *LogBsstIndex) Get(c []mh.Multihash) ([]int64, error) { - //TODO implement me - panic("implement me") + l.partsLk.Lock() + partitions := l.partitions // copy so that appends in write don't swap the slice under us + partsLen := len(partitions) // append can change len too, so save it here + l.partsLk.Unlock() + + results := make([]int64, len(c)) + + for i, multihash := range c { + results[i] = -1 + + // look through partitions starting with last (reads into recent writes are more likely) + for p := partsLen - 1; p >= 0; p-- { + part := partitions[p] + if !part.compacted.Load() { + // it's maybe not a compacted partition + part.logIndexLk.Lock() + + // compaction may finish before taking logIndexLk, so check again + if part.logIndex != nil { + if off, ok := part.logIndex[string(multihash)]; ok { + results[i] = off + part.logIndexLk.Unlock() + break + } + } + + part.logIndexLk.Unlock() + } + + // we HAVE TO load again, compaction may have juust finished + if part.compacted.Load() { + off, err := part.bss.Get([]mh.Multihash{multihash}) // todo GetSingle, not much advantage here from slices + if err != nil { + return nil, err + } + + if off[0] != -1 { + results[i] = off[0] + break + } + } + + } + } + + return results, nil } func (l *LogBsstIndex) Entries() (int64, error) { - //TODO implement me - panic("implement me") + l.partsLk.Lock() + defer l.partsLk.Unlock() + + var total int64 + for _, p := range l.partitions { + total += p.writeFlushed.Load() + } + + return total, nil } func (l *LogBsstIndex) List(f func(c mh.Multihash, offs []int64) error) error { - //TODO implement me - panic("implement me") + return xerrors.Errorf("use advanced") +} + +func (l *LogBsstIndex) ListAdv(f func([32]byte) error) error { + // don't allow writes while outputting the list + // writes shouldn't be routed into this index anyways when this is called + + l.writeLk.Lock() + defer l.writeLk.Unlock() + + l.partsLk.Lock() // technically we don't even need this lock, but might as well take it + partitions := l.partitions // copy so that appends in write don't swap the slice under us + partsLen := len(partitions) // append can change len too, so save it here + l.partsLk.Unlock() + + syncs := make([]int64, partsLen) + + for p := 0; p < partsLen; p++ { + syncs[p] = partitions[p].startSync() + } + for p := 0; p < partsLen; p++ { + if err := partitions[p].waitSync(syncs[p]); err != nil { + return xerrors.Errorf("waiting for sync: %w", err) + } + } + + partitionIterators := make([]func() ([32]byte, error), partsLen) + + for p := 0; p < partsLen; p++ { + part := partitions[p] + + if !part.compacted.Load() { + // not compacted, soo: + // * if in process of compaction, wait for it to finish + // * if not in process of compaction, read and sort the log + + if !part.compacting.Load() { + // not compacting, read from log + // sync is done, so we just read the log file and sort it like in compaction + + part.logIndexLk.Lock() + + // one last, last level of making sure we're not reading from a compacted partition + // if part.logIndex is nil, the partition somehow got compacted between now and reading part.compacting above + if part.logIndex != nil { + data, err := part.mhhSortedLog(syncs[p]) + part.logIndexLk.Unlock() + + if err != nil { + return xerrors.Errorf("loading sorted log for partition %d: %w", p, err) + } + + partitionIterators[p] = func() ([32]byte, error) { + + } + + continuee + } + part.logIndexLk.Unlock() + + // if we're here, magically somehow the partition got compacted. This is weird but actually fine + partitionIterators[p] = func() ([32]byte, error) { + + } + + } + + // compaction in progress, wait for it to finish + for !part.compacted.Load() { + time.Sleep(50 * time.Millisecond) + } + } + + // definitely compacted here, read from bss + + } + } func (l *LogBsstIndex) ToTruncate(atOrAbove int64) ([]mh.Multihash, error) {