From 804fd0d21b6d728b805ca37331ee4d2db37cd51b Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Tue, 1 Aug 2023 18:42:04 -0700 Subject: [PATCH 1/8] Improved logging --- src/client.rs | 10 +++++----- src/config.rs | 39 +++++++++++++++++++++++++++++++++++---- src/pool.rs | 4 +++- src/server.rs | 2 +- 4 files changed, 44 insertions(+), 11 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7d5e9798..4bb4a97a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -531,7 +531,7 @@ where error_response( &mut write, &format!( - "No pool configured for database: {:?}, user: {:?}", + "No pool configured for database: {:?}, user: {:?} (in startup)", pool_name, username ), ) @@ -1086,7 +1086,7 @@ where self.last_server_stats = Some(server.stats()); debug!( - "Client {:?} talking to server {:?}", + "Client {:?} talking to server {}", self.addr, server.address() ); @@ -1241,7 +1241,7 @@ where let _ = query_router.infer(&ast); } } - debug!("Sending query to server"); + debug!("Sending query to server (in Query mode)"); self.send_and_receive_loop( code, @@ -1354,7 +1354,7 @@ where // Sync // Frontend (client) is asking for the query result now. 'S' => { - debug!("Sending query to server"); + debug!("Sending query to server (in Sync mode)"); match plugin_output { Some(PluginOutput::Deny(error)) => { @@ -1499,7 +1499,7 @@ where error_response( &mut self.write, &format!( - "No pool configured for database: {}, user: {}", + "No pool configured for database: {}, user: {} (in get_pool)", self.pool_name, self.username ), ) diff --git a/src/config.rs b/src/config.rs index 9228b9bb..6e592a5e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -763,15 +763,22 @@ pub struct Plugins { pub prewarmer: Option, } +pub trait Plugin { + fn is_enabled(&self) -> bool; +} + impl std::fmt::Display for Plugins { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn is_enabled(arg: Option<&T>) -> bool { + if arg.is_some() { arg.unwrap().is_enabled() } else { false } + } write!( f, "interceptor: {}, table_access: {}, query_logger: {}, prewarmer: {}", - self.intercept.is_some(), - self.table_access.is_some(), - self.query_logger.is_some(), - self.prewarmer.is_some(), + is_enabled(self.intercept.as_ref()), + is_enabled(self.table_access.as_ref()), + is_enabled(self.query_logger.as_ref()), + is_enabled(self.prewarmer.as_ref()), ) } } @@ -782,23 +789,47 @@ pub struct Intercept { pub queries: BTreeMap, } +impl Plugin for Intercept { + fn is_enabled(&self) -> bool { + self.enabled + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)] pub struct TableAccess { pub enabled: bool, pub tables: Vec, } +impl Plugin for TableAccess { + fn is_enabled(&self) -> bool { + self.enabled + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)] pub struct QueryLogger { pub enabled: bool, } +impl Plugin for QueryLogger { + fn is_enabled(&self) -> bool { + self.enabled + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)] pub struct Prewarmer { pub enabled: bool, pub queries: Vec, } +impl Plugin for Prewarmer { + fn is_enabled(&self) -> bool { + self.enabled + } +} + impl Intercept { pub fn substitute(&mut self, db: &str, user: &str) { for (_, query) in self.queries.iter_mut() { diff --git a/src/pool.rs b/src/pool.rs index b9293521..1ebb02e0 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -789,7 +789,7 @@ impl ConnectionPool { match guard[address.shard].get(address) { Some(_) => true, None => { - debug!("{:?} is ok", address); + debug!("{} is ok", address); false } } @@ -1000,8 +1000,10 @@ impl ManageConnection for ServerPool { .await { Ok(mut conn) => { + // println!(">>>> self.plugins: {:?}", self.plugins); if let Some(ref plugins) = self.plugins { if let Some(ref prewarmer) = plugins.prewarmer { + // println!(">>>> prewarmer: {:?}", prewarmer); let mut prewarmer = prewarmer::Prewarmer { enabled: prewarmer.enabled, server: &mut conn, diff --git a/src/server.rs b/src/server.rs index afa1c09d..3b064ed4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1115,7 +1115,7 @@ impl Server { /// It will use the simple query protocol. /// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`. pub async fn query(&mut self, query: &str) -> Result<(), Error> { - debug!("Running `{}` on server {:?}", query, self.address); + debug!("Running `{}` on server {}", query, self.address); let query = simple_query(query); From 142438826b20f0b348d2cc8b1e6fad7af0ee651c Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Wed, 2 Aug 2023 13:17:10 -0700 Subject: [PATCH 2/8] Improved logging for more `Address` usages --- src/mirrors.rs | 14 +++++++------- src/pool.rs | 22 ++++++++++------------ src/server.rs | 8 ++++---- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/mirrors.rs b/src/mirrors.rs index 0f2b02c0..57518be5 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -62,7 +62,7 @@ impl MirroredClient { Ok(server) => server, Err(err) => { error!( - "Failed to get connection from pool, Discarding message {:?}, {:?}", + "Failed to get connection from pool, Discarding message {:?}, {}", err, address.clone() ); @@ -73,17 +73,17 @@ impl MirroredClient { tokio::select! { // Exit channel events _ = self.disconnect_rx.recv() => { - info!("Got mirror exit signal, exiting {:?}", address.clone()); + info!("Got mirror exit signal, exiting {}", address.clone()); break; } // Incoming data from server (we read to clear the socket buffer and discard the data) recv_result = server.recv() => { match recv_result { - Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()), + Ok(message) => trace!("Received from mirror: {} {}", String::from_utf8_lossy(&message[..]), address.clone()), Err(err) => { server.mark_bad(); - error!("Failed to receive from mirror {:?} {:?}", err, address.clone()); + error!("Failed to receive from mirror {:?} {}", err, address.clone()); } } } @@ -93,15 +93,15 @@ impl MirroredClient { match message { Some(bytes) => { match server.send(&BytesMut::from(&bytes[..])).await { - Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()), + Ok(_) => trace!("Sent to mirror: {} {}", String::from_utf8_lossy(&bytes[..]), address.clone()), Err(err) => { server.mark_bad(); - error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()) + error!("Failed to send to mirror, Discarding message {:?}, {}", err, address.clone()) } } } None => { - info!("Mirror channel closed, exiting {:?}", address.clone()); + info!("Mirror channel closed, exiting {}", address.clone()); break; }, } diff --git a/src/pool.rs b/src/pool.rs index 1ebb02e0..bb42176d 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -337,7 +337,7 @@ impl ConnectionPool { } } - debug!("Hash obtained for {:?}", address); + debug!("Hash obtained for {}", address); { let mut pool_auth_hash = pool_auth_hash.write(); @@ -631,7 +631,7 @@ impl ConnectionPool { if self.try_unban(&address).await { force_healthcheck = true; } else { - debug!("Address {:?} is banned", address); + debug!("Address {} is banned", address); continue; } } @@ -644,7 +644,7 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { error!( - "Connection checkout error for instance {:?}, error: {:?}", + "Connection checkout error for instance {}, error: {:?}", address, err ); self.ban(address, BanReason::FailedCheckout, Some(client_stats)); @@ -730,7 +730,7 @@ impl ConnectionPool { // Health check failed. Err(err) => { error!( - "Failed health check on instance {:?}, error: {:?}", + "Failed health check on instance {}, error: {:?}", address, err ); } @@ -739,7 +739,7 @@ impl ConnectionPool { // Health check timed out. Err(err) => { error!( - "Health check timeout on instance {:?}, error: {:?}", + "Health check timeout on instance {}, error: {:?}", address, err ); } @@ -761,7 +761,7 @@ impl ConnectionPool { return; } - error!("Banning instance {:?}, reason: {:?}", address, reason); + error!("Banning instance {}, reason: {:?}", address, reason); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.write(); @@ -839,14 +839,14 @@ impl ConnectionPool { drop(read_guard); if exceeded_ban_time { - warn!("Unbanning {:?}", address); + warn!("Unbanning {}", address); let mut write_guard = self.banlist.write(); write_guard[address.shard].remove(address); drop(write_guard); true } else { - debug!("{:?} is banned", address); + debug!("{} is banned", address); false } } @@ -920,7 +920,7 @@ impl ConnectionPool { return 0; } let busy = provisioned - idle; - debug!("{:?} has {:?} busy connections", address, busy); + debug!("{} has {:?} busy connections", address, busy); return busy; } } @@ -978,7 +978,7 @@ impl ManageConnection for ServerPool { /// Attempts to create a new connection. async fn connect(&self) -> Result { - info!("Creating a new server connection {:?}", self.address); + info!("Creating a new server connection {}", self.address); let stats = Arc::new(ServerStats::new( self.address.clone(), @@ -1000,10 +1000,8 @@ impl ManageConnection for ServerPool { .await { Ok(mut conn) => { - // println!(">>>> self.plugins: {:?}", self.plugins); if let Some(ref plugins) = self.plugins { if let Some(ref prewarmer) = plugins.prewarmer { - // println!(">>>> prewarmer: {:?}", prewarmer); let mut prewarmer = prewarmer::Prewarmer { enabled: prewarmer.enabled, server: &mut conn, diff --git a/src/server.rs b/src/server.rs index 3b064ed4..a51d4965 100644 --- a/src/server.rs +++ b/src/server.rs @@ -761,7 +761,7 @@ impl Server { } Err(err) => { error!( - "Terminating server {:?} because of: {:?}", + "Terminating server {} because of: {:?}", self.address, err ); self.bad = true; @@ -779,7 +779,7 @@ impl Server { Ok(message) => message, Err(err) => { error!( - "Terminating server {:?} because of: {:?}", + "Terminating server {} because of: {:?}", self.address, err ); self.bad = true; @@ -1093,7 +1093,7 @@ impl Server { /// Indicate that this server connection cannot be re-used and must be discarded. pub fn mark_bad(&mut self) { - error!("Server {:?} marked bad", self.address); + error!("Server {} marked bad", self.address); self.bad = true; } @@ -1346,7 +1346,7 @@ impl Drop for Server { }; info!( - "{} {:?}, session duration: {}", + "{} {}, session duration: {}", message, self.address, crate::format_duration(&duration) From a1926fa52febe26585fc2d70b5ef786c036df20b Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Wed, 2 Aug 2023 13:26:04 -0700 Subject: [PATCH 3/8] Fixed lint issues. --- src/config.rs | 6 +++++- src/server.rs | 10 ++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/config.rs b/src/config.rs index 6e592a5e..53bc9256 100644 --- a/src/config.rs +++ b/src/config.rs @@ -770,7 +770,11 @@ pub trait Plugin { impl std::fmt::Display for Plugins { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn is_enabled(arg: Option<&T>) -> bool { - if arg.is_some() { arg.unwrap().is_enabled() } else { false } + if arg.is_some() { + arg.unwrap().is_enabled() + } else { + false + } } write!( f, diff --git a/src/server.rs b/src/server.rs index a51d4965..4c703a82 100644 --- a/src/server.rs +++ b/src/server.rs @@ -760,10 +760,7 @@ impl Server { Ok(()) } Err(err) => { - error!( - "Terminating server {} because of: {:?}", - self.address, err - ); + error!("Terminating server {} because of: {:?}", self.address, err); self.bad = true; Err(err) } @@ -778,10 +775,7 @@ impl Server { let mut message = match read_message(&mut self.stream).await { Ok(message) => message, Err(err) => { - error!( - "Terminating server {} because of: {:?}", - self.address, err - ); + error!("Terminating server {} because of: {:?}", self.address, err); self.bad = true; return Err(err); } From b9ff2be57caae2fd85b6d2a348144d616d00a42a Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Fri, 11 Aug 2023 09:57:13 -0700 Subject: [PATCH 4/8] Reverted the `Address` logging changes. --- src/client.rs | 2 +- src/mirrors.rs | 14 +++++++------- src/pool.rs | 22 +++++++++++----------- src/server.rs | 16 +++++++++++----- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4bb4a97a..a8f0d746 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1086,7 +1086,7 @@ where self.last_server_stats = Some(server.stats()); debug!( - "Client {:?} talking to server {}", + "Client {:?} talking to server {:?}", self.addr, server.address() ); diff --git a/src/mirrors.rs b/src/mirrors.rs index 57518be5..0f2b02c0 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -62,7 +62,7 @@ impl MirroredClient { Ok(server) => server, Err(err) => { error!( - "Failed to get connection from pool, Discarding message {:?}, {}", + "Failed to get connection from pool, Discarding message {:?}, {:?}", err, address.clone() ); @@ -73,17 +73,17 @@ impl MirroredClient { tokio::select! { // Exit channel events _ = self.disconnect_rx.recv() => { - info!("Got mirror exit signal, exiting {}", address.clone()); + info!("Got mirror exit signal, exiting {:?}", address.clone()); break; } // Incoming data from server (we read to clear the socket buffer and discard the data) recv_result = server.recv() => { match recv_result { - Ok(message) => trace!("Received from mirror: {} {}", String::from_utf8_lossy(&message[..]), address.clone()), + Ok(message) => trace!("Received from mirror: {} {:?}", String::from_utf8_lossy(&message[..]), address.clone()), Err(err) => { server.mark_bad(); - error!("Failed to receive from mirror {:?} {}", err, address.clone()); + error!("Failed to receive from mirror {:?} {:?}", err, address.clone()); } } } @@ -93,15 +93,15 @@ impl MirroredClient { match message { Some(bytes) => { match server.send(&BytesMut::from(&bytes[..])).await { - Ok(_) => trace!("Sent to mirror: {} {}", String::from_utf8_lossy(&bytes[..]), address.clone()), + Ok(_) => trace!("Sent to mirror: {} {:?}", String::from_utf8_lossy(&bytes[..]), address.clone()), Err(err) => { server.mark_bad(); - error!("Failed to send to mirror, Discarding message {:?}, {}", err, address.clone()) + error!("Failed to send to mirror, Discarding message {:?}, {:?}", err, address.clone()) } } } None => { - info!("Mirror channel closed, exiting {}", address.clone()); + info!("Mirror channel closed, exiting {:?}", address.clone()); break; }, } diff --git a/src/pool.rs b/src/pool.rs index bb42176d..b9293521 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -337,7 +337,7 @@ impl ConnectionPool { } } - debug!("Hash obtained for {}", address); + debug!("Hash obtained for {:?}", address); { let mut pool_auth_hash = pool_auth_hash.write(); @@ -631,7 +631,7 @@ impl ConnectionPool { if self.try_unban(&address).await { force_healthcheck = true; } else { - debug!("Address {} is banned", address); + debug!("Address {:?} is banned", address); continue; } } @@ -644,7 +644,7 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { error!( - "Connection checkout error for instance {}, error: {:?}", + "Connection checkout error for instance {:?}, error: {:?}", address, err ); self.ban(address, BanReason::FailedCheckout, Some(client_stats)); @@ -730,7 +730,7 @@ impl ConnectionPool { // Health check failed. Err(err) => { error!( - "Failed health check on instance {}, error: {:?}", + "Failed health check on instance {:?}, error: {:?}", address, err ); } @@ -739,7 +739,7 @@ impl ConnectionPool { // Health check timed out. Err(err) => { error!( - "Health check timeout on instance {}, error: {:?}", + "Health check timeout on instance {:?}, error: {:?}", address, err ); } @@ -761,7 +761,7 @@ impl ConnectionPool { return; } - error!("Banning instance {}, reason: {:?}", address, reason); + error!("Banning instance {:?}, reason: {:?}", address, reason); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.write(); @@ -789,7 +789,7 @@ impl ConnectionPool { match guard[address.shard].get(address) { Some(_) => true, None => { - debug!("{} is ok", address); + debug!("{:?} is ok", address); false } } @@ -839,14 +839,14 @@ impl ConnectionPool { drop(read_guard); if exceeded_ban_time { - warn!("Unbanning {}", address); + warn!("Unbanning {:?}", address); let mut write_guard = self.banlist.write(); write_guard[address.shard].remove(address); drop(write_guard); true } else { - debug!("{} is banned", address); + debug!("{:?} is banned", address); false } } @@ -920,7 +920,7 @@ impl ConnectionPool { return 0; } let busy = provisioned - idle; - debug!("{} has {:?} busy connections", address, busy); + debug!("{:?} has {:?} busy connections", address, busy); return busy; } } @@ -978,7 +978,7 @@ impl ManageConnection for ServerPool { /// Attempts to create a new connection. async fn connect(&self) -> Result { - info!("Creating a new server connection {}", self.address); + info!("Creating a new server connection {:?}", self.address); let stats = Arc::new(ServerStats::new( self.address.clone(), diff --git a/src/server.rs b/src/server.rs index 4c703a82..afa1c09d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -760,7 +760,10 @@ impl Server { Ok(()) } Err(err) => { - error!("Terminating server {} because of: {:?}", self.address, err); + error!( + "Terminating server {:?} because of: {:?}", + self.address, err + ); self.bad = true; Err(err) } @@ -775,7 +778,10 @@ impl Server { let mut message = match read_message(&mut self.stream).await { Ok(message) => message, Err(err) => { - error!("Terminating server {} because of: {:?}", self.address, err); + error!( + "Terminating server {:?} because of: {:?}", + self.address, err + ); self.bad = true; return Err(err); } @@ -1087,7 +1093,7 @@ impl Server { /// Indicate that this server connection cannot be re-used and must be discarded. pub fn mark_bad(&mut self) { - error!("Server {} marked bad", self.address); + error!("Server {:?} marked bad", self.address); self.bad = true; } @@ -1109,7 +1115,7 @@ impl Server { /// It will use the simple query protocol. /// Result will not be returned, so this is useful for things like `SET` or `ROLLBACK`. pub async fn query(&mut self, query: &str) -> Result<(), Error> { - debug!("Running `{}` on server {}", query, self.address); + debug!("Running `{}` on server {:?}", query, self.address); let query = simple_query(query); @@ -1340,7 +1346,7 @@ impl Drop for Server { }; info!( - "{} {}, session duration: {}", + "{} {:?}, session duration: {}", message, self.address, crate::format_duration(&duration) From dcabbbb0b56be5fc04411be8a1839b7e53a1fbd4 Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Tue, 3 Oct 2023 19:32:41 +0000 Subject: [PATCH 5/8] Applied the PR comment by @levkk. --- src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index a8f0d746..d5dc8be4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -531,7 +531,7 @@ where error_response( &mut write, &format!( - "No pool configured for database: {:?}, user: {:?} (in startup)", + "No pool configured for database: {:?}, user: {:?}", pool_name, username ), ) @@ -1499,7 +1499,7 @@ where error_response( &mut self.write, &format!( - "No pool configured for database: {}, user: {} (in get_pool)", + "No pool configured for database: {}, user: {}", self.pool_name, self.username ), ) From 2cb16d2a07904575d7bd381b40192fa660c212ca Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Tue, 3 Oct 2023 19:34:07 +0000 Subject: [PATCH 6/8] Applied the PR comment by @levkk. --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index d5dc8be4..153ab97a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1354,7 +1354,7 @@ where // Sync // Frontend (client) is asking for the query result now. 'S' => { - debug!("Sending query to server (in Sync mode)"); + debug!("Sending query to server with extended protocol"); match plugin_output { Some(PluginOutput::Deny(error)) => { From dee386f28d4dec69d52c86c34ebe884ade30086d Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Tue, 3 Oct 2023 19:36:44 +0000 Subject: [PATCH 7/8] Applied the PR comment by @levkk. --- src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 153ab97a..7d5e9798 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1241,7 +1241,7 @@ where let _ = query_router.infer(&ast); } } - debug!("Sending query to server (in Query mode)"); + debug!("Sending query to server"); self.send_and_receive_loop( code, @@ -1354,7 +1354,7 @@ where // Sync // Frontend (client) is asking for the query result now. 'S' => { - debug!("Sending query to server with extended protocol"); + debug!("Sending query to server"); match plugin_output { Some(PluginOutput::Deny(error)) => { From c23a5d1516d720454ec5326f3a5fe1641efcdb04 Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Tue, 3 Oct 2023 19:38:57 +0000 Subject: [PATCH 8/8] Applied the PR comment by @levkk. --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 53bc9256..8863b92b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -770,8 +770,8 @@ pub trait Plugin { impl std::fmt::Display for Plugins { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn is_enabled(arg: Option<&T>) -> bool { - if arg.is_some() { - arg.unwrap().is_enabled() + if let Some(ref arg) = arg { + arg.is_enabled() } else { false }