From be180e654735c424a9c8f94796093fcde9b78ec8 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 22 Feb 2023 17:12:01 +0800 Subject: [PATCH 1/2] refactor: remove grpc client with default catalog/schema --- Cargo.lock | 2 ++ benchmarks/Cargo.toml | 1 + benchmarks/src/bin/nyc-taxi.rs | 3 ++- src/client/examples/logical.rs | 3 ++- src/client/src/database.rs | 5 ++--- src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/table.rs | 2 +- tests-integration/tests/grpc.rs | 4 ++-- tests/runner/Cargo.toml | 1 + tests/runner/src/env.rs | 5 +++-- 10 files changed, 17 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2c15a6a99d8..9bcdb069573b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,7 @@ dependencies = [ "arrow", "clap 4.1.4", "client", + "common-catalog", "indicatif", "itertools", "parquet", @@ -7013,6 +7014,7 @@ dependencies = [ "async-trait", "client", "common-base", + "common-catalog", "common-error", "common-grpc", "common-query", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 579530784a00..06422150a72d 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] arrow.workspace = true clap = { version = "4.0", features = ["derive"] } +common-catalog = { path = "../src/common/catalog" } client = { path = "../src/client" } indicatif = "0.17.1" itertools = "0.10.5" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 368dde8ce324..e99ee1f5a4d8 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -28,6 +28,7 @@ use clap::Parser; use client::api::v1::column::Values; use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; use client::{Client, Database}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; @@ -422,7 +423,7 @@ fn main() { .unwrap() .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); - let db = Database::with_client(client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); if !args.skip_write { do_write(&args, &db).await; diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 07debec67991..e3cd1f7747d1 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -14,6 +14,7 @@ use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId}; use client::{Client, Database}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use prost_09::Message; use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; use substrait_proto::protobuf::read_rel::{NamedTable, ReadType}; @@ -65,7 +66,7 @@ async fn run() { region_ids: vec![0], }; - let db = Database::with_client(client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); let result = db.create(create_table_expr).await.unwrap(); event!(Level::INFO, "create table result: {:#?}", result); diff --git a/src/client/src/database.rs b/src/client/src/database.rs index efc1eacfa88f..7e0505dd04e3 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -23,7 +23,6 @@ use api::v1::{ InsertRequest, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::*; use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; use common_query::Output; @@ -56,8 +55,8 @@ impl Database { } } - pub fn with_client(client: Client) -> Self { - Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client) + pub fn set_catalog(&mut self, catalog: impl Into) { + self.catalog = catalog.into(); } pub fn set_schema(&mut self, schema: impl Into) { diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 0394c3d91463..8ab217984255 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -186,7 +186,7 @@ impl DistInstance { for datanode in table_route.find_leaders() { let client = self.datanode_clients.get_client(&datanode).await; - let client = Database::with_client(client); + let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let regions = table_route.find_leader_regions(&datanode); let mut create_expr_for_region = create_table.clone(); diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 65a18ef61594..a26418225709 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -258,7 +258,7 @@ impl DistTable { ); for datanode in leaders { let client = self.datanode_clients.get_client(&datanode).await; - let db = Database::with_client(client); + let db = Database::new(&expr.catalog_name, &expr.schema_name, client); debug!("Sending {:?} to {:?}", expr, db); let result = db .alter(expr.clone()) diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 15e2687b4284..70e8717e09cd 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -65,7 +65,7 @@ pub async fn test_auto_create_table(store_type: StorageType) { setup_grpc_server(store_type, "auto_create_table").await; let grpc_client = Client::with_urls(vec![addr]); - let db = Database::with_client(grpc_client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); insert_and_assert(&db).await; let _ = fe_grpc_server.shutdown().await; guard.remove_all().await; @@ -131,7 +131,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { setup_grpc_server(store_type, "insert_and_select").await; let grpc_client = Client::with_urls(vec![addr]); - let db = Database::with_client(grpc_client); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); // create let expr = testing_create_expr(); diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 10d69895d5a6..a9077f2fa513 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true async-trait = "0.1" client = { path = "../../src/client" } common-base = { path = "../../src/common/base" } +common-catalog = { path = "../../src/common/catalog" } common-error = { path = "../../src/common/error" } common-grpc = { path = "../../src/common/grpc" } common-query = { path = "../../src/common/query" } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 98b2142fc005..ebc2f271f7f2 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -20,6 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use client::{Client, Database as DB, Error as ClientError}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::snafu::ErrorCompat; use common_query::Output; @@ -110,7 +111,7 @@ impl Env { println!("Started, going to test. Log will be write to {SERVER_LOG_FILE}"); let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::with_client(client); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { server_process, @@ -180,7 +181,7 @@ impl Env { } let client = Client::with_urls(vec![SERVER_ADDR]); - let db = DB::with_client(client); + let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); GreptimeDB { server_process: frontend, From c450194c3b8f3a4890d2e9e5d067f4836818f822 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 22 Feb 2023 17:24:33 +0800 Subject: [PATCH 2/2] refactor: re-export consts in client module --- Cargo.lock | 2 -- benchmarks/Cargo.toml | 1 - benchmarks/src/bin/nyc-taxi.rs | 3 +-- src/client/src/lib.rs | 1 + src/servers/tests/grpc/mod.rs | 4 ++-- tests-integration/tests/grpc.rs | 4 ++-- tests/runner/Cargo.toml | 1 - tests/runner/src/env.rs | 5 +++-- 8 files changed, 9 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9bcdb069573b..c2c15a6a99d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,7 +703,6 @@ dependencies = [ "arrow", "clap 4.1.4", "client", - "common-catalog", "indicatif", "itertools", "parquet", @@ -7014,7 +7013,6 @@ dependencies = [ "async-trait", "client", "common-base", - "common-catalog", "common-error", "common-grpc", "common-query", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 06422150a72d..579530784a00 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,6 @@ license.workspace = true [dependencies] arrow.workspace = true clap = { version = "4.0", features = ["derive"] } -common-catalog = { path = "../src/common/catalog" } client = { path = "../src/client" } indicatif = "0.17.1" itertools = "0.10.5" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index e99ee1f5a4d8..62de8bc0eb06 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -27,8 +27,7 @@ use arrow::record_batch::RecordBatch; use clap::Parser; use client::api::v1::column::Values; use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; -use client::{Client, Database}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index fbee1356d95a..2b8942e21ff3 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -18,6 +18,7 @@ mod error; pub mod load_balance; pub use api; +pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; pub use self::client::Client; pub use self::database::Database; diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index c2b943dfba11..e4a6606b63d1 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -19,7 +19,7 @@ use api::v1::auth_header::AuthScheme; use api::v1::Basic; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; -use client::{Client, Database}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::auth::UserProviderRef; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; @@ -126,7 +126,7 @@ async fn test_grpc_query() { assert!(re.is_ok()); let grpc_client = Client::with_urls(vec![re.unwrap().to_string()]); - let mut db = Database::with_client(grpc_client); + let mut db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); let re = db.sql("select * from numbers").await; assert!(re.is_err()); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 70e8717e09cd..77ac062ef904 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -17,8 +17,8 @@ use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId, }; -use client::{Client, Database}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::MIN_USER_TABLE_ID; use common_query::Output; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index a9077f2fa513..10d69895d5a6 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true async-trait = "0.1" client = { path = "../../src/client" } common-base = { path = "../../src/common/base" } -common-catalog = { path = "../../src/common/catalog" } common-error = { path = "../../src/common/error" } common-grpc = { path = "../../src/common/grpc" } common-query = { path = "../../src/common/query" } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index ebc2f271f7f2..840ec5a9b8e8 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -19,8 +19,9 @@ use std::process::Stdio; use std::time::Duration; use async_trait::async_trait; -use client::{Client, Database as DB, Error as ClientError}; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use client::{ + Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, +}; use common_error::ext::ErrorExt; use common_error::snafu::ErrorCompat; use common_query::Output;