Skip to content

Commit 5f8d849

Browse files
CookiePieWwWenyXu
andauthored
feat: alter database ttl (#5035)
* feat: alter databaset ttl * fix: make clippy happy * feat: add unset database option * fix: happy ci * fix: happy clippy * chore: fmt toml * fix: fix header * refactor: introduce `AlterDatabaseKind` * chore: apply suggestions from CR * refactor: add unset database option support * test: add unit tests * test: add sqlness tests * feat: invalidate schema name value cache * Apply suggestions from code review * chore: fmt * chore: update error messages * test: add more test cases * test: add more test cases * Apply suggestions from code review * chore: apply suggestions from CR --------- Co-authored-by: WenyXu <wenymedia@gmail.com>
1 parent 3029b47 commit 5f8d849

41 files changed

Lines changed: 1320 additions & 186 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ etcd-client = "0.13"
122122
fst = "0.4.7"
123123
futures = "0.3"
124124
futures-util = "0.3"
125-
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28" }
125+
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
126126
hex = "0.4"
127127
humantime = "2.1"
128128
humantime-serde = "1.1"

src/api/src/helper.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
527527
match request.expr {
528528
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
529529
Some(Expr::CreateTable(_)) => "ddl.create_table",
530-
Some(Expr::Alter(_)) => "ddl.alter",
530+
Some(Expr::AlterTable(_)) => "ddl.alter_table",
531531
Some(Expr::DropTable(_)) => "ddl.drop_table",
532532
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
533533
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
534534
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
535535
Some(Expr::CreateView(_)) => "ddl.create_view",
536536
Some(Expr::DropView(_)) => "ddl.drop_view",
537+
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
537538
None => "ddl.empty",
538539
}
539540
}

src/catalog/src/system_schema/information_schema/schemata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
180180
.context(TableMetadataManagerSnafu)?
181181
// information_schema is not available from this
182182
// table_metadata_manager and we return None
183-
.map(|schema_opts| format!("{schema_opts}"))
183+
.map(|schema_opts| format!("{}", schema_opts.into_inner()))
184184
} else {
185185
None
186186
};

src/client/src/database.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
1818
use api::v1::greptime_request::Request;
1919
use api::v1::query_request::Query;
2020
use api::v1::{
21-
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
21+
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
2222
QueryRequest, RequestHeader,
2323
};
2424
use arrow_flight::Ticket;
@@ -211,9 +211,9 @@ impl Database {
211211
.await
212212
}
213213

214-
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
214+
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
215215
self.do_get(Request::Ddl(DdlRequest {
216-
expr: Some(DdlExpr::Alter(expr)),
216+
expr: Some(DdlExpr::AlterTable(expr)),
217217
}))
218218
.await
219219
}

src/common/grpc-expr/src/alter.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
use api::helper::ColumnDataTypeWrapper;
1616
use api::v1::add_column_location::LocationType;
17-
use api::v1::alter_expr::Kind;
17+
use api::v1::alter_table_expr::Kind;
1818
use api::v1::column_def::as_fulltext_option;
1919
use api::v1::{
20-
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, CreateTableExpr, DropColumns,
21-
ModifyColumnTypes, RenameTable, SemanticType,
20+
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
21+
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
2222
};
2323
use common_query::AddColumnLocation;
2424
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
@@ -36,8 +36,8 @@ use crate::error::{
3636
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
3737
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
3838

39-
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
40-
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
39+
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
40+
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
4141
let catalog_name = expr.catalog_name;
4242
let schema_name = expr.schema_name;
4343
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
@@ -203,7 +203,7 @@ mod tests {
203203

204204
#[test]
205205
fn test_alter_expr_to_request() {
206-
let expr = AlterExpr {
206+
let expr = AlterTableExpr {
207207
catalog_name: String::default(),
208208
schema_name: String::default(),
209209
table_name: "monitor".to_string(),
@@ -244,7 +244,7 @@ mod tests {
244244

245245
#[test]
246246
fn test_alter_expr_with_location_to_request() {
247-
let expr = AlterExpr {
247+
let expr = AlterTableExpr {
248248
catalog_name: String::default(),
249249
schema_name: String::default(),
250250
table_name: "monitor".to_string(),
@@ -321,7 +321,7 @@ mod tests {
321321

322322
#[test]
323323
fn test_modify_column_type_expr() {
324-
let expr = AlterExpr {
324+
let expr = AlterTableExpr {
325325
catalog_name: "test_catalog".to_string(),
326326
schema_name: "test_schema".to_string(),
327327
table_name: "monitor".to_string(),
@@ -355,7 +355,7 @@ mod tests {
355355

356356
#[test]
357357
fn test_drop_column_expr() {
358-
let expr = AlterExpr {
358+
let expr = AlterTableExpr {
359359
catalog_name: "test_catalog".to_string(),
360360
schema_name: "test_schema".to_string(),
361361
table_name: "monitor".to_string(),

src/common/meta/src/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
3232
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
3333
use crate::{ClusterId, DatanodeId};
3434

35+
pub mod alter_database;
3536
pub mod alter_logical_tables;
3637
pub mod alter_table;
3738
pub mod create_database;
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use async_trait::async_trait;
16+
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17+
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18+
use common_telemetry::tracing::info;
19+
use serde::{Deserialize, Serialize};
20+
use snafu::{ensure, ResultExt};
21+
use strum::AsRefStr;
22+
23+
use super::utils::handle_retry_error;
24+
use crate::cache_invalidator::Context;
25+
use crate::ddl::DdlContext;
26+
use crate::error::{Result, SchemaNotFoundSnafu};
27+
use crate::instruction::CacheIdent;
28+
use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
29+
use crate::key::DeserializedValueWithBytes;
30+
use crate::lock_key::{CatalogLock, SchemaLock};
31+
use crate::rpc::ddl::UnsetDatabaseOption::{self};
32+
use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
33+
use crate::ClusterId;
34+
35+
pub struct AlterDatabaseProcedure {
36+
pub context: DdlContext,
37+
pub data: AlterDatabaseData,
38+
}
39+
40+
fn build_new_schema_value(
41+
mut value: SchemaNameValue,
42+
alter_kind: &AlterDatabaseKind,
43+
) -> Result<SchemaNameValue> {
44+
match alter_kind {
45+
AlterDatabaseKind::SetDatabaseOptions(options) => {
46+
for option in options.0.iter() {
47+
match option {
48+
SetDatabaseOption::Ttl(ttl) => {
49+
if ttl.is_zero() {
50+
value.ttl = None;
51+
} else {
52+
value.ttl = Some(*ttl);
53+
}
54+
}
55+
}
56+
}
57+
}
58+
AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
59+
for key in keys.0.iter() {
60+
match key {
61+
UnsetDatabaseOption::Ttl => value.ttl = None,
62+
}
63+
}
64+
}
65+
}
66+
Ok(value)
67+
}
68+
69+
impl AlterDatabaseProcedure {
70+
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
71+
72+
pub fn new(
73+
cluster_id: ClusterId,
74+
task: AlterDatabaseTask,
75+
context: DdlContext,
76+
) -> Result<Self> {
77+
Ok(Self {
78+
context,
79+
data: AlterDatabaseData::new(task, cluster_id)?,
80+
})
81+
}
82+
83+
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
84+
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
85+
86+
Ok(Self { context, data })
87+
}
88+
89+
pub async fn on_prepare(&mut self) -> Result<Status> {
90+
let value = self
91+
.context
92+
.table_metadata_manager
93+
.schema_manager()
94+
.get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
95+
.await?;
96+
97+
ensure!(
98+
value.is_some(),
99+
SchemaNotFoundSnafu {
100+
table_schema: self.data.schema(),
101+
}
102+
);
103+
104+
self.data.schema_value = value;
105+
self.data.state = AlterDatabaseState::UpdateMetadata;
106+
107+
Ok(Status::executing(true))
108+
}
109+
110+
pub async fn on_update_metadata(&mut self) -> Result<Status> {
111+
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
112+
113+
// Safety: schema_value is not None.
114+
let current_schema_value = self.data.schema_value.as_ref().unwrap();
115+
116+
let new_schema_value = build_new_schema_value(
117+
current_schema_value.get_inner_ref().clone(),
118+
&self.data.kind,
119+
)?;
120+
121+
self.context
122+
.table_metadata_manager
123+
.schema_manager()
124+
.update(schema_name, current_schema_value, &new_schema_value)
125+
.await?;
126+
127+
info!("Updated database metadata for schema {schema_name}");
128+
self.data.state = AlterDatabaseState::InvalidateSchemaCache;
129+
Ok(Status::executing(true))
130+
}
131+
132+
pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
133+
let cache_invalidator = &self.context.cache_invalidator;
134+
cache_invalidator
135+
.invalidate(
136+
&Context::default(),
137+
&[CacheIdent::SchemaName(SchemaName {
138+
catalog_name: self.data.catalog().to_string(),
139+
schema_name: self.data.schema().to_string(),
140+
})],
141+
)
142+
.await?;
143+
144+
Ok(Status::done())
145+
}
146+
}
147+
148+
#[async_trait]
149+
impl Procedure for AlterDatabaseProcedure {
150+
fn type_name(&self) -> &str {
151+
Self::TYPE_NAME
152+
}
153+
154+
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
155+
match self.data.state {
156+
AlterDatabaseState::Prepare => self.on_prepare().await,
157+
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
158+
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
159+
}
160+
.map_err(handle_retry_error)
161+
}
162+
163+
fn dump(&self) -> ProcedureResult<String> {
164+
serde_json::to_string(&self.data).context(ToJsonSnafu)
165+
}
166+
167+
fn lock_key(&self) -> LockKey {
168+
let catalog = self.data.catalog();
169+
let schema = self.data.schema();
170+
171+
let lock_key = vec![
172+
CatalogLock::Read(catalog).into(),
173+
SchemaLock::write(catalog, schema).into(),
174+
];
175+
176+
LockKey::new(lock_key)
177+
}
178+
}
179+
180+
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
181+
enum AlterDatabaseState {
182+
Prepare,
183+
UpdateMetadata,
184+
InvalidateSchemaCache,
185+
}
186+
187+
/// The data of alter database procedure.
188+
#[derive(Debug, Serialize, Deserialize)]
189+
pub struct AlterDatabaseData {
190+
cluster_id: ClusterId,
191+
state: AlterDatabaseState,
192+
kind: AlterDatabaseKind,
193+
catalog_name: String,
194+
schema_name: String,
195+
schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
196+
}
197+
198+
impl AlterDatabaseData {
199+
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
200+
Ok(Self {
201+
cluster_id,
202+
state: AlterDatabaseState::Prepare,
203+
kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
204+
catalog_name: task.alter_expr.catalog_name,
205+
schema_name: task.alter_expr.schema_name,
206+
schema_value: None,
207+
})
208+
}
209+
210+
pub fn catalog(&self) -> &str {
211+
&self.catalog_name
212+
}
213+
214+
pub fn schema(&self) -> &str {
215+
&self.schema_name
216+
}
217+
}
218+
219+
#[cfg(test)]
220+
mod tests {
221+
use std::time::Duration;
222+
223+
use crate::ddl::alter_database::build_new_schema_value;
224+
use crate::key::schema_name::SchemaNameValue;
225+
use crate::rpc::ddl::{
226+
AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
227+
UnsetDatabaseOptions,
228+
};
229+
230+
#[test]
231+
fn test_build_new_schema_value() {
232+
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
233+
SetDatabaseOption::Ttl(Duration::from_secs(10)),
234+
]));
235+
let current_schema_value = SchemaNameValue::default();
236+
let new_schema_value =
237+
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
238+
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
239+
240+
let unset_ttl_alter_kind =
241+
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
242+
UnsetDatabaseOption::Ttl,
243+
]));
244+
let new_schema_value =
245+
build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
246+
assert_eq!(new_schema_value.ttl, None);
247+
}
248+
}

0 commit comments

Comments
 (0)