Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a277f27caa035a801d5b9c020a0449777736614" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl MigrateTableMetadata {

async fn migrate_schema_key(&self, key: &v1SchemaKey) -> Result<()> {
let new_key = SchemaNameKey::new(&key.catalog_name, &key.schema_name);
let schema_name_value = SchemaNameValue;
let schema_name_value = SchemaNameValue::default();

info!("Creating '{new_key}'");

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common-telemetry = { workspace = true }
common-time = { workspace = true }
etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
prost.workspace = true
regex.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse value {} into key {}", value, key))]
ParseOption {
key: String,
value: String,
location: Location,
},

#[snafu(display("Corrupted table route data, err: {}", err_msg))]
RouteInfoCorrupted { err_msg: String, location: Location },

Expand Down Expand Up @@ -151,6 +158,7 @@ impl ErrorExt for Error {
IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal,

SerdeJson { .. }
| ParseOption { .. }
| RouteInfoCorrupted { .. }
| InvalidProtoMsg { .. }
| InvalidTableMetadata { .. }
Expand Down
68 changes: 60 additions & 8 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use futures::StreamExt;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::KeyValue;

const OPT_KEY_TTL: &str = "ttl";

#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SchemaNameKey<'a> {
pub catalog: &'a str,
Expand All @@ -43,8 +48,33 @@ impl<'a> Default for SchemaNameKey<'a> {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SchemaNameValue;
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
}

impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
type Error = Error;

fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
let ttl = value
.get(OPT_KEY_TTL)
.map(|ttl_str| {
ttl_str.parse::<humantime::Duration>().map_err(|_| {
ParseOptionSnafu {
key: OPT_KEY_TTL,
value: ttl_str.clone(),
}
.build()
})
})
.transpose()?
.map(|ttl| ttl.into());
Ok(Self { ttl })
}
}

impl<'a> SchemaNameKey<'a> {
pub fn new(catalog: &'a str, schema: &'a str) -> Self {
Expand Down Expand Up @@ -108,11 +138,15 @@ impl SchemaManager {
}

/// Creates `SchemaNameKey`.
pub async fn create(&self, schema: SchemaNameKey<'_>) -> Result<()> {
pub async fn create(
&self,
schema: SchemaNameKey<'_>,
value: Option<SchemaNameValue>,
) -> Result<()> {
let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(SchemaNameValue.try_as_raw_value()?);
.with_value(value.unwrap_or_default().try_as_raw_value()?);

self.kv_backend.put(req).await?;

Expand All @@ -125,6 +159,14 @@ impl SchemaManager {
Ok(self.kv_backend.get(&raw_key).await?.is_some())
}

pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
let raw_key = schema.as_raw_key();
let value = self.kv_backend.get(&raw_key).await?;
value
.map(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()))
.transpose()
}

/// Returns a schema stream, it lists all schemas belong to the target `catalog`.
pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
let start_key = SchemaNameKey::range_start_key(catalog);
Expand All @@ -143,25 +185,35 @@ impl SchemaManager {

#[cfg(test)]
mod tests {

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;

#[test]
fn test_serialization() {
let key = SchemaNameKey::new("my-catalog", "my-schema");

assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");

let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap();

assert_eq!(key, parsed);

let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);

let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
assert_eq!(value, parsed);
}

#[tokio::test]
async fn test_key_exist() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key).await.unwrap();
manager.create(schema_key, None).await.unwrap();

assert!(manager.exist(schema_key).await.unwrap());

Expand Down
2 changes: 1 addition & 1 deletion src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ common-error = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
futures.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
object-store = { workspace = true }
serde.workspace = true
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ datatypes = { workspace = true }
file-table-engine = { workspace = true }
futures = "0.3"
futures-util.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
key-lock = "0.1"
log-store = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
Expand Down Expand Up @@ -418,6 +419,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
Expand Down Expand Up @@ -485,6 +487,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
Expand Down Expand Up @@ -589,6 +592,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
Expand Down Expand Up @@ -661,6 +665,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ datatypes = { workspace = true }
file-table-engine = { workspace = true }
futures = "0.3"
futures-util.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
itertools.workspace = true
meta-client = { workspace = true }
# Although it is not used, please do not delete it.
Expand Down
46 changes: 39 additions & 7 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest};
Expand Down Expand Up @@ -67,8 +67,9 @@ use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, TableSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
Expand Down Expand Up @@ -104,6 +105,25 @@ impl DistInstance {
partitions: Option<Partitions>,
) -> Result<TableRef> {
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE);
// 1. get schema info
let schema = self
.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;

let Some(schema_opts) = schema else {
return SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
.fail();
};

let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
Expand All @@ -112,7 +132,7 @@ impl DistInstance {

let (partitions, partition_cols) = parse_partitions(create_table, partitions)?;

let mut table_info = create_table_info(create_table, partition_cols)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;

let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
Expand Down Expand Up @@ -340,6 +360,7 @@ impl DistInstance {
let expr = CreateDatabaseExpr {
database_name: stmt.name.to_string(),
create_if_not_exists: stmt.if_not_exists,
options: Default::default(),
};
self.handle_create_database(expr, query_ctx).await
}
Expand Down Expand Up @@ -477,10 +498,12 @@ impl DistInstance {
}
);

let schema_value =
SchemaNameValue::try_from(&expr.options).context(error::TableMetadataManagerSnafu)?;
self.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.create(schema)
.create(schema, Some(schema_value))
.await
.context(error::TableMetadataManagerSnafu)?;

Expand Down Expand Up @@ -772,6 +795,7 @@ fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Parti
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: SchemaNameValue,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
Expand Down Expand Up @@ -818,6 +842,10 @@ fn create_table_info(
})
.collect::<Result<Vec<_>>>()?;

let table_options = TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);

let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
Expand All @@ -826,8 +854,7 @@ fn create_table_info(
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?,
options: table_options,
created_on: DateTime::default(),
partition_key_indices,
};
Expand All @@ -854,6 +881,11 @@ fn create_table_info(
Ok(table_info)
}

fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
}

fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/distributed/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ mod tests {
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default())
.create(SchemaNameKey::default(), None)
.await
.unwrap();

Expand Down
Loading