Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;

use common_error::prelude::*;
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::parquet;
use datatypes::prelude::ConcreteDataType;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
Expand Down Expand Up @@ -355,6 +356,38 @@ pub enum Error {
#[snafu(backtrace)]
source: query::error::Error,
},

#[snafu(display("Failed to copy data from table: {}, source: {}", table_name, source))]
CopyTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},

#[snafu(display("Failed to execute table scan, source: {}", source))]
TableScanExec {
#[snafu(backtrace)]
source: common_query::error::Error,
},

#[snafu(display("Failed to write parquet file, source: {}", source))]
WriteParquet {
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Failed to poll stream, source: {}", source))]
PollStream {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},

#[snafu(display("Fail to write object into path: {}, source: {}", path, source))]
Comment thread
fengjiachun marked this conversation as resolved.
Outdated
WriteObject {
path: String,
backtrace: Backtrace,
source: object_store::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -417,7 +450,9 @@ impl ErrorExt for Error {
| MissingRequiredField { .. }
| IncorrectInternalState { .. } => StatusCode::Internal,

InitBackend { .. } => StatusCode::StorageUnavailable,
InitBackend { .. } | WriteParquet { .. } | PollStream { .. } | WriteObject { .. } => {
StatusCode::StorageUnavailable
}
OpenLogStore { source } => source.status_code(),
StartScriptManager { source } => source.status_code(),
OpenStorageEngine { source } => source.status_code(),
Expand All @@ -426,6 +461,8 @@ impl ErrorExt for Error {
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
BumpTableId { source, .. } => source.status_code(),
ColumnDefaultValue { source, .. } => source.status_code(),
CopyTable { source, .. } => source.status_code(),
TableScanExec { source, .. } => source.status_code(),
}
}

Expand Down
62 changes: 39 additions & 23 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::{CreateDatabaseRequest, DropTableRequest};
use table::requests::{CopyTableRequest, CreateDatabaseRequest, DropTableRequest};

use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::instance::Instance;
Expand All @@ -57,10 +57,10 @@ impl Instance {
.await
.context(ExecuteSqlSnafu)
}
QueryStatement::Sql(Statement::Insert(i)) => {
QueryStatement::Sql(Statement::Insert(insert)) => {
let requests = self
.sql_handler
.insert_to_requests(self.catalog_manager.clone(), *i, query_ctx.clone())
.insert_to_requests(self.catalog_manager.clone(), *insert, query_ctx.clone())
.await?;

match requests {
Expand All @@ -86,14 +86,14 @@ impl Instance {
}
}
}
QueryStatement::Sql(Statement::Delete(d)) => {
let request = SqlRequest::Delete(*d);
QueryStatement::Sql(Statement::Delete(delete)) => {
let request = SqlRequest::Delete(*delete);
self.sql_handler.execute(request, query_ctx).await
}
QueryStatement::Sql(Statement::CreateDatabase(c)) => {
QueryStatement::Sql(Statement::CreateDatabase(create_database)) => {
let request = CreateDatabaseRequest {
db_name: c.name.to_string(),
create_if_not_exists: c.if_not_exists,
db_name: create_database.name.to_string(),
create_if_not_exists: create_database.if_not_exists,
};

info!("Creating a new database: {}", request.db_name);
Expand All @@ -103,23 +103,23 @@ impl Instance {
.await
}

QueryStatement::Sql(Statement::CreateTable(c)) => {
QueryStatement::Sql(Statement::CreateTable(create_table)) => {
let table_id = self
.table_id_provider
.as_ref()
.context(TableIdProviderNotFoundSnafu)?
.next_table_id()
.await
.context(BumpTableIdSnafu)?;
let _engine_name = c.engine.clone();
let _engine_name = create_table.engine.clone();
// TODO(hl): Select table engine by engine_name

let name = c.name.clone();
let name = create_table.name.clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self
.sql_handler
.create_to_request(table_id, c, &table_ref)?;
let request =
self.sql_handler
.create_to_request(table_id, create_table, &table_ref)?;
let table_id = request.id;
info!("Creating table: {table_ref}, table id = {table_id}",);

Expand Down Expand Up @@ -148,27 +148,27 @@ impl Instance {
.execute(SqlRequest::DropTable(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowDatabases(stmt)) => {
QueryStatement::Sql(Statement::ShowDatabases(show_databases)) => {
self.sql_handler
.execute(SqlRequest::ShowDatabases(stmt), query_ctx)
.execute(SqlRequest::ShowDatabases(show_databases), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowTables(stmt)) => {
QueryStatement::Sql(Statement::ShowTables(show_tables)) => {
self.sql_handler
.execute(SqlRequest::ShowTables(stmt), query_ctx)
.execute(SqlRequest::ShowTables(show_tables), query_ctx)
.await
}
QueryStatement::Sql(Statement::Explain(stmt)) => {
QueryStatement::Sql(Statement::Explain(explain)) => {
self.sql_handler
.execute(SqlRequest::Explain(Box::new(stmt)), query_ctx)
.execute(SqlRequest::Explain(Box::new(explain)), query_ctx)
.await
}
QueryStatement::Sql(Statement::DescribeTable(stmt)) => {
QueryStatement::Sql(Statement::DescribeTable(describe_table)) => {
self.sql_handler
.execute(SqlRequest::DescribeTable(stmt), query_ctx)
.execute(SqlRequest::DescribeTable(describe_table), query_ctx)
.await
}
QueryStatement::Sql(Statement::ShowCreateTable(_stmt)) => {
QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => {
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
}
QueryStatement::Sql(Statement::Use(ref schema)) => {
Expand All @@ -182,6 +182,22 @@ impl Instance {

Ok(Output::RecordBatches(RecordBatches::empty()))
}
QueryStatement::Sql(Statement::Copy(copy_table)) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(copy_table.table_name(), query_ctx.clone())?;
let file_name = copy_table.file_name().to_string();

let req = CopyTableRequest {
catalog_name,
schema_name,
table_name,
file_name,
};

self.sql_handler
.execute(SqlRequest::CopyTable(req), query_ctx)
.await
}
}
}

Expand Down
27 changes: 14 additions & 13 deletions src/datanode/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSn
use crate::instance::sql::table_idents_to_full_name;

mod alter;
mod copy_table;
mod create;
mod delete;
mod drop_table;
Expand All @@ -48,6 +49,7 @@ pub enum SqlRequest {
DescribeTable(DescribeTable),
Explain(Box<Explain>),
Delete(Delete),
CopyTable(CopyTableRequest),
}

// Handler to execute SQL except query
Expand Down Expand Up @@ -81,31 +83,30 @@ impl SqlHandler {
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::Delete(stmt) => self.delete(query_ctx.clone(), stmt).await,
SqlRequest::ShowDatabases(stmt) => {
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await,
SqlRequest::CopyTable(req) => self.copy_table(req).await,
SqlRequest::ShowDatabases(req) => {
show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
SqlRequest::ShowTables(stmt) => {
show_tables(stmt, self.catalog_manager.clone(), query_ctx.clone())
SqlRequest::ShowTables(req) => {
show_tables(req, self.catalog_manager.clone(), query_ctx.clone())
.context(ExecuteSqlSnafu)
}
SqlRequest::DescribeTable(stmt) => {
SqlRequest::DescribeTable(req) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.name(), query_ctx.clone())?;
table_idents_to_full_name(req.name(), query_ctx.clone())?;
let table = self
.catalog_manager
.table(&catalog, &schema, &table)
.context(error::CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: stmt.name().to_string(),
table_name: req.name().to_string(),
})?;
describe_table(table).context(ExecuteSqlSnafu)
}
SqlRequest::Explain(stmt) => {
explain(stmt, self.query_engine.clone(), query_ctx.clone())
.await
.context(ExecuteSqlSnafu)
}
SqlRequest::Explain(req) => explain(req, self.query_engine.clone(), query_ctx.clone())
.await
.context(ExecuteSqlSnafu),
};
if let Err(e) = &result {
error!(e; "{query_ctx}");
Expand Down
Loading