Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::fmt::Debug;

use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
Expand All @@ -21,6 +22,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};

use crate::DeregisterTableRequest;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -96,6 +99,9 @@ pub enum Error {
#[snafu(display("Table `{}` already exists", table))]
TableExists { table: String, backtrace: Backtrace },

#[snafu(display("Table `{}` not exist", table))]
Comment thread
MichaelScofield marked this conversation as resolved.
TableNotExist { table: String, backtrace: Backtrace },

#[snafu(display("Schema {} already exists", schema))]
SchemaExists {
schema: String,
Expand Down Expand Up @@ -142,6 +148,17 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display(
"Failed to deregister table, request: {:?}, source: {}",
request,
source
))]
DeregisterTable {
request: DeregisterTableRequest,
#[snafu(backtrace)]
source: table::error::Error,
},

#[snafu(display("Illegal catalog manager state: {}", msg))]
IllegalManagerState { backtrace: Backtrace, msg: String },

Expand Down Expand Up @@ -232,13 +249,16 @@ impl ErrorExt for Error {
Error::InvalidCatalogValue { source, .. } => source.status_code(),

Error::TableExists { .. } => StatusCode::TableAlreadyExists,
Error::TableNotExist { .. } => StatusCode::TableNotFound,
Error::SchemaExists { .. } => StatusCode::InvalidArguments,

Error::OpenSystemCatalog { source, .. }
| Error::CreateSystemCatalog { source, .. }
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. } => source.status_code(),
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. } => source.status_code(),

Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub struct RenameTableRequest {
pub table_id: TableId,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
pub schema: String,
Expand Down
37 changes: 30 additions & 7 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result,
SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu,
TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu,
self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use crate::system::{
Expand Down Expand Up @@ -419,11 +419,34 @@ impl CatalogManager for LocalCatalogManager {
.is_ok())
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
{
let started = *self.init_lock.lock().await;
ensure!(started, IllegalManagerStateSnafu { msg: "not started" });
}

{
let _ = self.register_lock.lock().await;

let catalog = &request.catalog;
let schema = &request.schema;
let table_name = &request.table_name;
Comment thread
MichaelScofield marked this conversation as resolved.
Outdated
let table_id = self
.catalogs
.table(catalog, schema, table_name)?
.context(error::TableNotExistSnafu {
table: format!("{catalog}.{schema}.{table_name}"),
})?
Comment thread
MichaelScofield marked this conversation as resolved.
Outdated
.table_info()
.ident
.table_id;

if !self.system.deregister_table(&request, table_id).await? {
return Ok(false);
}

self.catalogs.deregister_table(request).await
}
.fail()
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
Expand Down
8 changes: 6 additions & 2 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::sync::{Arc, RwLock};

use common_catalog::consts::MIN_USER_TABLE_ID;
use common_telemetry::error;
use snafu::OptionExt;
use snafu::{ensure, OptionExt};
use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
self, CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::schema::SchemaProvider;
use crate::{
Expand Down Expand Up @@ -250,6 +250,10 @@ impl CatalogProvider for MemoryCatalogProvider {
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
let mut schemas = self.schemas.write().unwrap();
ensure!(
!schemas.contains_key(&name),
error::SchemaExistsSnafu { schema: &name }
);
Ok(schemas.insert(name, schema))
}

Expand Down
109 changes: 90 additions & 19 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,20 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableInfoRef};
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest};
use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest};
use table::{Table, TableRef};

use crate::error::{
self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,
};
use crate::DeregisterTableRequest;

pub const ENTRY_TYPE_INDEX: usize = 0;
pub const KEY_INDEX: usize = 1;
pub const VALUE_INDEX: usize = 3;

pub struct SystemCatalogTable {
table_info: TableInfoRef,
pub table: TableRef,
}
pub struct SystemCatalogTable(TableRef);

#[async_trait::async_trait]
impl Table for SystemCatalogTable {
Expand All @@ -56,25 +54,29 @@ impl Table for SystemCatalogTable {
}

fn schema(&self) -> SchemaRef {
self.table_info.meta.schema.clone()
self.0.schema()
}

async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
panic!("System catalog table does not support scan!")
self.0.scan(projection, filters, limit).await
}

/// Insert values into table.
async fn insert(&self, request: InsertRequest) -> table::error::Result<usize> {
self.table.insert(request).await
self.0.insert(request).await
}

fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
self.0.table_info()
}

async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
self.0.delete(request).await
}
}

Expand All @@ -95,10 +97,7 @@ impl SystemCatalogTable {
.await
.context(OpenSystemCatalogSnafu)?
{
Ok(Self {
table_info: table.table_info(),
table,
})
Ok(Self(table))
} else {
// system catalog table is not yet created, try to create
let request = CreateTableRequest {
Expand All @@ -118,8 +117,7 @@ impl SystemCatalogTable {
.create_table(&ctx, request)
.await
.context(CreateSystemCatalogSnafu)?;
let table_info = table.table_info();
Ok(Self { table, table_info })
Ok(Self(table))
}
}

Expand All @@ -128,7 +126,6 @@ impl SystemCatalogTable {
let full_projection = None;
let ctx = SessionContext::new();
let scan = self
.table
.scan(full_projection, &[], None)
.await
.context(error::SystemCatalogTableScanSnafu)?;
Expand Down Expand Up @@ -208,6 +205,29 @@ pub fn build_table_insert_request(
)
}

pub(crate) fn build_table_deletion_request(
request: &DeregisterTableRequest,
table_id: TableId,
) -> DeleteRequest {
let table_key = format_table_entry_key(&request.catalog, &request.schema, table_id);

let mut key_column_values = HashMap::with_capacity(3);
key_column_values.insert(
"entry_type".to_string(),
Comment thread
MichaelScofield marked this conversation as resolved.
Arc::new(UInt8Vector::from_slice(&[EntryType::Table as u8])) as _,
);
key_column_values.insert(
"key".to_string(),
Arc::new(BinaryVector::from_slice(&[table_key.as_bytes()])) as _,
);
// Timestamp in key part is intentionally left to 0
key_column_values.insert(
"timestamp".to_string(),
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
);
DeleteRequest { key_column_values }
}

pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
let full_schema_name = format!("{catalog_name}.{schema_name}");
build_insert_request(
Expand Down Expand Up @@ -380,6 +400,8 @@ pub struct TableEntryValue {

#[cfg(test)]
mod tests {
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
Expand Down Expand Up @@ -500,4 +522,53 @@ mod tests {
assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name);
assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name);
}

#[tokio::test]
async fn test_system_catalog_table_records() {
let (_, table_engine) = prepare_table_engine().await;
let catalog_table = SystemCatalogTable::new(table_engine).await.unwrap();

let table_insertion = build_table_insert_request(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
"my_table".to_string(),
1,
);
let result = catalog_table.insert(table_insertion).await.unwrap();
assert_eq!(result, 1);

let records = catalog_table.records().await.unwrap();
let mut batches = RecordBatches::try_collect(records).await.unwrap().take();
assert_eq!(batches.len(), 1);
let batch = batches.remove(0);
assert_eq!(batch.num_rows(), 1);

let row = batch.rows().next().unwrap();
let Value::UInt8(entry_type) = row[0] else { unreachable!() };
let Value::Binary(key) = row[1].clone() else { unreachable!() };
let Value::Binary(value) = row[3].clone() else { unreachable!() };
let entry = decode_system_catalog(Some(entry_type), Some(&*key), Some(&*value)).unwrap();
let expected = Entry::Table(TableEntry {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
table_id: 1,
});
assert_eq!(entry, expected);

let table_deletion = build_table_deletion_request(
&DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
},
1,
);
let result = catalog_table.delete(table_deletion).await.unwrap();
assert_eq!(result, 1);

let records = catalog_table.records().await.unwrap();
let batches = RecordBatches::try_collect(records).await.unwrap().take();
assert_eq!(batches.len(), 0);
}
}
26 changes: 23 additions & 3 deletions src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ use table::metadata::{TableId, TableInfoRef};
use table::table::scan::SimpleTableScan;
use table::{Table, TableRef};

use crate::error::{Error, InsertCatalogRecordSnafu};
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};
use crate::error::{self, Error, InsertCatalogRecordSnafu, Result as CatalogResult};
use crate::system::{
build_schema_insert_request, build_table_deletion_request, build_table_insert_request,
SystemCatalogTable,
};
use crate::{
CatalogListRef, CatalogProvider, DeregisterTableRequest, SchemaProvider, SchemaProviderRef,
};

/// Tables holds all tables created by user.
pub struct Tables {
Expand Down Expand Up @@ -279,6 +284,21 @@ impl SystemCatalog {
.context(InsertCatalogRecordSnafu)
}

pub(crate) async fn deregister_table(
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> CatalogResult<bool> {
self.information_schema
.system
.delete(build_table_deletion_request(request, table_id))
.await
.map(|x| x == 1)
.with_context(|_| error::DeregisterTableSnafu {
request: request.clone(),
})
}

pub async fn register_schema(
&self,
catalog: String,
Expand Down
Loading