diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 44ea99bf871f5..5cba4c1222959 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -633,6 +633,10 @@ pub trait ConfigObj: DIService { fn disk_space_cache_duration_secs(&self) -> u64; + fn disk_space_compute_lock_timeout_ms(&self) -> u64; + + fn metastore_batch_rpc(&self) -> bool; + fn transport_max_message_size(&self) -> usize; fn transport_max_frame_size(&self) -> usize; @@ -764,6 +768,8 @@ pub struct ConfigObjImpl { pub max_disk_space: u64, pub max_disk_space_per_worker: u64, pub disk_space_cache_duration_secs: u64, + pub disk_space_compute_lock_timeout_ms: u64, + pub metastore_batch_rpc: bool, pub transport_max_message_size: usize, pub transport_max_frame_size: usize, pub local_files_cleanup_interval_secs: u64, @@ -1172,6 +1178,14 @@ impl ConfigObj for ConfigObjImpl { self.disk_space_cache_duration_secs } + fn disk_space_compute_lock_timeout_ms(&self) -> u64 { + self.disk_space_compute_lock_timeout_ms + } + + fn metastore_batch_rpc(&self) -> bool { + self.metastore_batch_rpc + } + fn transport_max_message_size(&self) -> usize { self.transport_max_message_size } @@ -1850,6 +1864,13 @@ impl Config { * 1024 * 1024, disk_space_cache_duration_secs: 300, + disk_space_compute_lock_timeout_ms: env_parse_duration( + "CUBESTORE_DISK_SPACE_LOCK_WAIT_MS", + 1000, + Some(60_000), + None, + ), + metastore_batch_rpc: env_parse("CUBESTORE_METASTORE_BATCH_RPC", false), transport_max_message_size, transport_max_frame_size: env_parse_size( "CUBESTORE_TRANSPORT_MAX_FRAME_SIZE", @@ -2045,6 +2066,8 @@ impl Config { max_disk_space: 0, max_disk_space_per_worker: 0, disk_space_cache_duration_secs: 0, + disk_space_compute_lock_timeout_ms: 1000, + metastore_batch_rpc: false, transport_max_message_size: 64 << 20, transport_max_frame_size: 16 << 20, local_files_cleanup_interval_secs: 600, diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index b281cb7cda145..5d6fadd650638 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -55,7 +55,7 @@ use crate::metastore::wal::{WALIndexKey, WALRocksIndex}; use crate::table::{Row, TableValue}; -use crate::util::lock::acquire_lock; +use crate::util::lock::{acquire_lock, acquire_lock_duration}; use crate::util::WorkerLoop; use crate::{meta_store_table_impl, CubeError}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -884,6 +884,10 @@ pub trait MetaStore: DIService + Send + Sync { fn partition_table(&self) -> PartitionMetaStoreTable; async fn create_partition(&self, partition: Partition) -> Result, CubeError>; + async fn create_partitions( + &self, + partitions: Vec, + ) -> Result>, CubeError>; async fn get_partition(&self, partition_id: u64) -> Result, CubeError>; async fn get_partition_out_of_queue( &self, @@ -973,6 +977,13 @@ pub trait MetaStore: DIService + Send + Sync { &self, index_id: u64, ) -> Result>, CubeError>; + /// Active partitions for each index id, positionally aligned with `index_ids` + /// (result[i] corresponds to index_ids[i]). Returns a Vec rather than a map because the + /// metastore RPC serializes with flexbuffers, which rejects non-string map keys. + async fn get_active_partitions_for_indexes( + &self, + index_ids: Vec, + ) -> Result>>, CubeError>; async fn get_index(&self, index_id: u64) -> Result, CubeError>; async fn get_index_with_active_partitions_out_of_queue( @@ -2760,6 +2771,21 @@ impl MetaStore for RocksMetaStore { .await } + async fn create_partitions( + &self, + partitions: Vec, + ) -> Result>, CubeError> { + self.write_operation("create_partitions", move |db_ref, batch_pipe| { + let table = PartitionRocksTable::new(db_ref.clone()); + let mut result = Vec::with_capacity(partitions.len()); + for partition in partitions { + result.push(table.insert(partition, batch_pipe)?); + } + Ok(result) + }) + .await + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_partition(&self, partition_id: u64) -> Result, CubeError> { self.read_operation("get_partition", move |db_ref| { @@ -2829,21 +2855,25 @@ impl MetaStore for RocksMetaStore { // Single-flight: serialize the scan so a burst of concurrent callers // (e.g. many partition writes during an import/repartition) share one // computation instead of each materializing a full metastore scan. - let _compute_guard = - match acquire_lock("disk space compute", self.disk_space_compute_lock.lock()).await - { - Ok(guard) => guard, - Err(e) => { - log::error!( + let _compute_guard = match acquire_lock_duration( + "disk space compute", + self.disk_space_compute_lock.lock(), + Duration::from_millis(self.store.config.disk_space_compute_lock_timeout_ms()), + ) + .await + { + Ok(guard) => guard, + Err(e) => { + log::error!( "Timed out waiting for the disk space scan lock: {}. The single-flight \ scan is stuck; reporting 0 used disk space so the disk-space check \ passes. THE DISK-SPACE LIMIT IS NOT BEING ENFORCED until the scan \ recovers.", e ); - return Ok(0); - } - }; + return Ok(0); + } + }; if let Some(sizes) = self.disk_space_cached().await? { sizes } else { @@ -3598,6 +3628,29 @@ impl MetaStore for RocksMetaStore { .await } + async fn get_active_partitions_for_indexes( + &self, + index_ids: Vec, + ) -> Result>>, CubeError> { + self.read_operation_out_of_queue("get_active_partitions_for_indexes", move |db_ref| { + let rocks_partition = PartitionRocksTable::new(db_ref); + let mut result = Vec::with_capacity(index_ids.len()); + for index_id in index_ids { + let partitions = rocks_partition + .get_rows_by_index( + &PartitionIndexKey::ByIndexId(index_id), + &PartitionRocksIndex::IndexId, + )? + .into_iter() + .filter(|r| r.get_row().active) + .collect::>(); + result.push(partitions); + } + Ok(result) + }) + .await + } + #[tracing::instrument(level = "trace", skip(self))] async fn get_index(&self, index_id: u64) -> Result, CubeError> { self.read_operation("get_index", move |db_ref| { @@ -5801,6 +5854,151 @@ mod tests { Ok(()) } + #[tokio::test] + async fn get_active_partitions_for_indexes_test() -> Result<(), CubeError> { + init_test_logger().await; + + let (_remote_fs, meta_store) = + RocksMetaStore::prepare_test_metastore("get_active_partitions_for_indexes"); + + meta_store.create_schema("foo".to_string(), false).await?; + let columns = vec![ + Column::new("col1".to_string(), ColumnType::Int, 0), + Column::new("col2".to_string(), ColumnType::String, 1), + ]; + // Two tables → two default indexes, each with its own initial active partition. + let table1 = meta_store + .create_table( + "foo".to_string(), + "t1".to_string(), + columns.clone(), + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + let table2 = meta_store + .create_table( + "foo".to_string(), + "t2".to_string(), + columns.clone(), + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + + let index1 = meta_store.get_default_index(table1.get_id()).await?; + let index2 = meta_store.get_default_index(table2.get_id()).await?; + + let single1 = meta_store + .get_active_partitions_by_index_id(index1.get_id()) + .await?; + let single2 = meta_store + .get_active_partitions_by_index_id(index2.get_id()) + .await?; + + // Batch result is positionally aligned with the requested ids; it must match the + // per-index calls and return an empty vec (not an error) for the unknown index. + let unknown_index_id = index2.get_id() + 1000; + let batch = meta_store + .get_active_partitions_for_indexes(vec![ + index1.get_id(), + index2.get_id(), + unknown_index_id, + ]) + .await?; + + let ids = |ps: &Vec>| ps.iter().map(|p| p.get_id()).collect::>(); + assert_eq!(batch.len(), 3); + assert_eq!(ids(&batch[0]), ids(&single1)); + assert_eq!(ids(&batch[1]), ids(&single2)); + assert!(batch[2].is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn create_partitions_test() -> Result<(), CubeError> { + init_test_logger().await; + + let (_remote_fs, meta_store) = RocksMetaStore::prepare_test_metastore("create_partitions"); + + meta_store.create_schema("foo".to_string(), false).await?; + let columns = vec![Column::new("col1".to_string(), ColumnType::Int, 0)]; + let table = meta_store + .create_table( + "foo".to_string(), + "t1".to_string(), + columns, + None, + None, + vec![], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await?; + let index = meta_store.get_default_index(table.get_id()).await?; + let parent = meta_store + .get_active_partitions_by_index_id(index.get_id()) + .await?[0] + .clone(); + + let created = meta_store + .create_partitions(vec![ + Partition::new_child(&parent, None), + Partition::new_child(&parent, None), + ]) + .await?; + + assert_eq!(created.len(), 2); + assert_ne!(created[0].get_id(), created[1].get_id()); + // Both rows must be persisted and point at the same parent partition. + for child in &created { + let fetched = meta_store.get_partition(child.get_id()).await?; + assert_eq!( + fetched.get_row().parent_partition_id(), + &Some(parent.get_id()) + ); + } + + Ok(()) + } + #[tokio::test] async fn table_test() -> Result<(), CubeError> { init_test_logger().await; diff --git a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs index 7580c4d3c97e2..dc14d74226d3a 100644 --- a/rust/cubestore/cubestore/src/queryplanner/test_utils.rs +++ b/rust/cubestore/cubestore/src/queryplanner/test_utils.rs @@ -173,6 +173,13 @@ impl MetaStore for MetaStoreMock { panic!("MetaStore mock!") } + async fn create_partitions( + &self, + _partitions: Vec, + ) -> Result>, CubeError> { + panic!("MetaStore mock!") + } + async fn get_partition(&self, _partition_id: u64) -> Result, CubeError> { panic!("MetaStore mock!") } @@ -330,6 +337,13 @@ impl MetaStore for MetaStoreMock { panic!("MetaStore mock!") } + async fn get_active_partitions_for_indexes( + &self, + _index_ids: Vec, + ) -> Result>>, CubeError> { + panic!("MetaStore mock!") + } + async fn get_index(&self, _index_id: u64) -> Result, CubeError> { panic!("MetaStore mock!") } diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index 601e14d6e9e7d..9576f2ed0cdec 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -608,12 +608,19 @@ impl CompactionService for CompactionServiceImpl { let new_partitions_count = new_partitions_count_by_rows.max(new_partitions_count_by_file_size); - for _ in 0..new_partitions_count { - new_partitions.push( - self.meta_store - .create_partition(Partition::new_child(&partition, None)) - .await?, - ); + if self.config.metastore_batch_rpc() { + let children = (0..new_partitions_count) + .map(|_| Partition::new_child(&partition, None)) + .collect::>(); + new_partitions = self.meta_store.create_partitions(children).await?; + } else { + for _ in 0..new_partitions_count { + new_partitions.push( + self.meta_store + .create_partition(Partition::new_child(&partition, None)) + .await?, + ); + } } } @@ -694,10 +701,8 @@ impl CompactionService for CompactionServiceImpl { None => Arc::new(EmptyExec::new(schema.clone())), }; - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; + // `table` is already loaded by get_partition_for_compaction above and is immutable for + // the duration of the job, so reuse it instead of re-fetching over the metastore RPC. let unique_key = table.get_row().unique_key_columns(); let aggregate_columns = match index.get_row().get_type() { IndexType::Regular => None, @@ -960,6 +965,7 @@ impl CompactionService for CompactionServiceImpl { self.meta_store.clone(), self.remote_fs.clone(), self.metadata_cache_factory.clone(), + self.config.metastore_batch_rpc(), keys, key_len, multi_partition_id, @@ -1003,6 +1009,7 @@ impl CompactionService for CompactionServiceImpl { self.meta_store.clone(), self.remote_fs.clone(), self.metadata_cache_factory.clone(), + self.config.metastore_batch_rpc(), keys, key_len, multi_partition_id, @@ -1790,6 +1797,9 @@ mod tests { .expect_compaction_split_by_total_file_size_enabled() .returning(|| false); + // Exercise the batched create_partitions path for the split below. + config.expect_metastore_batch_rpc().returning(|| true); + let compaction_service = CompactionServiceImpl::new( metastore.clone(), Arc::new(chunk_store), @@ -2361,7 +2371,13 @@ mod tests { ]; let (chunk, _) = chunk_store - .add_chunk_columns(aggr_index.clone(), partition.clone(), data1.clone(), false) + .add_chunk_columns( + aggr_index.clone(), + &table, + partition.clone(), + data1.clone(), + false, + ) .await .unwrap() .await @@ -2370,7 +2386,13 @@ mod tests { metastore.chunk_uploaded(chunk.get_id()).await.unwrap(); let (chunk, _) = chunk_store - .add_chunk_columns(aggr_index.clone(), partition.clone(), data2.clone(), false) + .add_chunk_columns( + aggr_index.clone(), + &table, + partition.clone(), + data2.clone(), + false, + ) .await .unwrap() .await @@ -2856,6 +2878,7 @@ struct MultiSplit { meta: Arc, fs: Arc, metadata_cache_factory: Arc, + metastore_batch_rpc: bool, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2872,6 +2895,7 @@ impl MultiSplit { meta: Arc, fs: Arc, metadata_cache_factory: Arc, + metastore_batch_rpc: bool, keys: Vec, key_len: usize, multi_partition_id: u64, @@ -2882,6 +2906,7 @@ impl MultiSplit { meta, fs, metadata_cache_factory, + metastore_batch_rpc, keys, key_len, multi_partition_id, @@ -2902,18 +2927,28 @@ impl MultiSplit { let new_partition_rows = &mut self.new_partition_rows; let uploads = &mut self.uploads; - let mut children = Vec::with_capacity(mchildren.len()); - for mc in mchildren.iter() { - let c = Partition::new_child(&p.partition, Some(mc.get_id())); - let c = c.update_min_max_and_row_count( - mc.get_row().min_row().cloned(), - mc.get_row().max_row().cloned(), - 0, - None, - None, - ); - children.push(self.meta.create_partition(c).await?) - } + let child_defs = mchildren + .iter() + .map(|mc| { + let c = Partition::new_child(&p.partition, Some(mc.get_id())); + c.update_min_max_and_row_count( + mc.get_row().min_row().cloned(), + mc.get_row().max_row().cloned(), + 0, + None, + None, + ) + }) + .collect::>(); + let children = if self.metastore_batch_rpc { + self.meta.create_partitions(child_defs).await? + } else { + let mut children = Vec::with_capacity(child_defs.len()); + for c in child_defs { + children.push(self.meta.create_partition(c).await?); + } + children + }; let mut in_files = Vec::new(); collect_remote_files(&p, &mut in_files); diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 1be584b9384b5..bfb5684707257 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -859,8 +859,13 @@ impl ChunkDataStore for ChunkStore { partition: IdRow, batch: RecordBatch, ) -> Result<(IdRow, Option), CubeError> { + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; self.add_chunk_columns( index.clone(), + &table, partition.clone(), batch.columns().to_vec(), false, @@ -1182,25 +1187,56 @@ impl ChunkStore { .await?; let mut new_chunk_ids: Vec<(u64, Option)> = Vec::new(); - for w in written { - if w.num_rows == 0 { - let _ = tokio::fs::remove_file(&w.file).await; - continue; - } - let child = &children[w.child_index]; - let chunk = self - .meta_store - .create_chunk( + if self.config.metastore_batch_rpc() { + // Create all child chunks in one metastore write, then upload their files. The + // chunks are inactive until the swap below, so creating them before the uploads + // matches the per-item path's visibility. + let mut specs = Vec::new(); + let mut spec_files = Vec::new(); + for w in written { + if w.num_rows == 0 { + let _ = tokio::fs::remove_file(&w.file).await; + continue; + } + let child = &children[w.child_index]; + specs.push(Chunk::new( child.get_id(), w.num_rows, Some(Row::new(w.min)), Some(Row::new(w.max)), false, - ) - .await?; - let remote = ChunkStore::chunk_file_name(chunk.clone()); - let file_size = self.remote_fs.upload_file(w.file, remote).await?; - new_chunk_ids.push((chunk.get_id(), Some(file_size))); + )); + spec_files.push(w.file); + } + if !specs.is_empty() { + let chunks = self.meta_store.insert_chunks(specs).await?; + for (file, chunk) in spec_files.into_iter().zip(chunks) { + let remote = ChunkStore::chunk_file_name(chunk.clone()); + let file_size = self.remote_fs.upload_file(file, remote).await?; + new_chunk_ids.push((chunk.get_id(), Some(file_size))); + } + } + } else { + for w in written { + if w.num_rows == 0 { + let _ = tokio::fs::remove_file(&w.file).await; + continue; + } + let child = &children[w.child_index]; + let chunk = self + .meta_store + .create_chunk( + child.get_id(), + w.num_rows, + Some(Row::new(w.min)), + Some(Row::new(w.max)), + false, + ) + .await?; + let remote = ChunkStore::chunk_file_name(chunk.clone()); + let file_size = self.remote_fs.upload_file(w.file, remote).await?; + new_chunk_ids.push((chunk.get_id(), Some(file_size))); + } } let group_ids: Vec = group.iter().map(|c| c.get_id()).collect(); @@ -1555,7 +1591,7 @@ mod tests { let data = rows_to_columns(&col, data_frame.get_rows().as_slice()); let (chunk, file_size) = chunk_store - .add_chunk_columns(index, partition, data.clone(), false) + .add_chunk_columns(index, &table, partition, data.clone(), false) .await .unwrap() .await @@ -1576,6 +1612,119 @@ mod tests { let _ = fs::remove_dir_all(chunk_store_path.clone()); let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); } + + #[tokio::test] + async fn partition_data_with_batch_rpc() { + // Exercises the CUBESTORE_METASTORE_BATCH_RPC path: build_index_chunks fetches active + // partitions for all indexes in one get_active_partitions_for_indexes call and routes + // them per index. The produced chunks must still cover every input row. + let config = Config::test("partition_data_with_batch_rpc").update_config(|mut c| { + c.metastore_batch_rpc = true; + c + }); + let path = "/tmp/test_partition_data_batch"; + let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); + let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); + + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + { + let remote_fs = LocalDirRemoteFs::new( + Some(PathBuf::from(chunk_remote_store_path.clone())), + PathBuf::from(chunk_store_path.clone()), + ); + let meta_store = RocksMetaStore::new( + Path::new(path), + BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), + config.config_obj(), + ) + .unwrap(); + let chunk_store = ChunkStore::new( + meta_store.clone(), + remote_fs.clone(), + Arc::new(MockCluster::new()), + config.config_obj(), + CubestoreMetadataCacheFactoryImpl::new(Arc::new(BasicMetadataCacheFactory::new())), + 10, + ); + + let col = vec![ + Column::new("foo_int".to_string(), ColumnType::Int, 0), + Column::new("foo".to_string(), ColumnType::String, 1), + Column::new("boo".to_string(), ColumnType::String, 2), + ]; + let rows = (0..35) + .map(|i| { + Row::new(vec![ + TableValue::Int(34 - i), + TableValue::String(format!("Foo {}", 34 - i)), + TableValue::String(format!("Boo {}", 34 - i)), + ]) + }) + .collect::>(); + let data_frame = DataFrame::new(col.clone(), rows); + meta_store + .create_schema("foo".to_string(), false) + .await + .unwrap(); + // A secondary regular index so build_index_chunks loops over >1 index on the batch + // path and must route each index its own partitions. + let secondary = IndexDef { + name: "by_foo".to_string(), + columns: vec!["foo".to_string(), "foo_int".to_string()], + multi_index: None, + index_type: IndexType::Regular, + }; + let table = meta_store + .create_table( + "foo".to_string(), + "bar".to_string(), + col.clone(), + None, + None, + vec![secondary], + true, + None, + None, + None, + None, + None, + None, + None, + None, + None, + false, + None, + ) + .await + .unwrap(); + let index_count = meta_store + .get_table_indexes(table.get_id()) + .await + .unwrap() + .len(); + assert_eq!(index_count, 2, "default + secondary index expected"); + + let data = rows_to_columns(&col, data_frame.get_rows().as_slice()); + let jobs = chunk_store + .partition_data(table.get_id(), data, &col, false) + .await + .unwrap(); + assert!(!jobs.is_empty()); + let mut total_rows = 0u64; + for job in jobs { + let (chunk, _file_size) = job.await.unwrap().unwrap(); + total_rows += chunk.get_row().get_row_count(); + } + // Each index receives all 35 rows, routed to its own partition. + assert_eq!(total_rows, 35 * index_count as u64); + } + let _ = DB::destroy(&Options::default(), path); + let _ = fs::remove_dir_all(chunk_store_path.clone()); + let _ = fs::remove_dir_all(chunk_remote_store_path.clone()); + } + #[tokio::test] async fn create_aggr_chunk_test() { let config = Config::test("create_aggr_chunk_test"); @@ -1783,7 +1932,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -1903,7 +2052,14 @@ mod tests { async fn repartition_chunk_range_merges_only_range() { // repartition_chunk_range must merge only the active persisted chunks within // [start, end], leaving the rest active, and conserve rows into the children. - let config = Config::test("repartition_chunk_range_merges_only_range"); + // Run with batched metastore RPC on so the batched insert_chunks path in + // merge_chunk_group_into_children is exercised (per-item path covered by the other + // repartition tests, which default the flag off). + let config = + Config::test("repartition_chunk_range_merges_only_range").update_config(|mut c| { + c.metastore_batch_rpc = true; + c + }); let path = "/tmp/test_repartition_range"; let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); @@ -1973,7 +2129,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2142,7 +2298,7 @@ mod tests { for _ in 0..2 { let data = rows_to_columns(&col, &[]); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2307,7 +2463,7 @@ mod tests { .collect::>(); let data = rows_to_columns(&col, &rows); let (chunk, file_size) = chunk_store - .add_chunk_columns(index.clone(), partition.clone(), data, false) + .add_chunk_columns(index.clone(), &table, partition.clone(), data, false) .await .unwrap() .await @@ -2500,21 +2656,31 @@ impl ChunkStore { in_memory: bool, ) -> Result, Option), CubeError>>>, CubeError> { let index = self.meta_store.get_index(index_id).await?; - self.partition_rows_for_index(&index, columns, in_memory) + let table = self + .meta_store + .get_table_by_id(index.get_row().table_id()) + .await?; + self.partition_rows_for_index(&index, &table, None, columns, in_memory) .await } #[tracing::instrument(level = "trace", skip(self, columns))] async fn partition_rows_for_index( &self, index: &IdRow, + table: &IdRow, + preset_partitions: Option>>, mut columns: Vec, in_memory: bool, ) -> Result, Option), CubeError>>>, CubeError> { let index_id = index.get_id(); - let partitions = self - .meta_store - .get_active_partitions_by_index_id(index_id) - .await?; + let partitions = match preset_partitions { + Some(partitions) => partitions, + None => { + self.meta_store + .get_active_partitions_by_index_id(index_id) + .await? + } + }; let sort_key_size = index.get_row().sort_key_size() as usize; let expected_columns = index.get_row().get_columns().len(); @@ -2562,6 +2728,11 @@ impl ChunkStore { } let mut futures = Vec::new(); + // The disk-space check resolves to a per-node total, so checking each distinct target + // node once is enough; this avoids one metastore RPC per partition written. When the + // limit is disabled (the common case) skip the node-name/dedup work entirely. + let disk_check_enabled = !in_memory && self.config.max_disk_space_per_worker() != 0; + let mut checked_nodes: HashSet = HashSet::new(); for partition in partitions.into_iter() { let min = partition.get_row().get_min_val().as_ref(); let max = partition.get_row().get_max_val().as_ref(); @@ -2583,7 +2754,10 @@ impl ChunkStore { ) > Ordering::Equal) }); if to_write.len() > 0 { - if !in_memory { + if disk_check_enabled + && checked_nodes + .insert(node_name_by_partition(self.config.as_ref(), &partition)) + { self.check_node_disk_space(&partition).await?; } let to_write = UInt64Array::from(to_write); @@ -2591,10 +2765,13 @@ impl ChunkStore { .iter() .map(|c| datafusion::arrow::compute::take(c.as_ref(), &to_write, None)) .collect::, _>>()?; - let columns = self.post_process_columns(index.clone(), columns).await?; + let columns = self + .post_process_columns(index.clone(), table, columns) + .await?; futures.push(self.add_chunk_columns( index.clone(), + table, partition.clone(), columns, in_memory, @@ -2651,15 +2828,12 @@ impl ChunkStore { async fn post_process_columns( &self, index: IdRow, + table: &IdRow
, data: Vec, ) -> Result, CubeError> { match index.get_row().get_type() { IndexType::Regular => Ok(data), IndexType::Aggregate => { - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; let schema = Arc::new(arrow_schema(&index.get_row())); let batch = RecordBatch::try_new(schema.clone(), data)?; @@ -2731,6 +2905,7 @@ impl ChunkStore { async fn add_chunk_columns( &self, index: IdRow, + table: &IdRow
, partition: IdRow, data: Vec, in_memory: bool, @@ -2767,18 +2942,13 @@ impl ChunkStore { let metadata_cache_factory: Arc = self.metadata_cache_factory.clone(); - let table = self - .meta_store - .get_table_by_id(index.get_row().table_id()) - .await?; - let parquet = ParquetTableStore::new( index.get_row().clone(), ROW_GROUP_SIZE, metadata_cache_factory, ); - let writer_props = parquet.writer_props(&table).await?; + let writer_props = parquet.writer_props(table).await?; cube_ext::spawn_blocking(move || -> Result<(), CubeError> { parquet.write_data_given_props(&local_file_copy, data, writer_props) }) @@ -2804,8 +2974,43 @@ impl ChunkStore { in_memory: bool, ) -> Result, CubeError> { let mut rows = rows.0; + // The table is the same for every index/chunk produced by this call, so load it once + // here instead of re-fetching it per chunk over the metastore RPC downstream. + let table = self.meta_store.get_table_by_id(table_id).await?; + // When batching is enabled, fetch the active partitions of all indexes in one RPC + // instead of one per index inside partition_rows_for_index. The result is positionally + // aligned with `indexes` (result[i] holds index indexes[i]'s active partitions). + let mut partitions_by_index = if self.config.metastore_batch_rpc() { + let index_ids = indexes.iter().map(|i| i.get_id()).collect::>(); + let fetched = self + .meta_store + .get_active_partitions_for_indexes(index_ids) + .await?; + if fetched.len() != indexes.len() { + return Err(CubeError::internal(format!( + "Batched active-partition fetch returned {} entries for {} indexes", + fetched.len(), + indexes.len() + ))); + } + // Guard the positional pairing below: each entry's partitions must belong to the + // index at the same position. Cheap insurance against a future caller passing a + // reordered/sub-selected index slice. + debug_assert!( + fetched + .iter() + .zip(indexes.iter()) + .all(|(parts, index)| parts + .iter() + .all(|p| p.get_row().get_index_id() == index.get_id())), + "batched active-partition fetch is not aligned with the requested indexes" + ); + Some(fetched) + } else { + None + }; let mut futures = Vec::new(); - for index in indexes.iter() { + for (i, index) in indexes.iter().enumerate() { let index_columns = index.get_row().columns(); let index_columns_copy = index_columns.clone(); let columns = columns.to_vec(); @@ -2816,7 +3021,16 @@ impl ChunkStore { .await?; let remapped = remapped?; rows = rows_again; - futures.push(self.partition_rows_for_index(&index, remapped, in_memory)); + let preset_partitions = partitions_by_index + .as_mut() + .map(|all| std::mem::take(&mut all[i])); + futures.push(self.partition_rows_for_index( + &index, + &table, + preset_partitions, + remapped, + in_memory, + )); } let new_chunks = join_all(futures)