Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId};
use client::{Client, Database};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -422,7 +422,7 @@ fn main() {
.unwrap()
.block_on(async {
let client = Client::with_urls(vec![&args.endpoint]);
let db = Database::with_client(client);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

if !args.skip_write {
do_write(&args, &db).await;
Expand Down
3 changes: 2 additions & 1 deletion src/client/examples/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use prost_09::Message;
use substrait_proto::protobuf::plan_rel::RelType as PlanRelType;
use substrait_proto::protobuf::read_rel::{NamedTable, ReadType};
Expand Down Expand Up @@ -65,7 +66,7 @@ async fn run() {
region_ids: vec![0],
};

let db = Database::with_client(client);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let result = db.create(create_table_expr).await.unwrap();
event!(Level::INFO, "create table result: {:#?}", result);

Expand Down
5 changes: 2 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use api::v1::{
InsertRequest, QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::*;
use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage};
use common_query::Output;
Expand Down Expand Up @@ -56,8 +55,8 @@ impl Database {
}
}

pub fn with_client(client: Client) -> Self {
Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client)
pub fn set_catalog(&mut self, catalog: impl Into<String>) {
self.catalog = catalog.into();
}

pub fn set_schema(&mut self, schema: impl Into<String>) {
Expand Down
1 change: 1 addition & 0 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod error;
pub mod load_balance;

pub use api;
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};

pub use self::client::Client;
pub use self::database::Database;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl DistInstance {

for datanode in table_route.find_leaders() {
let client = self.datanode_clients.get_client(&datanode).await;
let client = Database::with_client(client);
let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client);

let regions = table_route.find_leader_regions(&datanode);
let mut create_expr_for_region = create_table.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl DistTable {
);
for datanode in leaders {
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::with_client(client);
let db = Database::new(&expr.catalog_name, &expr.schema_name, client);
debug!("Sending {:?} to {:?}", expr, db);
let result = db
.alter(expr.clone())
Expand Down
4 changes: 2 additions & 2 deletions src/servers/tests/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::auth_header::AuthScheme;
use api::v1::Basic;
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait;
use client::{Client, Database};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::auth::UserProviderRef;
use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu};
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn test_grpc_query() {
assert!(re.is_ok());

let grpc_client = Client::with_urls(vec![re.unwrap().to_string()]);
let mut db = Database::with_client(grpc_client);
let mut db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);

let re = db.sql("select * from numbers").await;
assert!(re.is_err());
Expand Down
8 changes: 4 additions & 4 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use api::v1::{
column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest, TableId,
};
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_query::Output;
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
Expand Down Expand Up @@ -65,7 +65,7 @@ pub async fn test_auto_create_table(store_type: StorageType) {
setup_grpc_server(store_type, "auto_create_table").await;

let grpc_client = Client::with_urls(vec![addr]);
let db = Database::with_client(grpc_client);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
Expand Down Expand Up @@ -131,7 +131,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
setup_grpc_server(store_type, "insert_and_select").await;

let grpc_client = Client::with_urls(vec![addr]);
let db = Database::with_client(grpc_client);
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);

// create
let expr = testing_create_expr();
Expand Down
8 changes: 5 additions & 3 deletions tests/runner/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::process::Stdio;
use std::time::Duration;

use async_trait::async_trait;
use client::{Client, Database as DB, Error as ClientError};
use client::{
Client, Database as DB, Error as ClientError, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use common_error::ext::ErrorExt;
use common_error::snafu::ErrorCompat;
use common_query::Output;
Expand Down Expand Up @@ -110,7 +112,7 @@ impl Env {
println!("Started, going to test. Log will be write to {SERVER_LOG_FILE}");

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::with_client(client);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_process,
Expand Down Expand Up @@ -180,7 +182,7 @@ impl Env {
}

let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::with_client(client);
let db = DB::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);

GreptimeDB {
server_process: frontend,
Expand Down