Skip to content

Commit 9161796

Browse files
fengjiachunv0y4g3rfengys1996
authored
feat: export the data from a table to parquet files (#1000)
* feat: copy table parser * feat: coopy table * chore: minor fix * chore: give stmt a more clearer name * chore: unified naming * chore: minor change * chore: add a todo * chore: end up with an empty file when occur an empty table * feat: format with copy table * feat: with options * chore: by cr * chore: default 5M rows per segment * Update src/datanode/src/sql/copy_table.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * Update src/datanode/src/sql/copy_table.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * Update src/datanode/src/error.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> --------- Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>
1 parent 68b2319 commit 9161796

File tree

14 files changed

+463
-42
lines changed

14 files changed

+463
-42
lines changed

src/datanode/src/error.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::any::Any;
1616

1717
use common_error::prelude::*;
1818
use common_recordbatch::error::Error as RecordBatchError;
19+
use datafusion::parquet;
1920
use datatypes::prelude::ConcreteDataType;
2021
use storage::error::Error as StorageError;
2122
use table::error::Error as TableError;
@@ -355,6 +356,38 @@ pub enum Error {
355356
#[snafu(backtrace)]
356357
source: query::error::Error,
357358
},
359+
360+
#[snafu(display("Failed to copy data from table: {}, source: {}", table_name, source))]
361+
CopyTable {
362+
table_name: String,
363+
#[snafu(backtrace)]
364+
source: TableError,
365+
},
366+
367+
#[snafu(display("Failed to execute table scan, source: {}", source))]
368+
TableScanExec {
369+
#[snafu(backtrace)]
370+
source: common_query::error::Error,
371+
},
372+
373+
#[snafu(display("Failed to write parquet file, source: {}", source))]
374+
WriteParquet {
375+
source: parquet::errors::ParquetError,
376+
backtrace: Backtrace,
377+
},
378+
379+
#[snafu(display("Failed to poll stream, source: {}", source))]
380+
PollStream {
381+
source: datatypes::arrow::error::ArrowError,
382+
backtrace: Backtrace,
383+
},
384+
385+
#[snafu(display("Failed to write object into path: {}, source: {}", path, source))]
386+
WriteObject {
387+
path: String,
388+
backtrace: Backtrace,
389+
source: object_store::Error,
390+
},
358391
}
359392

360393
pub type Result<T> = std::result::Result<T, Error>;
@@ -417,7 +450,9 @@ impl ErrorExt for Error {
417450
| MissingRequiredField { .. }
418451
| IncorrectInternalState { .. } => StatusCode::Internal,
419452

420-
InitBackend { .. } => StatusCode::StorageUnavailable,
453+
InitBackend { .. } | WriteParquet { .. } | PollStream { .. } | WriteObject { .. } => {
454+
StatusCode::StorageUnavailable
455+
}
421456
OpenLogStore { source } => source.status_code(),
422457
StartScriptManager { source } => source.status_code(),
423458
OpenStorageEngine { source } => source.status_code(),
@@ -426,6 +461,8 @@ impl ErrorExt for Error {
426461
TableIdProviderNotFound { .. } => StatusCode::Unsupported,
427462
BumpTableId { source, .. } => source.status_code(),
428463
ColumnDefaultValue { source, .. } => source.status_code(),
464+
CopyTable { source, .. } => source.status_code(),
465+
TableScanExec { source, .. } => source.status_code(),
429466
}
430467
}
431468

src/datanode/src/instance/sql.rs

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use snafu::prelude::*;
3131
use sql::ast::ObjectName;
3232
use sql::statements::statement::Statement;
3333
use table::engine::TableReference;
34-
use table::requests::{CreateDatabaseRequest, DropTableRequest};
34+
use table::requests::{CopyTableRequest, CreateDatabaseRequest, DropTableRequest};
3535

3636
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
3737
use crate::instance::Instance;
@@ -57,10 +57,10 @@ impl Instance {
5757
.await
5858
.context(ExecuteSqlSnafu)
5959
}
60-
QueryStatement::Sql(Statement::Insert(i)) => {
60+
QueryStatement::Sql(Statement::Insert(insert)) => {
6161
let requests = self
6262
.sql_handler
63-
.insert_to_requests(self.catalog_manager.clone(), *i, query_ctx.clone())
63+
.insert_to_requests(self.catalog_manager.clone(), *insert, query_ctx.clone())
6464
.await?;
6565

6666
match requests {
@@ -86,14 +86,14 @@ impl Instance {
8686
}
8787
}
8888
}
89-
QueryStatement::Sql(Statement::Delete(d)) => {
90-
let request = SqlRequest::Delete(*d);
89+
QueryStatement::Sql(Statement::Delete(delete)) => {
90+
let request = SqlRequest::Delete(*delete);
9191
self.sql_handler.execute(request, query_ctx).await
9292
}
93-
QueryStatement::Sql(Statement::CreateDatabase(c)) => {
93+
QueryStatement::Sql(Statement::CreateDatabase(create_database)) => {
9494
let request = CreateDatabaseRequest {
95-
db_name: c.name.to_string(),
96-
create_if_not_exists: c.if_not_exists,
95+
db_name: create_database.name.to_string(),
96+
create_if_not_exists: create_database.if_not_exists,
9797
};
9898

9999
info!("Creating a new database: {}", request.db_name);
@@ -103,23 +103,23 @@ impl Instance {
103103
.await
104104
}
105105

106-
QueryStatement::Sql(Statement::CreateTable(c)) => {
106+
QueryStatement::Sql(Statement::CreateTable(create_table)) => {
107107
let table_id = self
108108
.table_id_provider
109109
.as_ref()
110110
.context(TableIdProviderNotFoundSnafu)?
111111
.next_table_id()
112112
.await
113113
.context(BumpTableIdSnafu)?;
114-
let _engine_name = c.engine.clone();
114+
let _engine_name = create_table.engine.clone();
115115
// TODO(hl): Select table engine by engine_name
116116

117-
let name = c.name.clone();
117+
let name = create_table.name.clone();
118118
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
119119
let table_ref = TableReference::full(&catalog, &schema, &table);
120-
let request = self
121-
.sql_handler
122-
.create_to_request(table_id, c, &table_ref)?;
120+
let request =
121+
self.sql_handler
122+
.create_to_request(table_id, create_table, &table_ref)?;
123123
let table_id = request.id;
124124
info!("Creating table: {table_ref}, table id = {table_id}",);
125125

@@ -148,27 +148,27 @@ impl Instance {
148148
.execute(SqlRequest::DropTable(req), query_ctx)
149149
.await
150150
}
151-
QueryStatement::Sql(Statement::ShowDatabases(stmt)) => {
151+
QueryStatement::Sql(Statement::ShowDatabases(show_databases)) => {
152152
self.sql_handler
153-
.execute(SqlRequest::ShowDatabases(stmt), query_ctx)
153+
.execute(SqlRequest::ShowDatabases(show_databases), query_ctx)
154154
.await
155155
}
156-
QueryStatement::Sql(Statement::ShowTables(stmt)) => {
156+
QueryStatement::Sql(Statement::ShowTables(show_tables)) => {
157157
self.sql_handler
158-
.execute(SqlRequest::ShowTables(stmt), query_ctx)
158+
.execute(SqlRequest::ShowTables(show_tables), query_ctx)
159159
.await
160160
}
161-
QueryStatement::Sql(Statement::Explain(stmt)) => {
161+
QueryStatement::Sql(Statement::Explain(explain)) => {
162162
self.sql_handler
163-
.execute(SqlRequest::Explain(Box::new(stmt)), query_ctx)
163+
.execute(SqlRequest::Explain(Box::new(explain)), query_ctx)
164164
.await
165165
}
166-
QueryStatement::Sql(Statement::DescribeTable(stmt)) => {
166+
QueryStatement::Sql(Statement::DescribeTable(describe_table)) => {
167167
self.sql_handler
168-
.execute(SqlRequest::DescribeTable(stmt), query_ctx)
168+
.execute(SqlRequest::DescribeTable(describe_table), query_ctx)
169169
.await
170170
}
171-
QueryStatement::Sql(Statement::ShowCreateTable(_stmt)) => {
171+
QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => {
172172
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
173173
}
174174
QueryStatement::Sql(Statement::Use(ref schema)) => {
@@ -182,6 +182,22 @@ impl Instance {
182182

183183
Ok(Output::RecordBatches(RecordBatches::empty()))
184184
}
185+
QueryStatement::Sql(Statement::Copy(copy_table)) => {
186+
let (catalog_name, schema_name, table_name) =
187+
table_idents_to_full_name(copy_table.table_name(), query_ctx.clone())?;
188+
let file_name = copy_table.file_name().to_string();
189+
190+
let req = CopyTableRequest {
191+
catalog_name,
192+
schema_name,
193+
table_name,
194+
file_name,
195+
};
196+
197+
self.sql_handler
198+
.execute(SqlRequest::CopyTable(req), query_ctx)
199+
.await
200+
}
185201
}
186202
}
187203

src/datanode/src/sql.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSn
3131
use crate::instance::sql::table_idents_to_full_name;
3232

3333
mod alter;
34+
mod copy_table;
3435
mod create;
3536
mod delete;
3637
mod drop_table;
@@ -48,6 +49,7 @@ pub enum SqlRequest {
4849
DescribeTable(DescribeTable),
4950
Explain(Box<Explain>),
5051
Delete(Delete),
52+
CopyTable(CopyTableRequest),
5153
}
5254

5355
// Handler to execute SQL except query
@@ -81,31 +83,30 @@ impl SqlHandler {
8183
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
8284
SqlRequest::Alter(req) => self.alter(req).await,
8385
SqlRequest::DropTable(req) => self.drop_table(req).await,
84-
SqlRequest::Delete(stmt) => self.delete(query_ctx.clone(), stmt).await,
85-
SqlRequest::ShowDatabases(stmt) => {
86-
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
86+
SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await,
87+
SqlRequest::CopyTable(req) => self.copy_table(req).await,
88+
SqlRequest::ShowDatabases(req) => {
89+
show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
8790
}
88-
SqlRequest::ShowTables(stmt) => {
89-
show_tables(stmt, self.catalog_manager.clone(), query_ctx.clone())
91+
SqlRequest::ShowTables(req) => {
92+
show_tables(req, self.catalog_manager.clone(), query_ctx.clone())
9093
.context(ExecuteSqlSnafu)
9194
}
92-
SqlRequest::DescribeTable(stmt) => {
95+
SqlRequest::DescribeTable(req) => {
9396
let (catalog, schema, table) =
94-
table_idents_to_full_name(stmt.name(), query_ctx.clone())?;
97+
table_idents_to_full_name(req.name(), query_ctx.clone())?;
9598
let table = self
9699
.catalog_manager
97100
.table(&catalog, &schema, &table)
98101
.context(error::CatalogSnafu)?
99102
.with_context(|| TableNotFoundSnafu {
100-
table_name: stmt.name().to_string(),
103+
table_name: req.name().to_string(),
101104
})?;
102105
describe_table(table).context(ExecuteSqlSnafu)
103106
}
104-
SqlRequest::Explain(stmt) => {
105-
explain(stmt, self.query_engine.clone(), query_ctx.clone())
106-
.await
107-
.context(ExecuteSqlSnafu)
108-
}
107+
SqlRequest::Explain(req) => explain(req, self.query_engine.clone(), query_ctx.clone())
108+
.await
109+
.context(ExecuteSqlSnafu),
109110
};
110111
if let Err(e) = &result {
111112
error!(e; "{query_ctx}");

0 commit comments

Comments
 (0)