Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ serde = { version = "1.0", features = ["derive"] }
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.28"
tokio = { version = "1", features = ["full"] }
tonic = "0.8"

[profile.release]
debug = true
2 changes: 1 addition & 1 deletion src/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
prost = "0.11"
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.8"
tonic.workspace = true

[build-dependencies]
tonic-build = "0.8"
1 change: 1 addition & 0 deletions src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn build_table_regional_prefix(
}

/// Table global info has only one key across all datanodes so it does not have `node_id` field.
#[derive(Clone)]
pub struct TableGlobalKey {
pub catalog_name: String,
pub schema_name: String,
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ enum_dispatch = "0.3"
parking_lot = "0.12"
rand = "0.8"
snafu.workspace = true
tonic = "0.8"
tonic.workspace = true

[dev-dependencies]
datanode = { path = "../datanode" }
Expand Down
24 changes: 12 additions & 12 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::error::{
MissingTimestampColumnSnafu, Result,
};

/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = if expr.catalog_name.is_empty() {
None
} else {
Expand All @@ -39,8 +39,9 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
} else {
Some(expr.schema_name)
};
match expr.kind {
Some(Kind::AddColumns(add_columns)) => {
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
match kind {
Kind::AddColumns(add_columns) => {
let add_column_requests = add_columns
.add_columns
.into_iter()
Expand Down Expand Up @@ -72,9 +73,9 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
Ok(request)
}
Some(Kind::DropColumns(DropColumns { drop_columns })) => {
Kind::DropColumns(DropColumns { drop_columns }) => {
let alter_kind = AlterKind::DropColumns {
names: drop_columns.into_iter().map(|c| c.name).collect(),
};
Expand All @@ -85,19 +86,18 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
Ok(request)
}
Some(Kind::RenameTable(RenameTable { new_table_name })) => {
Kind::RenameTable(RenameTable { new_table_name }) => {
let alter_kind = AlterKind::RenameTable { new_table_name };
let request = AlterTableRequest {
catalog_name,
schema_name,
table_name: expr.table_name,
alter_kind,
};
Ok(Some(request))
Ok(request)
}
None => Ok(None),
}
}

Expand Down Expand Up @@ -218,7 +218,7 @@ mod tests {
})),
};

let alter_request = alter_expr_to_request(expr).unwrap().unwrap();
let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(None, alter_request.catalog_name);
assert_eq!(None, alter_request.schema_name);
assert_eq!("monitor".to_string(), alter_request.table_name);
Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
})),
};

let alter_request = alter_expr_to_request(expr).unwrap().unwrap();
let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name);
assert_eq!(Some("test_schema".to_string()), alter_request.schema_name);
assert_eq!("monitor".to_string(), alter_request.table_name);
Expand Down
27 changes: 10 additions & 17 deletions src/common/grpc-expr/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};

use crate::error::{
ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu,
IllegalInsertDataSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result,
ColumnDataTypeSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu,
InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
Expand Down Expand Up @@ -281,10 +281,7 @@ pub fn build_create_expr_from_insertion(
Ok(expr)
}

pub fn to_table_insert_request(
request: GrpcInsertRequest,
schema: SchemaRef,
) -> Result<InsertRequest> {
pub fn to_table_insert_request(request: GrpcInsertRequest) -> Result<InsertRequest> {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
Expand All @@ -295,19 +292,17 @@ pub fn to_table_insert_request(
column_name,
values,
null_mask,
datatype,
..
} in request.columns
{
let Some(values) = values else { continue };

let vector_builder = &mut schema
.column_schema_by_name(&column_name)
.context(ColumnNotFoundSnafu {
column_name: &column_name,
table_name,
})?
.data_type
.create_mutable_vector(row_count);
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
.context(ColumnDataTypeSnafu)?
.into();

let vector_builder = &mut datatype.create_mutable_vector(row_count);

add_values_to_builder(vector_builder, values, row_count, null_mask)?;

Expand Down Expand Up @@ -620,8 +615,6 @@ mod tests {

#[test]
fn test_to_table_insert_request() {
let table: Arc<dyn Table> = Arc::new(DemoTable {});

let (columns, row_count) = mock_insert_batch();
let request = GrpcInsertRequest {
schema_name: "public".to_string(),
Expand All @@ -630,7 +623,7 @@ mod tests {
row_count,
region_number: 0,
};
let insert_req = to_table_insert_request(request, table.schema()).unwrap();
let insert_req = to_table_insert_request(request).unwrap();

assert_eq!("greptime", insert_req.catalog_name);
assert_eq!("public", insert_req.schema_name);
Expand Down
4 changes: 2 additions & 2 deletions src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ flatbuffers = "22"
futures = "0.3"
prost = "0.11"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tonic = "0.8"
tokio.workspace = true
tonic.workspace = true
tower = "0.4"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ storage = { path = "../storage" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }
tokio.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tonic.workspace = true
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;

mod flight;
pub mod flight;
mod grpc;
mod script;
mod sql;
Expand Down
8 changes: 4 additions & 4 deletions src/datanode/src/instance/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ impl Instance {
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;

let request = common_grpc_expr::insert::to_table_insert_request(request, table.schema())
.context(InsertDataSnafu)?;
let request =
common_grpc_expr::insert::to_table_insert_request(request).context(InsertDataSnafu)?;

let affected_rows = table
.insert(request)
Expand All @@ -182,7 +182,7 @@ impl Instance {
}
}

fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
pub fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
let stream = FlightRecordBatchStream::new(stream);
Expand Down Expand Up @@ -273,7 +273,7 @@ mod test {
});

let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(1)));
assert!(matches!(output, RpcOutput::AffectedRows(0)));

let ticket = Request::new(Ticket {
ticket: ObjectExpr {
Expand Down
2 changes: 0 additions & 2 deletions src/datanode/src/server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ impl Instance {

pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?;
let Some(request) = request else { return Ok(Output::AffectedRows(0)) };

self.sql_handler()
.execute(SqlRequest::Alter(request), QueryContext::arc())
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/sql/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl SqlHandler {
.context(InsertSystemCatalogSnafu)?;
info!("Successfully created table: {:?}", table_name);
// TODO(hl): maybe support create multiple tables
Ok(Output::AffectedRows(1))
Ok(Output::AffectedRows(0))
}

/// Converts [CreateTable] to [SqlRequest::CreateTable].
Expand Down
12 changes: 6 additions & 6 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_create_database_and_insert_query() {
)"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));

let output = execute_sql(
&instance,
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn test_issue477_same_table_name_in_different_databases() {
)"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));

let output = execute_sql(
&instance,
Expand All @@ -100,7 +100,7 @@ async fn test_issue477_same_table_name_in_different_databases() {
)"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));

// Insert different data into a.demo and b.demo
let output = execute_sql(
Expand Down Expand Up @@ -351,7 +351,7 @@ pub async fn test_execute_create() {
) engine=mito with(regions=1);"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));
}

async fn check_output_stream(output: Output, expected: String) {
Expand Down Expand Up @@ -458,7 +458,7 @@ async fn test_insert_with_default_value_for_type(type_name: &str) {
) engine=mito with(regions=1);"#,
);
let output = execute_sql(&instance, &create_sql).await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));

// Insert with ts.
let output = execute_sql(
Expand Down Expand Up @@ -508,7 +508,7 @@ async fn test_use_database() {
"db1",
)
.await;
assert!(matches!(output, Output::AffectedRows(1)));
assert!(matches!(output, Output::AffectedRows(0)));

let output = execute_sql_in_db(&instance, "show tables", "db1").await;
let expected = "\
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license.workspace = true

[dependencies]
anymap = "1.0.0-beta.2"
arrow-flight.workspace = true
api = { path = "../api" }
async-stream.workspace = true
async-trait = "0.1"
Expand All @@ -21,7 +22,6 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
Expand All @@ -45,12 +45,12 @@ sql = { path = "../sql" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }
tokio.workspace = true
tonic.workspace = true

[dev-dependencies]
datanode = { path = "../datanode" }
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"] }
tempdir = "0.3"
tonic = "0.8"
tower = "0.4"
Loading