From 36e076aa5eae2257e7986cf0afe08be29864eb2c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 11 Apr 2025 09:12:18 +0000 Subject: [PATCH 1/6] feat: close follower regions after dropping leader regions --- .../meta/src/ddl/drop_table/executor.rs | 57 +++++++++++++++++-- src/common/meta/src/ddl/tests/drop_table.rs | 28 ++++++--- src/common/meta/src/rpc/router.rs | 24 ++++++++ 3 files changed, 97 insertions(+), 12 deletions(-) diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 1204629a1e0b..7aae31b13a0f 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -15,12 +15,13 @@ use std::collections::HashMap; use api::v1::region::{ - region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader, + region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest, + RegionRequest, RegionRequestHeader, }; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{debug, error}; use common_wal::options::WalOptions; use futures::future::join_all; use snafu::ensure; @@ -36,7 +37,8 @@ use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::rpc::router::{ - find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, + find_follower_regions, find_followers, find_leader_regions, find_leaders, + operating_leader_regions, RegionRoute, }; /// [Control] indicated to the caller whether to go to the next step. @@ -210,10 +212,10 @@ impl DropTableExecutor { region_routes: &[RegionRoute], fast_path: bool, ) -> Result<()> { + // Drops leader regions on datanodes. let leaders = find_leaders(region_routes); let mut drop_region_tasks = Vec::with_capacity(leaders.len()); let table_id = self.table_id; - for datanode in leaders { let requester = ctx.node_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); @@ -252,6 +254,53 @@ impl DropTableExecutor { .into_iter() .collect::>>()?; + // Drops follower regions on datanodes. + let followers = find_followers(region_routes); + let mut close_region_tasks = Vec::with_capacity(followers.len()); + for datanode in followers { + let requester = ctx.node_manager.datanode(&datanode).await; + let regions = find_follower_regions(region_routes, &datanode); + let region_ids = regions + .iter() + .map(|region_number| RegionId::new(table_id, *region_number)) + .collect::>(); + + for region_id in region_ids { + debug!("Closing region {region_id} on Datanode {datanode:?}"); + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Close(PbCloseRegionRequest { + region_id: region_id.as_u64(), + })), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + close_region_tasks.push(async move { + if let Err(err) = requester.handle(request).await { + if err.status_code() != StatusCode::RegionNotFound { + return Err(add_peer_context_if_needed(datanode)(err)); + } + } + Ok(()) + }); + } + } + + // Failure to close follower regions is not critical. + // When a leader region is dropped, follower regions will be unable to renew their leases via metasrv. + // Eventually, these follower regions will be automatically closed by the region livekeeper. + if let Err(err) = join_all(close_region_tasks) + .await + .into_iter() + .collect::>>() + { + error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id); + } + // Deletes the leader region from registry. let region_ids = operating_leader_regions(region_routes); ctx.leader_region_registry diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 3e09f65422e2..031a309c4c80 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -148,27 +148,39 @@ async fn test_on_datanode_drop_regions() { let check = |peer: Peer, request: RegionRequest, expected_peer_id: u64, - expected_region_id: RegionId| { + expected_region_id: RegionId, + follower: bool| { assert_eq!(peer.id, expected_peer_id); - let Some(region_request::Body::Drop(req)) = request.body else { - unreachable!(); + if follower { + let Some(region_request::Body::Close(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, expected_region_id); + } else { + let Some(region_request::Body::Drop(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, expected_region_id); }; - assert_eq!(req.region_id, expected_region_id); }; let mut results = Vec::new(); - for _ in 0..3 { + for _ in 0..5 { let result = rx.try_recv().unwrap(); results.push(result); } results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id)); let (peer, request) = results.remove(0); - check(peer, request, 1, RegionId::new(table_id, 1)); + check(peer, request, 1, RegionId::new(table_id, 1), false); let (peer, request) = results.remove(0); - check(peer, request, 2, RegionId::new(table_id, 2)); + check(peer, request, 2, RegionId::new(table_id, 2), false); let (peer, request) = results.remove(0); - check(peer, request, 3, RegionId::new(table_id, 3)); + check(peer, request, 3, RegionId::new(table_id, 3), false); + let (peer, request) = results.remove(0); + check(peer, request, 4, RegionId::new(table_id, 2), true); + let (peer, request) = results.remove(0); + check(peer, request, 5, RegionId::new(table_id, 1), true); } #[tokio::test] diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 03a1e0bfa032..7ddd104c61b7 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -62,6 +62,7 @@ pub struct TableRoute { region_leaders: HashMap>, } +/// Returns the leader peers of the table. pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { region_routes .iter() @@ -70,6 +71,15 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet { .collect() } +/// Returns the followers of the table. +pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet { + region_routes + .iter() + .flat_map(|x| &x.follower_peers) + .cloned() + .collect() +} + /// Returns the operating leader regions with corresponding [DatanodeId]. pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> { region_routes @@ -108,6 +118,7 @@ pub fn find_region_leader( .cloned() } +/// Returns the region numbers of the leader regions on the target datanode. pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec { region_routes .iter() @@ -122,6 +133,19 @@ pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Ve .collect() } +/// Returns the region numbers of the follower regions on the target datanode. +pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec { + region_routes + .iter() + .filter_map(|x| { + if x.follower_peers.contains(datanode) { + return Some(x.region.id.region_number()); + } + None + }) + .collect() +} + impl TableRoute { pub fn new(table: Table, region_routes: Vec) -> Self { let region_leaders = region_routes From 97f8043e7cc226137076aa5cd624034b79c9043f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 14 Apr 2025 07:09:03 +0000 Subject: [PATCH 2/6] chore: upgrade greptime-proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/datanode/src/region_server.rs | 35 +++++++++++++++++++++++++++-- src/metric-engine/src/engine.rs | 1 + src/mito2/src/request.rs | 2 ++ src/store-api/src/region_request.rs | 26 +++++++++++++++++++++ 6 files changed, 64 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8e5f15cdfd7..e18b7b80d7dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4722,7 +4722,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=583daa3fbbbe39c90b7b92d13646bc3291d9c941#583daa3fbbbe39c90b7b92d13646bc3291d9c941" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c#6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 919b4f8d441b..e3ee1ee0d729 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 072c1682cc11..fd4f8027361c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -55,8 +55,8 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, + RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, + SetRegionRoleStateResponse, SettableRegionRoleState, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest, @@ -910,6 +910,7 @@ impl RegionServerInner { | RegionRequest::Compact(_) | RegionRequest::Truncate(_) => RegionChange::None, RegionRequest::Catchup(_) => RegionChange::Catchup, + RegionRequest::Sync(_) => RegionChange::Sync, }; let engine = match self.get_engine(region_id, ®ion_change)? { @@ -948,6 +949,32 @@ impl RegionServerInner { } } + fn add_manifest_version_to_response( + region_id: RegionId, + engine: Arc, + result: &mut RegionResponse, + ) { + if let Some(region_stat) = engine.region_statistic(region_id) { + let metadata_manifest_version = region_stat.manifest.metadata_manifest_version(); + let data_manifest_version = region_stat.manifest.data_manifest_version(); + + if let Some(metadata_manifest_version) = metadata_manifest_version { + result.extensions.insert( + METADATA_MANIFEST_VERSION_EXTENSION_KEY.to_string(), + metadata_manifest_version.to_string().into_bytes(), + ); + } + result.extensions.insert( + DATA_MANIFEST_VERSION_EXTENSION_KEY.to_string(), + data_manifest_version.to_string().into_bytes(), + ); + info!( + "Region {} is altered, data_manifest_version: {}, metadata_manifest_version: {:?}", + region_id, data_manifest_version, metadata_manifest_version + ); + } + } + fn set_region_status_not_ready( &self, region_id: RegionId, @@ -987,6 +1014,7 @@ impl RegionServerInner { .insert(region_id, RegionEngineWithStatus::Ready(engine.clone())); } RegionChange::Catchup => {} + RegionChange::Sync => {} } } @@ -1035,6 +1063,9 @@ impl RegionServerInner { self.register_logical_regions(&engine, region_id).await?; } } + RegionChange::Sync => { + // TODO(weny): sync the region + } } Ok(()) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 71cb843ab16c..764d61c16a80 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -180,6 +180,7 @@ impl RegionEngine for MetricEngine { let mut extension_return_value = HashMap::new(); let result = match request { + RegionRequest::Sync(_) => unreachable!(), RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Create(create) => { self.inner diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 5331ba6fdc35..37148d2b1cc0 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -603,6 +603,8 @@ impl WorkerRequest { ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { + // Handles out side of worker. + RegionRequest::Sync(_) => unreachable!(), RegionRequest::Put(v) => { let mut write_request = WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())? diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 08ad2ac55992..50af68cbf4fc 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,6 +22,7 @@ use api::v1::add_column_location::LocationType; use api::v1::column_def::{ as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type, }; +use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests, @@ -55,6 +56,7 @@ use crate::mito_engine_options::{ TWCS_TIME_WINDOW, }; use crate::path_utils::region_dir; +use crate::region_engine::RegionManifestInfo; use crate::storage::{ColumnId, RegionId, ScanRequest}; #[derive(Debug, IntoStaticStr)] @@ -293,6 +295,30 @@ fn make_region_alters(alters: AlterRequests) -> Result Result> { + let region_id = sync.region_id.into(); + let region_manifest_info = match sync.manifest_info { + Some(ManifestInfo::MitoManifestInfo(manifest_info)) => RegionManifestInfo::Mito { + manifest_version: manifest_info.data_manifest_version, + flushed_entry_id: 0, + }, + Some(ManifestInfo::MetricManifestInfo(manifest_info)) => RegionManifestInfo::Metric { + data_manifest_version: manifest_info.data_manifest_version, + data_flushed_entry_id: 0, + metadata_manifest_version: manifest_info.metadata_manifest_version, + metadata_flushed_entry_id: 0, + }, + None => todo!(), + }; + Ok(vec![( + region_id, + RegionRequest::Sync(RegionSyncRequest { + region_id, + region_manifest_info, + }), + )]) +} + fn make_region_flush(flush: FlushRequest) -> Result> { let region_id = flush.region_id.into(); Ok(vec![( From 367a0eb8a6cc2a3a7bdca8816c2c7bcb0b6ce907 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 14 Apr 2025 06:36:34 +0000 Subject: [PATCH 3/6] feat: sync region followers after alter region operations --- .../meta/src/ddl/alter_logical_tables.rs | 44 ++++- src/common/meta/src/ddl/alter_table.rs | 33 +++- .../meta/src/ddl/create_logical_tables.rs | 40 ++++- src/common/meta/src/ddl/utils.rs | 151 ++++++++++++++++-- src/datanode/src/region_server.rs | 144 +++++++++-------- src/metric-engine/src/engine.rs | 27 +--- src/metric-engine/src/engine/alter.rs | 8 +- src/metric-engine/src/engine/create.rs | 11 +- src/metric-engine/src/error.rs | 11 +- src/metric-engine/src/utils.rs | 75 ++++++++- src/mito2/src/engine.rs | 38 ++++- src/mito2/src/request.rs | 2 - src/store-api/src/metadata.rs | 7 + src/store-api/src/metric_engine_consts.rs | 4 + src/store-api/src/region_engine.rs | 10 ++ src/store-api/src/region_request.rs | 32 +--- 16 files changed, 482 insertions(+), 155 deletions(-) diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index ea741accf319..982b2d0e7ad3 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -18,10 +18,12 @@ mod region_request; mod table_cache_keys; mod update_metadata; +use api::region::RegionResponse; use async_trait::async_trait; +use common_catalog::format_full_table_name; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context, LockKey, Procedure, Status}; -use common_telemetry::{info, warn}; +use common_telemetry::{error, info, warn}; use futures_util::future; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -30,7 +32,7 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; -use crate::ddl::utils::add_peer_context_if_needed; +use crate::ddl::utils::{add_peer_context_if_needed, sync_regions}; use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; @@ -39,7 +41,7 @@ use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::rpc::ddl::AlterTableTask; -use crate::rpc::router::find_leaders; +use crate::rpc::router::{find_leaders, RegionRoute}; pub struct AlterLogicalTablesProcedure { pub context: DdlContext, @@ -125,14 +127,20 @@ impl AlterLogicalTablesProcedure { }); } - // Collects responses from datanodes. - let phy_raw_schemas = future::join_all(alter_region_tasks) + let mut results = future::join_all(alter_region_tasks) .await .into_iter() - .map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; + // Collects responses from datanodes. + let phy_raw_schemas = results + .iter_mut() + .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)) + .collect::>(); + if phy_raw_schemas.is_empty() { + self.submit_sync_region_requests(results, &physical_table_route.region_routes) + .await; self.data.state = AlterTablesState::UpdateMetadata; return Ok(Status::executing(true)); } @@ -155,10 +163,34 @@ impl AlterLogicalTablesProcedure { warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } + self.submit_sync_region_requests(results, &physical_table_route.region_routes) + .await; self.data.state = AlterTablesState::UpdateMetadata; Ok(Status::executing(true)) } + async fn submit_sync_region_requests( + &self, + results: Vec, + region_routes: &[RegionRoute], + ) { + let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info; + if let Err(err) = sync_regions( + &self.context, + self.data.physical_table_id, + results, + region_routes, + table_info.meta.engine.as_str(), + ) + .await + { + error!(err; "Failed to sync regions for table {}, table_id: {}", + format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name), + self.data.physical_table_id + ); + } + } + pub(crate) async fn on_update_metadata(&mut self) -> Result { self.update_physical_table_metadata().await?; self.update_logical_tables_metadata().await?; diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 89406e6a963a..bb9d698b5585 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -19,6 +19,7 @@ mod update_metadata; use std::vec; +use api::region::RegionResponse; use api::v1::alter_table_expr::Kind; use api::v1::RenameTable; use async_trait::async_trait; @@ -29,7 +30,7 @@ use common_procedure::{ PoisonKeys, Procedure, ProcedureId, Status, StringKey, }; use common_telemetry::{debug, error, info}; -use futures::future; +use futures::future::{self}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::storage::RegionId; @@ -38,7 +39,9 @@ use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; -use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults}; +use crate::ddl::utils::{ + add_peer_context_if_needed, handle_multiple_results, sync_regions, MultipleResults, +}; use crate::ddl::DdlContext; use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result}; use crate::instruction::CacheIdent; @@ -48,7 +51,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::metrics; use crate::poison_key::table_poison_key; use crate::rpc::ddl::AlterTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution}; +use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute}; /// The alter table procedure pub struct AlterTableProcedure { @@ -194,7 +197,9 @@ impl AlterTableProcedure { // Just returns the error, and wait for the next try. Err(error) } - MultipleResults::Ok => { + MultipleResults::Ok(results) => { + self.submit_sync_region_requests(results, &physical_table_route.region_routes) + .await; self.data.state = AlterTableState::UpdateMetadata; Ok(Status::executing_with_clean_poisons(true)) } @@ -211,6 +216,26 @@ impl AlterTableProcedure { } } + async fn submit_sync_region_requests( + &mut self, + results: Vec, + region_routes: &[RegionRoute], + ) { + // Safety: filled in `prepare` step. + let table_info = self.data.table_info().unwrap(); + if let Err(err) = sync_regions( + &self.context, + self.data.table_id(), + results, + region_routes, + table_info.meta.engine.as_str(), + ) + .await + { + error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id()); + } + } + /// Update table metadata. pub(crate) async fn on_update_metadata(&mut self) -> Result { let table_id = self.data.table_id(); diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 59882ec49190..f2c67e892991 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -17,12 +17,14 @@ mod metadata; mod region_request; mod update_metadata; +use api::region::RegionResponse; use api::v1::CreateTableExpr; use async_trait::async_trait; +use common_catalog::consts::METRIC_ENGINE; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::{debug, warn}; -use futures_util::future::join_all; +use common_telemetry::{debug, error, warn}; +use futures::future; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -31,7 +33,7 @@ use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_regions}; use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_route::TableRouteValue; @@ -156,14 +158,20 @@ impl CreateLogicalTablesProcedure { }); } - // Collects response from datanodes. - let phy_raw_schemas = join_all(create_region_tasks) + let mut results = future::join_all(create_region_tasks) .await .into_iter() - .map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; + // Collects response from datanodes. + let phy_raw_schemas = results + .iter_mut() + .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)) + .collect::>(); + if phy_raw_schemas.is_empty() { + self.submit_sync_region_requests(results, region_routes) + .await; self.data.state = CreateTablesState::CreateMetadata; return Ok(Status::executing(false)); } @@ -186,10 +194,30 @@ impl CreateLogicalTablesProcedure { warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } + self.submit_sync_region_requests(results, region_routes) + .await; self.data.state = CreateTablesState::CreateMetadata; Ok(Status::executing(true)) } + + async fn submit_sync_region_requests( + &self, + results: Vec, + region_routes: &[RegionRoute], + ) { + if let Err(err) = sync_regions( + &self.context, + self.data.physical_table_id, + results, + region_routes, + METRIC_ENGINE, + ) + .await + { + error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id); + } + } } #[async_trait] diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 2e909946e004..020c2edf3406 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -15,27 +15,37 @@ use std::collections::HashMap; use std::fmt::Debug; -use common_catalog::consts::METRIC_ENGINE; +use api::region::RegionResponse; +use api::v1::region::sync_request::ManifestInfo; +use api::v1::region::{ + region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader, + SyncRequest, +}; +use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE}; use common_error::ext::BoxedError; use common_procedure::error::Error as ProcedureError; -use common_telemetry::{error, warn}; +use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{error, info, warn}; use common_wal::options::WalOptions; +use futures::future::join_all; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; -use store_api::storage::RegionNumber; +use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY}; +use store_api::region_engine::RegionManifestInfo; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use table::table_reference::TableReference; -use crate::ddl::DetectingRegion; +use crate::ddl::{DdlContext, DetectingRegion}; use crate::error::{ - Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu, + self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, + UnsupportedSnafu, }; use crate::key::datanode_table::DatanodeTableValue; use crate::key::table_name::TableNameKey; use crate::key::TableMetadataManagerRef; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::RegionRoute; +use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute}; /// Adds [Peer] context if the error is unretryable. pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error { @@ -192,8 +202,8 @@ pub fn extract_region_wal_options( /// - PartialNonRetryable: if any operation is non retryable, the result is non retryable. /// - AllRetryable: all operations are retryable. /// - AllNonRetryable: all operations are not retryable. -pub enum MultipleResults { - Ok, +pub enum MultipleResults { + Ok(Vec), PartialRetryable(Error), PartialNonRetryable(Error), AllRetryable(Error), @@ -205,9 +215,9 @@ pub enum MultipleResults { /// For partial success, we need to check if the errors are retryable. /// If all the errors are retryable, we return a retryable error. /// Otherwise, we return the first error. -pub fn handle_multiple_results(results: Vec>) -> MultipleResults { +pub fn handle_multiple_results(results: Vec>) -> MultipleResults { if results.is_empty() { - return MultipleResults::Ok; + return MultipleResults::Ok(Vec::new()); } let num_results = results.len(); let mut retryable_results = Vec::new(); @@ -216,7 +226,7 @@ pub fn handle_multiple_results(results: Vec>) -> MultipleRes for result in results { match result { - Ok(_) => ok_results.push(result), + Ok(value) => ok_results.push(value), Err(err) => { if err.is_retry_later() { retryable_results.push(err); @@ -243,7 +253,7 @@ pub fn handle_multiple_results(results: Vec>) -> MultipleRes } return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap()); } else if ok_results.len() == num_results { - return MultipleResults::Ok; + return MultipleResults::Ok(ok_results); } else if !retryable_results.is_empty() && !ok_results.is_empty() && non_retryable_results.is_empty() @@ -264,6 +274,121 @@ pub fn handle_multiple_results(results: Vec>) -> MultipleRes MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap()) } +/// Parses manifest infos from extensions. +pub fn parse_manifest_infos_from_extensions( + extensions: &HashMap>, +) -> Result> { + let data_manifest_version = + extensions + .get(MANIFEST_INFO_EXTENSION_KEY) + .context(error::UnexpectedSnafu { + err_msg: "manifest info extension not found", + })?; + let data_manifest_version = + RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?; + Ok(data_manifest_version) +} + +pub async fn sync_regions( + context: &DdlContext, + table_id: TableId, + results: Vec, + region_routes: &[RegionRoute], + engine: &str, +) -> Result<()> { + if engine != MITO_ENGINE && engine != METRIC_ENGINE { + info!( + "Skip submitting sync region requests for table_id: {}, engine: {}", + table_id, engine + ); + return Ok(()); + } + + let results = results + .into_iter() + .map(|response| parse_manifest_infos_from_extensions(&response.extensions)) + .collect::>>()? + .into_iter() + .flatten() + .collect::>(); + + let is_mito_engine = engine == MITO_ENGINE; + + let followers = find_followers(region_routes); + let mut sync_region_tasks = Vec::with_capacity(followers.len()); + for datanode in followers { + let requester = context.node_manager.datanode(&datanode).await; + let regions = find_follower_regions(region_routes, &datanode); + for region in regions { + let region_id = RegionId::new(table_id, region); + let manifest_info = if is_mito_engine { + let region_manifest_info = + results.get(®ion_id).context(error::UnexpectedSnafu { + err_msg: format!("No manifest info found for region {}", region_id), + })?; + ensure!( + region_manifest_info.is_mito(), + error::UnexpectedSnafu { + err_msg: format!("Region {} is not a mito region", region_id) + } + ); + ManifestInfo::MitoManifestInfo(MitoManifestInfo { + data_manifest_version: region_manifest_info.data_manifest_version(), + }) + } else { + let region_manifest_info = + results.get(®ion_id).context(error::UnexpectedSnafu { + err_msg: format!("No manifest info found for region {}", region_id), + })?; + ensure!( + region_manifest_info.is_metric(), + error::UnexpectedSnafu { + err_msg: format!("Region {} is not a metric region", region_id) + } + ); + ManifestInfo::MetricManifestInfo(MetricManifestInfo { + data_manifest_version: region_manifest_info.data_manifest_version(), + metadata_manifest_version: region_manifest_info + .metadata_manifest_version() + .unwrap_or_default(), + }) + }; + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Sync(SyncRequest { + region_id: region_id.as_u64(), + manifest_info: Some(manifest_info), + })), + }; + + let datanode = datanode.clone(); + let requester = requester.clone(); + sync_region_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) + }); + } + } + + // Failure to sync region is not critical. + // We try our best to sync the regions. + if let Err(err) = join_all(sync_region_tasks) + .await + .into_iter() + .collect::>>() + { + error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id); + } + info!("Sync follower regions on datanodes, table_id: {}", table_id); + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index fd4f8027361c..9bbb899bf139 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -19,7 +19,8 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use api::region::RegionResponse; -use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; +use api::v1::region::sync_request::ManifestInfo; +use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest}; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; @@ -55,8 +56,8 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, + RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse, + SettableRegionRoleState, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest, @@ -308,38 +309,6 @@ impl RegionServer { .with_context(|_| HandleRegionRequestSnafu { region_id }) } - /// Sync region manifest and registers new opened logical regions. - pub async fn sync_region_manifest( - &self, - region_id: RegionId, - manifest_info: RegionManifestInfo, - ) -> Result<()> { - let engine_with_status = self - .inner - .region_map - .get(®ion_id) - .with_context(|| RegionNotFoundSnafu { region_id })?; - - let Some(new_opened_regions) = engine_with_status - .sync_region(region_id, manifest_info) - .await - .with_context(|_| HandleRegionRequestSnafu { region_id })? - .new_opened_logical_region_ids() - else { - return Ok(()); - }; - - for region in new_opened_regions { - self.inner.region_map.insert( - region, - RegionEngineWithStatus::Ready(engine_with_status.engine().clone()), - ); - info!("Logical region {} is registered!", region); - } - - Ok(()) - } - /// Set region role state gracefully. /// /// For [SettableRegionRoleState::Follower]: @@ -468,6 +437,52 @@ impl RegionServer { extensions, }) } + + async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let manifest_info = request + .manifest_info + .context(error::MissingRequiredFieldSnafu { + name: "manifest_info", + })?; + + let manifest_info = match manifest_info { + ManifestInfo::MitoManifestInfo(info) => { + RegionManifestInfo::mito(info.data_manifest_version, 0) + } + ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric( + info.data_manifest_version, + 0, + info.metadata_manifest_version, + 0, + ), + }; + + let tracing_context = TracingContext::from_current_span(); + let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request")); + + self.sync_region(region_id, manifest_info) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) + } + + /// Sync region manifest and registers new opened logical regions. + pub async fn sync_region( + &self, + region_id: RegionId, + manifest_info: RegionManifestInfo, + ) -> Result<()> { + let engine_with_status = self + .inner + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })?; + + self.inner + .handle_sync_region(engine_with_status.engine(), region_id, manifest_info) + .await + } } #[async_trait] @@ -480,6 +495,9 @@ impl RegionServerHandler for RegionServer { region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => { self.handle_requests_in_parallel(request).await } + region_request::Body::Sync(sync_request) => { + self.handle_sync_region_request(sync_request).await + } _ => self.handle_requests_in_serial(request).await, } .map_err(BoxedError::new) @@ -861,8 +879,8 @@ impl RegionServerInner { match result { Ok(result) => { - for (region_id, region_change) in region_changes { - self.set_region_status_ready(region_id, engine.clone(), region_change) + for (region_id, region_change) in ®ion_changes { + self.set_region_status_ready(*region_id, engine.clone(), *region_change) .await?; } @@ -910,7 +928,6 @@ impl RegionServerInner { | RegionRequest::Compact(_) | RegionRequest::Truncate(_) => RegionChange::None, RegionRequest::Catchup(_) => RegionChange::Catchup, - RegionRequest::Sync(_) => RegionChange::Sync, }; let engine = match self.get_engine(region_id, ®ion_change)? { @@ -934,8 +951,9 @@ impl RegionServerInner { .inc_by(result.affected_rows as u64); } // Sets corresponding region status to ready. - self.set_region_status_ready(region_id, engine, region_change) + self.set_region_status_ready(region_id, engine.clone(), region_change) .await?; + Ok(RegionResponse { affected_rows: result.affected_rows, extensions: result.extensions, @@ -949,30 +967,30 @@ impl RegionServerInner { } } - fn add_manifest_version_to_response( + /// Handles the sync region request. + pub async fn handle_sync_region( + &self, + engine: &RegionEngineRef, region_id: RegionId, - engine: Arc, - result: &mut RegionResponse, - ) { - if let Some(region_stat) = engine.region_statistic(region_id) { - let metadata_manifest_version = region_stat.manifest.metadata_manifest_version(); - let data_manifest_version = region_stat.manifest.data_manifest_version(); - - if let Some(metadata_manifest_version) = metadata_manifest_version { - result.extensions.insert( - METADATA_MANIFEST_VERSION_EXTENSION_KEY.to_string(), - metadata_manifest_version.to_string().into_bytes(), - ); - } - result.extensions.insert( - DATA_MANIFEST_VERSION_EXTENSION_KEY.to_string(), - data_manifest_version.to_string().into_bytes(), - ); - info!( - "Region {} is altered, data_manifest_version: {}, metadata_manifest_version: {:?}", - region_id, data_manifest_version, metadata_manifest_version - ); + manifest_info: RegionManifestInfo, + ) -> Result<()> { + let Some(new_opened_regions) = engine + .sync_region(region_id, manifest_info) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id })? + .new_opened_logical_region_ids() + else { + warn!("No new opened logical regions"); + return Ok(()); + }; + + for region in new_opened_regions { + self.region_map + .insert(region, RegionEngineWithStatus::Ready(engine.clone())); + info!("Logical region {} is registered!", region); } + + Ok(()) } fn set_region_status_not_ready( @@ -1014,7 +1032,6 @@ impl RegionServerInner { .insert(region_id, RegionEngineWithStatus::Ready(engine.clone())); } RegionChange::Catchup => {} - RegionChange::Sync => {} } } @@ -1063,9 +1080,6 @@ impl RegionServerInner { self.register_logical_regions(&engine, region_id).await?; } } - RegionChange::Sync => { - // TODO(weny): sync the region - } } Ok(()) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 764d61c16a80..28709264b3c5 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -53,7 +53,7 @@ use crate::data_region::DataRegion; use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; use crate::row_modifier::RowModifier; -use crate::utils; +use crate::utils::{self, get_region_statistic}; #[cfg_attr(doc, aquamarine::aquamarine)] /// # Metric Engine @@ -180,7 +180,6 @@ impl RegionEngine for MetricEngine { let mut extension_return_value = HashMap::new(); let result = match request { - RegionRequest::Sync(_) => unreachable!(), RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, RegionRequest::Create(create) => { self.inner @@ -265,29 +264,7 @@ impl RegionEngine for MetricEngine { /// Note: Returns `None` if it's a logical region. fn region_statistic(&self, region_id: RegionId) -> Option { if self.inner.is_physical_region(region_id) { - let metadata_region_id = utils::to_metadata_region_id(region_id); - let data_region_id = utils::to_data_region_id(region_id); - - let metadata_stat = self.inner.mito.region_statistic(metadata_region_id); - let data_stat = self.inner.mito.region_statistic(data_region_id); - - match (metadata_stat, data_stat) { - (Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic { - num_rows: metadata_stat.num_rows + data_stat.num_rows, - memtable_size: metadata_stat.memtable_size + data_stat.memtable_size, - wal_size: metadata_stat.wal_size + data_stat.wal_size, - manifest_size: metadata_stat.manifest_size + data_stat.manifest_size, - sst_size: metadata_stat.sst_size + data_stat.sst_size, - index_size: metadata_stat.index_size + data_stat.index_size, - manifest: RegionManifestInfo::Metric { - data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(), - data_manifest_version: data_stat.manifest.data_manifest_version(), - metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(), - metadata_manifest_version: metadata_stat.manifest.data_manifest_version(), - }, - }), - _ => None, - } + get_region_statistic(&self.inner.mito, region_id) } else { None } diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 1d82149a7dac..22ef54cab8cf 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -30,7 +30,7 @@ use crate::error::{ LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, }; -use crate::utils::to_data_region_id; +use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id}; impl MetricEngineInner { pub async fn alter_regions( @@ -63,11 +63,15 @@ impl MetricEngineInner { .unwrap() .get_physical_region_id(region_id) .with_context(|| LogicalRegionNotFoundSnafu { region_id })?; + let mut manifest_infos = Vec::with_capacity(1); self.alter_logical_regions(physical_region_id, requests, extension_return_value) .await?; + append_manifest_info(&self.mito, region_id, &mut manifest_infos); + encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?; } else { let grouped_requests = self.group_logical_region_requests_by_physical_region_id(requests)?; + let mut manifest_infos = Vec::with_capacity(grouped_requests.len()); for (physical_region_id, requests) in grouped_requests { self.alter_logical_regions( physical_region_id, @@ -75,7 +79,9 @@ impl MetricEngineInner { extension_return_value, ) .await?; + append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos); } + encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?; } } Ok(0) diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 1ceb20d20656..78d229330284 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -51,7 +51,10 @@ use crate::error::{ Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu, }; use crate::metrics::PHYSICAL_REGION_COUNT; -use crate::utils::{self, to_data_region_id, to_metadata_region_id}; +use crate::utils::{ + self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id, + to_metadata_region_id, +}; const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024; @@ -88,11 +91,15 @@ impl MetricEngineInner { if requests.len() == 1 { let request = &requests.first().unwrap().1; let physical_region_id = parse_physical_region_id(request)?; + let mut manifest_infos = Vec::with_capacity(1); self.create_logical_regions(physical_region_id, requests, extension_return_value) .await?; + append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos); + encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?; } else { let grouped_requests = group_create_logical_region_requests_by_physical_region_id(requests)?; + let mut manifest_infos = Vec::with_capacity(grouped_requests.len()); for (physical_region_id, requests) in grouped_requests { self.create_logical_regions( physical_region_id, @@ -100,7 +107,9 @@ impl MetricEngineInner { extension_return_value, ) .await?; + append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos); } + encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?; } } else { return MissingRegionOptionSnafu {}.fail(); diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 5f853037e127..015e9d9f981d 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -67,6 +67,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to serialize region manifest info"))] + SerializeRegionManifestInfo { + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to decode base64 column value"))] DecodeColumnValue { #[snafu(source)] @@ -304,7 +312,8 @@ impl ErrorExt for Error { | DecodeColumnValue { .. } | ParseRegionId { .. } | InvalidMetadata { .. } - | SetSkippingIndexOption { .. } => StatusCode::Unexpected, + | SetSkippingIndexOption { .. } + | SerializeRegionManifestInfo { .. } => StatusCode::Unexpected, PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => { StatusCode::RegionNotFound diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs index 183ba2eaa369..ad0695f54a4c 100644 --- a/src/metric-engine/src/utils.rs +++ b/src/metric-engine/src/utils.rs @@ -12,9 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::metric_engine_consts::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP}; +use std::collections::HashMap; + +use common_telemetry::{info, warn}; +use mito2::engine::MitoEngine; +use snafu::ResultExt; +use store_api::metric_engine_consts::{ + MANIFEST_INFO_EXTENSION_KEY, METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP, +}; +use store_api::region_engine::{RegionEngine, RegionManifestInfo, RegionStatistic}; use store_api::storage::RegionId; +use crate::error::{Result, SerializeRegionManifestInfoSnafu}; + /// Change the given [RegionId]'s region group to [METRIC_METADATA_REGION_GROUP]. pub fn to_metadata_region_id(region_id: RegionId) -> RegionId { let table_id = region_id.table_id(); @@ -29,6 +39,69 @@ pub fn to_data_region_id(region_id: RegionId) -> RegionId { RegionId::with_group_and_seq(table_id, METRIC_DATA_REGION_GROUP, region_sequence) } +/// Get the region statistic of the given [RegionId]. +pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option { + let metadata_region_id = to_metadata_region_id(region_id); + let data_region_id = to_data_region_id(region_id); + + let metadata_stat = mito.region_statistic(metadata_region_id); + let data_stat = mito.region_statistic(data_region_id); + + match (&metadata_stat, &data_stat) { + (Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic { + num_rows: metadata_stat.num_rows + data_stat.num_rows, + memtable_size: metadata_stat.memtable_size + data_stat.memtable_size, + wal_size: metadata_stat.wal_size + data_stat.wal_size, + manifest_size: metadata_stat.manifest_size + data_stat.manifest_size, + sst_size: metadata_stat.sst_size + data_stat.sst_size, + index_size: metadata_stat.index_size + data_stat.index_size, + manifest: RegionManifestInfo::Metric { + data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(), + data_manifest_version: data_stat.manifest.data_manifest_version(), + metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(), + metadata_manifest_version: metadata_stat.manifest.data_manifest_version(), + }, + }), + _ => { + warn!( + "Failed to get region statistic for region {}, metadata_stat: {:?}, data_stat: {:?}", + region_id, metadata_stat, data_stat + ); + None + } + } +} + +/// Appends the given [RegionId]'s manifest info to the given list. +pub(crate) fn append_manifest_info( + mito: &MitoEngine, + region_id: RegionId, + manifest_infos: &mut Vec<(RegionId, RegionManifestInfo)>, +) { + if let Some(statistic) = get_region_statistic(mito, region_id) { + manifest_infos.push((region_id, statistic.manifest)); + } +} + +/// Encodes the given list of ([RegionId], [RegionManifestInfo]) to extensions(key: MANIFEST_INFO_EXTENSION_KEY). +pub(crate) fn encode_manifest_info_to_extensions( + manifest_infos: &[(RegionId, RegionManifestInfo)], + extensions: &mut HashMap>, +) -> Result<()> { + extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(manifest_infos) + .context(SerializeRegionManifestInfoSnafu)?, + ); + for (region_id, manifest_info) in manifest_infos { + info!( + "Added manifest info: {:?} to extensions, region_id: {:?}", + manifest_info, region_id + ); + } + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7b3c7352dac5..a138547ce32c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -70,7 +70,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing; +use common_telemetry::{info, tracing}; use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use futures::future::{join_all, try_join_all}; use object_store::manager::ObjectStoreManagerRef; @@ -80,6 +80,7 @@ use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, @@ -224,6 +225,23 @@ impl MitoEngine { pub(crate) fn get_region(&self, id: RegionId) -> Option { self.inner.workers.get_region(id) } + + fn encode_manifest_info_to_extensions( + region_id: &RegionId, + manifest_info: RegionManifestInfo, + extensions: &mut HashMap>, + ) { + let region_manifest_info = vec![(*region_id, manifest_info)]; + + extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(®ion_manifest_info).unwrap(), + ); + info!( + "Added manifest info: {:?} to extensions, region_id: {:?}", + region_manifest_info, region_id + ); + } } /// Check whether the region edit is valid. Only adding files to region is considered valid now. @@ -557,11 +575,25 @@ impl RegionEngine for MitoEngine { .with_label_values(&[request.request_type()]) .start_timer(); - self.inner + let is_alter = matches!(request, RegionRequest::Alter(_)); + let mut response = self + .inner .handle_request(region_id, request) .await .map(RegionResponse::new) - .map_err(BoxedError::new) + .map_err(BoxedError::new)?; + + if is_alter { + if let Some(statistic) = self.region_statistic(region_id) { + Self::encode_manifest_info_to_extensions( + ®ion_id, + statistic.manifest, + &mut response.extensions, + ); + } + } + + Ok(response) } #[tracing::instrument(skip_all)] diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 37148d2b1cc0..5331ba6fdc35 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -603,8 +603,6 @@ impl WorkerRequest { ) -> Result<(WorkerRequest, Receiver>)> { let (sender, receiver) = oneshot::channel(); let worker_request = match value { - // Handles out side of worker. - RegionRequest::Sync(_) => unreachable!(), RegionRequest::Put(v) => { let mut write_request = WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())? diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index bb622ac7f8ee..426e3f69ca23 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -966,6 +966,13 @@ pub enum MetadataError { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unexpected: {}", reason))] + Unexpected { + reason: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for MetadataError { diff --git a/src/store-api/src/metric_engine_consts.rs b/src/store-api/src/metric_engine_consts.rs index af2a523e7034..b2994b0b0087 100644 --- a/src/store-api/src/metric_engine_consts.rs +++ b/src/store-api/src/metric_engine_consts.rs @@ -73,6 +73,10 @@ pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; /// Represent a list of column metadata that are added to physical table. pub const ALTER_PHYSICAL_EXTENSION_KEY: &str = "ALTER_PHYSICAL"; +/// HashMap key to be used in the region server's extension response. +/// Represent the manifest info of a region. +pub const MANIFEST_INFO_EXTENSION_KEY: &str = "MANIFEST_INFO"; + /// Returns true if it's a internal column of the metric engine. pub fn is_metric_engine_internal_column(name: &str) -> bool { name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME || name == DATA_SCHEMA_TSID_COLUMN_NAME diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 8522a2e1ca95..a2db2d9f527f 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -549,6 +549,16 @@ impl RegionManifestInfo { } => Some(*metadata_flushed_entry_id), } } + + /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array. + pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result> { + serde_json::to_vec(manifest_infos) + } + + /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array. + pub fn decode_list(value: &[u8]) -> serde_json::Result> { + serde_json::from_slice(value) + } } impl Default for RegionManifestInfo { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 50af68cbf4fc..12ec69c7b86f 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,7 +22,6 @@ use api::v1::add_column_location::LocationType; use api::v1::column_def::{ as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type, }; -use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests, @@ -47,7 +46,7 @@ use crate::logstore::entry; use crate::metadata::{ ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, - InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, + InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu, }; use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::mito_engine_options::{ @@ -56,7 +55,6 @@ use crate::mito_engine_options::{ TWCS_TIME_WINDOW, }; use crate::path_utils::region_dir; -use crate::region_engine::RegionManifestInfo; use crate::storage::{ColumnId, RegionId, ScanRequest}; #[derive(Debug, IntoStaticStr)] @@ -155,6 +153,10 @@ impl RegionRequest { region_request::Body::Drops(drops) => make_region_drops(drops), region_request::Body::Alters(alters) => make_region_alters(alters), region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk), + region_request::Body::Sync(_) => UnexpectedSnafu { + reason: "Sync request should be handled separately by RegionServer::handle_sync_region_request", + } + .fail(), } } @@ -295,30 +297,6 @@ fn make_region_alters(alters: AlterRequests) -> Result Result> { - let region_id = sync.region_id.into(); - let region_manifest_info = match sync.manifest_info { - Some(ManifestInfo::MitoManifestInfo(manifest_info)) => RegionManifestInfo::Mito { - manifest_version: manifest_info.data_manifest_version, - flushed_entry_id: 0, - }, - Some(ManifestInfo::MetricManifestInfo(manifest_info)) => RegionManifestInfo::Metric { - data_manifest_version: manifest_info.data_manifest_version, - data_flushed_entry_id: 0, - metadata_manifest_version: manifest_info.metadata_manifest_version, - metadata_flushed_entry_id: 0, - }, - None => todo!(), - }; - Ok(vec![( - region_id, - RegionRequest::Sync(RegionSyncRequest { - region_id, - region_manifest_info, - }), - )]) -} - fn make_region_flush(flush: FlushRequest) -> Result> { let region_id = flush.region_id.into(); Ok(vec![( From e6ccfee71b4b5d7ede832f4849816a5e31d10d1a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 17 Apr 2025 06:29:16 +0000 Subject: [PATCH 4/6] test: add tests --- .../meta/src/ddl/test_util/create_table.rs | 5 +- .../src/ddl/test_util/datanode_handler.rs | 33 +++- .../src/ddl/tests/alter_logical_tables.rs | 93 ++++++++++- src/common/meta/src/ddl/tests/alter_table.rs | 150 ++++++++++++++---- .../src/ddl/tests/create_logical_tables.rs | 87 +++++++++- src/common/meta/src/ddl/tests/drop_table.rs | 2 +- src/common/meta/src/ddl/utils.rs | 3 + src/mito2/src/worker/handle_flush.rs | 4 + 8 files changed, 340 insertions(+), 37 deletions(-) diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index 12896fbf915b..9d99bbf5c673 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -18,7 +18,9 @@ use api::v1::column_def::try_as_column_schema; use api::v1::meta::Partition; use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType}; use chrono::DateTime; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE}; +use common_catalog::consts::{ + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE, MITO_ENGINE, +}; use datatypes::schema::RawSchema; use derive_builder::Builder; use store_api::storage::TableId; @@ -164,6 +166,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask .time_index("ts") .primary_keys(["host".into()]) .table_name(name) + .engine(MITO_ENGINE) .build() .unwrap() .into(); diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 7f02d9cc5a9a..bed78724a5f2 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -45,14 +45,41 @@ impl MockDatanodeHandler for () { } #[derive(Clone)] -pub struct DatanodeWatcher(pub mpsc::Sender<(Peer, RegionRequest)>); +pub struct DatanodeWatcher { + sender: mpsc::Sender<(Peer, RegionRequest)>, + handler: Option Result>, +} + +impl DatanodeWatcher { + pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self { + Self { + sender, + handler: None, + } + } + + pub fn with_handler( + mut self, + user_handler: fn(Peer, RegionRequest) -> Result, + ) -> Self { + self.handler = Some(user_handler); + self + } +} #[async_trait::async_trait] impl MockDatanodeHandler for DatanodeWatcher { async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - self.0.send((peer.clone(), request)).await.unwrap(); - Ok(RegionResponse::new(0)) + self.sender + .send((peer.clone(), request.clone())) + .await + .unwrap(); + if let Some(handler) = self.handler { + handler(peer.clone(), request) + } else { + Ok(RegionResponse::new(0)) + } } async fn handle_query( diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 1c22cdf6f4c6..01ab8e513ce6 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -15,19 +15,33 @@ use std::assert_matches::assert_matches; use std::sync::Arc; +use api::region::RegionResponse; +use api::v1::meta::Peer; +use api::v1::region::sync_request::ManifestInfo; +use api::v1::region::{region_request, MetricManifestInfo, RegionRequest, SyncRequest}; use api::v1::{ColumnDataType, SemanticType}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::{Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; +use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; +use store_api::region_engine::RegionManifestInfo; +use store_api::storage::RegionId; +use tokio::sync::mpsc; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; use crate::ddl::test_util::columns::TestColumnDefBuilder; -use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler; -use crate::ddl::test_util::{create_logical_table, create_physical_table}; +use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; +use crate::ddl::test_util::{ + create_logical_table, create_physical_table, create_physical_table_metadata, + test_create_physical_table_task, +}; use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound}; +use crate::error::Result; use crate::key::table_name::TableNameKey; +use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; use crate::rpc::ddl::AlterTableTask; +use crate::rpc::router::{Region, RegionRoute}; use crate::test_util::{new_ddl_context, MockDatanodeManager}; fn make_alter_logical_table_add_column_task( @@ -407,3 +421,78 @@ async fn test_on_part_duplicate_alter_request() { ] ); } + +fn alters_request_handler(_peer: Peer, request: RegionRequest) -> Result { + if let region_request::Body::Alters(_) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + // Default region id for physical table. + let region_id = RegionId::new(1000, 1); + response.extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))]) + .unwrap(), + ); + return Ok(response); + } + + Ok(RegionResponse::new(0)) +} + +#[tokio::test] +async fn test_on_submit_alter_region_request() { + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let handler = DatanodeWatcher::new(tx).with_handler(alters_request_handler); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + let ddl_context = new_ddl_context(node_manager); + + let mut create_physical_table_task = test_create_physical_table_task("phy"); + let phy_id = 1000u32; + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(phy_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_state: None, + leader_down_since: None, + }]; + create_physical_table_task.set_table_id(phy_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)), + ) + .await; + create_logical_table(ddl_context.clone(), phy_id, "table1").await; + create_logical_table(ddl_context.clone(), phy_id, "table2").await; + + let tasks = vec![ + make_alter_logical_table_add_column_task(None, "table1", vec!["new_col".to_string()]), + make_alter_logical_table_add_column_task(None, "table2", vec!["mew_col".to_string()]), + ]; + + let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context); + procedure.on_prepare().await.unwrap(); + procedure.on_submit_alter_region_requests().await.unwrap(); + let mut results = Vec::new(); + for _ in 0..2 { + let result = rx.try_recv().unwrap(); + results.push(result); + } + rx.try_recv().unwrap_err(); + let (peer, request) = results.remove(0); + assert_eq!(peer.id, 1); + assert_matches!(request.body.unwrap(), region_request::Body::Alters(_)); + let (peer, request) = results.remove(0); + assert_eq!(peer.id, 5); + assert_matches!( + request.body.unwrap(), + region_request::Body::Sync(SyncRequest { + manifest_info: Some(ManifestInfo::MetricManifestInfo(MetricManifestInfo { + data_manifest_version: 1, + metadata_manifest_version: 2, + .. + })), + .. + }) + ); +} diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index c8d0450b900e..6f24870e6e04 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -16,7 +16,9 @@ use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::Arc; +use api::region::RegionResponse; use api::v1::alter_table_expr::Kind; +use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{region_request, RegionRequest}; use api::v1::{ AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn, @@ -28,6 +30,8 @@ use common_error::status_code::StatusCode; use common_procedure::store::poison_store::PoisonStore; use common_procedure::{ProcedureId, Status}; use common_procedure_test::MockContextProvider; +use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; +use store_api::region_engine::RegionManifestInfo; use store_api::storage::RegionId; use table::requests::TTL_KEY; use tokio::sync::mpsc::{self}; @@ -39,7 +43,7 @@ use crate::ddl::test_util::datanode_handler::{ AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler, RequestOutdatedErrorDatanodeHandler, }; -use crate::error::Error; +use crate::error::{Error, Result}; use crate::key::datanode_table::DatanodeTableKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -120,10 +124,71 @@ async fn test_on_prepare_table_not_exists_err() { assert_matches!(err.status_code(), StatusCode::TableNotFound); } +fn test_alter_table_task(table_name: &str) -> AlterTableTask { + AlterTableTask { + alter_table: AlterTableExpr { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + kind: Some(Kind::DropColumns(DropColumns { + drop_columns: vec![DropColumn { + name: "cpu".to_string(), + }], + })), + }, + } +} + +fn assert_alter_request( + peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_id: RegionId, +) { + assert_eq!(peer.id, expected_peer_id); + let Some(region_request::Body::Alter(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, expected_region_id); +} + +fn assert_sync_request( + peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_id: RegionId, + expected_manifest_version: u64, +) { + assert_eq!(peer.id, expected_peer_id); + let Some(region_request::Body::Sync(req)) = request.body else { + unreachable!(); + }; + let Some(ManifestInfo::MitoManifestInfo(info)) = req.manifest_info else { + unreachable!(); + }; + assert_eq!(info.data_manifest_version, expected_manifest_version); + assert_eq!(req.region_id, expected_region_id); +} + +fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result { + if let region_request::Body::Alter(req) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + let region_id = RegionId::from(req.region_id); + response.extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1))]) + .unwrap(), + ); + return Ok(response); + } + + Ok(RegionResponse::new(0)) +} + #[tokio::test] async fn test_on_submit_alter_request() { let (tx, mut rx) = mpsc::channel(8); - let datanode_handler = DatanodeWatcher(tx); + let datanode_handler = DatanodeWatcher::new(tx).with_handler(alter_request_handler); let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); let table_id = 1024; @@ -140,18 +205,7 @@ async fn test_on_submit_alter_request() { .await .unwrap(); - let alter_table_task = AlterTableTask { - alter_table: AlterTableExpr { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - kind: Some(Kind::DropColumns(DropColumns { - drop_columns: vec![DropColumn { - name: "cpu".to_string(), - }], - })), - }, - }; + let alter_table_task = test_alter_table_task(table_name); let procedure_id = ProcedureId::random(); let provider = Arc::new(MockContextProvider::default()); let mut procedure = @@ -162,30 +216,72 @@ async fn test_on_submit_alter_request() { .await .unwrap(); - let check = |peer: Peer, - request: RegionRequest, - expected_peer_id: u64, - expected_region_id: RegionId| { - assert_eq!(peer.id, expected_peer_id); - let Some(region_request::Body::Alter(req)) = request.body else { - unreachable!(); - }; - assert_eq!(req.region_id, expected_region_id); - }; + let mut results = Vec::new(); + for _ in 0..5 { + let result = rx.try_recv().unwrap(); + results.push(result); + } + rx.try_recv().unwrap_err(); + results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id)); + + let (peer, request) = results.remove(0); + assert_alter_request(peer, request, 1, RegionId::new(table_id, 1)); + let (peer, request) = results.remove(0); + assert_alter_request(peer, request, 2, RegionId::new(table_id, 2)); + let (peer, request) = results.remove(0); + assert_alter_request(peer, request, 3, RegionId::new(table_id, 3)); + let (peer, request) = results.remove(0); + assert_sync_request(peer, request, 4, RegionId::new(table_id, 2), 1); + let (peer, request) = results.remove(0); + assert_sync_request(peer, request, 5, RegionId::new(table_id, 1), 1); +} + +#[tokio::test] +async fn test_on_submit_alter_request_without_sync_request() { + let (tx, mut rx) = mpsc::channel(8); + // without use `alter_request_handler`, so no sync request will be sent. + let datanode_handler = DatanodeWatcher::new(tx); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let ddl_context = new_ddl_context(node_manager); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + prepare_table_route(table_id), + HashMap::new(), + ) + .await + .unwrap(); + + let alter_table_task = test_alter_table_task(table_name); + let procedure_id = ProcedureId::random(); + let provider = Arc::new(MockContextProvider::default()); + let mut procedure = + AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap(); + procedure.on_prepare().await.unwrap(); + procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); let mut results = Vec::new(); for _ in 0..3 { let result = rx.try_recv().unwrap(); results.push(result); } + rx.try_recv().unwrap_err(); results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id)); let (peer, request) = results.remove(0); - check(peer, request, 1, RegionId::new(table_id, 1)); + assert_alter_request(peer, request, 1, RegionId::new(table_id, 1)); let (peer, request) = results.remove(0); - check(peer, request, 2, RegionId::new(table_id, 2)); + assert_alter_request(peer, request, 2, RegionId::new(table_id, 2)); let (peer, request) = results.remove(0); - check(peer, request, 3, RegionId::new(table_id, 3)); + assert_alter_request(peer, request, 3, RegionId::new(table_id, 3)); } #[tokio::test] diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index fb5518d463e9..af0af4ccdb7b 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -15,20 +15,28 @@ use std::assert_matches::assert_matches; use std::sync::Arc; +use api::region::RegionResponse; +use api::v1::meta::Peer; +use api::v1::region::sync_request::ManifestInfo; +use api::v1::region::{region_request, MetricManifestInfo, RegionRequest, SyncRequest}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; +use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; +use store_api::region_engine::RegionManifestInfo; use store_api::storage::RegionId; +use tokio::sync::mpsc; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler; +use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; use crate::ddl::test_util::{ create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task, }; use crate::ddl::TableMetadata; -use crate::error::Error; -use crate::key::table_route::TableRouteValue; +use crate::error::{Error, Result}; +use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use crate::rpc::router::{Region, RegionRoute}; use crate::test_util::{new_ddl_context, MockDatanodeManager}; #[tokio::test] @@ -390,3 +398,76 @@ async fn test_on_create_metadata_err() { let error = procedure.execute(&ctx).await.unwrap_err(); assert!(!error.is_retry_later()); } + +fn creates_request_handler(_peer: Peer, request: RegionRequest) -> Result { + if let region_request::Body::Creates(_) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + // Default region id for physical table. + let region_id = RegionId::new(1024, 1); + response.extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))]) + .unwrap(), + ); + return Ok(response); + } + + Ok(RegionResponse::new(0)) +} + +#[tokio::test] +async fn test_on_submit_create_request() { + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let handler = DatanodeWatcher::new(tx).with_handler(creates_request_handler); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + let ddl_context = new_ddl_context(node_manager); + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let table_id = 1024u32; + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_state: None, + leader_down_since: None, + }]; + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)), + ) + .await; + let physical_table_id = table_id; + let task = test_create_logical_table_task("foo"); + let yet_another_task = test_create_logical_table_task("bar"); + let mut procedure = CreateLogicalTablesProcedure::new( + vec![task, yet_another_task], + physical_table_id, + ddl_context, + ); + procedure.on_prepare().await.unwrap(); + procedure.on_datanode_create_regions().await.unwrap(); + let mut results = Vec::new(); + for _ in 0..2 { + let result = rx.try_recv().unwrap(); + results.push(result); + } + rx.try_recv().unwrap_err(); + let (peer, request) = results.remove(0); + assert_eq!(peer.id, 1); + assert_matches!(request.body.unwrap(), region_request::Body::Creates(_)); + let (peer, request) = results.remove(0); + assert_eq!(peer.id, 5); + assert_matches!( + request.body.unwrap(), + region_request::Body::Sync(SyncRequest { + manifest_info: Some(ManifestInfo::MetricManifestInfo(MetricManifestInfo { + data_manifest_version: 1, + metadata_manifest_version: 2, + .. + })), + .. + }) + ); +} diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 031a309c4c80..9983e19ec55e 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -100,7 +100,7 @@ async fn test_on_prepare_table() { #[tokio::test] async fn test_on_datanode_drop_regions() { let (tx, mut rx) = mpsc::channel(8); - let datanode_handler = DatanodeWatcher(tx); + let datanode_handler = DatanodeWatcher::new(tx); let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); let table_id = 1024; diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 020c2edf3406..bc2276f2d9ee 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -315,6 +315,9 @@ pub async fn sync_regions( let is_mito_engine = engine == MITO_ENGINE; let followers = find_followers(region_routes); + if followers.is_empty() { + return Ok(()); + } let mut sync_region_tasks = Vec::with_capacity(followers.len()); for datanode in followers { let requester = context.node_manager.datanode(&datanode).await; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index e846809a5ddb..4018814958aa 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -216,6 +216,10 @@ impl RegionWorkerLoop { "Region {} flush finished, tries to bump wal to {}", region_id, request.flushed_entry_id ); + // We use update_high_watermark here instead of schedule_flush because: + // 1. The flush operation has already completed successfully, so we just need to mark the WAL entries as flushed + // 2. schedule_flush would trigger a new flush operation, which is unnecessary since we just finished one + // 3. update_high_watermark is the correct operation to update the system's state about what data is safely persisted if let Err(e) = self .wal .obsolete(region_id, request.flushed_entry_id, ®ion.provider) From 4e9760ba9f46f905fb859daafde09415396fecaf Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 18 Apr 2025 06:08:23 +0000 Subject: [PATCH 5/6] chore: apply suggestions from CR --- src/common/meta/src/ddl/alter_logical_tables.rs | 4 ++-- src/common/meta/src/ddl/alter_table.rs | 4 ++-- src/common/meta/src/ddl/create_logical_tables.rs | 4 ++-- src/common/meta/src/ddl/utils.rs | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 982b2d0e7ad3..74706a8ddbb5 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -32,7 +32,7 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; -use crate::ddl::utils::{add_peer_context_if_needed, sync_regions}; +use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions}; use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; @@ -175,7 +175,7 @@ impl AlterLogicalTablesProcedure { region_routes: &[RegionRoute], ) { let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info; - if let Err(err) = sync_regions( + if let Err(err) = sync_follower_regions( &self.context, self.data.physical_table_id, results, diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index bb9d698b5585..ee7b15509c79 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -40,7 +40,7 @@ use table::table_reference::TableReference; use crate::cache_invalidator::Context; use crate::ddl::utils::{ - add_peer_context_if_needed, handle_multiple_results, sync_regions, MultipleResults, + add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults, }; use crate::ddl::DdlContext; use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result}; @@ -223,7 +223,7 @@ impl AlterTableProcedure { ) { // Safety: filled in `prepare` step. let table_info = self.data.table_info().unwrap(); - if let Err(err) = sync_regions( + if let Err(err) = sync_follower_regions( &self.context, self.data.table_id(), results, diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index f2c67e892991..628f17d3984c 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -33,7 +33,7 @@ use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_regions}; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions}; use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_route::TableRouteValue; @@ -206,7 +206,7 @@ impl CreateLogicalTablesProcedure { results: Vec, region_routes: &[RegionRoute], ) { - if let Err(err) = sync_regions( + if let Err(err) = sync_follower_regions( &self.context, self.data.physical_table_id, results, diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index bc2276f2d9ee..8c48d9f419cf 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -289,7 +289,8 @@ pub fn parse_manifest_infos_from_extensions( Ok(data_manifest_version) } -pub async fn sync_regions( +/// Sync follower regions on datanodes. +pub async fn sync_follower_regions( context: &DdlContext, table_id: TableId, results: Vec, From 79f9d7ea1d90da38288f06ec28bf2b0df1874199 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 18 Apr 2025 09:16:22 +0000 Subject: [PATCH 6/6] chore: apply suggestions from CR --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/mito2/src/engine.rs | 8 +++++--- src/mito2/src/worker/handle_flush.rs | 4 ---- src/store-api/src/region_request.rs | 2 +- 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e18b7b80d7dc..0678edbafc6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4722,7 +4722,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c#6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6d9cffd43c4e6358805a798f17e03e232994b82#b6d9cffd43c4e6358805a798f17e03e232994b82" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index e3ee1ee0d729..f6a0e5e6d342 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index a138547ce32c..fbedef3c3583 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -230,17 +230,18 @@ impl MitoEngine { region_id: &RegionId, manifest_info: RegionManifestInfo, extensions: &mut HashMap>, - ) { + ) -> Result<()> { let region_manifest_info = vec![(*region_id, manifest_info)]; extensions.insert( MANIFEST_INFO_EXTENSION_KEY.to_string(), - RegionManifestInfo::encode_list(®ion_manifest_info).unwrap(), + RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?, ); info!( "Added manifest info: {:?} to extensions, region_id: {:?}", region_manifest_info, region_id ); + Ok(()) } } @@ -589,7 +590,8 @@ impl RegionEngine for MitoEngine { ®ion_id, statistic.manifest, &mut response.extensions, - ); + ) + .map_err(BoxedError::new)?; } } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 4018814958aa..e846809a5ddb 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -216,10 +216,6 @@ impl RegionWorkerLoop { "Region {} flush finished, tries to bump wal to {}", region_id, request.flushed_entry_id ); - // We use update_high_watermark here instead of schedule_flush because: - // 1. The flush operation has already completed successfully, so we just need to mark the WAL entries as flushed - // 2. schedule_flush would trigger a new flush operation, which is unnecessary since we just finished one - // 3. update_high_watermark is the correct operation to update the system's state about what data is safely persisted if let Err(e) = self .wal .obsolete(region_id, request.flushed_entry_id, ®ion.provider) diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 12ec69c7b86f..5a6e1289c6fe 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -154,7 +154,7 @@ impl RegionRequest { region_request::Body::Alters(alters) => make_region_alters(alters), region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk), region_request::Body::Sync(_) => UnexpectedSnafu { - reason: "Sync request should be handled separately by RegionServer::handle_sync_region_request", + reason: "Sync request should be handled separately by RegionServer", } .fail(), }