Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e3f5b83
[WIP]:delete sql
jun0315 Feb 4, 2023
e8da65e
[fix]:time parser bug
jun0315 Feb 5, 2023
56448e0
Merge branch 'develop' into xqj_delete
jun0315 Feb 5, 2023
f96c345
[fix]:resolve conflict
jun0315 Feb 5, 2023
76581cc
[fmt]:cargo fmt
jun0315 Feb 5, 2023
3782bb8
[fix]:remove unless log
jun0315 Feb 5, 2023
821df85
[fix]:test
jun0315 Feb 8, 2023
42ea2a7
[feat]:add error parse
Feb 8, 2023
c1307a0
Merge branch 'develop' into xqj_delete
Feb 8, 2023
d7beacb
[fix]:resolve conflict
Feb 8, 2023
c01d77c
[fix]:remove unless code
Feb 8, 2023
e2f872b
[fix]:remove unless code
Feb 8, 2023
b32f62a
[test]:add IT
Feb 9, 2023
5be930f
[fix]:add license
Feb 9, 2023
452b0e1
[fix]:ci
Feb 9, 2023
83f5fe3
[fix]:ci
Feb 9, 2023
43b73bf
[fix]:ci
Feb 9, 2023
81c8bd9
[fix]:remove
Feb 9, 2023
b082769
[fix]:ci
Feb 9, 2023
f679429
[feat]:add sql
Feb 9, 2023
2e1dceb
[fix]:modify sql
Feb 9, 2023
0c7a9cd
[feat]:refactor parser_expr
Feb 9, 2023
6c36f38
[feat]:rm backtrace
jun0315 Feb 9, 2023
5fb29df
Merge branch 'develop' into xqj_delete
jun0315 Feb 9, 2023
c28d3a3
[fix]:ci
jun0315 Feb 9, 2023
d5eebe4
[fix]: conversation
jun0315 Feb 10, 2023
558a3f6
[fix]: conversation
jun0315 Feb 10, 2023
f70f3cd
feat:refactor delete
jun0315 Feb 10, 2023
3afdb2e
feat:refactor delete
jun0315 Feb 10, 2023
6f0ad69
fix:resolve conversation
jun0315 Feb 13, 2023
86789ca
fix:ut
jun0315 Feb 13, 2023
83b6f5c
Merge branch 'develop' into xqj_delete
jun0315 Feb 13, 2023
593555a
fix:ut
jun0315 Feb 13, 2023
d38d87c
fix:conversation
jun0315 Feb 14, 2023
9f0efaa
Merge branch 'develop' into xqj_delete
jun0315 Feb 14, 2023
83038e5
Merge branch 'develop' into xqj_delete
jun0315 Feb 14, 2023
d339ce1
fix:conversation
jun0315 Feb 14, 2023
6e8f17a
fix:conservation
jun0315 Feb 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ pub(crate) fn build_table_deletion_request(
let table_key = format_table_entry_key(&request.catalog, &request.schema, table_id);
DeleteRequest {
key_column_values: build_primary_key_columns(EntryType::Table, table_key.as_bytes()),
catalog_name: request.catalog.clone(),
schema_name: request.schema.clone(),
table_name: request.table_name.clone(),
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ pub enum Error {
source: TableError,
},

#[snafu(display("Failed to delete value to table: {}, source: {}", table_name, source))]
Comment thread
jun0315 marked this conversation as resolved.
Outdated
Delete {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},

#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
Expand Down Expand Up @@ -161,7 +168,7 @@ pub enum Error {
},

#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String, backtrace: Backtrace },
InvalidSql { msg: String },

#[snafu(display("Failed to create schema when creating table, source: {}", source))]
CreateSchema {
Expand Down Expand Up @@ -343,6 +350,7 @@ impl ErrorExt for Error {
Error::DropTable { source, .. } => source.status_code(),

Error::Insert { source, .. } => source.status_code(),
Error::Delete { source, .. } => source.status_code(),

Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl Instance {
)?;
self.sql_handler.execute(request, query_ctx).await
}
QueryStatement::Sql(Statement::Delete(d)) => {
let request = self.sql_handler.delete_to_request(*d, query_ctx.clone())?;
self.sql_handler.execute(request, query_ctx).await
}

QueryStatement::Sql(Statement::CreateDatabase(c)) => {
let request = CreateDatabaseRequest {
Expand Down
3 changes: 3 additions & 0 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::instance::sql::table_idents_to_full_name;

mod alter;
mod create;
mod delete;
mod drop_table;
mod insert;

Expand All @@ -45,6 +46,7 @@ pub enum SqlRequest {
ShowTables(ShowTables),
DescribeTable(DescribeTable),
Explain(Box<Explain>),
Delete(DeleteRequest),
Comment thread
jun0315 marked this conversation as resolved.
Outdated
}

// Handler to execute SQL except query
Expand Down Expand Up @@ -78,6 +80,7 @@ impl SqlHandler {
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::Delete(req) => self.delete(req).await,
SqlRequest::ShowDatabases(stmt) => {
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
Expand Down
139 changes: 139 additions & 0 deletions src/datanode/src/sql/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;

use common_query::Output;
use common_telemetry::warn;
use datatypes::prelude::VectorRef;
use datatypes::vectors::{BooleanVector, Float64Vector, StringVector, TimestampMillisecondVector};
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{BinaryOperator, Expr, Value};
use sql::statements::delete::Delete;
use table::engine::TableReference;
use table::requests::DeleteRequest;

use crate::error::{DeleteSnafu, InvalidSqlSnafu, Result};
use crate::instance::sql::table_idents_to_full_name;
use crate::sql::{SqlHandler, SqlRequest};

impl SqlHandler {
pub(crate) async fn delete(&self, req: DeleteRequest) -> Result<Output> {
let table_ref = TableReference {
catalog: &req.catalog_name.to_string(),
schema: &req.schema_name.to_string(),
table: &req.table_name.to_string(),
};

let table = self.get_table(&table_ref)?;

let affected_rows = table.delete(req).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;

Ok(Output::AffectedRows(affected_rows))
}

pub(crate) fn delete_to_request(
&self,
stmt: Delete,
query_ctx: QueryContextRef,
) -> Result<SqlRequest> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(stmt.table_name(), query_ctx)?;
let key_column_values = parser_selection(stmt.selection())?;
Ok(SqlRequest::Delete(DeleteRequest {
key_column_values,
catalog_name,
schema_name,
table_name,
}))
}
}

/// parse selection, currently supported format is `tagkey1 = 'tagvalue1' and tagkey2 = 'tagvalue2'`.
/// (only uses =, and in the where clause and provides all columns needed by the key.)
fn parser_selection(selection: &Option<Expr>) -> Result<HashMap<String, VectorRef>> {
Comment thread
jun0315 marked this conversation as resolved.
Outdated
let mut key_column_values = HashMap::new();
if let Some(expr) = selection {
parser_expr(expr, &mut key_column_values)?;
}
Ok(key_column_values)
}

fn parser_expr(expr: &Expr, key_column_values: &mut HashMap<String, VectorRef>) -> Result<()> {
Comment thread
jun0315 marked this conversation as resolved.
Outdated
// match BinaryOp
if let Expr::BinaryOp { left, op, right } = expr {
match (left.deref(), op, right.deref()) {
// match And operator
(Expr::BinaryOp { .. }, BinaryOperator::And, Expr::BinaryOp { .. }) => {
parser_expr(left.deref(), key_column_values)?;
Comment thread
evenyag marked this conversation as resolved.
Outdated
parser_expr(right.deref(), key_column_values)?;
return Ok(());
}
Comment thread
jun0315 marked this conversation as resolved.
Outdated
// match Eq operator
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Value(value)) => {
key_column_values.insert(
column_name.to_string(),
value_to_vector(&column_name.to_string(), value)?,
);
return Ok(());
}
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Identifier(value)) => {
key_column_values.insert(
column_name.to_string(),
Arc::new(StringVector::from(vec![value.to_string()])),
);
return Ok(());
}
_ => {}
}
}
return InvalidSqlSnafu {
msg: format!("Failed to parse expr:{expr}"),
}
.fail();
}

/// parse value to vector
fn value_to_vector(column_name: &String, value: &Value) -> Result<VectorRef> {
match value {
Value::Number(n, _) => {
if column_name == "ts" {
Ok(Arc::new(TimestampMillisecondVector::from_vec(vec![
i64::from_str(n).unwrap(),
])))
} else {
Ok(Arc::new(Float64Vector::from_vec(vec![
f64::from_str(n).unwrap()
])))
}
Comment thread
jun0315 marked this conversation as resolved.
Outdated
}
Value::SingleQuotedString(s) | sql::ast::Value::DoubleQuotedString(s) => {
Ok(Arc::new(StringVector::from(vec![s.to_string()])))
}
Value::Boolean(b) => Ok(Arc::new(BooleanVector::from(vec![*b]))),
_ => {
warn!("Current value type is not supported, value:{value}");
return InvalidSqlSnafu {
msg: format!("Failed to parse value:{value}"),
}
.fail();
}
}
}
49 changes: 49 additions & 0 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,55 @@ async fn test_use_database() {
check_output_stream(output, expected).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_delete() {
Comment thread
jun0315 marked this conversation as resolved.
let instance = MockInstance::new("test_delete").await;

let output = execute_sql(
&instance,
r#"create table test_table(
host string,
ts timestamp,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));

let output = execute_sql(
&instance,
r#"insert into test_table(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 77.7, 2048, 1655276558000),
('host3', 88.8, 3072, 1655276559000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(3)));

let output = execute_sql(
&instance,
"delete from test_table where host = host1 and ts = 1655276557000 ",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));

let output = execute_sql(&instance, "select * from test_table").await;
let expect = "\
+-------+---------------------+------+--------+
| host | ts | cpu | memory |
+-------+---------------------+------+--------+
| host2 | 2022-06-15T07:02:38 | 77.7 | 2048 |
| host3 | 2022-06-15T07:02:39 | 88.8 | 3072 |
+-------+---------------------+------+--------+\
"
.to_string();
check_output_stream(output, expect).await;
}

async fn execute_sql(instance: &MockInstance, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ pub enum Error {
},

#[snafu(display("Invalid SQL, error: {}", err_msg))]
InvalidSql {
err_msg: String,
backtrace: Backtrace,
},
InvalidSql { err_msg: String },

#[snafu(display("Illegal Frontend state: {}", err_msg))]
IllegalFrontendState {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ impl Instance {
| Statement::Explain(_)
| Statement::Query(_)
| Statement::Insert(_)
| Statement::Delete(_)
| Statement::Alter(_)
| Statement::DropTable(_) => self.sql_handler.do_statement_query(stmt, query_ctx).await,
Statement::Use(db) => self.handle_use(db, query_ctx),
Expand Down
7 changes: 6 additions & 1 deletion src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,12 @@ mod tests {
let mut key_column_values = HashMap::with_capacity(2);
key_column_values.insert("host".to_string(), del_hosts);
key_column_values.insert("ts".to_string(), del_tss);
let del_req = DeleteRequest { key_column_values };
let del_req = DeleteRequest {
key_column_values,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
};
table.delete(del_req).await.unwrap();

let session_ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions src/query/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ where
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Explain(explain) => self.explain_to_plan(explain),
Statement::ShowTables(_)
| Statement::Delete(_)
| Statement::ShowDatabases(_)
| Statement::ShowCreateTable(_)
| Statement::DescribeTable(_)
Expand Down
5 changes: 3 additions & 2 deletions src/sql/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub use sqlparser::ast::{
ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function, FunctionArg,
FunctionArgExpr, Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo, Value,
BinaryOperator, ColumnDef, ColumnOption, ColumnOptionDef, DataType, Expr, Function,
FunctionArg, FunctionArgExpr, Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo,
Value,
};
2 changes: 1 addition & 1 deletion src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub enum Error {
InvalidTimeIndex { msg: String, backtrace: Backtrace },

#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String, backtrace: Backtrace },
InvalidSql { msg: String },

#[snafu(display("Invalid column option, column name: {}, error: {}", name, msg))]
InvalidColumnOption {
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ impl<'a> ParserContext<'a> {
self.parse_show()
}

Keyword::DELETE => self.parse_delete(),

Keyword::DESCRIBE | Keyword::DESC => {
self.parser.next_token();
self.parse_describe()
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@

mod alter_parser;
pub(crate) mod create_parser;
pub(crate) mod delete_parser;
pub(crate) mod insert_parser;
pub(crate) mod query_parser;
Loading