Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8a88448eb5b015fbf12e67c72c620c9d314f2375" }
Comment thread
WenyXu marked this conversation as resolved.
Outdated
Comment thread
WenyXu marked this conversation as resolved.
Outdated
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
Expand Down
3 changes: 2 additions & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::AlterTable(_)) => "ddl.alter_table",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty",
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader,
};
use arrow_flight::Ticket;
Expand Down Expand Up @@ -211,9 +211,9 @@ impl Database {
.await
}

pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
expr: Some(DdlExpr::AlterTable(expr)),
}))
.await
}
Expand Down
22 changes: 11 additions & 11 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::as_fulltext_option;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, CreateTableExpr, DropColumns,
ModifyColumnTypes, RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
Expand All @@ -36,9 +36,9 @@ use crate::error::{
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
let alter_table_expr = expr.catalog_name;
Comment thread
WenyXu marked this conversation as resolved.
Outdated
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
let alter_kind = match kind {
Expand Down Expand Up @@ -129,7 +129,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
};

let request = AlterTableRequest {
catalog_name,
catalog_name: alter_table_expr,
schema_name,
table_name: expr.table_name,
table_id,
Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {

#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -244,7 +244,7 @@ mod tests {

#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -321,7 +321,7 @@ mod tests {

#[test]
fn test_modify_column_type_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {

#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};

pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
Expand Down
181 changes: 181 additions & 0 deletions src/common/meta/src/ddl/alter_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::alter_database_expr::Kind;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use super::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::key::schema_name::SchemaNameKey;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::rpc::ddl::AlterDatabaseTask;
use crate::ClusterId;

pub struct AlterDatabaseProcedure {
pub context: DdlContext,
pub data: AlterDatabaseData,
}

impl AlterDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";

pub fn new(cluster_id: ClusterId, task: AlterDatabaseTask, context: DdlContext) -> Self {
Self {
context,
data: AlterDatabaseData {
state: AlterDatabaseState::Prepare,
cluster_id,
task,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

pub async fn on_prepare(&mut self) -> Result<Status> {
let exists = self
.context
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
.await?;

ensure!(
exists,
SchemaNotFoundSnafu {
table_schema: self.data.schema(),
}
);

self.data.task.validate()?;
self.data.state = AlterDatabaseState::UpdateMetadata;

Ok(Status::executing(true))
}

pub async fn on_update_metadata(&mut self) -> Result<Status> {
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
// Safety: Validated in on_prepare
let alter_kind = self.data.task.alter_expr.kind.as_ref().unwrap();
match alter_kind {
Kind::SetDatabaseOptions(options) => {
let option_map = options
.set_database_options
.iter()
.map(|option| (option.key.clone(), option.value.clone()))
.collect::<HashMap<String, String>>();
self.validate_options(&option_map)?;
let schema_value = (&option_map).try_into()?;
self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, schema_value)
.await?;
}
Kind::UnsetDatabaseOptions(options) => {
let option_map = options
.keys
.iter()
.map(|option| (option.clone(), "".to_string()))
.collect::<HashMap<String, String>>();
self.validate_options(&option_map)?;
let schema_value = (&option_map).try_into()?;
self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, schema_value)
.await?;
}
};
Ok(Status::done())
}

fn validate_options(&self, _options: &HashMap<String, String>) -> Result<()> {
todo!()
Comment thread
WenyXu marked this conversation as resolved.
Outdated
}
}

#[async_trait]
impl Procedure for AlterDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
AlterDatabaseState::Prepare => self.on_prepare().await,
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let catalog = self.data.catalog();
let schema = self.data.schema();

let lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
];

LockKey::new(lock_key)
}
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterDatabaseState {
Prepare,
UpdateMetadata,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AlterDatabaseData {
cluster_id: ClusterId,
state: AlterDatabaseState,
task: AlterDatabaseTask,
}

impl AlterDatabaseData {
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Self {
Self {
cluster_id,
state: AlterDatabaseState::Prepare,
task,
}
}

pub fn catalog(&self) -> &str {
self.task.catalog()
}

pub fn schema(&self) -> &str {
self.task.schema()
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_logical_tables/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashSet;

use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use snafu::{ensure, OptionExt};

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use api::v1;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod update_metadata;

use std::vec;

use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/alter_table/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use common_catalog::format_full_table_name;
use snafu::ensure;
Expand Down
Loading