Skip to content

Commit fe954b7

Browse files
MichaelScofieldwaynexia
authored andcommitted
refactor: system tables in new region server (#2344)
refactor: inverse the dependency between system tables and catalog manager
1 parent 3cab6de commit fe954b7

22 files changed

Lines changed: 183 additions & 1025 deletions

File tree

src/catalog/src/error.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use datatypes::prelude::ConcreteDataType;
2222
use snafu::{Location, Snafu};
2323
use tokio::task::JoinError;
2424

25-
use crate::DeregisterTableRequest;
26-
2725
#[derive(Debug, Snafu)]
2826
#[snafu(visibility(pub))]
2927
pub enum Error {
@@ -179,20 +177,6 @@ pub enum Error {
179177
source: table::error::Error,
180178
},
181179

182-
#[snafu(display(
183-
"Failed to deregister table, request: {:?}, source: {}",
184-
request,
185-
source
186-
))]
187-
DeregisterTable {
188-
request: DeregisterTableRequest,
189-
location: Location,
190-
source: table::error::Error,
191-
},
192-
193-
#[snafu(display("Illegal catalog manager state: {}", msg))]
194-
IllegalManagerState { location: Location, msg: String },
195-
196180
#[snafu(display("Failed to scan system catalog table, source: {}", source))]
197181
SystemCatalogTableScan {
198182
location: Location,
@@ -269,7 +253,6 @@ impl ErrorExt for Error {
269253
Error::InvalidKey { .. }
270254
| Error::SchemaNotFound { .. }
271255
| Error::TableNotFound { .. }
272-
| Error::IllegalManagerState { .. }
273256
| Error::CatalogNotFound { .. }
274257
| Error::InvalidEntryType { .. }
275258
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
@@ -302,7 +285,6 @@ impl ErrorExt for Error {
302285
| Error::InsertCatalogRecord { source, .. }
303286
| Error::OpenTable { source, .. }
304287
| Error::CreateTable { source, .. }
305-
| Error::DeregisterTable { source, .. }
306288
| Error::TableSchemaMismatch { source, .. } => source.status_code(),
307289

308290
Error::MetaSrv { source, .. } => source.status_code(),

src/catalog/src/lib.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,36 +44,40 @@ pub mod tables;
4444
pub trait CatalogManager: Send + Sync {
4545
fn as_any(&self) -> &dyn Any;
4646

47-
/// Starts a catalog manager.
48-
async fn start(&self) -> Result<()>;
49-
50-
/// Registers a catalog to catalog manager, returns whether the catalog exist before.
51-
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool>;
47+
/// Register a local catalog.
48+
///
49+
/// # Returns
50+
///
51+
/// Whether the catalog is registered.
52+
fn register_local_catalog(&self, name: &str) -> Result<bool>;
5253

53-
/// Register a schema with catalog name and schema name. Retuens whether the
54-
/// schema registered.
54+
/// Register a local schema.
55+
///
56+
/// # Returns
57+
///
58+
/// Whether the schema is registered.
5559
///
5660
/// # Errors
5761
///
5862
/// This method will/should fail if catalog not exist
59-
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
63+
fn register_local_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;
6064

6165
/// Deregisters a database within given catalog/schema to catalog manager
62-
async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
66+
fn deregister_local_schema(&self, request: DeregisterSchemaRequest) -> Result<bool>;
6367

64-
/// Registers a table within given catalog/schema to catalog manager,
65-
/// returns whether the table registered.
68+
/// Registers a local table.
69+
///
70+
/// # Returns
71+
///
72+
/// Whether the table is registered.
6673
///
6774
/// # Errors
6875
///
6976
/// This method will/should fail if catalog or schema not exist
70-
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool>;
77+
fn register_local_table(&self, request: RegisterTableRequest) -> Result<bool>;
7178

7279
/// Deregisters a table within given catalog/schema to catalog manager
73-
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()>;
74-
75-
/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
76-
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;
80+
fn deregister_local_table(&self, request: DeregisterTableRequest) -> Result<()>;
7781

7882
async fn catalog_names(&self) -> Result<Vec<String>>;
7983

@@ -160,7 +164,7 @@ pub struct RegisterSchemaRequest {
160164
pub schema: String,
161165
}
162166

163-
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
167+
pub(crate) async fn handle_system_table_request<'a, M: CatalogManager + ?Sized>(
164168
manager: &'a M,
165169
engine: TableEngineRef,
166170
sys_table_requests: &'a mut Vec<RegisterSystemTableRequest>,
@@ -185,15 +189,13 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
185189
table_name,
186190
),
187191
})?;
188-
let _ = manager
189-
.register_table(RegisterTableRequest {
190-
catalog: catalog_name.clone(),
191-
schema: schema_name.clone(),
192-
table_name: table_name.clone(),
193-
table_id,
194-
table: table.clone(),
195-
})
196-
.await?;
192+
manager.register_local_table(RegisterTableRequest {
193+
catalog: catalog_name.clone(),
194+
schema: schema_name.clone(),
195+
table_name: table_name.clone(),
196+
table_id,
197+
table: table.clone(),
198+
})?;
197199
info!("Created and registered system table: {table_name}");
198200
table
199201
};

src/catalog/src/local.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,4 @@
1515
pub mod manager;
1616
pub mod memory;
1717

18-
pub use manager::LocalCatalogManager;
1918
pub use memory::{new_memory_catalog_manager, MemoryCatalogManager};

0 commit comments

Comments
 (0)