Skip to content

Commit 6f270be

Browse files
committed
feat: streaming insert from select query results
1 parent 5621325 commit 6f270be

File tree

6 files changed

+198
-94
lines changed

6 files changed

+198
-94
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/datanode/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ common-runtime = { path = "../common/runtime" }
2727
common-telemetry = { path = "../common/telemetry" }
2828
common-time = { path = "../common/time" }
2929
datafusion.workspace = true
30+
datafusion-expr.workspace = true
3031
datatypes = { path = "../datatypes" }
3132
futures = "0.3"
3233
hyper = { version = "0.14", features = ["full"] }

src/datanode/src/error.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::any::Any;
1616

1717
use common_error::prelude::*;
18+
use datatypes::prelude::ConcreteDataType;
1819
use storage::error::Error as StorageError;
1920
use table::error::Error as TableError;
2021

@@ -101,6 +102,18 @@ pub enum Error {
101102
))]
102103
ColumnValuesNumberMismatch { columns: usize, values: usize },
103104

105+
#[snafu(display(
106+
"Columns type mismatch, column: {}, expected type: {:?}, acutal: {:?}",
107+
column,
108+
expected,
109+
actual,
110+
))]
111+
ColumnTypeMismatch {
112+
column: String,
113+
expected: ConcreteDataType,
114+
actual: ConcreteDataType,
115+
},
116+
104117
#[snafu(display("Failed to parse sql value, source: {}", source))]
105118
ParseSqlValue {
106119
#[snafu(backtrace)]
@@ -378,6 +391,7 @@ impl ErrorExt for Error {
378391
| Error::VectorComputation { source } => source.status_code(),
379392

380393
Error::ColumnValuesNumberMismatch { .. }
394+
| Error::ColumnTypeMismatch { .. }
381395
| Error::InvalidSql { .. }
382396
| Error::NotSupportSql { .. }
383397
| Error::KeyColumnNotFound { .. }

src/datanode/src/instance/sql.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_recordbatch::RecordBatches;
2121
use common_telemetry::logging::info;
2222
use common_telemetry::timer;
2323
use datatypes::schema::Schema;
24+
use futures::StreamExt;
2425
use query::parser::{QueryLanguageParser, QueryStatement};
2526
use servers::error as server_error;
2627
use servers::promql::PromqlHandler;
@@ -35,6 +36,7 @@ use table::requests::{CreateDatabaseRequest, DropTableRequest};
3536
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
3637
use crate::instance::Instance;
3738
use crate::metric;
39+
use crate::sql::insert::InsertRequests;
3840
use crate::sql::SqlRequest;
3941

4042
impl Instance {
@@ -56,11 +58,28 @@ impl Instance {
5658
.context(ExecuteSqlSnafu)
5759
}
5860
QueryStatement::Sql(Statement::Insert(i)) => {
59-
let request = self
61+
let requests = self
6062
.sql_handler
6163
.insert_to_request(self.catalog_manager.clone(), *i, query_ctx.clone())
6264
.await?;
63-
self.sql_handler.execute(request, query_ctx).await
65+
66+
match requests {
67+
InsertRequests::Request(request) => {
68+
self.sql_handler.execute(request, query_ctx.clone()).await
69+
}
70+
71+
InsertRequests::Stream(mut s) => {
72+
let mut output = None;
73+
while let Some(request) = s.next().await {
74+
output = Some(
75+
self.sql_handler
76+
.execute(request?, query_ctx.clone())
77+
.await?,
78+
)
79+
}
80+
Ok(output.unwrap())
81+
}
82+
}
6483
}
6584
QueryStatement::Sql(Statement::Delete(d)) => {
6685
let request = SqlRequest::Delete(*d);

src/datanode/src/sql.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ mod alter;
3434
mod create;
3535
mod delete;
3636
mod drop_table;
37-
mod insert;
37+
pub(crate) mod insert;
3838

3939
#[derive(Debug)]
4040
pub enum SqlRequest {
@@ -160,6 +160,7 @@ mod tests {
160160
use tempdir::TempDir;
161161

162162
use super::*;
163+
use crate::sql::insert::InsertRequests;
163164

164165
struct DemoTable;
165166

@@ -260,7 +261,7 @@ mod tests {
260261
.unwrap();
261262

262263
match request {
263-
SqlRequest::Insert(req) => {
264+
InsertRequests::Request(SqlRequest::Insert(req)) => {
264265
assert_eq!(req.table_name, "demo");
265266
let columns_values = req.columns_values;
266267
assert_eq!(4, columns_values.len());

0 commit comments

Comments
 (0)