From fff3f9d3510f68407c0f52ca2e32198b073844ce Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 16:27:06 +0200 Subject: [PATCH 1/8] perf(cubestore): make disk-space single-flight lock wait configurable The disk-space scan single-flight on main waited up to a hardcoded 10s for the compute lock. While a worker waits, its metastore RPC connection stays open, growing the simultaneous-open-connection footprint and the window for a reset. Cap it via CUBESTORE_DISK_SPACE_LOCK_WAIT_MS (default 1000ms, in milliseconds so sub-second values work) and fall back to the existing fail-open path on timeout. --- rust/cubestore/cubestore/src/config/mod.rs | 14 +++++++++++ rust/cubestore/cubestore/src/metastore/mod.rs | 24 +++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 44ea99bf871f5..81188080691ed 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -633,6 +633,8 @@ pub trait ConfigObj: DIService { fn disk_space_cache_duration_secs(&self) -> u64; + fn disk_space_compute_lock_timeout_ms(&self) -> u64; + fn transport_max_message_size(&self) -> usize; fn transport_max_frame_size(&self) -> usize; @@ -764,6 +766,7 @@ pub struct ConfigObjImpl { pub max_disk_space: u64, pub max_disk_space_per_worker: u64, pub disk_space_cache_duration_secs: u64, + pub disk_space_compute_lock_timeout_ms: u64, pub transport_max_message_size: usize, pub transport_max_frame_size: usize, pub local_files_cleanup_interval_secs: u64, @@ -1172,6 +1175,10 @@ impl ConfigObj for ConfigObjImpl { self.disk_space_cache_duration_secs } + fn disk_space_compute_lock_timeout_ms(&self) -> u64 { + self.disk_space_compute_lock_timeout_ms + } + fn transport_max_message_size(&self) -> usize { self.transport_max_message_size } @@ -1850,6 +1857,12 @@ impl Config { * 1024 * 1024, disk_space_cache_duration_secs: 300, + disk_space_compute_lock_timeout_ms: env_parse_duration( + "CUBESTORE_DISK_SPACE_LOCK_WAIT_MS", + 1000, + Some(60_000), + None, + ), transport_max_message_size, transport_max_frame_size: env_parse_size( "CUBESTORE_TRANSPORT_MAX_FRAME_SIZE", @@ -2045,6 +2058,7 @@ impl Config { max_disk_space: 0, max_disk_space_per_worker: 0, disk_space_cache_duration_secs: 0, + disk_space_compute_lock_timeout_ms: 1000, transport_max_message_size: 64 << 20, transport_max_frame_size: 16 << 20, local_files_cleanup_interval_secs: 600, diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index b281cb7cda145..c3754722751cb 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -55,7 +55,7 @@ use crate::metastore::wal::{WALIndexKey, WALRocksIndex}; use crate::table::{Row, TableValue}; -use crate::util::lock::acquire_lock; +use crate::util::lock::{acquire_lock, acquire_lock_duration}; use crate::util::WorkerLoop; use crate::{meta_store_table_impl, CubeError}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -2829,21 +2829,25 @@ impl MetaStore for RocksMetaStore { // Single-flight: serialize the scan so a burst of concurrent callers // (e.g. many partition writes during an import/repartition) share one // computation instead of each materializing a full metastore scan. - let _compute_guard = - match acquire_lock("disk space compute", self.disk_space_compute_lock.lock()).await - { - Ok(guard) => guard, - Err(e) => { - log::error!( + let _compute_guard = match acquire_lock_duration( + "disk space compute", + self.disk_space_compute_lock.lock(), + Duration::from_millis(self.store.config.disk_space_compute_lock_timeout_ms()), + ) + .await + { + Ok(guard) => guard, + Err(e) => { + log::error!( "Timed out waiting for the disk space scan lock: {}. The single-flight \ scan is stuck; reporting 0 used disk space so the disk-space check \ passes. THE DISK-SPACE LIMIT IS NOT BEING ENFORCED until the scan \ recovers.", e ); - return Ok(0); - } - }; + return Ok(0); + } + }; if let Some(sizes) = self.disk_space_cached().await? { sizes } else { From 7b6c60173c33db3c6452c88d143765493f4bfd16 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 16:50:46 +0200 Subject: [PATCH 2/8] perf(cubestore): load the table once per partitioning job, not per chunk add_chunk_columns and post_process_columns each re-fetched the table over the metastore RPC for every chunk they produced, even though the table is the same for the whole import/repartition job and immutable for its duration. On a wide table this is one redundant read RPC per chunk, each a separate worker->main connection feeding the connect-per-RPC storm. Thread the table down instead: build_index_chunks loads it once before the index loop, partition_rows loads it once for the single-index path, and add_persistent_chunk loads it once. compact() now reuses the table already returned by get_partition_for_compaction instead of re-fetching it. --- .../cubestore/src/store/compaction.rs | 22 ++++++--- rust/cubestore/cubestore/src/store/mod.rs | 45 +++++++++++-------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index 601e14d6e9e7d..e61a1c42e6131 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -694,10 +694,8 @@ impl CompactionService for CompactionServiceImpl { None => Arc::new(EmptyExec::new(schema.clone())), }; - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; + // `table` is already loaded by get_partition_for_compaction above and is immutable for + // the duration of the job, so reuse it instead of re-fetching over the metastore RPC. let unique_key = table.get_row().unique_key_columns(); let aggregate_columns = match index.get_row().get_type() { IndexType::Regular => None, @@ -2361,7 +2359,13 @@ mod tests { ]; let (chunk, _) = chunk_store - .add_chunk_columns(aggr_index.clone(), partition.clone(), data1.clone(), false) + .add_chunk_columns( + aggr_index.clone(), + &table, + partition.clone(), + data1.clone(), + false, + ) .await .unwrap() .await @@ -2370,7 +2374,13 @@ mod tests { metastore.chunk_uploaded(chunk.get_id()).await.unwrap(); let (chunk, _) = chunk_store - .add_chunk_columns(aggr_index.clone(), partition.clone(), data2.clone(), false) + .add_chunk_columns( + aggr_index.clone(), + &table, + partition.clone(), + data2.clone(), + false, + ) .await .unwrap() .await diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 1be584b9384b5..cb0e25ffe1ad5 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -859,8 +859,13 @@ impl ChunkDataStore for ChunkStore { partition: IdRow, batch: RecordBatch, ) -> Result<(IdRow, Option), CubeError> { + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; self.add_chunk_columns( index.clone(), + &table, partition.clone(), batch.columns().to_vec(), false, @@ -1555,7 +1560,7 @@ mod tests { let data = rows_to_columns(&col, data_frame.get_rows().as_slice()); let (chunk, file_size) = chunk_store - .add_chunk_columns(index, partition, data.clone(), false) + .add_chunk_columns(index, &table, partition, data.clone(), false) .await .unwrap() .await @@ -1783,7 +1788,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -1973,7 +1978,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2142,7 +2147,7 @@ mod tests { for _ in 0..2 { let data = rows_to_columns(&col, &[]); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2307,7 +2312,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2500,13 +2505,18 @@ impl ChunkStore { in_memory: bool, ) -> Result, Option), CubeError>>>, CubeError> { let index = self.meta_store.get_index(index_id).await?; - self.partition_rows_for_index(&index, columns, in_memory) + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; + self.partition_rows_for_index(&index, &table, columns, in_memory) .await } #[tracing::instrument(level = "trace", skip(self, columns))] async fn partition_rows_for_index( &self, index: &IdRow, + table: &IdRow, mut columns: Vec, in_memory: bool, ) -> Result, Option), CubeError>>>, CubeError> { @@ -2591,10 +2601,13 @@ impl ChunkStore { .iter() .map(|c| datafusion::arrow::compute::take(c.as_ref(), &to_write, None)) .collect::, _>>()?; - let columns = self.post_process_columns(index.clone(), columns).await?; + let columns = self + .post_process_columns(index.clone(), table, columns) + .await?; futures.push(self.add_chunk_columns( index.clone(), + table, partition.clone(), columns, in_memory, @@ -2651,15 +2664,12 @@ impl ChunkStore { async fn post_process_columns( &self, index: IdRow, + table: &IdRow
, data: Vec, ) -> Result, CubeError> { match index.get_row().get_type() { IndexType::Regular => Ok(data), IndexType::Aggregate => { - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; let schema = Arc::new(arrow_schema(&index.get_row())); let batch = RecordBatch::try_new(schema.clone(), data)?; @@ -2731,6 +2741,7 @@ impl ChunkStore { async fn add_chunk_columns( &self, index: IdRow, + table: &IdRow
, partition: IdRow, data: Vec, in_memory: bool, @@ -2767,18 +2778,13 @@ impl ChunkStore { let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; - let parquet = ParquetTableStore::new( index.get_row().clone(), ROW_GROUP_SIZE, metadata_cache_factory, ); - let writer_props = parquet.writer_props(&table).await?; + let writer_props = parquet.writer_props(table).await?; cube_ext::spawn_blocking(move || -> Result<(), CubeError> { parquet.write_data_given_props(&local_file_copy, data, writer_props) }) @@ -2804,6 +2810,9 @@ impl ChunkStore { in_memory: bool, ) -> Result, CubeError> { let mut rows = rows.0; + // The table is the same for every index/chunk produced by this call, so load it once + // here instead of re-fetching it per chunk over the metastore RPC downstream. + let table = self.meta_store.get_table_by_id(table_id).await?; let mut futures = Vec::new(); for index in indexes.iter() { let index_columns = index.get_row().columns(); @@ -2816,7 +2825,7 @@ impl ChunkStore { .await?; let remapped = remapped?; rows = rows_again; - futures.push(self.partition_rows_for_index(&index, remapped, in_memory)); + futures.push(self.partition_rows_for_index(&index, &table, remapped, in_memory)); } let new_chunks = join_all(futures) From e41fe00fcf21a29dccc4a1857a1e7e6c1b18f7fd Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 17:21:26 +0200 Subject: [PATCH 3/8] perf(cubestore): dedup the per-node disk-space check across a partitioning job partition_rows_for_index ran check_node_disk_space for every partition it wrote to, each a separate get_used_disk_space_out_of_queue metastore RPC. The check resolves to a per-node total, so checking each distinct target node once is enough and the answer is identical. Track checked nodes in a set and skip repeats, cutting the per-partition RPC fan-out to one per node. --- rust/cubestore/cubestore/src/store/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index cb0e25ffe1ad5..908ecf51ef5e6 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -2572,6 +2572,9 @@ impl ChunkStore { } let mut futures = Vec::new(); + // The disk-space check resolves to a per-node total, so checking each distinct target + // node once is enough; this avoids one metastore RPC per partition written. + let mut checked_nodes: HashSet = HashSet::new(); for partition in partitions.into_iter() { let min = partition.get_row().get_min_val().as_ref(); let max = partition.get_row().get_max_val().as_ref(); @@ -2593,7 +2596,10 @@ impl ChunkStore { ) > Ordering::Equal) }); if to_write.len() > 0 { - if !in_memory { + if !in_memory + && checked_nodes + .insert(node_name_by_partition(self.config.as_ref(), &partition)) + { self.check_node_disk_space(&partition).await?; } let to_write = UInt64Array::from(to_write); From 5fd8f1abdd0a976a31f48460b137696bb6af610a Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 17:47:17 +0200 Subject: [PATCH 4/8] perf(cubestore): batch active-partition fetch across indexes behind a flag During a partitioning job build_index_chunks fetched active partitions with one get_active_partitions_by_index_id RPC per index. Add a batched get_active_partitions_for_indexes(Vec) that returns them for all indexes in a single metastore round-trip, gated behind CUBESTORE_METASTORE_BATCH_RPC (default off). When the flag is off the per-index path is unchanged. partition_rows_for_index takes the active partitions as an optional preset; a missing map entry in batch mode is an internal error rather than a silent empty set (which would trip the corrupt-data path). Tests cover the metastore method against the per-index method and the flag-on path through build_index_chunks with multiple indexes. --- rust/cubestore/cubestore/src/config/mod.rs | 9 + rust/cubestore/cubestore/src/metastore/mod.rs | 115 +++++++++++++ .../cubestore/src/queryplanner/test_utils.rs | 7 + rust/cubestore/cubestore/src/store/mod.rs | 161 +++++++++++++++++- 4 files changed, 286 insertions(+), 6 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 81188080691ed..5cba4c1222959 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -635,6 +635,8 @@ pub trait ConfigObj: DIService { fn disk_space_compute_lock_timeout_ms(&self) -> u64; + fn metastore_batch_rpc(&self) -> bool; + fn transport_max_message_size(&self) -> usize; fn transport_max_frame_size(&self) -> usize; @@ -767,6 +769,7 @@ pub struct ConfigObjImpl { pub max_disk_space_per_worker: u64, pub disk_space_cache_duration_secs: u64, pub disk_space_compute_lock_timeout_ms: u64, + pub metastore_batch_rpc: bool, pub transport_max_message_size: usize, pub transport_max_frame_size: usize, pub local_files_cleanup_interval_secs: u64, @@ -1179,6 +1182,10 @@ impl ConfigObj for ConfigObjImpl { self.disk_space_compute_lock_timeout_ms } + fn metastore_batch_rpc(&self) -> bool { + self.metastore_batch_rpc + } + fn transport_max_message_size(&self) -> usize { self.transport_max_message_size } @@ -1863,6 +1870,7 @@ impl Config { Some(60_000), None, ), + metastore_batch_rpc: env_parse("CUBESTORE_METASTORE_BATCH_RPC", false), transport_max_message_size, transport_max_frame_size: env_parse_size( "CUBESTORE_TRANSPORT_MAX_FRAME_SIZE", @@ -2059,6 +2067,7 @@ impl Config { max_disk_space_per_worker: 0, disk_space_cache_duration_secs: 0, disk_space_compute_lock_timeout_ms: 1000, + metastore_batch_rpc: false, transport_max_message_size: 64 << 20, transport_max_frame_size: 16 << 20, local_files_cleanup_interval_secs: 600, diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index c3754722751cb..c587f1685b826 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -973,6 +973,10 @@ pub trait MetaStore: DIService + Send + Sync { &self, index_id: u64, ) -> Result>, CubeError>; + async fn get_active_partitions_for_indexes( + &self, + index_ids: Vec, + ) -> Result>>, CubeError>; async fn get_index(&self, index_id: u64) -> Result, CubeError>; async fn get_index_with_active_partitions_out_of_queue( @@ -3602,6 +3606,29 @@ impl MetaStore for RocksMetaStore { .await } + async fn get_active_partitions_for_indexes( + &self, + index_ids: Vec, + ) -> Result>>, CubeError> { + self.read_operation_out_of_queue("get_active_partitions_for_indexes", move |db_ref| { + let rocks_partition = PartitionRocksTable::new(db_ref); + let mut result = HashMap::with_capacity(index_ids.len()); + for index_id in index_ids { + let partitions = rocks_partition + .get_rows_by_index( + &PartitionIndexKey::ByIndexId(index_id), + &PartitionRocksIndex::IndexId, + )? + .into_iter() + .filter(|r| r.get_row().active) + .collect::>(); + result.insert(index_id, partitions); + } + Ok(result) + }) + .await + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_index(&self, index_id: u64) -> Result, CubeError> { self.read_operation("get_index", move |db_ref| { @@ -5805,6 +5832,94 @@ mod tests { Ok(()) } + #[tokio::test] + async fn get_active_partitions_for_indexes_test() -> Result<(), CubeError> { + init_test_logger().await; + + let (_remote_fs, meta_store) = + RocksMetaStore::prepare_test_metastore("get_active_partitions_for_indexes"); + + meta_store.create_schema("foo".to_string(), false).await?; + let columns = vec![ + Column::new("col1".to_string(), ColumnType::Int, 0), + Column::new("col2".to_string(), ColumnType::String, 1), + ]; + // Two tables → two default indexes, each with its own initial active partition. + let table1 = meta_store + .create_table( + "foo".to_string(), + "t1".to_string(), + columns.clone(), + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + let table2 = meta_store + .create_table( + "foo".to_string(), + "t2".to_string(), + columns.clone(), + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + + let index1 = meta_store.get_default_index(table1.get_id()).await?; + let index2 = meta_store.get_default_index(table2.get_id()).await?; + + let single1 = meta_store + .get_active_partitions_by_index_id(index1.get_id()) + .await?; + let single2 = meta_store + .get_active_partitions_by_index_id(index2.get_id()) + .await?; + + // Batch over both indexes plus a non-existent one must match the per-index calls and + // return an empty vec (not an error) for the unknown index. + let unknown_index_id = index2.get_id() + 1000; + let batch = meta_store + .get_active_partitions_for_indexes(vec![ + index1.get_id(), + index2.get_id(), + unknown_index_id, + ]) + .await?; + + let ids = |ps: &Vec>| ps.iter().map(|p| p.get_id()).collect::>(); + assert_eq!(batch.len(), 3); + assert_eq!(ids(&batch[&index1.get_id()]), ids(&single1)); + assert_eq!(ids(&batch[&index2.get_id()]), ids(&single2)); + assert!(batch[&unknown_index_id].is_empty()); + + Ok(()) + } + #[tokio::test] async fn table_test() -> Result<(), CubeError> { init_test_logger().await; diff --git a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs index 7580c4d3c97e2..50664de3b57ac 100644 --- a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs +++ b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs @@ -330,6 +330,13 @@ impl MetaStore for MetaStoreMock { panic!("MetaStore mock!") } + async fn get_active_partitions_for_indexes( + &self, + _index_ids: Vec, + ) -> Result>>, CubeError> { + panic!("MetaStore mock!") + } + async fn get_index(&self, _index_id: u64) -> Result, CubeError> { panic!("MetaStore mock!") } diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 908ecf51ef5e6..7578f553ca7c1 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -1581,6 +1581,119 @@ mod tests { let _ = fs::remove_dir_all(chunk_store_path.clone()); let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); } + + #[tokio::test] + async fn partition_data_with_batch_rpc() { + // Exercises the CUBESTORE_METASTORE_BATCH_RPC path: build_index_chunks fetches active + // partitions for all indexes in one get_active_partitions_for_indexes call and routes + // them per index. The produced chunks must still cover every input row. + let config = Config::test("partition_data_with_batch_rpc").update_config(|mut c| { + c.metastore_batch_rpc = true; + c + }); + let path = "/tmp/test_partition_data_batch"; + let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); + let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); + + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + { + let remote_fs = LocalDirRemoteFs::new( + Some(PathBuf::from(chunk_remote_store_path.clone())), + PathBuf::from(chunk_store_path.clone()), + ); + let meta_store = RocksMetaStore::new( + Path::new(path), + BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), + config.config_obj(), + ) + .unwrap(); + let chunk_store = ChunkStore::new( + meta_store.clone(), + remote_fs.clone(), + Arc::new(MockCluster::new()), + config.config_obj(), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), + 10, + ); + + let col = vec![ + Column::new("foo_int".to_string(), ColumnType::Int, 0), + Column::new("foo".to_string(), ColumnType::String, 1), + Column::new("boo".to_string(), ColumnType::String, 2), + ]; + let rows = (0..35) + .map(|i| { + Row::new(vec![ + TableValue::Int(34 - i), + TableValue::String(format!("Foo {}", 34 - i)), + TableValue::String(format!("Boo {}", 34 - i)), + ]) + }) + .collect::>(); + let data_frame = DataFrame::new(col.clone(), rows); + meta_store + .create_schema("foo".to_string(), false) + .await + .unwrap(); + // A secondary regular index so build_index_chunks loops over >1 index on the batch + // path and must route each index its own partitions. + let secondary = IndexDef { + name: "by_foo".to_string(), + columns: vec!["foo".to_string(), "foo_int".to_string()], + multi_index: None, + index_type: IndexType::Regular, + }; + let table = meta_store + .create_table( + "foo".to_string(), + "bar".to_string(), + col.clone(), + None, + None, + vec![secondary], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await + .unwrap(); + let index_count = meta_store + .get_table_indexes(table.get_id()) + .await + .unwrap() + .len(); + assert_eq!(index_count, 2, "default + secondary index expected"); + + let data = rows_to_columns(&col, data_frame.get_rows().as_slice()); + let jobs = chunk_store + .partition_data(table.get_id(), data, &col, false) + .await + .unwrap(); + assert!(!jobs.is_empty()); + let mut total_rows = 0u64; + for job in jobs { + let (chunk, _file_size) = job.await.unwrap().unwrap(); + total_rows += chunk.get_row().get_row_count(); + } + // Each index receives all 35 rows, routed to its own partition. + assert_eq!(total_rows, 35 * index_count as u64); + } + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + } + #[tokio::test] async fn create_aggr_chunk_test() { let config = Config::test("create_aggr_chunk_test"); @@ -2509,7 +2622,7 @@ impl ChunkStore { .meta_store .get_table_by_id(index.get_row().table_id()) .await?; - self.partition_rows_for_index(&index, &table, columns, in_memory) + self.partition_rows_for_index(&index, &table, None, columns, in_memory) .await } #[tracing::instrument(level = "trace", skip(self, columns))] @@ -2517,14 +2630,19 @@ impl ChunkStore { &self, index: &IdRow, table: &IdRow
, + preset_partitions: Option>>, mut columns: Vec, in_memory: bool, ) -> Result, Option), CubeError>>>, CubeError> { let index_id = index.get_id(); - let partitions = self - .meta_store - .get_active_partitions_by_index_id(index_id) - .await?; + let partitions = match preset_partitions { + Some(partitions) => partitions, + None => { + self.meta_store + .get_active_partitions_by_index_id(index_id) + .await? + } + }; let sort_key_size = index.get_row().sort_key_size() as usize; let expected_columns = index.get_row().get_columns().len(); @@ -2819,6 +2937,18 @@ impl ChunkStore { // The table is the same for every index/chunk produced by this call, so load it once // here instead of re-fetching it per chunk over the metastore RPC downstream. let table = self.meta_store.get_table_by_id(table_id).await?; + // When batching is enabled, fetch the active partitions of all indexes in one RPC + // instead of one per index inside partition_rows_for_index. + let mut partitions_by_index = if self.config.metastore_batch_rpc() { + let index_ids = indexes.iter().map(|i| i.get_id()).collect::>(); + Some( + self.meta_store + .get_active_partitions_for_indexes(index_ids) + .await?, + ) + } else { + None + }; let mut futures = Vec::new(); for index in indexes.iter() { let index_columns = index.get_row().columns(); @@ -2831,7 +2961,26 @@ impl ChunkStore { .await?; let remapped = remapped?; rows = rows_again; - futures.push(self.partition_rows_for_index(&index, &table, remapped, in_memory)); + // In batch mode the map always has an entry per requested index id (the RPC inserts + // one for every id, empty vec included). A missing key means the requested ids and + // the indexes iterated here diverged — an internal bug we surface rather than + // silently passing an empty set, which would trip the corrupt-data path below. + let preset_partitions = match partitions_by_index.as_mut() { + Some(map) => Some(map.remove(&index.get_id()).ok_or_else(|| { + CubeError::internal(format!( + "Active partitions missing for index {} in batched fetch", + index.get_id() + )) + })?), + None => None, + }; + futures.push(self.partition_rows_for_index( + &index, + &table, + preset_partitions, + remapped, + in_memory, + )); } let new_chunks = join_all(futures) From 358441b40f9fac186fb266d2770ba109a3a0a061 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 18:01:47 +0200 Subject: [PATCH 5/8] perf(cubestore): batch child-partition creation in splits behind the flag compact() and the multi-partition split both created child partitions one create_partition RPC at a time inside a loop. Add create_partitions(Vec) that inserts them all in one metastore write transaction, gated behind CUBESTORE_METASTORE_BATCH_RPC. This cuts N write RPCs / N write-lock acquisitions to one and makes the creation all-or-nothing, removing the orphan-partition window if a mid-loop insert failed. When the flag is off the per-item loop is unchanged. MultiSplit carries the flag from the compaction config. Tested at the metastore level and through the compaction split path with the flag on. --- rust/cubestore/cubestore/src/metastore/mod.rs | 76 +++++++++++++++++++ .../cubestore/src/queryplanner/test_utils.rs | 7 ++ .../cubestore/src/store/compaction.rs | 61 ++++++++++----- 3 files changed, 126 insertions(+), 18 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index c587f1685b826..aa2eac201e4a2 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -884,6 +884,10 @@ pub trait MetaStore: DIService + Send + Sync { fn partition_table(&self) -> PartitionMetaStoreTable; async fn create_partition(&self, partition: Partition) -> Result, CubeError>; + async fn create_partitions( + &self, + partitions: Vec, + ) -> Result>, CubeError>; async fn get_partition(&self, partition_id: u64) -> Result, CubeError>; async fn get_partition_out_of_queue( &self, @@ -2764,6 +2768,21 @@ impl MetaStore for RocksMetaStore { .await } + async fn create_partitions( + &self, + partitions: Vec, + ) -> Result>, CubeError> { + self.write_operation("create_partitions", move |db_ref, batch_pipe| { + let table = PartitionRocksTable::new(db_ref.clone()); + let mut result = Vec::with_capacity(partitions.len()); + for partition in partitions { + result.push(table.insert(partition, batch_pipe)?); + } + Ok(result) + }) + .await + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_partition(&self, partition_id: u64) -> Result, CubeError> { self.read_operation("get_partition", move |db_ref| { @@ -5920,6 +5939,63 @@ mod tests { Ok(()) } + #[tokio::test] + async fn create_partitions_test() -> Result<(), CubeError> { + init_test_logger().await; + + let (_remote_fs, meta_store) = RocksMetaStore::prepare_test_metastore("create_partitions"); + + meta_store.create_schema("foo".to_string(), false).await?; + let columns = vec![Column::new("col1".to_string(), ColumnType::Int, 0)]; + let table = meta_store + .create_table( + "foo".to_string(), + "t1".to_string(), + columns, + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + let index = meta_store.get_default_index(table.get_id()).await?; + let parent = meta_store + .get_active_partitions_by_index_id(index.get_id()) + .await?[0] + .clone(); + + let created = meta_store + .create_partitions(vec![ + Partition::new_child(&parent, None), + Partition::new_child(&parent, None), + ]) + .await?; + + assert_eq!(created.len(), 2); + assert_ne!(created[0].get_id(), created[1].get_id()); + // Both rows must be persisted and point at the same parent partition. + for child in &created { + let fetched = meta_store.get_partition(child.get_id()).await?; + assert_eq!( + fetched.get_row().parent_partition_id(), + &Some(parent.get_id()) + ); + } + + Ok(()) + } + #[tokio::test] async fn table_test() -> Result<(), CubeError> { init_test_logger().await; diff --git a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs index 50664de3b57ac..2b77b4861e2bc 100644 --- a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs +++ b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs @@ -173,6 +173,13 @@ impl MetaStore for MetaStoreMock { panic!("MetaStore mock!") } + async fn create_partitions( + &self, + _partitions: Vec, + ) -> Result>, CubeError> { + panic!("MetaStore mock!") + } + async fn get_partition(&self, _partition_id: u64) -> Result, CubeError> { panic!("MetaStore mock!") } diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index e61a1c42e6131..9576f2ed0cdec 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -608,12 +608,19 @@ impl CompactionService for CompactionServiceImpl { let new_partitions_count = new_partitions_count_by_rows.max(new_partitions_count_by_file_size); - for _ in 0..new_partitions_count { - new_partitions.push( - self.meta_store - .create_partition(Partition::new_child(&partition, None)) - .await?, - ); + if self.config.metastore_batch_rpc() { + let children = (0..new_partitions_count) + .map(|_| Partition::new_child(&partition, None)) + .collect::>(); + new_partitions = self.meta_store.create_partitions(children).await?; + } else { + for _ in 0..new_partitions_count { + new_partitions.push( + self.meta_store + .create_partition(Partition::new_child(&partition, None)) + .await?, + ); + } } } @@ -958,6 +965,7 @@ impl CompactionService for CompactionServiceImpl { self.meta_store.clone(), self.remote_fs.clone(), self.metadata_cache_factory.clone(), + self.config.metastore_batch_rpc(), keys, key_len, multi_partition_id, @@ -1001,6 +1009,7 @@ impl CompactionService for CompactionServiceImpl { self.meta_store.clone(), self.remote_fs.clone(), self.metadata_cache_factory.clone(), + self.config.metastore_batch_rpc(), keys, key_len, multi_partition_id, @@ -1788,6 +1797,9 @@ mod tests { .expect_compaction_split_by_total_file_size_enabled() .returning(|| false); + // Exercise the batched create_partitions path for the split below. + config.expect_metastore_batch_rpc().returning(|| true); + let compaction_service = CompactionServiceImpl::new( metastore.clone(), Arc::new(chunk_store), @@ -2866,6 +2878,7 @@ struct MultiSplit { meta: Arc, fs: Arc, metadata_cache_factory: Arc, + metastore_batch_rpc: bool, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2882,6 +2895,7 @@ impl MultiSplit { meta: Arc, fs: Arc, metadata_cache_factory: Arc, + metastore_batch_rpc: bool, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2892,6 +2906,7 @@ impl MultiSplit { meta, fs, metadata_cache_factory, + metastore_batch_rpc, keys, key_len, multi_partition_id, @@ -2912,18 +2927,28 @@ impl MultiSplit { let new_partition_rows = &mut self.new_partition_rows; let uploads = &mut self.uploads; - let mut children = Vec::with_capacity(mchildren.len()); - for mc in mchildren.iter() { - let c = Partition::new_child(&p.partition, Some(mc.get_id())); - let c = c.update_min_max_and_row_count( - mc.get_row().min_row().cloned(), - mc.get_row().max_row().cloned(), - 0, - None, - None, - ); - children.push(self.meta.create_partition(c).await?) - } + let child_defs = mchildren + .iter() + .map(|mc| { + let c = Partition::new_child(&p.partition, Some(mc.get_id())); + c.update_min_max_and_row_count( + mc.get_row().min_row().cloned(), + mc.get_row().max_row().cloned(), + 0, + None, + None, + ) + }) + .collect::>(); + let children = if self.metastore_batch_rpc { + self.meta.create_partitions(child_defs).await? + } else { + let mut children = Vec::with_capacity(child_defs.len()); + for c in child_defs { + children.push(self.meta.create_partition(c).await?); + } + children + }; let mut in_files = Vec::new(); collect_remote_files(&p, &mut in_files); From e4f0fac8e521d3b90f71d64a6f8a5062c8943293 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 18:23:31 +0200 Subject: [PATCH 6/8] perf(cubestore): batch child-chunk creation in range repartition behind the flag merge_chunk_group_into_children created one child chunk per written file with a create_chunk RPC inside a loop. When CUBESTORE_METASTORE_BATCH_RPC is on, build all child chunk specs and create them in one insert_chunks write, then upload their files; the chunks are inactive until the swap, so create-before-upload matches the per-item visibility and any chunk left behind by a failed upload is swept by the existing remove_inactive_not_uploaded_chunks reaper. The per-item path is unchanged when the flag is off. This is the repartition cascade that wide imports trigger, so it cuts a large share of the create_chunk write RPCs. Reuses the existing insert_chunks metastore method. Covered by repartition_chunk_range_merges_only_range run with the flag on. --- rust/cubestore/cubestore/src/store/mod.rs | 68 ++++++++++++++++++----- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 7578f553ca7c1..7a20de3754edf 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -1187,25 +1187,56 @@ impl ChunkStore { .await?; let mut new_chunk_ids: Vec<(u64, Option)> = Vec::new(); - for w in written { - if w.num_rows == 0 { - let _ = tokio::fs::remove_file(&w.file).await; - continue; - } - let child = &children[w.child_index]; - let chunk = self - .meta_store - .create_chunk( + if self.config.metastore_batch_rpc() { + // Create all child chunks in one metastore write, then upload their files. The + // chunks are inactive until the swap below, so creating them before the uploads + // matches the per-item path's visibility. + let mut specs = Vec::new(); + let mut spec_files = Vec::new(); + for w in written { + if w.num_rows == 0 { + let _ = tokio::fs::remove_file(&w.file).await; + continue; + } + let child = &children[w.child_index]; + specs.push(Chunk::new( child.get_id(), w.num_rows, Some(Row::new(w.min)), Some(Row::new(w.max)), false, - ) - .await?; - let remote = ChunkStore::chunk_file_name(chunk.clone()); - let file_size = self.remote_fs.upload_file(w.file, remote).await?; - new_chunk_ids.push((chunk.get_id(), Some(file_size))); + )); + spec_files.push(w.file); + } + if !specs.is_empty() { + let chunks = self.meta_store.insert_chunks(specs).await?; + for (file, chunk) in spec_files.into_iter().zip(chunks) { + let remote = ChunkStore::chunk_file_name(chunk.clone()); + let file_size = self.remote_fs.upload_file(file, remote).await?; + new_chunk_ids.push((chunk.get_id(), Some(file_size))); + } + } + } else { + for w in written { + if w.num_rows == 0 { + let _ = tokio::fs::remove_file(&w.file).await; + continue; + } + let child = &children[w.child_index]; + let chunk = self + .meta_store + .create_chunk( + child.get_id(), + w.num_rows, + Some(Row::new(w.min)), + Some(Row::new(w.max)), + false, + ) + .await?; + let remote = ChunkStore::chunk_file_name(chunk.clone()); + let file_size = self.remote_fs.upload_file(w.file, remote).await?; + new_chunk_ids.push((chunk.get_id(), Some(file_size))); + } } let group_ids: Vec = group.iter().map(|c| c.get_id()).collect(); @@ -2021,7 +2052,14 @@ mod tests { async fn repartition_chunk_range_merges_only_range() { // repartition_chunk_range must merge only the active persisted chunks within // [start, end], leaving the rest active, and conserve rows into the children. - let config = Config::test("repartition_chunk_range_merges_only_range"); + // Run with batched metastore RPC on so the batched insert_chunks path in + // merge_chunk_group_into_children is exercised (per-item path covered by the other + // repartition tests, which default the flag off). + let config = + Config::test("repartition_chunk_range_merges_only_range").update_config(|mut c| { + c.metastore_batch_rpc = true; + c + }); let path = "/tmp/test_repartition_range"; let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); From a024cfd596f35d3460fbef22560d19178be95c67 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 18:31:28 +0200 Subject: [PATCH 7/8] fix(cubestore): make batched active-partition fetch RPC-serializable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit get_active_partitions_for_indexes returned HashMap, but the metastore RPC serializes with flexbuffers, which rejects non-string map keys (KeyMustBeString) — it failed only over a real worker->main RPC, surfacing as "Connection closed unexpectedly" during imports with the flag on, while the node-local unit tests passed. Return a positional Vec>> aligned with the requested index ids instead. build_index_chunks zips by position and validates the length; a length mismatch is an internal error rather than a silent empty set. Full cubestore lib suite passes with the flag forced on. --- rust/cubestore/cubestore/src/metastore/mod.rs | 21 ++++++----- .../cubestore/src/queryplanner/test_utils.rs | 2 +- rust/cubestore/cubestore/src/store/mod.rs | 37 +++++++++---------- 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index aa2eac201e4a2..5d6fadd650638 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -977,10 +977,13 @@ pub trait MetaStore: DIService + Send + Sync { &self, index_id: u64, ) -> Result>, CubeError>; + /// Active partitions for each index id, positionally aligned with `index_ids` + /// (result[i] corresponds to index_ids[i]). Returns a Vec rather than a map because the + /// metastore RPC serializes with flexbuffers, which rejects non-string map keys. async fn get_active_partitions_for_indexes( &self, index_ids: Vec, - ) -> Result>>, CubeError>; + ) -> Result>>, CubeError>; async fn get_index(&self, index_id: u64) -> Result, CubeError>; async fn get_index_with_active_partitions_out_of_queue( @@ -3628,10 +3631,10 @@ impl MetaStore for RocksMetaStore { async fn get_active_partitions_for_indexes( &self, index_ids: Vec, - ) -> Result>>, CubeError> { + ) -> Result>>, CubeError> { self.read_operation_out_of_queue("get_active_partitions_for_indexes", move |db_ref| { let rocks_partition = PartitionRocksTable::new(db_ref); - let mut result = HashMap::with_capacity(index_ids.len()); + let mut result = Vec::with_capacity(index_ids.len()); for index_id in index_ids { let partitions = rocks_partition .get_rows_by_index( @@ -3641,7 +3644,7 @@ impl MetaStore for RocksMetaStore { .into_iter() .filter(|r| r.get_row().active) .collect::>(); - result.insert(index_id, partitions); + result.push(partitions); } Ok(result) }) @@ -5919,8 +5922,8 @@ mod tests { .get_active_partitions_by_index_id(index2.get_id()) .await?; - // Batch over both indexes plus a non-existent one must match the per-index calls and - // return an empty vec (not an error) for the unknown index. + // Batch result is positionally aligned with the requested ids; it must match the + // per-index calls and return an empty vec (not an error) for the unknown index. let unknown_index_id = index2.get_id() + 1000; let batch = meta_store .get_active_partitions_for_indexes(vec![ @@ -5932,9 +5935,9 @@ mod tests { let ids = |ps: &Vec>| ps.iter().map(|p| p.get_id()).collect::>(); assert_eq!(batch.len(), 3); - assert_eq!(ids(&batch[&index1.get_id()]), ids(&single1)); - assert_eq!(ids(&batch[&index2.get_id()]), ids(&single2)); - assert!(batch[&unknown_index_id].is_empty()); + assert_eq!(ids(&batch[0]), ids(&single1)); + assert_eq!(ids(&batch[1]), ids(&single2)); + assert!(batch[2].is_empty()); Ok(()) } diff --git a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs index 2b77b4861e2bc..dc14d74226d3a 100644 --- a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs +++ b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs @@ -340,7 +340,7 @@ impl MetaStore for MetaStoreMock { async fn get_active_partitions_for_indexes( &self, _index_ids: Vec, - ) -> Result>>, CubeError> { + ) -> Result>>, CubeError> { panic!("MetaStore mock!") } diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 7a20de3754edf..2b5db9e20087e 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -2977,18 +2977,27 @@ impl ChunkStore { let table = self.meta_store.get_table_by_id(table_id).await?; // When batching is enabled, fetch the active partitions of all indexes in one RPC // instead of one per index inside partition_rows_for_index. + // When batching is enabled the result is positionally aligned with `indexes` + // (result[i] holds index `indexes[i]`'s active partitions). let mut partitions_by_index = if self.config.metastore_batch_rpc() { let index_ids = indexes.iter().map(|i| i.get_id()).collect::>(); - Some( - self.meta_store - .get_active_partitions_for_indexes(index_ids) - .await?, - ) + let fetched = self + .meta_store + .get_active_partitions_for_indexes(index_ids) + .await?; + if fetched.len() != indexes.len() { + return Err(CubeError::internal(format!( + "Batched active-partition fetch returned {} entries for {} indexes", + fetched.len(), + indexes.len() + ))); + } + Some(fetched) } else { None }; let mut futures = Vec::new(); - for index in indexes.iter() { + for (i, index) in indexes.iter().enumerate() { let index_columns = index.get_row().columns(); let index_columns_copy = index_columns.clone(); let columns = columns.to_vec(); @@ -2999,19 +3008,9 @@ impl ChunkStore { .await?; let remapped = remapped?; rows = rows_again; - // In batch mode the map always has an entry per requested index id (the RPC inserts - // one for every id, empty vec included). A missing key means the requested ids and - // the indexes iterated here diverged — an internal bug we surface rather than - // silently passing an empty set, which would trip the corrupt-data path below. - let preset_partitions = match partitions_by_index.as_mut() { - Some(map) => Some(map.remove(&index.get_id()).ok_or_else(|| { - CubeError::internal(format!( - "Active partitions missing for index {} in batched fetch", - index.get_id() - )) - })?), - None => None, - }; + let preset_partitions = partitions_by_index + .as_mut() + .map(|all| std::mem::take(&mut all[i])); futures.push(self.partition_rows_for_index( &index, &table, From 7cab9400b4a483bfe80ac5049f5140ad322c9603 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 16 Jun 2026 18:57:50 +0200 Subject: [PATCH 8/8] perf(cubestore): skip disk-space dedup work when the limit is disabled Address review: hoist a disk_check_enabled flag so the per-partition node-name build + dedup set are skipped entirely when max_disk_space_per_worker is 0 (the common case), and guard the batched active-partition positional pairing with a debug_assert that each entry belongs to the index at the same position. --- rust/cubestore/cubestore/src/store/mod.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 2b5db9e20087e..bfb5684707257 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -2729,7 +2729,9 @@ impl ChunkStore { let mut futures = Vec::new(); // The disk-space check resolves to a per-node total, so checking each distinct target - // node once is enough; this avoids one metastore RPC per partition written. + // node once is enough; this avoids one metastore RPC per partition written. When the + // limit is disabled (the common case) skip the node-name/dedup work entirely. + let disk_check_enabled = !in_memory && self.config.max_disk_space_per_worker() != 0; let mut checked_nodes: HashSet = HashSet::new(); for partition in partitions.into_iter() { let min = partition.get_row().get_min_val().as_ref(); @@ -2752,7 +2754,7 @@ impl ChunkStore { ) > Ordering::Equal) }); if to_write.len() > 0 { - if !in_memory + if disk_check_enabled && checked_nodes .insert(node_name_by_partition(self.config.as_ref(), &partition)) { @@ -2976,9 +2978,8 @@ impl ChunkStore { // here instead of re-fetching it per chunk over the metastore RPC downstream. let table = self.meta_store.get_table_by_id(table_id).await?; // When batching is enabled, fetch the active partitions of all indexes in one RPC - // instead of one per index inside partition_rows_for_index. - // When batching is enabled the result is positionally aligned with `indexes` - // (result[i] holds index `indexes[i]`'s active partitions). + // instead of one per index inside partition_rows_for_index. The result is positionally + // aligned with `indexes` (result[i] holds index indexes[i]'s active partitions). let mut partitions_by_index = if self.config.metastore_batch_rpc() { let index_ids = indexes.iter().map(|i| i.get_id()).collect::>(); let fetched = self @@ -2992,6 +2993,18 @@ impl ChunkStore { indexes.len() ))); } + // Guard the positional pairing below: each entry's partitions must belong to the + // index at the same position. Cheap insurance against a future caller passing a + // reordered/sub-selected index slice. + debug_assert!( + fetched + .iter() + .zip(indexes.iter()) + .all(|(parts, index)| parts + .iter() + .all(|p| p.get_row().get_index_id() == index.get_id())), + "batched active-partition fetch is not aligned with the requested indexes" + ); Some(fetched) } else { None