Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns,
RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterExpr, ChangeColumnTypes, CreateTableExpr,
DropColumns, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ChangeColumnTypeRequest};

use crate::error::{
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
Expand Down Expand Up @@ -64,13 +65,33 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
columns: add_column_requests,
}
}
Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types,
}) => {
let change_column_type_requests = change_column_types
.into_iter()
.map(|cct| {
let target_type =
ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
.into();

Ok(ChangeColumnTypeRequest {
column_name: cct.column_name,
target_type,
})
})
.collect::<Result<Vec<_>>>()?;

AlterKind::ChangeColumnTypes {
columns: change_column_type_requests,
}
}
Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
},
Kind::RenameTable(RenameTable { new_table_name }) => {
AlterKind::RenameTable { new_table_name }
}
Kind::ChangeColumnTypes(_) => unimplemented!(),
};

let request = AlterTableRequest {
Expand Down Expand Up @@ -138,7 +159,10 @@ fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation

#[cfg(test)]
mod tests {
use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, SemanticType};
use api::v1::{
AddColumn, AddColumns, ChangeColumnType, ColumnDataType, ColumnDef, DropColumn,
SemanticType,
};
use datatypes::prelude::ConcreteDataType;

use super::*;
Expand Down Expand Up @@ -261,6 +285,40 @@ mod tests {
assert_eq!(Some(AddColumnLocation::First), add_column.location);
}

#[test]
fn test_change_column_type_expr() {
let expr = AlterExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),

kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
column_name: "mem_usage".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
}],
})),
};

let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "test_catalog");
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);

let mut change_column_types = match alter_request.alter_kind {
AlterKind::ChangeColumnTypes { columns } => columns,
_ => unreachable!(),
};

let change_column_type = change_column_types.pop().unwrap();
assert_eq!("mem_usage", change_column_type.column_name);
assert_eq!(
ConcreteDataType::string_datatype(),
change_column_type.target_type
);
}

#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
Expand Down
75 changes: 66 additions & 9 deletions src/common/meta/src/ddl/alter_table/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ fn create_proto_alter_kind(
add_columns,
})))
}
Kind::ChangeColumnTypes(x) => Ok(Some(alter_request::Kind::ChangeColumnTypes(x.clone()))),
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
Expand All @@ -105,7 +106,6 @@ fn create_proto_alter_kind(
})))
}
Kind::RenameTable(_) => Ok(None),
Kind::ChangeColumnTypes(_) => unimplemented!(),
}
}

Expand All @@ -119,27 +119,27 @@ mod tests {
use api::v1::region::region_request::Body;
use api::v1::region::RegionColumnDef;
use api::v1::{
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ColumnDataType,
ColumnDef as PbColumnDef, SemanticType,
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ChangeColumnType,
ChangeColumnTypes, ColumnDataType, ColumnDef as PbColumnDef, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, TableId};

use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::create_table::{
build_raw_table_info_from_expr, TestCreateTableExprBuilder,
};
use crate::ddl::DdlContext;
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};

#[tokio::test]
async fn test_make_alter_region_request() {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager);
async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let table_id = 1024;
let region_id = RegionId::new(table_id, 1);
Expand Down Expand Up @@ -194,12 +194,25 @@ mod tests {
)
.await
.unwrap();
(
ddl_context,
cluster_id,
table_id,
region_id,
table_name.to_string(),
)
}

#[tokio::test]
async fn test_make_alter_region_request() {
let (ddl_context, cluster_id, table_id, region_id, table_name) =
prepare_ddl_context().await;

let task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_name,
kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(PbColumnDef {
Expand Down Expand Up @@ -256,4 +269,48 @@ mod tests {
))
);
}

#[tokio::test]
async fn test_make_alter_column_type_region_request() {
let (ddl_context, cluster_id, table_id, region_id, table_name) =
prepare_ddl_context().await;

let task = AlterTableTask {
alter_table: AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
}],
})),
},
};

let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};
assert_eq!(alter_region_request.region_id, region_id.as_u64());
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::ChangeColumnTypes(
ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
}]
}
))
);
}
}
62 changes: 58 additions & 4 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension,
CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType,
TableName,
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType,
ColumnDataTypeExtension, CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable,
SemanticType, TableName,
Comment thread
waynexia marked this conversation as resolved.
Outdated
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
Expand All @@ -37,7 +37,9 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX};
use sql::statements::{column_def_to_schema, sql_column_def_to_grpc_column_def};
use sql::statements::{
column_def_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
};
use sql::util::extract_tables_from_query;
use table::requests::{TableOptions, FILE_TABLE_META_KEY};
use table::table_reference::TableReference;
Expand Down Expand Up @@ -473,6 +475,23 @@ pub(crate) fn to_alter_expr(
location: location.as_ref().map(From::from),
}],
}),
AlterTableOperation::ChangeColumnType {
column_name,
target_type,
} => {
let target_type =
sql_data_type_to_concrete_data_type(target_type).context(ParseSqlSnafu)?;
let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
.map(|w| w.to_parts())
.context(ColumnDataTypeSnafu)?;
Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
column_name: column_name.value.to_string(),
target_type: target_type as i32,
target_type_extension,
}],
})
}
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: name.value.to_string(),
Expand Down Expand Up @@ -710,4 +729,39 @@ mod tests {
if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
);
}

#[test]
fn test_to_alter_change_column_type_expr() {
let sql = "ALTER TABLE monitor MODIFY mem_usage STRING;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::Alter(alter_table) = stmt else {
unreachable!()
};

// query context with system timezone UTC.
let expr = to_alter_expr(alter_table.clone(), QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();

let Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types,
}) = kind
else {
unreachable!()
};

assert_eq!(1, change_column_types.len());
let change_column_type = &change_column_types[0];

assert_eq!("mem_usage", change_column_type.column_name);
assert_eq!(
ColumnDataType::String as i32,
change_column_type.target_type
);
assert!(change_column_type.target_type_extension.is_none());
}
}
Loading