diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index 6b934808ab269..bb02b94845aac 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -18,6 +18,9 @@ use databend_common_ast::ast; use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::types::UInt64Type; use databend_common_meta_app::schema::TableIndexType; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::FuseTable; @@ -86,18 +89,18 @@ impl Interpreter for RefreshTableIndexInterpreter { let index_version = index.version.clone(); let index_schema = table_schema.project(&field_indices); - let mut build_res = PipelineBuildResult::create(); let fuse_table = FuseTable::try_from_table(table.as_ref())?; let index_type = match self.plan.index_type { ast::TableIndexType::Inverted => TableIndexType::Inverted, ast::TableIndexType::Ngram => TableIndexType::Ngram, ast::TableIndexType::Vector => TableIndexType::Vector, - ast::TableIndexType::Spatial => todo!(), + ast::TableIndexType::Spatial => TableIndexType::Spatial, ast::TableIndexType::Aggregating => unreachable!(), }; - match self.plan.index_type { + let mut build_res = PipelineBuildResult::create(); + let refreshed_blocks = match self.plan.index_type { ast::TableIndexType::Inverted => { // TODO: Refactor refresh inverted index fuse_table @@ -110,22 +113,34 @@ impl Interpreter for RefreshTableIndexInterpreter { segment_locs, &mut build_res.main_pipeline, ) - .await?; + .await? } _ => { - assert!(segment_locs.is_none()); do_refresh_table_index( fuse_table, self.ctx.clone(), index_name, index_type, index_schema.into(), + segment_locs, &mut build_res.main_pipeline, ) - .await?; + .await? } + }; + + let result_block = + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![refreshed_blocks])]); + + if build_res.main_pipeline.is_empty() { + return PipelineBuildResult::from_blocks(vec![result_block]); } - Ok(build_res) + let mut result_res = PipelineBuildResult::from_blocks(vec![result_block])?; + result_res + .sources_pipelines + .extend(build_res.sources_pipelines); + result_res.sources_pipelines.push(build_res.main_pipeline); + Ok(result_res) } } diff --git a/src/query/service/tests/it/indexes/inverted_index/index_refresh.rs b/src/query/service/tests/it/indexes/inverted_index/index_refresh.rs index f48eff3e83e83..8443914f5c8da 100644 --- a/src/query/service/tests/it/indexes/inverted_index/index_refresh.rs +++ b/src/query/service/tests/it/indexes/inverted_index/index_refresh.rs @@ -38,6 +38,7 @@ use databend_query::test_kits::append_string_sample_data; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; use databend_storages_common_io::ReadSettings; +use futures_util::TryStreamExt; use tantivy::schema::IndexRecordOption; #[tokio::test(flavor = "multi_thread")] @@ -93,7 +94,11 @@ async fn test_fuse_do_refresh_inverted_index() -> anyhow::Result<()> { segment_locs: None, }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; let new_table = table.refresh(ctx.as_ref()).await?; let new_fuse_table = FuseTable::create_without_refresh_table_info( diff --git a/src/query/service/tests/it/indexes/inverted_index/pruning.rs b/src/query/service/tests/it/indexes/inverted_index/pruning.rs index bcadedaadc549..a7c0dae0fa5fc 100644 --- a/src/query/service/tests/it/indexes/inverted_index/pruning.rs +++ b/src/query/service/tests/it/indexes/inverted_index/pruning.rs @@ -54,6 +54,7 @@ use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; +use futures_util::TryStreamExt; use opendal::Operator; async fn apply_block_pruning( @@ -128,7 +129,11 @@ async fn test_block_pruner() -> anyhow::Result<()> { }; let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; // get table let catalog = ctx.get_catalog("default").await?; @@ -539,7 +544,11 @@ async fn test_block_pruner() -> anyhow::Result<()> { segment_locs: None, }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; let new_table = table.refresh(ctx.as_ref()).await?; let fuse_table = FuseTable::create_without_refresh_table_info( diff --git a/src/query/service/tests/it/indexes/spatial_index/index_refresh.rs b/src/query/service/tests/it/indexes/spatial_index/index_refresh.rs new file mode 100644 index 0000000000000..45d84b541b80e --- /dev/null +++ b/src/query/service/tests/it/indexes/spatial_index/index_refresh.rs @@ -0,0 +1,265 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashSet; +use std::sync::Arc; + +use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::TableDataType; +use databend_common_expression::types::GeometryType; +use databend_common_expression::types::number::UInt64Type; +use databend_common_io::geometry::geometry_from_str; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::schema::CreateTableIndexReq; +use databend_common_meta_app::schema::TableIndexType; +use databend_common_sql::plans::RefreshTableIndexPlan; +use databend_common_storage::read_parquet_schema_async_rs; +use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::TableContext; +use databend_common_storages_fuse::io::MetaReaders; +use databend_query::interpreters::Interpreter; +use databend_query::interpreters::RefreshTableIndexInterpreter; +use databend_query::sessions::QueryContext; +use databend_query::test_kits::*; +use databend_storages_common_cache::LoadParams; +use futures_util::TryStreamExt; + +fn build_block(rows: &[(u64, &str, &str)]) -> Result { + let ids = rows.iter().map(|(id, _, _)| *id).collect::>(); + let geom1 = rows + .iter() + .map(|(_, g1, _)| geometry_from_str(&format!("SRID=4326;{g1}"), None)) + .collect::>>()?; + let geom2 = rows + .iter() + .map(|(_, _, g2)| geometry_from_str(&format!("SRID=4326;{g2}"), None)) + .collect::>>()?; + + Ok(DataBlock::new_from_columns(vec![ + UInt64Type::from_data(ids).wrap_nullable(None), + GeometryType::from_data(geom1).wrap_nullable(None), + GeometryType::from_data(geom2).wrap_nullable(None), + ])) +} + +fn find_column_id(table: &Arc, column_name: &str) -> u32 { + table + .get_table_info() + .meta + .schema + .fields() + .iter() + .find(|field| field.name() == column_name) + .unwrap() + .column_id() +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_do_refresh_spatial_index() -> anyhow::Result<()> { + let fixture = TestFixture::setup().await?; + + fixture + .default_session() + .get_settings() + .set_data_retention_time_in_days(0)?; + fixture.create_default_database().await?; + + let ctx = fixture.new_query_ctx().await?; + let catalog = ctx.get_catalog(&fixture.default_catalog_name()).await?; + + let sql = format!( + "CREATE TABLE {}.{}.{} ( + id UInt64, + geom1 Geometry, + geom2 Geometry + ) ENGINE=FUSE", + fixture.default_catalog_name(), + fixture.default_db_name(), + fixture.default_table_name() + ); + fixture.execute_command(&sql).await?; + + let table = fixture.latest_default_table().await?; + let blocks = vec![ + build_block(&[ + (1, "POINT(0 0)", "POINT(100 100)"), + (2, "POINT(1 1)", "POINT(101 101)"), + (3, "POINT(2 2)", "POINT(102 102)"), + ])?, + build_block(&[ + (4, "POINT(10 10)", "POINT(200 200)"), + (5, "POINT(11 11)", "POINT(201 201)"), + (6, "POINT(12 12)", "POINT(202 202)"), + ])?, + ]; + fixture + .append_commit_blocks(table.clone(), blocks, false, true) + .await?; + + let table = table.refresh(ctx.as_ref()).await?; + let table_id = table.get_id(); + let tenant = ctx.get_tenant(); + let geom1_column_id = find_column_id(&table, "geom1"); + let geom2_column_id = find_column_id(&table, "geom2"); + + let index_name1 = "idx_geom1".to_string(); + let req1 = CreateTableIndexReq { + create_option: CreateOption::Create, + table_id, + tenant: tenant.clone(), + name: index_name1.clone(), + column_ids: vec![geom1_column_id], + sync_creation: false, + options: BTreeMap::new(), + index_type: TableIndexType::Spatial, + }; + catalog.create_table_index(req1).await?; + + let index_name2 = "idx_geom2".to_string(); + let req2 = CreateTableIndexReq { + create_option: CreateOption::Create, + table_id, + tenant, + name: index_name2.clone(), + column_ids: vec![geom2_column_id], + sync_creation: false, + options: BTreeMap::new(), + index_type: TableIndexType::Spatial, + }; + catalog.create_table_index(req2).await?; + + let refresh_index_plan1 = RefreshTableIndexPlan { + index_type: databend_common_ast::ast::TableIndexType::Spatial, + catalog: fixture.default_catalog_name(), + database: fixture.default_db_name(), + table: fixture.default_table_name(), + index_name: index_name1.clone(), + segment_locs: None, + }; + let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan1)?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; + + check_index_data(ctx.clone(), table.clone(), vec![index_name1.clone()]).await?; + + let ctx = fixture.new_query_ctx().await?; + let refresh_index_plan2 = RefreshTableIndexPlan { + index_type: databend_common_ast::ast::TableIndexType::Spatial, + catalog: fixture.default_catalog_name(), + database: fixture.default_db_name(), + table: fixture.default_table_name(), + index_name: index_name2.clone(), + segment_locs: None, + }; + let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan2)?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; + + check_index_data(ctx.clone(), table.clone(), vec![index_name1, index_name2]).await?; + + Ok(()) +} + +async fn check_index_data( + ctx: Arc, + table: Arc, + index_names: Vec, +) -> Result<()> { + let new_table = table.refresh(ctx.as_ref()).await?; + let table_info = new_table.get_table_info().clone(); + let table_indexes = table_info.meta.indexes.clone(); + let geometry_column_ids = table_info + .meta + .schema + .fields() + .iter() + .filter(|field| matches!(field.data_type().remove_nullable(), TableDataType::Geometry)) + .map(|field| field.column_id()) + .collect::>(); + + let new_fuse_table = FuseTable::create_without_refresh_table_info( + table_info, + ctx.get_settings().get_s3_storage_class()?, + )?; + + let snapshot = new_fuse_table.read_table_snapshot().await?.unwrap(); + let dal = new_fuse_table.get_operator_ref(); + let segment_reader = + MetaReaders::segment_info_reader(new_fuse_table.get_operator(), new_fuse_table.schema()); + + let mut block_metas = Vec::new(); + for (segment_loc, ver) in &snapshot.segments { + let segment_info = segment_reader + .read(&LoadParams { + location: segment_loc.to_string(), + len_hint: None, + ver: *ver, + put_cache: false, + }) + .await?; + for block_meta in segment_info.block_metas()? { + block_metas.push(block_meta); + } + } + + for block_meta in block_metas { + assert!(block_meta.spatial_index_location.is_some()); + assert!(block_meta.spatial_index_size.is_some()); + + let path = block_meta.spatial_index_location.clone().unwrap(); + let file_size = block_meta.spatial_index_size; + let index_schema = read_parquet_schema_async_rs(dal, &path.0, file_size).await?; + let spatial_stats = block_meta.spatial_stats.clone().unwrap(); + + let mut expected_fields = HashSet::new(); + for index_name in &index_names { + let table_index = table_indexes.get(index_name).unwrap(); + let index_version = index_schema.metadata.get(index_name).unwrap(); + assert_eq!(index_version, &table_index.version); + + for column_id in &table_index.column_ids { + expected_fields.insert(column_id.to_string()); + + let spatial_stat = spatial_stats.get(column_id).unwrap(); + assert!(spatial_stat.is_valid); + assert_eq!(spatial_stat.srid, 4326); + } + } + + for field in index_schema.fields() { + assert!(expected_fields.remove(field.name())); + } + assert!(expected_fields.is_empty()); + + for column_id in &geometry_column_ids { + let spatial_stat = spatial_stats.get(column_id).unwrap(); + assert!(spatial_stat.is_valid); + assert_eq!(spatial_stat.srid, 4326); + } + assert_eq!(spatial_stats.len(), geometry_column_ids.len()); + } + + Ok(()) +} diff --git a/src/query/service/tests/it/indexes/spatial_index/mod.rs b/src/query/service/tests/it/indexes/spatial_index/mod.rs index 7791be1573836..f882d23237f1e 100644 --- a/src/query/service/tests/it/indexes/spatial_index/mod.rs +++ b/src/query/service/tests/it/indexes/spatial_index/mod.rs @@ -13,5 +13,6 @@ // limitations under the License. mod builder; +mod index_refresh; mod pruning; mod runtime_filter; diff --git a/src/query/service/tests/it/indexes/vector_index/index_refresh.rs b/src/query/service/tests/it/indexes/vector_index/index_refresh.rs index ae3d83512948c..48e99f04f251a 100644 --- a/src/query/service/tests/it/indexes/vector_index/index_refresh.rs +++ b/src/query/service/tests/it/indexes/vector_index/index_refresh.rs @@ -32,6 +32,7 @@ use databend_query::interpreters::RefreshTableIndexInterpreter; use databend_query::sessions::QueryContext; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; +use futures_util::TryStreamExt; #[tokio::test(flavor = "multi_thread")] async fn test_fuse_do_refresh_vector_index() -> anyhow::Result<()> { @@ -136,7 +137,11 @@ async fn test_fuse_do_refresh_vector_index() -> anyhow::Result<()> { }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan1)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; let index_names = vec![index_name1.clone()]; check_index_data(ctx.clone(), table.clone(), index_names).await?; @@ -152,7 +157,11 @@ async fn test_fuse_do_refresh_vector_index() -> anyhow::Result<()> { }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan2)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; let index_names = vec![index_name1.clone(), index_name2.clone()]; check_index_data(ctx.clone(), table.clone(), index_names).await?; @@ -189,7 +198,11 @@ async fn test_fuse_do_refresh_vector_index() -> anyhow::Result<()> { }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan3)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; let index_names = vec![index_name1, index_name2]; check_index_data(ctx.clone(), table.clone(), index_names).await?; diff --git a/src/query/service/tests/it/indexes/vector_index/pruning.rs b/src/query/service/tests/it/indexes/vector_index/pruning.rs index 55583879096be..b0d39e5766ba5 100644 --- a/src/query/service/tests/it/indexes/vector_index/pruning.rs +++ b/src/query/service/tests/it/indexes/vector_index/pruning.rs @@ -63,6 +63,7 @@ use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; +use futures_util::TryStreamExt; use opendal::Operator; async fn apply_block_pruning( @@ -157,7 +158,11 @@ async fn test_block_pruner() -> anyhow::Result<()> { }; let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?; - let _ = interpreter.execute(ctx.clone()).await?; + let _ = interpreter + .execute(ctx.clone()) + .await? + .try_collect::>() + .await?; // get table let catalog = ctx.get_catalog("default").await?; diff --git a/src/query/sql/src/planner/binder/ddl/index.rs b/src/query/sql/src/planner/binder/ddl/index.rs index efd8ceea33a51..9e9bfef8083fa 100644 --- a/src/query/sql/src/planner/binder/ddl/index.rs +++ b/src/query/sql/src/planner/binder/ddl/index.rs @@ -946,7 +946,10 @@ impl Binder { if !matches!( index_type, - AstTableIndexType::Inverted | AstTableIndexType::Ngram | AstTableIndexType::Vector + AstTableIndexType::Inverted + | AstTableIndexType::Ngram + | AstTableIndexType::Vector + | AstTableIndexType::Spatial ) { return Err(ErrorCode::UnsupportedIndex(format!( "Table index {} does not support refresh", diff --git a/src/query/sql/src/planner/plans/ddl/index.rs b/src/query/sql/src/planner/plans/ddl/index.rs index 3cd003ebb196f..3b32e65898ea7 100644 --- a/src/query/sql/src/planner/plans/ddl/index.rs +++ b/src/query/sql/src/planner/plans/ddl/index.rs @@ -13,9 +13,15 @@ // limitations under the License. use std::collections::BTreeMap; +use std::sync::Arc; use databend_common_ast::ast::TableIndexType; use databend_common_expression::ColumnId; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::TableInfo; @@ -85,3 +91,12 @@ pub struct RefreshTableIndexPlan { pub index_name: String, pub segment_locs: Option>, } + +impl RefreshTableIndexPlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::new(vec![DataField::new( + "refreshed_blocks", + DataType::Number(NumberDataType::UInt64), + )])) + } +} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 85e8d0bcefb23..4ed393d4ef152 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -597,6 +597,7 @@ impl Plan { Plan::DescribeTask(plan) => plan.schema(), Plan::RefreshVirtualColumn(plan) => plan.schema(), Plan::VacuumVirtualColumn(plan) => plan.schema(), + Plan::RefreshTableIndex(plan) => plan.schema(), Plan::ShowTasks(plan) => plan.schema(), Plan::ExecuteTask(plan) => plan.schema(), Plan::DescRowAccessPolicy(plan) => plan.schema(), diff --git a/src/query/storages/common/table_meta/src/meta/v2/statistics.rs b/src/query/storages/common/table_meta/src/meta/v2/statistics.rs index 8bfe05d7ab638..5ae0a4454bdc8 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/statistics.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/statistics.rs @@ -73,7 +73,7 @@ pub struct ClusterStatistics { pub pages: Option>, } -/// Spatial statistics for geometry/geography columns. +/// Spatial statistics for geometry columns. #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Default, FrozenAPI)] pub struct SpatialStatistics { pub min_x: OrderedFloat, diff --git a/src/query/storages/fuse/src/io/write/spatial_index_writer.rs b/src/query/storages/fuse/src/io/write/spatial_index_writer.rs index fb298449a8737..37e38ae649884 100644 --- a/src/query/storages/fuse/src/io/write/spatial_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/spatial_index_writer.rs @@ -36,7 +36,9 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_meta_app::schema::TableIndexType; use databend_common_metrics::storage::metrics_inc_block_spatial_index_generate_milliseconds; use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SingleColumnMeta; use databend_storages_common_table_meta::meta::StatisticsOfSpatialColumns; use databend_storages_common_table_meta::table::TableCompression; use geo::algorithm::bounding_rect::BoundingRect; @@ -44,8 +46,10 @@ use geo_index::rtree::RTreeBuilder; use geo_index::rtree::sort::HilbertSort; use log::debug; use log::info; +use opendal::Operator; use parquet::file::metadata::KeyValue; +use crate::io::read::load_spatial_index_files; use crate::statistics::SpatialStatsBuilder; #[derive(Debug, Clone)] @@ -210,33 +214,9 @@ impl SpatialIndexBuilder { ); if let Some(result) = self.build_spatial_index()? { - let SpatialIndexResult { - index_fields, - index_columns, - metadata, - } = result; - - let index_schema = TableSchemaRefExt::create(index_fields); - let index_block = DataBlock::new(index_columns, 1); - - let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); - let _ = blocks_to_parquet( - index_schema.as_ref(), - vec![index_block], - &mut data, - // Zstd has the best compression ratio - TableCompression::Zstd, - // No dictionary page for spatial index - false, - Some(metadata), - )?; - - let size = data.len() as u64; - index_state = Some(SpatialIndexState { - location: location.clone(), - size, - data, - }); + let state = Self::serialize_spatial_index(result, location)?; + let size = state.size; + index_state = Some(state); // Perf. let elapsed_ms = start.elapsed().as_millis() as u64; @@ -257,6 +237,91 @@ impl SpatialIndexBuilder { }) } + #[async_backtrace::framed] + pub async fn finalize_with_existing( + &mut self, + operator: Operator, + settings: &ReadSettings, + location: &Location, + existing_location: Option<&Location>, + existing_column_metas: Option>, + existing_index_meta: Option>, + ) -> Result { + if existing_location.is_none() + || (existing_column_metas.is_none() && existing_index_meta.is_none()) + { + return self.finalize(location); + } + + let start = Instant::now(); + info!( + "Start build merged spatial R-Tree index for location: {}", + location.0 + ); + + let existing_location = existing_location.unwrap(); + let existing_column_metas = existing_column_metas.unwrap_or_default(); + let existing_column_names = existing_column_metas + .iter() + .map(|(name, _)| name.clone()) + .collect::>(); + let existing_columns = if existing_column_names.is_empty() { + Vec::new() + } else { + load_spatial_index_files( + operator, + settings, + &existing_column_names, + &existing_location.0, + ) + .await? + }; + + let mut result = self.build_spatial_index()?.unwrap_or(SpatialIndexResult { + index_fields: Vec::new(), + index_columns: Vec::new(), + metadata: Vec::new(), + }); + + for (name, _) in existing_column_metas.into_iter() { + result + .index_fields + .push(TableField::new(&name, TableDataType::Binary)); + } + for existing_column in existing_columns.into_iter() { + result + .index_columns + .push(BlockEntry::Column(existing_column)); + } + if let Some(existing_index_meta) = existing_index_meta { + for (key, value) in existing_index_meta { + result.metadata.push(KeyValue { + key, + value: Some(value), + }); + } + } + + let index_state = if result.index_fields.is_empty() { + None + } else { + Some(Self::serialize_spatial_index(result, location)?) + }; + let spatial_stats = self.finalize_spatial_stats(); + + let elapsed_ms = start.elapsed().as_millis() as u64; + metrics_inc_block_spatial_index_generate_milliseconds(elapsed_ms); + info!( + "Finish build merged spatial index: location={}, cost={} ms", + location.0, elapsed_ms + ); + + Ok(SpatialIndexBuildResult { + index_state, + spatial_stats, + }) + } + fn build_spatial_index(&mut self) -> Result> { let mut columns = HashMap::new(); for offset in &self.field_offsets_set { @@ -354,6 +419,37 @@ impl SpatialIndexBuilder { } (!statistics.is_empty()).then_some(statistics) } + + fn serialize_spatial_index( + result: SpatialIndexResult, + location: &Location, + ) -> Result { + let SpatialIndexResult { + index_fields, + index_columns, + metadata, + } = result; + + let index_schema = TableSchemaRefExt::create(index_fields); + let index_block = DataBlock::new(index_columns, 1); + + let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); + let _ = blocks_to_parquet( + index_schema.as_ref(), + vec![index_block], + &mut data, + TableCompression::Zstd, + false, + Some(metadata), + )?; + + let size = data.len() as u64; + Ok(SpatialIndexState { + location: location.clone(), + size, + data, + }) + } } struct SpatialIndexResult { diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index f9eefd6e23d71..c93ee0cbe5760 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -81,9 +81,9 @@ impl FuseTable { index_schema: TableSchemaRef, segment_locs: Option>, pipeline: &mut Pipeline, - ) -> Result<()> { + ) -> Result { let Some(snapshot) = self.read_table_snapshot().await? else { - return Ok(()); + return Ok(0); }; let table_schema = self.schema(); @@ -113,7 +113,7 @@ impl FuseTable { }; if segment_locs.is_empty() { - return Ok(()); + return Ok(0); } let operator = self.get_operator_ref(); @@ -143,7 +143,7 @@ impl FuseTable { } } if block_metas.is_empty() { - return Ok(()); + return Ok(0); } let data_schema = Arc::new(DataSchema::from(index_schema.as_ref())); @@ -182,7 +182,7 @@ impl FuseTable { pipeline.try_resize(1)?; pipeline.add_sink(|input| InvertedIndexSink::try_create(input, block_nums))?; - Ok(()) + Ok(block_nums as u64) } } diff --git a/src/query/storages/fuse/src/operations/table_index.rs b/src/query/storages/fuse/src/operations/table_index.rs index cb4838939ea12..937a8dd772013 100644 --- a/src/query/storages/fuse/src/operations/table_index.rs +++ b/src/query/storages/fuse/src/operations/table_index.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::fmt::Debug; use std::fmt::Formatter; @@ -25,6 +26,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::TableDataType; use databend_common_expression::TableField; @@ -48,6 +50,7 @@ use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::BlockHLLState; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ExtendedBlockMeta; +use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::RawBlockHLL; use databend_storages_common_table_meta::meta::SegmentStatistics; use databend_storages_common_table_meta::meta::SingleColumnMeta; @@ -67,10 +70,12 @@ use crate::io::BlockReader; use crate::io::BlockWriter; use crate::io::BloomIndexState; use crate::io::MetaReaders; +use crate::io::SpatialIndexBuilder; use crate::io::TableMetaLocationGenerator; use crate::io::VectorIndexBuilder; use crate::io::read::bloom::block_filter_reader::load_bloom_filter_by_columns; use crate::io::read::bloom::block_filter_reader::load_index_meta; +use crate::io::read::load_spatial_index_meta; use crate::io::read::load_vector_index_meta; use crate::io::read::read_segment_stats; use crate::operations::BlockMetaIndex; @@ -86,9 +91,13 @@ pub async fn do_refresh_table_index( index_name: String, index_type: TableIndexType, index_schema: TableSchemaRef, + segment_locs: Option>, pipeline: &mut Pipeline, -) -> Result<()> { - if !matches!(index_type, TableIndexType::Ngram | TableIndexType::Vector) { +) -> Result { + if !matches!( + index_type, + TableIndexType::Ngram | TableIndexType::Vector | TableIndexType::Spatial + ) { return Err(ErrorCode::RefreshIndexError(format!( "Refresh index type {} not support", index_type @@ -103,7 +112,7 @@ pub async fn do_refresh_table_index( let Some(snapshot) = fuse_table.read_table_snapshot().await? else { // no snapshot - return Ok(()); + return Ok(0); }; info!("Start refresh {} index {}", index_type, index_name); @@ -127,16 +136,23 @@ pub async fn do_refresh_table_index( let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema); if snapshot.segments.is_empty() { - return Ok(()); + return Ok(0); } let operator = fuse_table.get_operator_ref(); let table_meta = &fuse_table.get_table_info().meta; let index_arg = build_refresh_index_arg(&index_name, &index_type, table_meta, &index_schema)?; + let target_segments = segment_locs.map(|locs| locs.into_iter().collect::>()); // Read the segment infos and collect the block metas that need to generate the index. let mut index_metas = VecDeque::new(); for (segment_idx, (segment_loc, ver)) in snapshot.segments.iter().enumerate() { + if target_segments + .as_ref() + .is_some_and(|segments| !segments.contains(&(segment_loc.clone(), *ver))) + { + continue; + } let segment_info = segment_reader .read(&LoadParams { location: segment_loc.to_string(), @@ -172,7 +188,7 @@ pub async fn do_refresh_table_index( "Finish refresh {} index {}, all indexes has generated", index_type, index_name ); - return Ok(()); + return Ok(0); } let settings = ReadSettings::from_ctx(&ctx)?; @@ -224,6 +240,20 @@ pub async fn do_refresh_table_index( ) }); } + RefreshIndexArg::Spatial(spatial_index_arg) => { + let mut table_indexes = BTreeMap::new(); + table_indexes.insert(index_name.clone(), table_index.clone()); + pipeline.add_async_transformer(|| { + SpatialIndexTransform::new( + operator.clone(), + settings, + table_indexes.clone(), + index_schema.clone(), + meta_locations.clone(), + spatial_index_arg.existing_names_prefix.clone(), + ) + }); + } } pipeline.try_resize(1)?; @@ -261,7 +291,7 @@ pub async fn do_refresh_table_index( info!("Finish refresh {} index {}", index_type, index_name); - Ok(()) + Ok(block_nums as u64) } // build the index arguments used for refresh @@ -315,7 +345,24 @@ fn build_refresh_index_arg( }; Ok(RefreshIndexArg::Vector(vector_arg)) } - _ => todo!(), + TableIndexType::Spatial => { + let index = table_meta.indexes.get(index_name).unwrap(); + + let existing_names_prefix = index + .column_ids + .iter() + .map(|id| format!("{id}")) + .collect::>(); + + let spatial_arg = RefreshSpatialIndexArg { + index_name: index_name.clone(), + index_version: index.version.clone(), + existing_column_ids: index.column_ids.clone(), + existing_names_prefix, + }; + Ok(RefreshIndexArg::Spatial(spatial_arg)) + } + _ => unreachable!(), } } @@ -351,6 +398,17 @@ async fn check_index_generated( ) .await } + RefreshIndexArg::Spatial(spatial_index_arg) => { + check_spatial_index_generated( + operator.clone(), + segment_idx, + block_idx, + block_meta, + stats, + spatial_index_arg, + ) + .await + } } } @@ -480,6 +538,97 @@ async fn check_vector_index_generated( Ok(Some(vector_index_meta)) } +async fn check_spatial_index_generated( + operator: Operator, + segment_idx: usize, + block_idx: usize, + block_meta: Arc, + stats: Option>, + spatial_index_arg: &RefreshSpatialIndexArg, +) -> Result> { + let mut index_columns = None; + let mut index_meta = None; + let mut needs_refresh = match block_meta.spatial_stats.as_ref() { + Some(stats) => spatial_index_arg + .existing_column_ids + .iter() + .any(|column_id| !stats.keys().any(|id| id == column_id)), + None => true, + }; + + if let Some(spatial_index_location) = &block_meta.spatial_index_location { + let index_location = &spatial_index_location.0; + if let Ok(_content_length) = operator + .stat(index_location) + .await + .map(|meta| meta.content_length()) + { + let spatial_index_meta = + load_spatial_index_meta(operator.clone(), index_location).await?; + + let current_index_generated = spatial_index_meta + .metadata + .get(&spatial_index_arg.index_name) + .is_some_and(|version| version == &spatial_index_arg.index_version) + && spatial_index_arg + .existing_names_prefix + .iter() + .all(|column_id| { + spatial_index_meta + .columns + .iter() + .any(|(name, _)| name == column_id) + }); + + if current_index_generated && !needs_refresh { + return Ok(None); + } + needs_refresh = true; + + let mut metadata = spatial_index_meta.metadata.clone(); + metadata.remove(&spatial_index_arg.index_name); + if !metadata.is_empty() { + index_meta = Some(metadata); + } + + let mut spatial_index_columns = Vec::with_capacity(spatial_index_meta.columns.len()); + for column in &spatial_index_meta.columns { + let name = column.0.to_string(); + if spatial_index_arg.existing_names_prefix.contains(&name) { + continue; + } + spatial_index_columns.push(column.clone()); + } + if !spatial_index_columns.is_empty() { + index_columns = Some(spatial_index_columns); + } + } else { + needs_refresh = true; + } + } else { + needs_refresh = true; + } + + if !needs_refresh { + return Ok(None); + } + + let spatial_index_meta = RefreshIndexMeta { + index: BlockMetaIndex { + segment_idx, + block_idx, + }, + block_meta, + column_hlls: stats + .as_ref() + .and_then(|v| v.block_hlls.get(block_idx)) + .cloned(), + index_columns, + index_meta, + }; + Ok(Some(spatial_index_meta)) +} + pub struct IndexSource { settings: ReadSettings, storage_format: FuseStorageFormat, @@ -767,6 +916,111 @@ impl AsyncTransform for VectorIndexTransform { } } +pub struct SpatialIndexTransform { + operator: Operator, + settings: ReadSettings, + table_indexes: BTreeMap, + index_schema: TableSchemaRef, + meta_locations: TableMetaLocationGenerator, + existing_names_prefix: Vec, +} + +impl SpatialIndexTransform { + pub fn new( + operator: Operator, + settings: ReadSettings, + table_indexes: BTreeMap, + index_schema: TableSchemaRef, + meta_locations: TableMetaLocationGenerator, + existing_names_prefix: Vec, + ) -> Self { + Self { + operator, + settings, + table_indexes, + index_schema, + meta_locations, + existing_names_prefix, + } + } +} + +#[async_trait::async_trait] +impl AsyncTransform for SpatialIndexTransform { + const NAME: &'static str = "SpatialIndexTransform"; + + #[async_backtrace::framed] + async fn transform(&mut self, data_block: DataBlock) -> Result { + let RefreshIndexMeta { + index, + block_meta, + column_hlls, + index_columns, + index_meta, + } = data_block + .get_meta() + .and_then(RefreshIndexMeta::downcast_ref_from) + .unwrap(); + + let mut new_block_meta = Arc::unwrap_or_clone(block_meta.clone()); + + let mut builder = + SpatialIndexBuilder::try_create(&self.table_indexes, self.index_schema.clone(), false) + .unwrap(); + builder.add_block(&data_block)?; + + let spatial_index_location = self.meta_locations.block_spatial_index_location(); + let existing_location = &block_meta.spatial_index_location; + let spatial_result = builder + .finalize_with_existing( + self.operator.clone(), + &self.settings, + &spatial_index_location, + existing_location.as_ref(), + index_columns.clone(), + index_meta.clone(), + ) + .await?; + + new_block_meta.spatial_index_size = spatial_result.index_state.as_ref().map(|v| v.size); + new_block_meta.spatial_index_location = spatial_result + .index_state + .as_ref() + .map(|v| v.location.clone()); + + let mut spatial_stats = block_meta.spatial_stats.clone().unwrap_or_default(); + spatial_stats.retain(|column_id, _| { + !self + .existing_names_prefix + .iter() + .any(|prefix| prefix == &column_id.to_string()) + }); + if let Some(new_spatial_stats) = spatial_result.spatial_stats { + spatial_stats.extend(new_spatial_stats); + } + new_block_meta.spatial_stats = (!spatial_stats.is_empty()).then_some(spatial_stats); + + BlockWriter::write_down_spatial_index_state(&self.operator, spatial_result.index_state) + .await?; + + let extended_block_meta = ExtendedBlockMeta { + block_meta: new_block_meta, + draft_virtual_block_meta: None, + column_hlls: column_hlls.clone().map(BlockHLLState::Serialized), + }; + + let entry = MutationLogEntry::ReplacedBlock { + index: index.clone(), + block_meta: Arc::new(extended_block_meta), + }; + let meta = MutationLogs { + entries: vec![entry], + }; + let new_block = DataBlock::empty_with_meta(Box::new(meta)); + Ok(new_block) + } +} + #[derive(Clone)] pub struct RefreshIndexMeta { index: BlockMetaIndex, @@ -790,6 +1044,7 @@ impl BlockMetaInfo for RefreshIndexMeta {} enum RefreshIndexArg { Ngram(RefreshNgramIndexArg), Vector(RefreshVectorIndexArg), + Spatial(RefreshSpatialIndexArg), } struct RefreshNgramIndexArg { @@ -803,3 +1058,10 @@ struct RefreshVectorIndexArg { index_version: String, existing_names_prefix: Vec, } + +struct RefreshSpatialIndexArg { + index_name: String, + index_version: String, + existing_column_ids: Vec, + existing_names_prefix: Vec, +} diff --git a/src/query/storages/fuse/src/table_functions/fuse_block.rs b/src/query/storages/fuse/src/table_functions/fuse_block.rs index 39666d75fee2b..f75d4094fda72 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_block.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_block.rs @@ -71,6 +71,10 @@ impl TableMetaFunc for FuseBlock { "vector_index_size", TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), ), + TableField::new( + "spatial_index_size", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ), TableField::new( "virtual_column_size", TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), @@ -98,6 +102,7 @@ impl TableMetaFunc for FuseBlock { let mut inverted_index_size = Vec::with_capacity(len); let mut ngram_index_size = Vec::with_capacity(len); let mut vector_index_size = Vec::with_capacity(len); + let mut spatial_index_size = Vec::with_capacity(len); let mut virtual_column_size = Vec::with_capacity(len); let segments_io = SegmentsIO::create(ctx.clone(), tbl.operator.clone(), tbl.schema()); @@ -128,6 +133,7 @@ impl TableMetaFunc for FuseBlock { inverted_index_size.push(block.inverted_index_size); ngram_index_size.push(block.ngram_filter_index_size); vector_index_size.push(block.vector_index_size); + spatial_index_size.push(block.spatial_index_size); virtual_column_size.push( block .virtual_block_meta @@ -156,6 +162,7 @@ impl TableMetaFunc for FuseBlock { UInt64Type::from_opt_data(inverted_index_size).into(), UInt64Type::from_opt_data(ngram_index_size).into(), UInt64Type::from_opt_data(vector_index_size).into(), + UInt64Type::from_opt_data(spatial_index_size).into(), UInt64Type::from_opt_data(virtual_column_size).into(), ], num_rows, diff --git a/tests/sqllogictests/suites/query/index/10_spatial_index/10_0000_spatial_index_base.test b/tests/sqllogictests/suites/query/index/10_spatial_index/10_0000_spatial_index_base.test index 9826f15ed40b2..25911609d69bf 100644 --- a/tests/sqllogictests/suites/query/index/10_spatial_index/10_0000_spatial_index_base.test +++ b/tests/sqllogictests/suites/query/index/10_spatial_index/10_0000_spatial_index_base.test @@ -8,12 +8,12 @@ statement ok USE test_spatial_index statement ok -CREATE TABLE IF NOT EXISTS t(id Int, geom Geometry, geog Geography, SPATIAL INDEX idx (geom)) Engine = Fuse +CREATE TABLE IF NOT EXISTS t(id Int, geom Geometry, geom2 Geometry, SPATIAL INDEX idx (geom)) Engine = Fuse row_per_block=2 query TT SHOW CREATE TABLE t ---- -t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geog GEOGRAPHY NULL, SYNC SPATIAL INDEX idx (geom) ) ENGINE=FUSE +t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geom2 GEOMETRY NULL, SYNC SPATIAL INDEX idx (geom) ) ENGINE=FUSE statement ok DROP SPATIAL INDEX idx ON t; @@ -22,16 +22,54 @@ statement error CREATE SPATIAL INDEX idx2 ON t(id); statement ok -CREATE SPATIAL INDEX idx2 ON t(geom); +CREATE SPATIAL INDEX idx2 ON t(geom2); query TT SHOW CREATE TABLE t ---- -t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geog GEOGRAPHY NULL, SYNC SPATIAL INDEX idx2 (geom) ) ENGINE=FUSE +t CREATE TABLE t ( id INT NULL, geom GEOMETRY NULL, geom2 GEOMETRY NULL, SYNC SPATIAL INDEX idx2 (geom2) ) ENGINE=FUSE statement error DROP INVERTED INDEX idx2 ON t; +statement ok +DROP SPATIAL INDEX idx2 ON t; + +statement ok +CREATE SPATIAL INDEX idx ON t(geom); + +statement ok +CREATE ASYNC SPATIAL INDEX idx_refresh ON t(geom2); + +statement ok +INSERT INTO t VALUES +(1, TO_GEOMETRY('POINT(10 10)'), TO_GEOMETRY('POINT(100 100)')), +(2, TO_GEOMETRY('POINT(11 11)'), TO_GEOMETRY('POINT(101 101)')), +(3, TO_GEOMETRY('POINT(20 20)'), TO_GEOMETRY('POINT(200 200)')), +(4, TO_GEOMETRY('POINT(21 21)'), TO_GEOMETRY('POINT(201 201)')), +(5, TO_GEOMETRY('POINT(30 30)'), TO_GEOMETRY('POINT(300 300)')), +(6, TO_GEOMETRY('POINT(40 40)'), TO_GEOMETRY('POINT(400 400)')); + +query I +SELECT count() FROM fuse_block('test_spatial_index', 't') WHERE spatial_index_size IS NULL +---- +0 + +query I +REFRESH SPATIAL INDEX idx_refresh ON t +---- +3 + +query I +SELECT count() FROM fuse_block('test_spatial_index', 't') WHERE spatial_index_size IS NULL +---- +0 + +query I +REFRESH SPATIAL INDEX idx_refresh ON t +---- +0 + statement ok CREATE TABLE spatial_stores (store_id INT, category String, store_name String, status String, location Geometry, SPATIAL INDEX idx (location)) row_per_block=2