Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b307c97
add buffer pool & spsc_queue
iaojnh Feb 2, 2026
a96e684
add buffer pool & open buffer storage ut
iaojnh Feb 4, 2026
03e4dbc
modify buffer pool
iaojnh Feb 6, 2026
7df2716
upd buffer pool
iaojnh Feb 9, 2026
2c610cf
Merge branch 'main' into feat/buffer_storage_vec
iaojnh Feb 9, 2026
11a0e47
clang format
iaojnh Feb 9, 2026
55e6f1b
clang format
iaojnh Feb 10, 2026
b24d921
clang format
iaojnh Feb 10, 2026
8916f90
clang format
iaojnh Feb 10, 2026
e3d014c
fix bugs
iaojnh Feb 10, 2026
4694835
Merge branch 'main' into feat/buffer_storage_vec
iaojnh Feb 10, 2026
ed6a3f2
fix complie“
iaojnh Feb 11, 2026
95b1c16
clang format
iaojnh Feb 11, 2026
d6db41d
fix ut
iaojnh Feb 11, 2026
8dd8e48
fix: combined indexer should use key instead of index (#87)
chinaux Feb 12, 2026
753cc0d
feat: support ai extension (#88)
Cuiyus Feb 12, 2026
b83cf52
fix(py): py with build-in package typing not typing_extensions (#99)
Cuiyus Feb 12, 2026
c79f0b0
minor: add installation instruction for node.js package (#103)
zhourrr Feb 13, 2026
42fa524
feat(ci): macos ci with github-runner (#94)
Cuiyus Feb 13, 2026
34e7ced
minor: add links to package repository
zhourrr Feb 13, 2026
a4f3de8
chore: add trend badge (#132)
feihongxu0824 Feb 15, 2026
d72a074
docs: fix repository URL in CONTRIBUTING.md (#139)
Junio243 Feb 17, 2026
39f0437
fix(docs): fix typo in README align attr and Python version in CONTRI…
cluster2600 Feb 20, 2026
e956192
ci: continuous benching (#110)
JalinWang Feb 25, 2026
779c63d
docs: adjust join us in the readme. (#168)
Cuiyus Feb 25, 2026
a7c6aa1
chore: enable the conventional-pre-commit run sucess and update to la…
SYaoJun Feb 25, 2026
fc988e3
Upgrade GitHub Actions for Node 24 compatibility (#129)
salmanmkc Feb 25, 2026
49e2d34
Upgrade GitHub Actions to latest versions (#130)
salmanmkc Feb 25, 2026
1dfeda6
feat(ci): ci workflow with github-hosted runner (#171)
Cuiyus Feb 26, 2026
1fef6e6
feat: add jina embeddings v5 support (#156)
hanxiao Feb 26, 2026
573f20f
Merge branch 'main' into feat/buffer_storage_vec
iaojnh Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions src/ailego/buffer/buffer_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include <zvec/ailego/buffer/buffer_pool.h>
#include <zvec/core/framework/index_logger.h>

namespace zvec {
namespace ailego {

int LRUCache::init(size_t block_size) {
block_size_ = block_size;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
queues_.push_back(ConcurrentQueue(block_size));
}
return 0;
}

bool LRUCache::evict_single_block(BlockType &item) {
bool found = false;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
found = queues_[i].try_dequeue(item);
if (found) {
break;
}
}
return found;
}

bool LRUCache::add_single_block(const LPMap *lp_map, const BlockType &block,
int block_type) {
bool ok = queues_[block_type].try_enqueue(block);
evict_queue_insertions_.fetch_add(1, std::memory_order_relaxed);
if (evict_queue_insertions_ % block_size_ == 0) {
this->clear_dead_node(lp_map);
}
return ok;
}

void LRUCache::clear_dead_node(const LPMap *lp_map) {
for (int i = 0; i < CATCH_QUEUE_NUM; i++) {
int clear_size = block_size_ * 2;
if (queues_[i].size_approx() < clear_size * 4) {
continue;
}
int clear_count = 0;
ConcurrentQueue tmp(block_size_);
BlockType item;
while (queues_[i].try_dequeue(item) && (clear_count++ < clear_size)) {
if (!lp_map->isDeadBlock(item)) {
tmp.try_enqueue(item);
}
}
while (tmp.try_dequeue(item)) {
if (!lp_map->isDeadBlock(item)) {
queues_[i].try_enqueue(item);
}
}
}
}

void LPMap::init(size_t entry_num) {
if (entries_) {
delete[] entries_;
}
entry_num_ = entry_num;
entries_ = new Entry[entry_num_];
for (size_t i = 0; i < entry_num_; i++) {
entries_[i].ref_count.store(std::numeric_limits<int>::min());
entries_[i].load_count.store(0);
entries_[i].buffer = nullptr;
}
cache_.init(entry_num * 4);
}

char *LPMap::acquire_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) == 0) {
entry.load_count.fetch_add(1, std::memory_order_relaxed);
}
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
if (entry.ref_count.load(std::memory_order_relaxed) < 0) {
return nullptr;
}
return entry.buffer;
Comment on lines +75 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race condition: ref_count is checked at line 75, incremented at 78, then checked again at 79 - another thread could evict the block between these operations, making entry.buffer invalid by line 82

}

void LPMap::release_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];

if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
LRUCache::BlockType block;
block.first = block_id;
block.second = entry.load_count.load();
cache_.add_single_block(this, block, 0);
}
}

char *LPMap::evict_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
int expected = 0;
if (entry.ref_count.compare_exchange_strong(
expected, std::numeric_limits<int>::min())) {
char *buffer = entry.buffer;
entry.buffer = nullptr;
return buffer;
} else {
return nullptr;
}
}

char *LPMap::set_block_acquired(block_id_t block_id, char *buffer) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) >= 0) {
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
return entry.buffer;
}
entry.buffer = buffer;
entry.ref_count.store(1, std::memory_order_relaxed);
entry.load_count.fetch_add(1, std::memory_order_relaxed);
return buffer;
}

void LPMap::recycle(moodycamel::ConcurrentQueue<char *> &free_buffers) {
LRUCache::BlockType block;
do {
bool ok = cache_.evict_single_block(block);
if (!ok) {
return;
}
} while (isDeadBlock(block));
char *buffer = evict_block(block.first);
if (buffer) {
free_buffers.try_enqueue(buffer);
}
}

VecBufferPool::VecBufferPool(const std::string &filename) {
fd_ = open(filename.c_str(), O_RDONLY);
if (fd_ < 0) {
throw std::runtime_error("Failed to open file: " + filename);
}
struct stat st;
if (fstat(fd_, &st) < 0) {
throw std::runtime_error("Failed to stat file: " + filename);
}
file_size_ = st.st_size;
}

int VecBufferPool::init(size_t pool_capacity, size_t block_size) {
pool_capacity_ = pool_capacity;
size_t buffer_num = pool_capacity_ / block_size + 10;
size_t block_num = file_size_ / block_size + 10;
lp_map_.init(block_num);
for (size_t i = 0; i < buffer_num; i++) {
char *buffer = (char *)aligned_alloc(64, block_size);
if (buffer != nullptr) {
bool ok = free_buffers_.try_enqueue(buffer);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused variable ok

Suggested change
bool ok = free_buffers_.try_enqueue(buffer);
free_buffers_.try_enqueue(buffer);

}
Comment on lines +157 to +160
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak when aligned_alloc fails - allocated buffers are not freed on failure, and no error is returned

}
LOG_DEBUG("Buffer pool num: %zu, entry num: %zu", buffer_num,
lp_map_.entry_num());
return 0;
}

VecBufferPoolHandle VecBufferPool::get_handle() {
return VecBufferPoolHandle(*this);
}

char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset,
size_t size, int retry) {
char *buffer = lp_map_.acquire_block(block_id);
if (buffer) {
return buffer;
}
{
bool found = free_buffers_.try_dequeue(buffer);
if (!found) {
for (int i = 0; i < retry; i++) {
lp_map_.recycle(free_buffers_);
found = free_buffers_.try_dequeue(buffer);
if (found) {
break;
}
}
}
if (!found) {
LOG_ERROR("Buffer pool failed to get free buffer");
return nullptr;
}
}

ssize_t read_bytes = pread(fd_, buffer, size, offset);
if (read_bytes != static_cast<ssize_t>(size)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return nullptr;
}
Comment on lines +194 to +198
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak when pread fails - buffer is not returned to free_buffers_, causing the pool to permanently lose this buffer

char *placed_buffer = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
placed_buffer = lp_map_.set_block_acquired(block_id, buffer);
}
if (placed_buffer != buffer) {
// another thread has set the block
free_buffers_.try_enqueue(buffer);
}
return placed_buffer;
}

int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
ssize_t read_bytes = pread(fd_, buffer, length, offset);
if (read_bytes != static_cast<ssize_t>(length)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return -1;
}
return 0;
}

char *VecBufferPoolHandle::get_block(size_t offset, size_t size,
size_t block_id) {
char *buffer = pool.acquire_buffer(block_id, offset, size, 5);
return buffer;
}

int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) {
return pool.get_meta(offset, length, buffer);
}

void VecBufferPoolHandle::release_one(block_id_t block_id) {
pool.lp_map_.release_block(block_id);
}

void VecBufferPoolHandle::acquire_one(block_id_t block_id) {
pool.lp_map_.acquire_block(block_id);
}

} // namespace ailego
} // namespace zvec
10 changes: 9 additions & 1 deletion src/core/algorithm/flat/flat_streamer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,18 @@ class FlatStreamerContext : public IndexStreamer::Context {
group_topk_heaps_.clear();
}

void reset() override {}
void reset() override {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

//! Reset the context
void reset(const FlatStreamer<BATCH_SIZE> *owner) {
this->reset();
magic_ = owner->magic();
feature_size_ = owner->meta().element_size();

Expand Down
4 changes: 4 additions & 0 deletions src/core/algorithm/hnsw/hnsw_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class HnswContext : public IndexContext {

//! Reset context
void reset(void) override {
this->clear();
set_filter(nullptr);
reset_threshold();
set_fetch_vector(false);
Expand Down Expand Up @@ -422,6 +423,9 @@ class HnswContext : public IndexContext {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

uint32_t *mutable_stats_get_neighbors() {
Expand Down
3 changes: 1 addition & 2 deletions src/core/algorithm/hnsw/hnsw_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ struct Neighbors {
Neighbors(uint32_t cnt_in, const node_id_t *data_in)
: cnt{cnt_in}, data{data_in} {}

Neighbors(IndexStorage::MemoryBlock &&mem_block)
: neighbor_block{std::move(mem_block)} {
Neighbors(IndexStorage::MemoryBlock &mem_block) : neighbor_block{mem_block} {
auto hd = reinterpret_cast<const NeighborsHeader *>(neighbor_block.data());
cnt = hd->neighbor_cnt;
data = hd->neighbors;
Expand Down
2 changes: 1 addition & 1 deletion src/core/algorithm/hnsw/hnsw_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const Neighbors HnswStreamerEntity::get_neighbors(level_t level,
LOG_ERROR("Read neighbor header failed, ret=%zu", size);
return Neighbors();
}
return Neighbors(std::move(neighbor_block));
return Neighbors(neighbor_block);
}

//! Get vector data by key
Expand Down
7 changes: 5 additions & 2 deletions src/core/interface/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,9 @@ int Index::Search(const VectorData &vector_data,
}

// dense support refiner, but sparse doesn't
int ret = 0;
if (search_param->refiner_param == nullptr) {
return _dense_search(vector_data, search_param, result, context);
ret = _dense_search(vector_data, search_param, result, context);
} else {
auto &reference_index = search_param->refiner_param->reference_index;
if (reference_index == nullptr) {
Expand Down Expand Up @@ -441,8 +442,10 @@ int Index::Search(const VectorData &vector_data,
// TODO: should copy other params?
flat_search_param->bf_pks = std::make_shared<std::vector<uint64_t>>(keys);

return reference_index->Search(vector_data, flat_search_param, result);
ret = reference_index->Search(vector_data, flat_search_param, result);
}
context->reset();
return ret;
}


Expand Down
Loading