Skip to content

Make infer role configurable and fix double parse bug #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitely selected with our custom protocol.
Expand Down Expand Up @@ -134,6 +138,7 @@ database = "shard2"
pool_mode = "session"
default_role = "primary"
query_parser_enabled = true
query_parser_read_write_splitting = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

Expand Down
4 changes: 4 additions & 0 deletions examples/docker/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
4 changes: 4 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ default_role = "any"
# we'll direct it to the primary.
query_parser_enabled = true

# If the query parser is enabled and this setting is enabled, we'll attempt to
# infer the role from the query itself.
query_parser_read_write_splitting = true

# If the query parser is enabled and this setting is enabled, the primary will be part of the pool of databases used for
# load balancing of read queries. Otherwise, the primary will only be used for write
# queries. The primary can always be explicitly selected with our custom protocol.
Expand Down
112 changes: 76 additions & 36 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,9 @@ where
let mut prepared_statement = None;
let mut will_prepare = false;

let client_identifier =
ClientIdentifier::new(&self.application_name, &self.username, &self.pool_name);

// Our custom protocol loop.
// We expect the client to either start a transaction with regular queries
// or issue commands for our sharding and server selection protocol.
Expand Down Expand Up @@ -812,6 +815,21 @@ where
message_result = read_message(&mut self.read) => message_result?
};

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;
query_router.update_pool_settings(pool.settings.clone());

let mut initial_parsed_ast = None;

match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
Expand Down Expand Up @@ -841,24 +859,34 @@ where

'Q' => {
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
let plugin_result = query_router.execute_plugins(&ast).await;
match query_router.parse(&message) {
Ok(ast) => {
let plugin_result = query_router.execute_plugins(&ast).await;

match plugin_result {
Ok(PluginOutput::Deny(error)) => {
error_response(&mut self.write, &error).await?;
continue;
}
match plugin_result {
Ok(PluginOutput::Deny(error)) => {
error_response(&mut self.write, &error).await?;
continue;
}

Ok(PluginOutput::Intercept(result)) => {
write_all(&mut self.write, result).await?;
continue;
}
Ok(PluginOutput::Intercept(result)) => {
write_all(&mut self.write, result).await?;
continue;
}

_ => (),
};
_ => (),
};

let _ = query_router.infer(&ast);

let _ = query_router.infer(&ast);
initial_parsed_ast = Some(ast);
}
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
}
}
}
}
Expand All @@ -872,13 +900,21 @@ where
self.buffer.put(&message[..]);

if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}
match query_router.parse(&message) {
Ok(ast) => {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}

let _ = query_router.infer(&ast);
}
let _ = query_router.infer(&ast);
}
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
}
};
}

continue;
Expand Down Expand Up @@ -922,13 +958,6 @@ where
_ => (),
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

// Check on plugin results.
match plugin_output {
Some(PluginOutput::Deny(error)) => {
Expand All @@ -941,11 +970,6 @@ where
_ => (),
};

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool = self.get_pool().await?;

// Check if the pool is paused and wait until it's resumed.
if pool.wait_paused().await {
// Refresh pool information, something might have changed.
Expand Down Expand Up @@ -1165,6 +1189,9 @@ where
None => {
trace!("Waiting for message inside transaction or in session mode");

// This is not an initial message so discard the initial_parsed_ast
initial_parsed_ast.take();

match tokio::time::timeout(
idle_client_timeout_duration,
read_message(&mut self.read),
Expand Down Expand Up @@ -1221,7 +1248,22 @@ where
// Query
'Q' => {
if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
// We don't want to parse again if we already parsed it as the initial message
let ast = match initial_parsed_ast {
Some(_) => Some(initial_parsed_ast.take().unwrap()),
None => match query_router.parse(&message) {
Ok(ast) => Some(ast),
Err(error) => {
warn!(
"Query parsing error: {} (client: {})",
error, client_identifier
);
None
}
},
};

if let Some(ast) = ast {
let plugin_result = query_router.execute_plugins(&ast).await;

match plugin_result {
Expand All @@ -1237,8 +1279,6 @@ where

_ => (),
};

let _ = query_router.infer(&ast);
}
}
debug!("Sending query to server");
Expand Down Expand Up @@ -1290,7 +1330,7 @@ where
}

if query_router.query_parser_enabled() {
if let Ok(ast) = QueryRouter::parse(&message) {
if let Ok(ast) = query_router.parse(&message) {
if let Ok(output) = query_router.execute_plugins(&ast).await {
plugin_output = Some(output);
}
Expand Down
39 changes: 39 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ pub struct Pool {
#[serde(default)] // False
pub query_parser_enabled: bool,

pub query_parser_max_length: Option<usize>,

#[serde(default)] // False
pub query_parser_read_write_splitting: bool,

#[serde(default)] // False
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -627,6 +632,18 @@ impl Pool {
}
}

if self.query_parser_read_write_splitting && !self.query_parser_enabled {
error!(
"query_parser_read_write_splitting is only valid when query_parser_enabled is true"
);
return Err(Error::BadConfig);
}

if self.plugins.is_some() && !self.query_parser_enabled {
error!("plugins are only valid when query_parser_enabled is true");
return Err(Error::BadConfig);
}

self.automatic_sharding_key = match &self.automatic_sharding_key {
Some(key) => {
// No quotes in the key so we don't have to compare quoted
Expand Down Expand Up @@ -663,6 +680,8 @@ impl Default for Pool {
users: BTreeMap::default(),
default_role: String::from("any"),
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: false,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -914,6 +933,17 @@ impl From<&Config> for std::collections::HashMap<String, String> {
format!("pools.{}.query_parser_enabled", pool_name),
pool.query_parser_enabled.to_string(),
),
(
format!("pools.{}.query_parser_max_length", pool_name),
match pool.query_parser_max_length {
Some(max_length) => max_length.to_string(),
None => String::from("unlimited"),
},
),
(
format!("pools.{}.query_parser_read_write_splitting", pool_name),
pool.query_parser_read_write_splitting.to_string(),
),
(
format!("pools.{}.default_role", pool_name),
pool.default_role.clone(),
Expand Down Expand Up @@ -1096,6 +1126,15 @@ impl Config {
"[pool: {}] Query router: {}",
pool_name, pool_config.query_parser_enabled
);

info!(
"[pool: {}] Query parser max length: {:?}",
pool_name, pool_config.query_parser_max_length
);
info!(
"[pool: {}] Infer role from query: {}",
pool_name, pool_config.query_parser_read_write_splitting
);
info!(
"[pool: {}] Number of shards: {}",
pool_name,
Expand Down
11 changes: 11 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ pub struct PoolSettings {
// Enable/disable query parser.
pub query_parser_enabled: bool,

// Max length of query the parser will parse.
pub query_parser_max_length: Option<usize>,

// Infer role
pub query_parser_read_write_splitting: bool,

// Read from the primary as well or not.
pub primary_reads_enabled: bool,

Expand Down Expand Up @@ -157,6 +163,8 @@ impl Default for PoolSettings {
db: String::default(),
default_role: None,
query_parser_enabled: false,
query_parser_max_length: None,
query_parser_read_write_splitting: false,
primary_reads_enabled: true,
sharding_function: ShardingFunction::PgBigintHash,
automatic_sharding_key: None,
Expand Down Expand Up @@ -456,6 +464,9 @@ impl ConnectionPool {
_ => unreachable!(),
},
query_parser_enabled: pool_config.query_parser_enabled,
query_parser_max_length: pool_config.query_parser_max_length,
query_parser_read_write_splitting: pool_config
.query_parser_read_write_splitting,
primary_reads_enabled: pool_config.primary_reads_enabled,
sharding_function: pool_config.sharding_function,
automatic_sharding_key: pool_config.automatic_sharding_key.clone(),
Expand Down
Loading