Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6175e7d0b9ebe892b8baa8bd6dd39ea789657d4c" }
Comment thread
WenyXu marked this conversation as resolved.
Outdated
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down
44 changes: 38 additions & 6 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ mod region_request;
mod table_cache_keys;
mod update_metadata;

use api::region::RegionResponse;
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures_util::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
Expand All @@ -30,7 +32,7 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;

use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
Expand All @@ -39,7 +41,7 @@ use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::find_leaders;
use crate::rpc::router::{find_leaders, RegionRoute};

pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
Expand Down Expand Up @@ -125,14 +127,20 @@ impl AlterLogicalTablesProcedure {
});
}

// Collects responses from datanodes.
let phy_raw_schemas = future::join_all(alter_region_tasks)
let mut results = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

// Collects responses from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();

if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
return Ok(Status::executing(true));
}
Expand All @@ -155,10 +163,34 @@ impl AlterLogicalTablesProcedure {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}

async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}",
format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
self.data.physical_table_id
);
}
}

pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;
Expand Down
33 changes: 29 additions & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod update_metadata;

use std::vec;

use api::region::RegionResponse;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
Expand All @@ -29,7 +30,7 @@ use common_procedure::{
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use futures::future;
use futures::future::{self};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
Expand All @@ -38,7 +39,9 @@ use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;

use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults};
use crate::ddl::utils::{
add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
};
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::instruction::CacheIdent;
Expand All @@ -48,7 +51,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};

/// The alter table procedure
pub struct AlterTableProcedure {
Expand Down Expand Up @@ -194,7 +197,9 @@ impl AlterTableProcedure {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::Ok => {
MultipleResults::Ok(results) => {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing_with_clean_poisons(true))
}
Expand All @@ -211,6 +216,26 @@ impl AlterTableProcedure {
}
}

async fn submit_sync_region_requests(
&mut self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
// Safety: filled in `prepare` step.
let table_info = self.data.table_info().unwrap();
if let Err(err) = sync_follower_regions(
&self.context,
self.data.table_id(),
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
}
}

/// Update table metadata.
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
Expand Down
40 changes: 34 additions & 6 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ mod metadata;
mod region_request;
mod update_metadata;

use api::region::RegionResponse;
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::{debug, warn};
use futures_util::future::join_all;
use common_telemetry::{debug, error, warn};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
Expand All @@ -31,7 +33,7 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::key::table_route::TableRouteValue;
Expand Down Expand Up @@ -156,14 +158,20 @@ impl CreateLogicalTablesProcedure {
});
}

// Collects response from datanodes.
let phy_raw_schemas = join_all(create_region_tasks)
let mut results = future::join_all(create_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

// Collects response from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();

if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
Expand All @@ -186,10 +194,30 @@ impl CreateLogicalTablesProcedure {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;

Ok(Status::executing(true))
}

async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
METRIC_ENGINE,
)
.await
{
error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
}
}
}

#[async_trait]
Expand Down
57 changes: 53 additions & 4 deletions src/common/meta/src/ddl/drop_table/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
use std::collections::HashMap;

use api::v1::region::{
region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader,
region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest,
RegionRequest, RegionRequestHeader,
};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, error};
use common_wal::options::WalOptions;
use futures::future::join_all;
use snafu::ensure;
Expand All @@ -36,7 +37,8 @@ use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
find_follower_regions, find_followers, find_leader_regions, find_leaders,
operating_leader_regions, RegionRoute,
};

/// [Control] indicated to the caller whether to go to the next step.
Expand Down Expand Up @@ -210,10 +212,10 @@ impl DropTableExecutor {
region_routes: &[RegionRoute],
fast_path: bool,
) -> Result<()> {
// Drops leader regions on datanodes.
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;

for datanode in leaders {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
Expand Down Expand Up @@ -252,6 +254,53 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;

// Drops follower regions on datanodes.
let followers = find_followers(region_routes);
let mut close_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

for region_id in region_ids {
debug!("Closing region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Close(PbCloseRegionRequest {
region_id: region_id.as_u64(),
})),
};

let datanode = datanode.clone();
let requester = requester.clone();
close_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
});
}
}

// Failure to close follower regions is not critical.
// When a leader region is dropped, follower regions will be unable to renew their leases via metasrv.
// Eventually, these follower regions will be automatically closed by the region livekeeper.
if let Err(err) = join_all(close_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
{
error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
}

// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
Expand Down
5 changes: 4 additions & 1 deletion src/common/meta/src/ddl/test_util/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE, MITO_ENGINE,
};
use datatypes::schema::RawSchema;
use derive_builder::Builder;
use store_api::storage::TableId;
Expand Down Expand Up @@ -164,6 +166,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.engine(MITO_ENGINE)
.build()
.unwrap()
.into();
Expand Down
Loading
Loading