Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions bsst/bsst.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,260 @@ type BSST struct {

var BsstCIDSampleSize = 1024

func Create2(path string, entries int64, source Source) (bsst *BSST, err error) {
var salt [32]byte
if _, err := rand.Read(salt[:]); err != nil {
return nil, xerrors.Errorf("generate salt: %w", err)
}

header := &BSSTHeader{
Salt: salt,
}

var nextLevel []multiHashHash

createSample := make([]multihash.Multihash, 0, BsstCIDSampleSize) // TODO!!!

err = source.List(func(c multihash.Multihash, offs []int64) error {
if len(createSample) < BsstCIDSampleSize {
// copy c data

c := append([]byte(nil), c...)
createSample = append(createSample, c)
}
for i, off := range offs {
nextLevel = append(nextLevel, header.makeMHH(c, int64(i), off))
}
return nil
})
if err != nil {
return nil, xerrors.Errorf("write buckets: %w", err)
}

// sort buckets

// todo parallel merge sort
sort.Slice(nextLevel, func(i, j int) bool {
return bytes.Compare(nextLevel[i].mhh[:], nextLevel[j].mhh[:]) < 0
})

bsst, err = CreateAdv(path, entries, salt, func(i int64) []byte { // todo test
var ent [EntrySize]byte
copy(ent[:], nextLevel[i].mhh[:])
binary.LittleEndian.PutUint64(ent[EntKeyBytes:], uint64(nextLevel[i].off))
return ent[:]
})
if err != nil {
return nil, xerrors.Errorf("create adv: %w", err)
}

bsst.CreateSample = createSample
return bsst, nil
}

func CreateAdv(path string, entries int64, salt [32]byte, data func(i int64) []byte) (bsst *BSST, err error) {
f, err := os.Create(path)
if err != nil {
return nil, xerrors.Errorf("open file: %w", err)
}

header := &BSSTHeader{
L0Buckets: (entries + MeanEntriesPerBucket - 1) / MeanEntriesPerBucket, // ceil entries / MeanEntriesPerBucket
BucketSize: BucketSize,
Entries: entries,

Levels: 1,
LevelFactor: LevelFactor,
Finalized: false,

Salt: salt,
}

var hdrBuf [BucketSize]byte

copy(hdrBuf[:], BsstMagic)

if err := header.MarshalCBOR(bytes.NewBuffer(hdrBuf[8:])); err != nil {
return nil, xerrors.Errorf("marshal header: %w", err)
}

if _, err := f.Write(hdrBuf[:]); err != nil {
return nil, xerrors.Errorf("write header: %w", err)
}

// sync header
if err := f.Sync(); err != nil {
return nil, xerrors.Errorf("sync header: %w", err)
}

// write buckets
bufWriter := bufio.NewWriterSize(f, 4<<20)
nullEntry := [EntrySize]byte{}

inBucket := uint64(0)
bucketEnts := uint64(0)

var bloomHead [BucketBloomFilterSize]byte

flushBucket := func(bucketIdx uint64) error {
if inBucket > bucketIdx {
panic("buckets not sorted")
}

//bSizes[bucketEnts]++

// pad up to BucketUserEntries entries
for bucketEnts < BucketUserEntries {
if _, err := bufWriter.Write(nullEntry[:]); err != nil {
return xerrors.Errorf("write padding entry: %w", err)
}
bucketEnts++
}

// write bloom filter / header padding
if _, err := bufWriter.Write(bloomHead[:]); err != nil {
return xerrors.Errorf("write header entry filter: %w", err)
}

// reset temp vars
inBucket++
bucketEnts = 0

return nil
}

var ents uint64
var level int
levelBuckets := uint64(header.L0Buckets)
prevLevelBuckets := uint64(0)

var nextLevel [][]byte
first := true
entsToIter := entries

for len(nextLevel) > 0 || first {
list := nextLevel
nextLevel = [][]byte{}

layerSource := data
if !first {
layerSource = func(i int64) []byte {
return list[i]
}
entsToIter = int64(len(list))
}

first = false

bucketRange := math.MaxUint64 / levelBuckets // todo techincally +1?? (if changing note this is also calculated below)

for i := int64(0); i < entsToIter; i++ {
entry := layerSource(i)

// first 64 bits of hash to calculate bucket
hashidx := binary.BigEndian.Uint64(entry[:8])
bucketIdx := prevLevelBuckets + (hashidx / bucketRange)
bloomEntIdx := (hashidx % bucketRange) / (bucketRange / BucketBloomFilterEntries)

for inBucket != bucketIdx {
if err := flushBucket(bucketIdx); err != nil {
return nil, err
}

// reset bloom
for i := range bloomHead[:BucketBloomFilterSize] {
bloomHead[i] = 0
}
}

// update bloom filter first, so that even if this entry won't fit in this level, we know that it doesn't
// exist in one i/o if it's not in bloom
bloomHead[bloomEntIdx/8] |= 1 << (bloomEntIdx % 8)

// check if we have space for this entry
if bucketEnts >= BucketUserEntries {
nextLevel = append(nextLevel, entry)
continue
}

// write entry
if _, err := bufWriter.Write(entry[:EntrySize]); err != nil {
return nil, xerrors.Errorf("write entry: %w", err)
}

bucketEnts++
ents++
}

// flush last buckets
for inBucket < levelBuckets+prevLevelBuckets { // todo is the condition right?
if err := flushBucket(levelBuckets + prevLevelBuckets); err != nil {
return nil, err
}

// todo could only do this once
for i := range bloomHead[:BucketBloomFilterSize] {
bloomHead[i] = 0
}
}

level++
prevLevelBuckets += levelBuckets
levelBuckets = (levelBuckets + LevelFactor - 1) / LevelFactor // ceil(levelBuckets / LevelFactor)
}

header.Levels = int64(level)

if err := bufWriter.Flush(); err != nil {
return nil, xerrors.Errorf("flush buckets: %w", err)
}

if err := f.Sync(); err != nil {
return nil, xerrors.Errorf("sync buckets: %w", err)
}

// finalize header
header.Finalized = true
if _, err := f.Seek(8, io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek to header: %w", err)
}

if err := header.MarshalCBOR(f); err != nil {
return nil, xerrors.Errorf("marshal header: %w", err)
}

if err := f.Sync(); err != nil {
return nil, xerrors.Errorf("sync header: %w", err)
}

// print bSizes sorted by key
/*var keys []int
for k := range bSizes {
keys = append(keys, int(k))
}
sort.Ints(keys)*/

/*var ibe int64
for _, k := range keys {
ibe += bSizes[uint64(k)] * int64(k)
//fmt.Printf("bs %d: %d\n", k, bSizes[uint64(k)])
}*/

//fmt.Println("ents ", ents, " bkt ", header.L0Buckets, " ibe ", ibe)

if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close written bsst: %w", err)
}

// reopen in read-only mode
bsst, err = Open(path)
if err != nil {
return nil, xerrors.Errorf("reopen bsst: %w", err)
}

return bsst, nil
}

func Create(path string, entries int64, source Source) (bsst *BSST, err error) {
f, err := os.Create(path)
if err != nil {
Expand Down Expand Up @@ -471,6 +725,7 @@ func (h *BSST) Get(c []multihash.Multihash) ([]int64, error) {

top:
for i, k := range keys {
out[i] = -1
levelBuckets := uint64(h.h.L0Buckets)
prevLevelBuckets := uint64(0)

Expand Down
30 changes: 28 additions & 2 deletions bsst/bsst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func (t *testSource) List(f func(c mh.Multihash, offs []int64) error) error {
var _ Source = (*testSource)(nil)

func TestBSSTCreate(t *testing.T) {

n := int64(13421280)
n := int64(13_421_280)

bsst, err := Create(filepath.Join(t.TempDir(), "/a.bsst"), n, &testSource{n: n})
require.NoError(t, err)
Expand All @@ -60,3 +59,30 @@ func TestBSSTCreate(t *testing.T) {
require.NoError(t, err)
require.Equal(t, n, got)
}

func TestBSSTCreate2(t *testing.T) {
n := int64(13_421_280)

bsst, err := Create2(filepath.Join(t.TempDir(), "/a.bsst"), n, &testSource{n: n})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, bsst.Close()) })

var got int64

err = (&testSource{n: n}).List(func(c mh.Multihash, offs []int64) error {
h, err := bsst.Has([]mh.Multihash{c})
require.NoError(t, err)
if !h[0] {
require.True(t, h[0]) // todo probably full buckets
}

r, err := bsst.Get([]mh.Multihash{c})
require.NoError(t, err)
require.Equal(t, offs[0], r[0])

got++
return nil
})
require.NoError(t, err)
require.Equal(t, n, got)
}
1 change: 1 addition & 0 deletions carlog/carlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ func (j *CarLog) Commit() (int64, error) {
defer j.idxLk.RUnlock()

// todo log commit?
j.wIdx.Sync()

if err := j.flushBuffered(); err != nil {
return 0, xerrors.Errorf("flushing buffered data: %w", err)
Expand Down
Loading