Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e3f5b83
[WIP]:delete sql
jun0315 Feb 4, 2023
e8da65e
[fix]:time parser bug
jun0315 Feb 5, 2023
56448e0
Merge branch 'develop' into xqj_delete
jun0315 Feb 5, 2023
f96c345
[fix]:resolve conflict
jun0315 Feb 5, 2023
76581cc
[fmt]:cargo fmt
jun0315 Feb 5, 2023
3782bb8
[fix]:remove unless log
jun0315 Feb 5, 2023
821df85
[fix]:test
jun0315 Feb 8, 2023
42ea2a7
[feat]:add error parse
Feb 8, 2023
c1307a0
Merge branch 'develop' into xqj_delete
Feb 8, 2023
d7beacb
[fix]:resolve conflict
Feb 8, 2023
c01d77c
[fix]:remove unless code
Feb 8, 2023
e2f872b
[fix]:remove unless code
Feb 8, 2023
b32f62a
[test]:add IT
Feb 9, 2023
5be930f
[fix]:add license
Feb 9, 2023
452b0e1
[fix]:ci
Feb 9, 2023
83f5fe3
[fix]:ci
Feb 9, 2023
43b73bf
[fix]:ci
Feb 9, 2023
81c8bd9
[fix]:remove
Feb 9, 2023
b082769
[fix]:ci
Feb 9, 2023
f679429
[feat]:add sql
Feb 9, 2023
2e1dceb
[fix]:modify sql
Feb 9, 2023
0c7a9cd
[feat]:refactor parser_expr
Feb 9, 2023
6c36f38
[feat]:rm backtrace
jun0315 Feb 9, 2023
5fb29df
Merge branch 'develop' into xqj_delete
jun0315 Feb 9, 2023
c28d3a3
[fix]:ci
jun0315 Feb 9, 2023
d5eebe4
[fix]: conversation
jun0315 Feb 10, 2023
558a3f6
[fix]: conversation
jun0315 Feb 10, 2023
f70f3cd
feat:refactor delete
jun0315 Feb 10, 2023
3afdb2e
feat:refactor delete
jun0315 Feb 10, 2023
6f0ad69
fix:resolve conversation
jun0315 Feb 13, 2023
86789ca
fix:ut
jun0315 Feb 13, 2023
83b6f5c
Merge branch 'develop' into xqj_delete
jun0315 Feb 13, 2023
593555a
fix:ut
jun0315 Feb 13, 2023
d38d87c
fix:conversation
jun0315 Feb 14, 2023
9f0efaa
Merge branch 'develop' into xqj_delete
jun0315 Feb 14, 2023
83038e5
Merge branch 'develop' into xqj_delete
jun0315 Feb 14, 2023
d339ce1
fix:conversation
jun0315 Feb 14, 2023
6e8f17a
fix:conservation
jun0315 Feb 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ pub enum Error {
source: TableError,
},

#[snafu(display(
"Failed to delete value from table: {}, source: {}",
table_name,
source
))]
Delete {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},

#[snafu(display("Failed to start server, source: {}", source))]
StartServer {
#[snafu(backtrace)]
Expand Down Expand Up @@ -161,7 +172,10 @@ pub enum Error {
},

#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String, backtrace: Backtrace },
InvalidSql { msg: String },

#[snafu(display("Not support SQL, error: {}", msg))]
NotSupportSql { msg: String },

#[snafu(display("Failed to create schema when creating table, source: {}", source))]
CreateSchema {
Expand Down Expand Up @@ -343,6 +357,7 @@ impl ErrorExt for Error {
Error::DropTable { source, .. } => source.status_code(),

Error::Insert { source, .. } => source.status_code(),
Error::Delete { source, .. } => source.status_code(),

Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Expand All @@ -361,6 +376,7 @@ impl ErrorExt for Error {

Error::ColumnValuesNumberMismatch { .. }
| Error::InvalidSql { .. }
| Error::NotSupportSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::InvalidPrimaryKey { .. }
| Error::MissingTimestampColumn { .. }
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ impl Instance {
)?;
self.sql_handler.execute(request, query_ctx).await
}

QueryStatement::Sql(Statement::Delete(d)) => {
let request = SqlRequest::Delete(*d.clone());
Comment thread
evenyag marked this conversation as resolved.
Outdated
self.sql_handler.execute(request, query_ctx).await
}
QueryStatement::Sql(Statement::CreateDatabase(c)) => {
let request = CreateDatabaseRequest {
db_name: c.name.to_string(),
Expand Down
20 changes: 18 additions & 2 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ use query::query_engine::QueryEngineRef;
use query::sql::{describe_table, explain, show_databases, show_tables};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::delete::Delete;
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowTables};
use table::engine::TableEngineRef;
use table::engine::{EngineContext, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;

use crate::error::{self, ExecuteSqlSnafu, Result, TableNotFoundSnafu};
use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
use crate::instance::sql::table_idents_to_full_name;

mod alter;
mod create;
mod delete;
mod drop_table;
mod insert;

Expand All @@ -44,6 +47,7 @@ pub enum SqlRequest {
ShowTables(ShowTables),
DescribeTable(DescribeTable),
Explain(Box<Explain>),
Delete(Delete),
}

// Handler to execute SQL except query
Expand Down Expand Up @@ -77,6 +81,7 @@ impl SqlHandler {
SqlRequest::CreateDatabase(req) => self.create_database(req).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::Delete(stmt) => self.delete(query_ctx.clone(), stmt).await,
SqlRequest::ShowDatabases(stmt) => {
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
Expand Down Expand Up @@ -108,6 +113,17 @@ impl SqlHandler {
result
}

pub(crate) fn get_table(&self, table_ref: &TableReference) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_ref)
.with_context(|_| GetTableSnafu {
table_name: table_ref.to_string(),
})?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})
}

pub fn table_engine(&self) -> TableEngineRef {
self.table_engine.clone()
}
Expand Down
147 changes: 147 additions & 0 deletions src/datanode/src/sql/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_query::Output;
use datatypes::data_type::DataType;
use datatypes::prelude::VectorRef;
use datatypes::vectors::StringVector;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{BinaryOperator, Expr, Value};
use sql::statements::delete::Delete;
use sql::statements::sql_value_to_value;
use table::engine::TableReference;
use table::requests::DeleteRequest;
use table::TableRef;

use crate::error::{ColumnNotFoundSnafu, DeleteSnafu, InvalidSqlSnafu, NotSupportSqlSnafu, Result};
use crate::instance::sql::table_idents_to_full_name;
use crate::sql::SqlHandler;

impl SqlHandler {
pub(crate) async fn delete(&self, query_ctx: QueryContextRef, stmt: Delete) -> Result<Output> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(stmt.table_name(), query_ctx)?;
let table_ref = TableReference {
catalog: &catalog_name.to_string(),
schema: &schema_name.to_string(),
table: &table_name.to_string(),
};

let table = self.get_table(&table_ref)?;

let req = DeleteRequest {
key_column_values: parse_selection(stmt.selection(), &table)?,
};

let affected_rows = table.delete(req).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;

Ok(Output::AffectedRows(affected_rows))
}
}

/// parse selection, currently supported format is `tagkey1 = 'tagvalue1' and 'ts' = 'value'`.
/// (only uses =, and in the where clause and provides all columns needed by the key.)
fn parse_selection(
selection: &Option<Expr>,
table: &TableRef,
) -> Result<HashMap<String, VectorRef>> {
let mut key_column_values = HashMap::new();
if let Some(expr) = selection {
parse_expr(expr, &mut key_column_values, table)?;
}
Ok(key_column_values)
}

fn parse_expr(
expr: &Expr,
key_column_values: &mut HashMap<String, VectorRef>,
table: &TableRef,
) -> Result<()> {
// match BinaryOp
if let Expr::BinaryOp { left, op, right } = expr {
match (&**left, op, &**right) {
// match And operator
(Expr::BinaryOp { .. }, BinaryOperator::And, Expr::BinaryOp { .. }) => {
parse_expr(left, key_column_values, table)?;
parse_expr(right, key_column_values, table)?;
return Ok(());
}
// match Eq operator
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Value(value)) => {
key_column_values.insert(
column_name.to_string(),
value_to_vector(&column_name.to_string(), value, table)?,
);
return Ok(());
}
(Expr::Identifier(column_name), BinaryOperator::Eq, Expr::Identifier(value)) => {
key_column_values.insert(
column_name.to_string(),
Arc::new(StringVector::from(vec![value.to_string()])),
);
return Ok(());
}
_ => {}
}
}
NotSupportSqlSnafu {
msg: format!(
"Not support sql expr:{expr},correct format is tagkey1 = tagvalue1 and ts = value"
),
}
.fail()
}

/// parse value to vector
fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) -> Result<VectorRef> {
let data_type = table.schema().column_type_by_name(column_name);
match data_type {
Some(data_type) => {
let value = sql_value_to_value(column_name, &data_type, sql_value);
match value {
Ok(value) => {
let mut vec = data_type.create_mutable_vector(1);
if vec.push_value_ref(value.as_value_ref()).is_err() {
return InvalidSqlSnafu {
msg: format!(
"invalid sql, column name is {column_name}, value is {sql_value}",
),
}
.fail();
}
Ok(vec.to_vector())
}
_ => {
InvalidSqlSnafu {
msg: format!(
"invalid sql, column name is {column_name}, value is {sql_value}",
),
}
.fail()
}
}
}
None => ColumnNotFoundSnafu {
column_name,
table_name: table.table_info().name.clone(),
}
.fail(),
}
}
14 changes: 3 additions & 11 deletions src/datanode/src/sql/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use table::requests::*;

use crate::error::{
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, FindTableSnafu, InsertSnafu, ParseSqlSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, InsertSnafu, ParseSqlSnafu, ParseSqlValueSnafu, Result,
TableNotFoundSnafu,
};
use crate::sql::{SqlHandler, SqlRequest};

Expand All @@ -43,15 +43,7 @@ impl SqlHandler {
table: &req.table_name.to_string(),
};

let table = self
.catalog_manager
.table(table_ref.catalog, table_ref.schema, table_ref.table)
.context(FindTableSnafu {
table_name: table_ref.to_string(),
})?
.context(TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table = self.get_table(&table_ref)?;

let affected_rows = table.insert(req).await.with_context(|_| InsertSnafu {
table_name: table_ref.to_string(),
Expand Down
49 changes: 49 additions & 0 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,55 @@ async fn test_use_database() {
check_output_stream(output, expected).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_delete() {
Comment thread
jun0315 marked this conversation as resolved.
let instance = MockInstance::new("test_delete").await;

let output = execute_sql(
&instance,
r#"create table test_table(
host string,
ts timestamp,
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));

let output = execute_sql(
&instance,
r#"insert into test_table(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 77.7, 2048, 1655276558000),
('host3', 88.8, 3072, 1655276559000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(3)));

let output = execute_sql(
&instance,
"delete from test_table where host = host1 and ts = 1655276557000 ",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));

let output = execute_sql(&instance, "select * from test_table").await;
let expect = "\
+-------+---------------------+------+--------+
| host | ts | cpu | memory |
+-------+---------------------+------+--------+
| host2 | 2022-06-15T07:02:38 | 77.7 | 2048 |
| host3 | 2022-06-15T07:02:39 | 88.8 | 3072 |
+-------+---------------------+------+--------+\
"
.to_string();
check_output_stream(output, expect).await;
}

async fn execute_sql(instance: &MockInstance, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}
Expand Down
7 changes: 6 additions & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use column_schema::TIME_INDEX_KEY;
use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};

use crate::data_type::DataType;
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error, Result};
pub use crate::schema::column_schema::{ColumnSchema, Metadata};
pub use crate::schema::constraint::ColumnDefaultConstraint;
Expand Down Expand Up @@ -87,6 +87,11 @@ impl Schema {
.map(|index| &self.column_schemas[*index])
}

pub fn column_type_by_name(&self, name: &str) -> Option<ConcreteDataType> {
self.column_schema_by_name(name)
.map(|schema| schema.data_type.clone())
}
Comment thread
jun0315 marked this conversation as resolved.
Outdated

/// Retrieve the column's name by index
/// # Panics
/// This method **may** panic if the index is out of range of column schemas.
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ impl Instance {
| Statement::Explain(_)
| Statement::Query(_)
| Statement::Insert(_)
| Statement::Delete(_)
| Statement::Alter(_)
| Statement::DropTable(_) => self.sql_handler.do_statement_query(stmt, query_ctx).await,
Statement::Use(db) => self.handle_use(db, query_ctx),
Expand Down Expand Up @@ -575,6 +576,9 @@ pub fn check_permission(
Statement::DescribeTable(stmt) => {
validate_param(stmt.name(), query_ctx)?;
}
Statement::Delete(delete) => {
validate_param(delete.table_name(), query_ctx)?;
}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/query/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ where
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Explain(explain) => self.explain_to_plan(explain),
Statement::ShowTables(_)
| Statement::Delete(_)
| Statement::ShowDatabases(_)
| Statement::ShowCreateTable(_)
| Statement::DescribeTable(_)
Expand Down
Loading