Skip to content
Merged
23 changes: 23 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,10 @@ pub trait ConfigObj: DIService {

fn disk_space_cache_duration_secs(&self) -> u64;

fn disk_space_compute_lock_timeout_ms(&self) -> u64;

fn metastore_batch_rpc(&self) -> bool;

fn transport_max_message_size(&self) -> usize;
fn transport_max_frame_size(&self) -> usize;

Expand Down Expand Up @@ -764,6 +768,8 @@ pub struct ConfigObjImpl {
pub max_disk_space: u64,
pub max_disk_space_per_worker: u64,
pub disk_space_cache_duration_secs: u64,
pub disk_space_compute_lock_timeout_ms: u64,
pub metastore_batch_rpc: bool,
pub transport_max_message_size: usize,
pub transport_max_frame_size: usize,
pub local_files_cleanup_interval_secs: u64,
Expand Down Expand Up @@ -1172,6 +1178,14 @@ impl ConfigObj for ConfigObjImpl {
self.disk_space_cache_duration_secs
}

fn disk_space_compute_lock_timeout_ms(&self) -> u64 {
self.disk_space_compute_lock_timeout_ms
}

fn metastore_batch_rpc(&self) -> bool {
self.metastore_batch_rpc
}

fn transport_max_message_size(&self) -> usize {
self.transport_max_message_size
}
Expand Down Expand Up @@ -1850,6 +1864,13 @@ impl Config {
* 1024
* 1024,
disk_space_cache_duration_secs: 300,
disk_space_compute_lock_timeout_ms: env_parse_duration(
"CUBESTORE_DISK_SPACE_LOCK_WAIT_MS",
1000,
Some(60_000),
None,
),
Comment thread
claude[bot] marked this conversation as resolved.
metastore_batch_rpc: env_parse("CUBESTORE_METASTORE_BATCH_RPC", false),
transport_max_message_size,
transport_max_frame_size: env_parse_size(
"CUBESTORE_TRANSPORT_MAX_FRAME_SIZE",
Expand Down Expand Up @@ -2045,6 +2066,8 @@ impl Config {
max_disk_space: 0,
max_disk_space_per_worker: 0,
disk_space_cache_duration_secs: 0,
disk_space_compute_lock_timeout_ms: 1000,
metastore_batch_rpc: false,
transport_max_message_size: 64 << 20,
transport_max_frame_size: 16 << 20,
local_files_cleanup_interval_secs: 600,
Expand Down
218 changes: 208 additions & 10 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::metastore::wal::{WALIndexKey, WALRocksIndex};

use crate::table::{Row, TableValue};

use crate::util::lock::acquire_lock;
use crate::util::lock::{acquire_lock, acquire_lock_duration};
use crate::util::WorkerLoop;
use crate::{meta_store_table_impl, CubeError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
Expand Down Expand Up @@ -884,6 +884,10 @@ pub trait MetaStore: DIService + Send + Sync {

fn partition_table(&self) -> PartitionMetaStoreTable;
async fn create_partition(&self, partition: Partition) -> Result<IdRow<Partition>, CubeError>;
async fn create_partitions(
&self,
partitions: Vec<Partition>,
) -> Result<Vec<IdRow<Partition>>, CubeError>;
async fn get_partition(&self, partition_id: u64) -> Result<IdRow<Partition>, CubeError>;
async fn get_partition_out_of_queue(
&self,
Expand Down Expand Up @@ -973,6 +977,13 @@ pub trait MetaStore: DIService + Send + Sync {
&self,
index_id: u64,
) -> Result<Vec<IdRow<Partition>>, CubeError>;
/// Active partitions for each index id, positionally aligned with `index_ids`
/// (result[i] corresponds to index_ids[i]). Returns a Vec rather than a map because the
/// metastore RPC serializes with flexbuffers, which rejects non-string map keys.
async fn get_active_partitions_for_indexes(
&self,
index_ids: Vec<u64>,
) -> Result<Vec<Vec<IdRow<Partition>>>, CubeError>;
async fn get_index(&self, index_id: u64) -> Result<IdRow<Index>, CubeError>;

async fn get_index_with_active_partitions_out_of_queue(
Expand Down Expand Up @@ -2760,6 +2771,21 @@ impl MetaStore for RocksMetaStore {
.await
}

async fn create_partitions(
&self,
partitions: Vec<Partition>,
) -> Result<Vec<IdRow<Partition>>, CubeError> {
self.write_operation("create_partitions", move |db_ref, batch_pipe| {
let table = PartitionRocksTable::new(db_ref.clone());
let mut result = Vec::with_capacity(partitions.len());
for partition in partitions {
result.push(table.insert(partition, batch_pipe)?);
}
Ok(result)
})
.await
}
Comment on lines +2774 to +2787

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding #[tracing::instrument(level = "trace", skip(self, partitions))] to match create_partition above — keeps tracing parity between the two paths when toggling the flag during incident triage. Same suggestion applies to get_active_partitions_for_indexes below.


#[tracing::instrument(level = "trace", skip(self))]
async fn get_partition(&self, partition_id: u64) -> Result<IdRow<Partition>, CubeError> {
self.read_operation("get_partition", move |db_ref| {
Expand Down Expand Up @@ -2829,21 +2855,25 @@ impl MetaStore for RocksMetaStore {
// Single-flight: serialize the scan so a burst of concurrent callers
// (e.g. many partition writes during an import/repartition) share one
// computation instead of each materializing a full metastore scan.
let _compute_guard =
match acquire_lock("disk space compute", self.disk_space_compute_lock.lock()).await
{
Ok(guard) => guard,
Err(e) => {
log::error!(
let _compute_guard = match acquire_lock_duration(
"disk space compute",
self.disk_space_compute_lock.lock(),
Duration::from_millis(self.store.config.disk_space_compute_lock_timeout_ms()),
)
.await
{
Ok(guard) => guard,
Err(e) => {
log::error!(
"Timed out waiting for the disk space scan lock: {}. The single-flight \
scan is stuck; reporting 0 used disk space so the disk-space check \
passes. THE DISK-SPACE LIMIT IS NOT BEING ENFORCED until the scan \
recovers.",
e
);
return Ok(0);
}
};
return Ok(0);
}
};
if let Some(sizes) = self.disk_space_cached().await? {
sizes
} else {
Expand Down Expand Up @@ -3598,6 +3628,29 @@ impl MetaStore for RocksMetaStore {
.await
}

async fn get_active_partitions_for_indexes(
&self,
index_ids: Vec<u64>,
) -> Result<Vec<Vec<IdRow<Partition>>>, CubeError> {
self.read_operation_out_of_queue("get_active_partitions_for_indexes", move |db_ref| {
let rocks_partition = PartitionRocksTable::new(db_ref);
let mut result = Vec::with_capacity(index_ids.len());
for index_id in index_ids {
let partitions = rocks_partition
.get_rows_by_index(
&PartitionIndexKey::ByIndexId(index_id),
&PartitionRocksIndex::IndexId,
)?
.into_iter()
.filter(|r| r.get_row().active)
.collect::<Vec<_>>();
result.push(partitions);
}
Ok(result)
})
.await
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_index(&self, index_id: u64) -> Result<IdRow<Index>, CubeError> {
self.read_operation("get_index", move |db_ref| {
Expand Down Expand Up @@ -5801,6 +5854,151 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn get_active_partitions_for_indexes_test() -> Result<(), CubeError> {
init_test_logger().await;

let (_remote_fs, meta_store) =
RocksMetaStore::prepare_test_metastore("get_active_partitions_for_indexes");

meta_store.create_schema("foo".to_string(), false).await?;
let columns = vec![
Column::new("col1".to_string(), ColumnType::Int, 0),
Column::new("col2".to_string(), ColumnType::String, 1),
];
// Two tables → two default indexes, each with its own initial active partition.
let table1 = meta_store
.create_table(
"foo".to_string(),
"t1".to_string(),
columns.clone(),
None,
None,
vec![],
true,
None,
None,
None,
None,
None,
None,
None,
None,
None,
false,
None,
)
.await?;
let table2 = meta_store
.create_table(
"foo".to_string(),
"t2".to_string(),
columns.clone(),
None,
None,
vec![],
true,
None,
None,
None,
None,
None,
None,
None,
None,
None,
false,
None,
)
.await?;

let index1 = meta_store.get_default_index(table1.get_id()).await?;
let index2 = meta_store.get_default_index(table2.get_id()).await?;

let single1 = meta_store
.get_active_partitions_by_index_id(index1.get_id())
.await?;
let single2 = meta_store
.get_active_partitions_by_index_id(index2.get_id())
.await?;

// Batch result is positionally aligned with the requested ids; it must match the
// per-index calls and return an empty vec (not an error) for the unknown index.
let unknown_index_id = index2.get_id() + 1000;
let batch = meta_store
.get_active_partitions_for_indexes(vec![
index1.get_id(),
index2.get_id(),
unknown_index_id,
])
.await?;

let ids = |ps: &Vec<IdRow<Partition>>| ps.iter().map(|p| p.get_id()).collect::<Vec<_>>();
assert_eq!(batch.len(), 3);
assert_eq!(ids(&batch[0]), ids(&single1));
assert_eq!(ids(&batch[1]), ids(&single2));
assert!(batch[2].is_empty());

Ok(())
}

#[tokio::test]
async fn create_partitions_test() -> Result<(), CubeError> {
init_test_logger().await;

let (_remote_fs, meta_store) = RocksMetaStore::prepare_test_metastore("create_partitions");

meta_store.create_schema("foo".to_string(), false).await?;
let columns = vec![Column::new("col1".to_string(), ColumnType::Int, 0)];
let table = meta_store
.create_table(
"foo".to_string(),
"t1".to_string(),
columns,
None,
None,
vec![],
true,
None,
None,
None,
None,
None,
None,
None,
None,
None,
false,
None,
)
.await?;
let index = meta_store.get_default_index(table.get_id()).await?;
let parent = meta_store
.get_active_partitions_by_index_id(index.get_id())
.await?[0]
.clone();

let created = meta_store
.create_partitions(vec![
Partition::new_child(&parent, None),
Partition::new_child(&parent, None),
])
.await?;

assert_eq!(created.len(), 2);
assert_ne!(created[0].get_id(), created[1].get_id());
// Both rows must be persisted and point at the same parent partition.
for child in &created {
let fetched = meta_store.get_partition(child.get_id()).await?;
assert_eq!(
fetched.get_row().parent_partition_id(),
&Some(parent.get_id())
);
}

Ok(())
}

#[tokio::test]
async fn table_test() -> Result<(), CubeError> {
init_test_logger().await;
Expand Down
14 changes: 14 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ impl MetaStore for MetaStoreMock {
panic!("MetaStore mock!")
}

async fn create_partitions(
&self,
_partitions: Vec<Partition>,
) -> Result<Vec<IdRow<Partition>>, CubeError> {
panic!("MetaStore mock!")
}

async fn get_partition(&self, _partition_id: u64) -> Result<IdRow<Partition>, CubeError> {
panic!("MetaStore mock!")
}
Expand Down Expand Up @@ -330,6 +337,13 @@ impl MetaStore for MetaStoreMock {
panic!("MetaStore mock!")
}

async fn get_active_partitions_for_indexes(
&self,
_index_ids: Vec<u64>,
) -> Result<Vec<Vec<IdRow<Partition>>>, CubeError> {
panic!("MetaStore mock!")
}

async fn get_index(&self, _index_id: u64) -> Result<IdRow<Index>, CubeError> {
panic!("MetaStore mock!")
}
Expand Down
Loading
Loading