Skip to content

Commit 5e641aa

Browse files
forward COM_PING to backend connection
When the connection is idle in some proxies/connection pool's, it's common to refresh the connection by sending a COM_PING(MySQL) or select 1 query(Postgres). Currently we are not forwarding the COM_PING to the backend connection, which can cause the connection to be closed by the backend server. Fixes: REA-4887 Fixes: #1387 Release-Note-Core: Fixes an issue that we misse to forward COM_PING to backend connection to keep the connection alive. Change-Id: I92178a37d58f8336a669c2f259ca9a478b856973 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8112 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 64a7ff1 commit 5e641aa

File tree

7 files changed

+46
-0
lines changed

7 files changed

+46
-0
lines changed

mysql-srv/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@
4848
//! }
4949
//! async fn on_close(&mut self, _: DeallocateId) {}
5050
//!
51+
//! async fn on_ping(&mut self) -> io::Result<()> {
52+
//! Ok(())
53+
//! }
54+
//!
5155
//! async fn on_reset(&mut self) -> io::Result<()> {
5256
//! Ok(())
5357
//! }
@@ -293,6 +297,9 @@ pub trait MySqlShim<W: AsyncWrite + Unpin + Send> {
293297
results: QueryResultWriter<'_, W>,
294298
) -> QueryResultsResponse;
295299

300+
/// Called when the client issue a ping command.
301+
async fn on_ping(&mut self) -> io::Result<()>;
302+
296303
/// Called when the client issues a reset command
297304
async fn on_reset(&mut self) -> io::Result<()>;
298305

@@ -818,6 +825,7 @@ impl<B: MySqlShim<W> + Send, R: AsyncRead + Unpin, W: AsyncWrite + Unpin + Send>
818825
.await?;
819826
}
820827
Command::Ping => {
828+
self.shim.on_ping().await?;
821829
writers::write_ok_packet(&mut self.writer, 0, 0, StatusFlags::empty()).await?;
822830
self.writer.flush().await?;
823831
}

mysql-srv/tests/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ where
9191

9292
async fn on_close(&mut self, _: DeallocateId) {}
9393

94+
async fn on_ping(&mut self) -> io::Result<()> {
95+
Ok(())
96+
}
97+
9498
async fn on_reset(&mut self) -> io::Result<()> {
9599
Ok(())
96100
}

readyset-adapter/src/backend.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,14 @@ where
859859
.unwrap_or_else(|| DB::DEFAULT_DB_VERSION.to_string())
860860
}
861861

862+
/// Send ping on the upstream connection, if it exists
863+
pub async fn ping(&mut self) -> Result<(), DB::Error> {
864+
if let Some(upstream) = &mut self.upstream {
865+
upstream.ping().await
866+
} else {
867+
Ok(())
868+
}
869+
}
862870
/// Reset the current upstream connection
863871
pub async fn reset(&mut self) -> Result<(), DB::Error> {
864872
if let Some(upstream) = &mut self.upstream {

readyset-adapter/src/upstream_database.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ pub trait UpstreamDatabase: Sized + Send {
110110
database: &str,
111111
) -> Result<(), Self::Error>;
112112

113+
/// Ping the upstream connection to see if it is still alive
114+
async fn ping(&mut self) -> Result<(), Self::Error>;
115+
113116
/// Reset the connection to the upstream database
114117
async fn reset(&mut self) -> Result<(), Self::Error>;
115118

@@ -280,6 +283,14 @@ where
280283
.await
281284
}
282285

286+
async fn ping(&mut self) -> Result<(), Self::Error> {
287+
if let Some(u) = &mut self.upstream {
288+
u.ping().await
289+
} else {
290+
Ok(())
291+
}
292+
}
293+
283294
async fn reset(&mut self) -> Result<(), Self::Error> {
284295
self.upstream().await?.reset().await
285296
}

readyset-mysql/src/backend.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,11 @@ where
746746
let _ = self.noria.remove_statement(statement_id).await;
747747
}
748748

749+
async fn on_ping(&mut self) -> std::io::Result<()> {
750+
self.ping()
751+
.await
752+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
753+
}
749754
async fn on_reset(&mut self) -> io::Result<()> {
750755
let _ = match self.reset().await {
751756
Ok(()) => Ok(()),

readyset-mysql/src/upstream.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ impl UpstreamDatabase for MySqlUpstream {
257257
Ok(self.conn.ping().await.is_ok())
258258
}
259259

260+
async fn ping(&mut self) -> Result<(), Self::Error> {
261+
self.conn.ping().await.map_err(Error::MySql)?;
262+
Ok(())
263+
}
264+
260265
async fn change_user(
261266
&mut self,
262267
user: &str,

readyset-psql/src/upstream.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ impl UpstreamDatabase for PostgreSqlUpstream {
239239
Ok(!self.client.simple_query("select 1").await?.is_empty())
240240
}
241241

242+
async fn ping(&mut self) -> Result<(), Self::Error> {
243+
self.client.simple_query("select 1").await?;
244+
Ok(())
245+
}
246+
242247
async fn change_user(
243248
&mut self,
244249
_user: &str,

0 commit comments

Comments
 (0)