Skip to content

Commit 9ad6c45

Browse files
test: Sqlness tests for distribute mode (#979)
* test: Sqlness tests for distribute mode * ci * fix: resolve PR comments * fix: resolve PR comments
1 parent 7fe417e commit 9ad6c45

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+650
-209
lines changed

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/datanode/src/error.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ impl ErrorExt for Error {
335335
source.status_code()
336336
}
337337
Error::DecodeLogicalPlan { source } => source.status_code(),
338-
Error::NewCatalog { source } => source.status_code(),
338+
Error::NewCatalog { source } | Error::RegisterSchema { source } => source.status_code(),
339339
Error::FindTable { source, .. } => source.status_code(),
340340
Error::CreateTable { source, .. }
341341
| Error::GetTable { source, .. }
@@ -379,7 +379,6 @@ impl ErrorExt for Error {
379379
| Error::CreateDir { .. }
380380
| Error::InsertSystemCatalog { .. }
381381
| Error::RenameTable { .. }
382-
| Error::RegisterSchema { .. }
383382
| Error::Catalog { .. }
384383
| Error::MissingRequiredField { .. }
385384
| Error::IncorrectInternalState { .. } => StatusCode::Internal,

src/datanode/src/sql/alter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl SqlHandler {
7272
alter_table: AlterTable,
7373
table_ref: TableReference,
7474
) -> Result<AlterTableRequest> {
75-
let alter_kind = match alter_table.alter_operation() {
75+
let alter_kind = match &alter_table.alter_operation() {
7676
AlterTableOperation::AddConstraint(table_constraint) => {
7777
return error::InvalidSqlSnafu {
7878
msg: format!("unsupported table constraint {table_constraint}"),

src/frontend/src/catalog.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ impl CatalogManager for FrontendCatalogManager {
8282
Ok(true)
8383
}
8484

85-
async fn deregister_table(&self, _request: DeregisterTableRequest) -> CatalogResult<bool> {
85+
async fn deregister_table(&self, request: DeregisterTableRequest) -> CatalogResult<bool> {
86+
let table_name = TableName::new(request.catalog, request.schema, request.table_name);
87+
self.partition_manager
88+
.table_routes()
89+
.invalidate_table_route(&table_name)
90+
.await;
8691
Ok(true)
8792
}
8893

src/frontend/src/error.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub enum Error {
114114
backtrace: Backtrace,
115115
},
116116

117-
#[snafu(display("Table not found: {}", table_name))]
117+
#[snafu(display("Table `{}` not exist", table_name))]
118118
TableNotFound {
119119
table_name: String,
120120
backtrace: Backtrace,
@@ -214,6 +214,9 @@ pub enum Error {
214214
backtrace: Backtrace,
215215
},
216216

217+
#[snafu(display("Schema {} already exists", name))]
218+
SchemaExists { name: String, backtrace: Backtrace },
219+
217220
#[snafu(display("Table occurs error, source: {}", source))]
218221
Table {
219222
#[snafu(backtrace)]
@@ -392,8 +395,9 @@ impl ErrorExt for Error {
392395
Error::StartMetaClient { source } | Error::RequestMeta { source } => {
393396
source.status_code()
394397
}
395-
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
396-
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
398+
Error::CatalogNotFound { .. }
399+
| Error::SchemaNotFound { .. }
400+
| Error::SchemaExists { .. } => StatusCode::InvalidArguments,
397401

398402
Error::BuildCreateExprOnInsertion { source }
399403
| Error::ToTableInsertRequest { source }

src/frontend/src/instance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl Instance {
396396
| Statement::Alter(_)
397397
| Statement::DropTable(_) => self.sql_handler.do_statement_query(stmt, query_ctx).await,
398398
Statement::Use(db) => self.handle_use(db, query_ctx),
399-
_ => NotSupportedSnafu {
399+
Statement::ShowCreateTable(_) => NotSupportedSnafu {
400400
feat: format!("{stmt:?}"),
401401
}
402402
.fail(),

src/frontend/src/instance/distributed.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use datatypes::schema::{RawSchema, Schema};
3838
use meta_client::client::MetaClient;
3939
use meta_client::rpc::router::DeleteRequest as MetaDeleteRequest;
4040
use meta_client::rpc::{
41-
CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse,
42-
TableName,
41+
CompareAndPutRequest, CreateRequest as MetaCreateRequest, Partition as MetaPartition,
42+
RouteResponse, TableName,
4343
};
4444
use partition::partition::{PartitionBound, PartitionDef};
4545
use query::parser::QueryStatement;
@@ -60,8 +60,9 @@ use crate::datanode::DatanodeClients;
6060
use crate::error::{
6161
self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu,
6262
ColumnDataTypeSnafu, DeserializePartitionSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
63-
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu,
64-
TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu,
63+
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
64+
StartMetaClientSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu,
65+
ToTableInsertRequestSnafu,
6566
};
6667
use crate::expr_factory;
6768
use crate::instance::parse_stmt;
@@ -105,6 +106,26 @@ impl DistInstance {
105106
&create_table.table_name,
106107
);
107108

109+
if self
110+
.catalog_manager
111+
.table(
112+
&table_name.catalog_name,
113+
&table_name.schema_name,
114+
&table_name.table_name,
115+
)
116+
.context(CatalogSnafu)?
117+
.is_some()
118+
{
119+
return if create_table.create_if_not_exists {
120+
Ok(Output::AffectedRows(0))
121+
} else {
122+
TableAlreadyExistSnafu {
123+
table: table_name.to_string(),
124+
}
125+
.fail()
126+
};
127+
}
128+
108129
let mut table_info = create_table_info(create_table)?;
109130

110131
let response = self
@@ -157,7 +178,7 @@ impl DistInstance {
157178
.register_table(request)
158179
.await
159180
.context(CatalogSnafu)?,
160-
error::TableAlreadyExistSnafu {
181+
TableAlreadyExistSnafu {
161182
table: table_name.to_string()
162183
}
163184
);
@@ -266,6 +287,10 @@ impl DistInstance {
266287
let create_expr = &mut expr_factory::create_to_expr(&stmt, query_ctx)?;
267288
Ok(self.create_table(create_expr, stmt.partitions).await?)
268289
}
290+
Statement::Alter(alter_table) => {
291+
let expr = grpc::to_alter_expr(alter_table, query_ctx)?;
292+
return self.handle_alter_table(expr).await;
293+
}
269294
Statement::DropTable(stmt) => {
270295
let (catalog, schema, table) =
271296
table_idents_to_full_name(stmt.table_name(), query_ctx)
@@ -358,10 +383,19 @@ impl DistInstance {
358383
.store_client()
359384
.context(StartMetaClientSnafu)?;
360385

361-
let request = PutRequest::default()
386+
let request = CompareAndPutRequest::new()
362387
.with_key(key.to_string())
363388
.with_value(value.as_bytes().context(CatalogEntrySerdeSnafu)?);
364-
client.put(request.into()).await.context(RequestMetaSnafu)?;
389+
let response = client
390+
.compare_and_put(request.into())
391+
.await
392+
.context(RequestMetaSnafu)?;
393+
ensure!(
394+
response.success,
395+
SchemaExistsSnafu {
396+
name: key.schema_name
397+
}
398+
);
365399

366400
Ok(Output::AffectedRows(1))
367401
}

src/frontend/src/instance/distributed/grpc.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use alter_expr::Kind;
1516
use api::v1::ddl_request::Expr as DdlExpr;
1617
use api::v1::greptime_request::Request;
18+
use api::v1::{alter_expr, AddColumn, AddColumns, AlterExpr, DropColumn, DropColumns, RenameTable};
1719
use async_trait::async_trait;
20+
use common_error::prelude::BoxedError;
1821
use common_query::Output;
22+
use datanode::instance::sql::table_idents_to_full_name;
1923
use meta_client::rpc::TableName;
2024
use servers::query_handler::grpc::GrpcQueryHandler;
2125
use session::context::QueryContextRef;
22-
use snafu::OptionExt;
26+
use snafu::{OptionExt, ResultExt};
27+
use sql::statements::alter::{AlterTable, AlterTableOperation};
28+
use sql::statements::sql_column_def_to_grpc_column_def;
2329

24-
use crate::error::{self, Result};
30+
use crate::error::{self, ExternalSnafu, Result};
2531
use crate::instance::distributed::DistInstance;
2632

2733
#[async_trait]
@@ -56,3 +62,47 @@ impl GrpcQueryHandler for DistInstance {
5662
}
5763
}
5864
}
65+
66+
pub(crate) fn to_alter_expr(
67+
alter_table: AlterTable,
68+
query_ctx: QueryContextRef,
69+
) -> Result<AlterExpr> {
70+
let (catalog_name, schema_name, table_name) =
71+
table_idents_to_full_name(alter_table.table_name(), query_ctx)
72+
.map_err(BoxedError::new)
73+
.context(ExternalSnafu)?;
74+
75+
let kind = match alter_table.alter_operation() {
76+
AlterTableOperation::AddConstraint(_) => {
77+
return error::NotSupportedSnafu {
78+
feat: "ADD CONSTRAINT",
79+
}
80+
.fail();
81+
}
82+
AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns {
83+
add_columns: vec![AddColumn {
84+
column_def: Some(
85+
sql_column_def_to_grpc_column_def(column_def)
86+
.map_err(BoxedError::new)
87+
.context(ExternalSnafu)?,
88+
),
89+
is_key: false,
90+
}],
91+
}),
92+
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
93+
drop_columns: vec![DropColumn {
94+
name: name.value.to_string(),
95+
}],
96+
}),
97+
AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
98+
new_table_name: new_table_name.to_string(),
99+
}),
100+
};
101+
102+
Ok(AlterExpr {
103+
catalog_name,
104+
schema_name,
105+
table_name,
106+
kind: Some(kind),
107+
})
108+
}

src/partition/src/manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ impl PartitionRuleManager {
4646
Self { table_routes }
4747
}
4848

49+
pub fn table_routes(&self) -> &TableRoutes {
50+
self.table_routes.as_ref()
51+
}
52+
4953
/// Find table route of given table name.
5054
pub async fn find_table_route(&self, table: &TableName) -> Result<Arc<TableRoute>> {
5155
self.table_routes.get_route(table).await

src/partition/src/route.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::error::{self, Result};
2424

2525
pub struct TableRoutes {
2626
meta_client: Arc<MetaClient>,
27+
// TODO(LFC): Use table id as cache key, then remove all the manually invoked cache invalidations.
2728
cache: Cache<TableName, Arc<TableRoute>>,
2829
}
2930

@@ -72,4 +73,8 @@ impl TableRoutes {
7273
pub async fn insert_table_route(&self, table_name: TableName, table_route: Arc<TableRoute>) {
7374
self.cache.insert(table_name, table_route).await
7475
}
76+
77+
pub async fn invalidate_table_route(&self, table_name: &TableName) {
78+
self.cache.invalidate(table_name).await
79+
}
7580
}

0 commit comments

Comments
 (0)