Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 149 additions & 176 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9b68af55c050a010f202fcccb22d58f080f0a868" }
greptime-proto = { git = "https://github.com/NiwakaDev/greptime-proto.git", rev = "ec402b6500f908a0acfab6c889225cd4dc2228a4" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"
Expand Down
2 changes: 1 addition & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ pub fn request_type(request: &Request) -> &'static str {
Request::Inserts(_) => "inserts",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::Delete(_) => "delete",
Request::RowInserts(_) => "row_inserts",
Request::RowDelete(_) => "row_delete",
Request::Deletes(_) => "deletes",
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequest,
AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, DdlRequest, DeleteRequests,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest,
RequestHeader, TruncateTableExpr,
};
Expand Down Expand Up @@ -132,9 +132,9 @@ impl Database {
Ok(stream_inserter)
}

pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
pub async fn delete(&self, request: DeleteRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_DELETE);
self.handle(Request::Delete(request)).await
self.handle(Request::Deletes(request)).await
}

async fn handle(&self, request: Request) -> Result<u32> {
Expand Down
19 changes: 16 additions & 3 deletions src/common/grpc-expr/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ use table::requests::DeleteRequest;
use crate::error::{ColumnDataTypeSnafu, IllegalDeleteRequestSnafu, Result};
use crate::insert::add_values_to_builder;

pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteRequest> {
pub fn to_table_delete_request(
catalog_name: &str,
schema_name: &str,
request: GrpcDeleteRequest,
) -> Result<DeleteRequest> {
let row_count = request.row_count as usize;

let mut key_column_values = HashMap::with_capacity(request.key_columns.len());
Expand Down Expand Up @@ -52,7 +56,12 @@ pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteReque
);
}

Ok(DeleteRequest { key_column_values })
Ok(DeleteRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: request.table_name,
key_column_values,
})
}

#[cfg(test)]
Expand Down Expand Up @@ -94,8 +103,12 @@ mod tests {
row_count: 3,
};

let mut request = to_table_delete_request(grpc_request).unwrap();
let mut request =
to_table_delete_request("foo_catalog", "foo_schema", grpc_request).unwrap();

assert_eq!(request.catalog_name, "foo_catalog");
assert_eq!(request.schema_name, "foo_schema");
assert_eq!(request.table_name, "foo");
assert_eq!(
Arc::new(Int32Vector::from_slice(vec![1, 2, 3])) as VectorRef,
request.key_column_values.remove("id").unwrap()
Expand Down
92 changes: 64 additions & 28 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequests};
use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequests, InsertRequests};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_grpc_expr::insert::to_table_insert_request;
Expand Down Expand Up @@ -164,27 +164,38 @@ impl Instance {
Ok(Output::AffectedRows(affected_rows))
}

async fn handle_delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<Output> {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table_name = &request.table_name.clone();
let table_ref = TableReference::full(catalog, schema, table_name);

let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
async fn handle_deletes(
&self,
request: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let results = future::try_join_all(request.deletes.into_iter().map(|delete| {
let catalog_manager = self.catalog_manager.clone();
let catalog = ctx.current_catalog().to_string();
let schema = ctx.current_schema().to_string();
common_runtime::spawn_write(async move {
let table_name = delete.table_name.clone();
let table_ref = TableReference::full(&catalog, &schema, &table_name);
let table = catalog_manager
.table(&catalog, &schema, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;

let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(DeleteExprToRequestSnafu)?;
let request =
common_grpc_expr::delete::to_table_delete_request(&catalog, &schema, delete)
.context(DeleteExprToRequestSnafu)?;

let affected_rows = table.delete(request).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})?;
table.delete(request).await.with_context(|_| DeleteSnafu {
table_name: table_ref.to_string(),
})
})
}))
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<usize>>()?;
Ok(Output::AffectedRows(affected_rows))
}

Expand All @@ -211,7 +222,7 @@ impl GrpcQueryHandler for Instance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_inserts(requests, &ctx).await,
Request::Delete(request) => self.handle_delete(request, ctx).await,
Request::Deletes(request) => self.handle_deletes(request, ctx).await,
Request::Query(query_request) => {
let query = query_request
.query
Expand Down Expand Up @@ -310,8 +321,8 @@ mod test {
use api::v1::column::Values;
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, DropTableExpr, InsertRequest, InsertRequests,
QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr,
CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, InsertRequest,
InsertRequests, QueryRequest, RenameTable, SemanticType, TableId, TruncateTableExpr,
};
use common_catalog::consts::MITO_ENGINE;
use common_error::ext::ErrorExt;
Expand Down Expand Up @@ -903,7 +914,7 @@ mod test {
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));

let request = DeleteRequest {
let request1 = DeleteRequest {
table_name: "demo".to_string(),
region_number: 0,
key_columns: vec![
Expand All @@ -928,13 +939,39 @@ mod test {
],
row_count: 1,
};

let request = Request::Delete(request);
let request2 = DeleteRequest {
table_name: "demo".to_string(),
region_number: 0,
key_columns: vec![
Column {
column_name: "host".to_string(),
values: Some(Values {
string_values: vec!["host3".to_string()],
..Default::default()
}),
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672201026000],
..Default::default()
}),
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 1,
};
let request = Request::Deletes(DeleteRequests {
deletes: vec![request1, request2],
});
let output = instance
.do_query(request, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(2)));

let output = exec_selection(instance, "SELECT ts, host, cpu FROM demo").await;
let Output::Stream(stream) = output else {
Expand All @@ -946,7 +983,6 @@ mod test {
| ts | host | cpu |
+---------------------+-------+------+
| 2022-12-28T04:17:05 | host1 | 66.6 |
| 2022-12-28T04:17:06 | host3 | 88.8 |
+---------------------+-------+------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
Expand Down
17 changes: 16 additions & 1 deletion src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to split delete request, source: {}", source))]
SplitDelete {
source: partition::error::Error,
location: Location,
},

#[snafu(display("Failed to create table info, source: {}", source))]
CreateTableInfo {
#[snafu(backtrace)]
Expand Down Expand Up @@ -409,6 +415,12 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display("Missing time index column: {}", source))]
MissingTimeIndexColumn {
location: Location,
source: table::error::Error,
},

#[snafu(display("Failed to start script manager, source: {}", source))]
StartScriptManager {
#[snafu(backtrace)]
Expand Down Expand Up @@ -644,6 +656,8 @@ impl ErrorExt for Error {
source.status_code()
}

Error::MissingTimeIndexColumn { source, .. } => source.status_code(),

Error::FindDatanode { .. }
| Error::CreateTableRoute { .. }
| Error::FindRegionRoute { .. }
Expand Down Expand Up @@ -693,7 +707,8 @@ impl ErrorExt for Error {
Error::DeserializePartition { source, .. }
| Error::FindTablePartitionRule { source, .. }
| Error::FindTableRoute { source, .. }
| Error::SplitInsert { source, .. } => source.status_code(),
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. } => source.status_code(),

Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,

Expand Down
35 changes: 12 additions & 23 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod deleter;
pub(crate) mod inserter;

use std::collections::HashMap;
Expand All @@ -21,7 +22,7 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest,
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests,
FlushTableExpr, InsertRequests, TruncateTableExpr,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -56,7 +57,6 @@ use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
Expand All @@ -66,9 +66,10 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, UnrecognizedTableOptionSnafu,
TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::DistTable;

Expand Down Expand Up @@ -626,27 +627,15 @@ impl DistInstance {

async fn handle_dist_delete(
&self,
request: DeleteRequest,
request: DeleteRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table_name = &request.table_name;
let table_ref = TableReference::full(catalog, schema, table_name);

let table = self
.catalog_manager
.table(catalog, schema, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;

let request = common_grpc_expr::delete::to_table_delete_request(request)
.context(ToTableDeleteRequestSnafu)?;

let affected_rows = table.delete(request).await.context(TableSnafu)?;
let deleter = DistDeleter::new(
ctx.current_catalog().to_string(),
ctx.current_schema().to_string(),
self.catalog_manager(),
);
let affected_rows = deleter.grpc_delete(request).await?;
Comment thread
killme2008 marked this conversation as resolved.
Ok(Output::AffectedRows(affected_rows))
}

Expand Down Expand Up @@ -676,11 +665,11 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::Delete(request) => self.handle_dist_delete(request, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu {
feat: "row insert/delete",
}
.fail(),
Request::Deletes(requests) => self.handle_dist_delete(requests, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
Expand Down
Loading