diff --git a/Cargo.lock b/Cargo.lock index b8947d5c2b16..f89cf85f0835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1837,6 +1837,7 @@ dependencies = [ "datatypes", "etcd-client", "futures", + "humantime-serde", "hyper", "lazy_static", "prost", @@ -4154,7 +4155,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc#ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a277f27caa035a801d5b9c020a0449777736614#4a277f27caa035a801d5b9c020a0449777736614" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index bfbc1049d98e..b3c3209558b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,8 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a277f27caa035a801d5b9c020a0449777736614" } +humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index fb40b3a7db29..685591cf6289 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -205,7 +205,7 @@ impl MigrateTableMetadata { async fn migrate_schema_key(&self, key: &v1SchemaKey) -> Result<()> { let new_key = SchemaNameKey::new(&key.catalog_name, &key.schema_name); - let schema_name_value = SchemaNameValue; + let schema_name_value = SchemaNameValue::default(); info!("Creating '{new_key}'"); diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 68c1cef20855..f978d737451e 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -15,6 +15,7 @@ common-telemetry = { workspace = true } common-time = { workspace = true } etcd-client.workspace = true futures.workspace = true +humantime-serde.workspace = true lazy_static.workspace = true prost.workspace = true regex.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ef892a24e14e..b066bee50dd5 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -54,6 +54,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse value {} into key {}", value, key))] + ParseOption { + key: String, + value: String, + location: Location, + }, + #[snafu(display("Corrupted table route data, err: {}", err_msg))] RouteInfoCorrupted { err_msg: String, location: Location }, @@ -151,6 +158,7 @@ impl ErrorExt for Error { IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal, SerdeJson { .. } + | ParseOption { .. } | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } | InvalidTableMetadata { .. } diff --git a/src/common/meta/src/key/schema_name.rs b/src/common/meta/src/key/schema_name.rs index 9fee6a60fefd..c162e31b1617 100644 --- a/src/common/meta/src/key/schema_name.rs +++ b/src/common/meta/src/key/schema_name.rs @@ -12,22 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; +use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use futures::stream::BoxStream; use futures::StreamExt; +use humantime_serde::re::humantime; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use crate::error::{self, Error, InvalidTableMetadataSnafu, Result}; +use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result}; use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::{PutRequest, RangeRequest}; use crate::rpc::KeyValue; +const OPT_KEY_TTL: &str = "ttl"; + #[derive(Debug, Clone, Copy, PartialEq)] pub struct SchemaNameKey<'a> { pub catalog: &'a str, @@ -43,8 +48,33 @@ impl<'a> Default for SchemaNameKey<'a> { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct SchemaNameValue; +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct SchemaNameValue { + #[serde(default)] + #[serde(with = "humantime_serde")] + pub ttl: Option, +} + +impl TryFrom<&HashMap> for SchemaNameValue { + type Error = Error; + + fn try_from(value: &HashMap) -> std::result::Result { + let ttl = value + .get(OPT_KEY_TTL) + .map(|ttl_str| { + ttl_str.parse::().map_err(|_| { + ParseOptionSnafu { + key: OPT_KEY_TTL, + value: ttl_str.clone(), + } + .build() + }) + }) + .transpose()? + .map(|ttl| ttl.into()); + Ok(Self { ttl }) + } +} impl<'a> SchemaNameKey<'a> { pub fn new(catalog: &'a str, schema: &'a str) -> Self { @@ -108,11 +138,15 @@ impl SchemaManager { } /// Creates `SchemaNameKey`. - pub async fn create(&self, schema: SchemaNameKey<'_>) -> Result<()> { + pub async fn create( + &self, + schema: SchemaNameKey<'_>, + value: Option, + ) -> Result<()> { let raw_key = schema.as_raw_key(); let req = PutRequest::new() .with_key(raw_key) - .with_value(SchemaNameValue.try_as_raw_value()?); + .with_value(value.unwrap_or_default().try_as_raw_value()?); self.kv_backend.put(req).await?; @@ -125,6 +159,14 @@ impl SchemaManager { Ok(self.kv_backend.get(&raw_key).await?.is_some()) } + pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result> { + let raw_key = schema.as_raw_key(); + let value = self.kv_backend.get(&raw_key).await?; + value + .map(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref())) + .transpose() + } + /// Returns a schema stream, it lists all schemas belong to the target `catalog`. pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result> { let start_key = SchemaNameKey::range_start_key(catalog); @@ -143,25 +185,35 @@ impl SchemaManager { #[cfg(test)] mod tests { + use super::*; use crate::kv_backend::memory::MemoryKvBackend; #[test] fn test_serialization() { let key = SchemaNameKey::new("my-catalog", "my-schema"); - assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema"); let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap(); - assert_eq!(key, parsed); + + let value = SchemaNameValue { + ttl: Some(Duration::from_secs(10)), + }; + let mut opts: HashMap = HashMap::new(); + opts.insert("ttl".to_string(), "10s".to_string()); + let from_value = SchemaNameValue::try_from(&opts).unwrap(); + assert_eq!(value, from_value); + + let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap(); + assert_eq!(value, parsed); } #[tokio::test] async fn test_key_exist() { let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default())); let schema_key = SchemaNameKey::new("my-catalog", "my-schema"); - manager.create(schema_key).await.unwrap(); + manager.create(schema_key, None).await.unwrap(); assert!(manager.exist(schema_key).await.unwrap()); diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 3f61988175f7..c09ed6e1e90f 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -12,7 +12,7 @@ common-error = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } futures.workspace = true -humantime-serde = "1.1" +humantime-serde.workspace = true object-store = { workspace = true } serde.workspace = true serde_json = "1.0" diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index caa71d44e643..e613684e8760 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -40,7 +40,7 @@ datatypes = { workspace = true } file-table-engine = { workspace = true } futures = "0.3" futures-util.workspace = true -humantime-serde = "1.1" +humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } key-lock = "0.1" log-store = { workspace = true } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 2d9e527f694b..d3cec2b13853 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -365,6 +365,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "my_database".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); @@ -418,6 +419,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "my_database".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); @@ -485,6 +487,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "my_database".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); @@ -589,6 +592,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "my_database".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); @@ -661,6 +665,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "my_database".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 8ab85e1bec43..0ea5c683e57f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -39,7 +39,7 @@ datatypes = { workspace = true } file-table-engine = { workspace = true } futures = "0.3" futures-util.workspace = true -humantime-serde = "1.1" +humantime-serde.workspace = true itertools.workspace = true meta-client = { workspace = true } # Although it is not used, please do not delete it. diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 091810efee50..bccbb321619d 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -34,7 +34,7 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::key::schema_name::SchemaNameKey; +use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest}; @@ -67,8 +67,9 @@ use crate::catalog::FrontendCatalogManager; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, - RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, - TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu, + RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu, + TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, TableSnafu, + UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::instance::distributed::deleter::DistDeleter; @@ -104,6 +105,25 @@ impl DistInstance { partitions: Option, ) -> Result { let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE); + // 1. get schema info + let schema = self + .catalog_manager + .table_metadata_manager_ref() + .schema_manager() + .get(SchemaNameKey::new( + &create_table.catalog_name, + &create_table.schema_name, + )) + .await + .context(TableMetadataManagerSnafu)?; + + let Some(schema_opts) = schema else { + return SchemaNotFoundSnafu { + schema_info: &create_table.schema_name, + } + .fail(); + }; + let table_name = TableName::new( &create_table.catalog_name, &create_table.schema_name, @@ -112,7 +132,7 @@ impl DistInstance { let (partitions, partition_cols) = parse_partitions(create_table, partitions)?; - let mut table_info = create_table_info(create_table, partition_cols)?; + let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?; let resp = self .create_table_procedure(create_table, partitions, table_info.clone()) @@ -340,6 +360,7 @@ impl DistInstance { let expr = CreateDatabaseExpr { database_name: stmt.name.to_string(), create_if_not_exists: stmt.if_not_exists, + options: Default::default(), }; self.handle_create_database(expr, query_ctx).await } @@ -477,10 +498,12 @@ impl DistInstance { } ); + let schema_value = + SchemaNameValue::try_from(&expr.options).context(error::TableMetadataManagerSnafu)?; self.catalog_manager .table_metadata_manager_ref() .schema_manager() - .create(schema) + .create(schema, Some(schema_value)) .await .context(error::TableMetadataManagerSnafu)?; @@ -772,6 +795,7 @@ fn create_partitions_stmt(partitions: Vec) -> Result, + schema_opts: SchemaNameValue, ) -> Result { let mut column_schemas = Vec::with_capacity(create_table.column_defs.len()); let mut column_name_to_index_map = HashMap::new(); @@ -818,6 +842,10 @@ fn create_table_info( }) .collect::>>()?; + let table_options = TableOptions::try_from(&create_table.table_options) + .context(UnrecognizedTableOptionSnafu)?; + let table_options = merge_options(table_options, schema_opts); + let meta = RawTableMeta { schema: raw_schema, primary_key_indices, @@ -826,8 +854,7 @@ fn create_table_info( next_column_id: column_schemas.len() as u32, region_numbers: vec![], engine_options: HashMap::new(), - options: TableOptions::try_from(&create_table.table_options) - .context(UnrecognizedTableOptionSnafu)?, + options: table_options, created_on: DateTime::default(), partition_key_indices, }; @@ -854,6 +881,11 @@ fn create_table_info( Ok(table_info) } +fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions { + table_opts.ttl = table_opts.ttl.or(schema_opts.ttl); + table_opts +} + fn parse_partitions( create_table: &CreateTableExpr, partitions: Option, diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index 8443c0f28376..9039f4198769 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -205,7 +205,7 @@ mod tests { .await .unwrap(); schema_manager - .create(SchemaNameKey::default()) + .create(SchemaNameKey::default(), None) .await .unwrap(); diff --git a/src/meta-srv/src/metadata_service.rs b/src/meta-srv/src/metadata_service.rs index d17d7d99f462..09efdbe6d308 100644 --- a/src/meta-srv/src/metadata_service.rs +++ b/src/meta-srv/src/metadata_service.rs @@ -90,7 +90,7 @@ impl MetadataService for DefaultMetadataService { if !exist { self.table_metadata_manager .schema_manager() - .create(schema) + .create(schema, None) .await .context(error::TableMetadataManagerSnafu)?; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index d7fd06c4f9b7..d855eec5f7aa 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -44,7 +44,7 @@ headers = "0.3" hex = { version = "0.4" } hostname = "0.3.1" http-body = "0.4" -humantime-serde = "1.1" +humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } itertools.workspace = true diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index f934a1d96324..168548b9ad92 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -27,7 +27,7 @@ datatypes = { workspace = true } derive_builder.workspace = true futures.workspace = true humantime = "2.1" -humantime-serde = "1.1" +humantime-serde.workspace = true paste = "1.0" serde.workspace = true snafu = { version = "0.7", features = ["backtraces"] } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 6b9ba75cec40..6e15edd99641 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -69,6 +69,7 @@ mod test { expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { database_name: "database_created_through_grpc".to_string(), create_if_not_exists: true, + options: Default::default(), })), }); let output = query(instance, request).await;