Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
145 changes: 83 additions & 62 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::any::Any;

use common_error::prelude::*;
use common_recordbatch::error::Error as RecordBatchError;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
use table::error::Error as TableError;

Expand Down Expand Up @@ -101,12 +103,33 @@ pub enum Error {
))]
ColumnValuesNumberMismatch { columns: usize, values: usize },

#[snafu(display(
"Column type mismatch, column: {}, expected type: {:?}, actual: {:?}",
column,
expected,
actual,
))]
ColumnTypeMismatch {
column: String,
expected: ConcreteDataType,
actual: ConcreteDataType,
},

#[snafu(display("Failed to collect record batch, source: {}", source))]
CollectRecords {
#[snafu(backtrace)]
source: RecordBatchError,
},

#[snafu(display("Failed to parse sql value, source: {}", source))]
ParseSqlValue {
#[snafu(backtrace)]
source: sql::error::Error,
},

#[snafu(display("Missing insert body"))]
MissingInsertBody { backtrace: Backtrace },

#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
Insert {
table_name: String,
Expand Down Expand Up @@ -338,73 +361,71 @@ pub type Result<T> = std::result::Result<T, Error>;

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Error::ExecuteSql { source } | Error::DescribeStatement { source } => {
ExecuteSql { source } | DescribeStatement { source } => source.status_code(),
DecodeLogicalPlan { source } => source.status_code(),
NewCatalog { source } | RegisterSchema { source } => source.status_code(),
FindTable { source, .. } => source.status_code(),
CreateTable { source, .. } | GetTable { source, .. } | AlterTable { source, .. } => {
source.status_code()
}
Error::DecodeLogicalPlan { source } => source.status_code(),
Error::NewCatalog { source } | Error::RegisterSchema { source } => source.status_code(),
Error::FindTable { source, .. } => source.status_code(),
Error::CreateTable { source, .. }
| Error::GetTable { source, .. }
| Error::AlterTable { source, .. } => source.status_code(),
Error::DropTable { source, .. } => source.status_code(),

Error::Insert { source, .. } => source.status_code(),
Error::Delete { source, .. } => source.status_code(),

Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,

Error::ParseSqlValue { source, .. } | Error::ParseSql { source, .. } => {
source.status_code()
}

Error::AlterExprToRequest { source, .. }
| Error::CreateExprToRequest { source }
| Error::InsertData { source } => source.status_code(),

Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => {
source.status_code()
}

Error::ColumnValuesNumberMismatch { .. }
| Error::InvalidSql { .. }
| Error::NotSupportSql { .. }
| Error::KeyColumnNotFound { .. }
| Error::IllegalPrimaryKeysDef { .. }
| Error::MissingTimestampColumn { .. }
| Error::CatalogNotFound { .. }
| Error::SchemaNotFound { .. }
| Error::ConstraintNotSupported { .. }
| Error::SchemaExists { .. }
| Error::ParseTimestamp { .. }
| Error::DatabaseNotFound { .. } => StatusCode::InvalidArguments,
DropTable { source, .. } => source.status_code(),

Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),

TableNotFound { .. } => StatusCode::TableNotFound,
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,

ParseSqlValue { source, .. } | ParseSql { source, .. } => source.status_code(),

AlterExprToRequest { source, .. }
| CreateExprToRequest { source }
| InsertData { source } => source.status_code(),

ConvertSchema { source, .. } | VectorComputation { source } => source.status_code(),

ColumnValuesNumberMismatch { .. }
| ColumnTypeMismatch { .. }
| InvalidSql { .. }
| NotSupportSql { .. }
| KeyColumnNotFound { .. }
| IllegalPrimaryKeysDef { .. }
| MissingTimestampColumn { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| ConstraintNotSupported { .. }
| SchemaExists { .. }
| ParseTimestamp { .. }
| MissingInsertBody { .. }
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,

// TODO(yingwen): Further categorize http error.
Error::StartServer { .. }
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. }
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::RenameTable { .. }
| Error::Catalog { .. }
| Error::MissingRequiredField { .. }
| Error::IncorrectInternalState { .. } => StatusCode::Internal,

Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
StartServer { .. }
| ParseAddr { .. }
| TcpBind { .. }
| StartGrpc { .. }
| CreateDir { .. }
| InsertSystemCatalog { .. }
| RenameTable { .. }
| Catalog { .. }
| MissingRequiredField { .. }
| IncorrectInternalState { .. } => StatusCode::Internal,

InitBackend { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source } => source.status_code(),
StartScriptManager { source } => source.status_code(),
OpenStorageEngine { source } => source.status_code(),
RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
MetaClientInit { source, .. } => source.status_code(),
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
BumpTableId { source, .. } => source.status_code(),
ColumnDefaultValue { source, .. } => source.status_code(),
}
}

Expand Down
38 changes: 29 additions & 9 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use datatypes::schema::Schema;
use futures::StreamExt;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
Expand All @@ -35,6 +36,7 @@ use table::requests::{CreateDatabaseRequest, DropTableRequest};
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::instance::Instance;
use crate::metric;
use crate::sql::insert::InsertRequests;
use crate::sql::SqlRequest;

impl Instance {
Expand All @@ -56,15 +58,33 @@ impl Instance {
.context(ExecuteSqlSnafu)
}
QueryStatement::Sql(Statement::Insert(i)) => {
let (catalog, schema, table) =
table_idents_to_full_name(i.table_name(), query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self.sql_handler.insert_to_request(
self.catalog_manager.clone(),
*i,
table_ref,
)?;
self.sql_handler.execute(request, query_ctx).await
let requests = self
.sql_handler
.insert_to_requests(self.catalog_manager.clone(), *i, query_ctx.clone())
.await?;

match requests {
InsertRequests::Request(request) => {
self.sql_handler.execute(request, query_ctx.clone()).await
}

InsertRequests::Stream(mut s) => {
let mut rows = 0;
while let Some(request) = s.next().await {
match self
.sql_handler
.execute(request?, query_ctx.clone())
.await?
{
Output::AffectedRows(n) => {
rows += n;
}
_ => unreachable!(),
}
}
Ok(Output::AffectedRows(rows))
}
}
}
QueryStatement::Sql(Statement::Delete(d)) => {
let request = SqlRequest::Delete(*d);
Expand Down
70 changes: 66 additions & 4 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod alter;
mod create;
mod delete;
mod drop_table;
mod insert;
pub(crate) mod insert;

#[derive(Debug)]
pub enum SqlRequest {
Expand Down Expand Up @@ -142,24 +142,27 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use futures::StreamExt;
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use session::context::QueryContext;
use sql::statements::statement::Statement;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableReference;
use table::error::Result as TableResult;
use table::metadata::TableInfoRef;
use table::Table;
use tempdir::TempDir;

use super::*;
use crate::error::Error;
use crate::sql::insert::InsertRequests;

struct DemoTable;

Expand Down Expand Up @@ -255,11 +258,12 @@ mod tests {
}
};
let request = sql_handler
.insert_to_request(catalog_list.clone(), *stmt, TableReference::bare("demo"))
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();

match request {
SqlRequest::Insert(req) => {
InsertRequests::Request(SqlRequest::Insert(req)) => {
assert_eq!(req.table_name, "demo");
let columns_values = req.columns_values;
assert_eq!(4, columns_values.len());
Expand Down Expand Up @@ -294,5 +298,63 @@ mod tests {
panic!("Not supposed to reach here")
}
}

// test inert into select

// type mismatch
let sql = "insert into demo(ts) select number from numbers limit 3";

let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,
_ => {
unreachable!()
}
};
let request = sql_handler
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();

match request {
InsertRequests::Stream(mut stream) => {
assert!(matches!(
stream.next().await.unwrap().unwrap_err(),
Error::ColumnTypeMismatch { .. }
));
}
_ => unreachable!(),
}

let sql = "insert into demo(cpu) select cast(number as double) from numbers limit 3";
let stmt = match QueryLanguageParser::parse_sql(sql).unwrap() {
QueryStatement::Sql(Statement::Insert(i)) => i,
_ => {
unreachable!()
}
};
let request = sql_handler
.insert_to_requests(catalog_list.clone(), *stmt, QueryContext::arc())
.await
.unwrap();

match request {
InsertRequests::Stream(mut stream) => {
let mut times = 0;
while let Some(Ok(SqlRequest::Insert(req))) = stream.next().await {
times += 1;
assert_eq!(req.table_name, "demo");
let columns_values = req.columns_values;
assert_eq!(1, columns_values.len());

let memories = &columns_values["cpu"];
assert_eq!(3, memories.len());
assert_eq!(Value::from(0.0f64), memories.get(0));
assert_eq!(Value::from(1.0f64), memories.get(1));
assert_eq!(Value::from(2.0f64), memories.get(2));
}
assert_eq!(1, times);
}
_ => unreachable!(),
}
}
}
Loading