Skip to content

Commit 58eb5c0

Browse files
committed
psql-srv: Appropriately return EmptyQueryResponse message
When a PostgreSQL client sends an empty query over the wire protocol, the appropriate response is to send an EmptyQueryResponse message in reply. This is different from the CommandComplete message, which is the standard reply for regular queries. Our usage of our fork of tokio_postgres elides a copy during standard query processing by directly accessing the CommandComplete message buffer, which is populated by tokio_postgres when Readyset runs queries on an upstream PostgreSQL database. Unfortunately, tokio_postgres returns a CommandComplete message for an empty query run on the upstream, even though the upstream replied with an EmptyQueryResponse message. Fortunately, however, we don't have to forward all CommandComplete messages we process from tokio_postgres back to our client. Now, we selectively detect when tokio_postgres produces a CommandComplete message that is semantically equivalent to an EmptyQueryResponse, and then we send back an EmptyQueryResponse message to our client. This patch brings our behavior a little more into alignment with the PostgreSQL wire protocol, allowing us to be a little more compatible with applications that expect to receive an EmptyQueryResponse in reply to an empty query. Also add some flushes to related tests, as I experienced some flakiness there with the stream not being processed before the test functions exited. Fixes: REA-5385 Release-Note-Core: Improve PostgreSQL wire protocol compatibility by appropriately responding to an empty query with an EmptyQueryResponse message. Change-Id: Ib24aa57f9f38663946feefee52e6c862d1f9daf3 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8898 Reviewed-by: Jason Brown <jason.b@readyset.io> Tested-by: Buildkite CI
1 parent 9c0e0b4 commit 58eb5c0

File tree

3 files changed

+37
-31
lines changed

3 files changed

+37
-31
lines changed

psql-srv/src/codec/encoder.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const ID_BIND_COMPLETE: u8 = b'2';
2121
const ID_CLOSE_COMPLETE: u8 = b'3';
2222
const ID_COMMAND_COMPLETE: u8 = b'C';
2323
const ID_DATA_ROW: u8 = b'D';
24+
const ID_EMPTY_QUERY_RESPONSE: u8 = b'I';
2425
const ID_ERROR_RESPONSE: u8 = b'E';
2526
const ID_PARAMETER_DESCRIPTION: u8 = b't';
2627
const ID_PARAMETER_STATUS: u8 = b'S';
@@ -107,7 +108,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
107108
put_i32(LENGTH_PLACEHOLDER, dst);
108109
put_i32(AUTHENTICATION_CLEARTEXT_REQUIRED, dst);
109110
}
110-
111111
AuthenticationSasl {
112112
allow_channel_binding,
113113
} => {
@@ -120,37 +120,31 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
120120
put_str(SCRAM_SHA_256_AUTHENTICATION_METHOD, dst);
121121
put_u8(NUL_BYTE, dst);
122122
}
123-
124123
AuthenticationSaslContinue { sasl_data } => {
125124
put_u8(ID_AUTHENTICATION_REQUEST, dst);
126125
put_i32(LENGTH_PLACEHOLDER, dst);
127126
put_i32(AUTHENTICATION_SASL_CHALLENGE, dst);
128127
dst.extend_from_slice(&sasl_data);
129128
}
130-
131129
AuthenticationSaslFinal { sasl_data } => {
132130
put_u8(ID_AUTHENTICATION_REQUEST, dst);
133131
put_i32(LENGTH_PLACEHOLDER, dst);
134132
put_i32(AUTHENTICATION_SASL_COMPLETED, dst);
135133
dst.extend_from_slice(&sasl_data);
136134
}
137-
138135
AuthenticationOk => {
139136
put_u8(ID_AUTHENTICATION_REQUEST, dst);
140137
put_i32(LENGTH_PLACEHOLDER, dst);
141138
put_i32(AUTHENTICATION_OK_SUCCESS, dst);
142139
}
143-
144140
BindComplete => {
145141
put_u8(ID_BIND_COMPLETE, dst);
146142
put_i32(LENGTH_PLACEHOLDER, dst);
147143
}
148-
149144
CloseComplete => {
150145
put_u8(ID_CLOSE_COMPLETE, dst);
151146
put_i32(LENGTH_PLACEHOLDER, dst);
152147
}
153-
154148
CommandComplete { tag } => {
155149
put_u8(ID_COMMAND_COMPLETE, dst);
156150
put_i32(LENGTH_PLACEHOLDER, dst);
@@ -192,14 +186,12 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
192186
dst,
193187
);
194188
}
195-
196189
PassThroughCommandComplete(tag) => {
197190
put_u8(ID_COMMAND_COMPLETE, dst);
198191
let tag_str = std::str::from_utf8(&tag)?;
199192
put_i32(tag_str.len() as i32 + 4, dst);
200193
put_str(tag_str, dst);
201194
}
202-
203195
DataRow {
204196
values,
205197
explicit_transfer_formats,
@@ -229,12 +221,10 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
229221
// Update the value count field to match the number of values just serialized.
230222
set_i16(i16::try_from(n_values)?, dst, start_ofs + 5)?;
231223
}
232-
233224
NoData => {
234225
put_u8(ID_NO_DATA, dst);
235226
put_i32(LENGTH_PLACEHOLDER, dst);
236227
}
237-
238228
PassThroughSimpleRow(row) => {
239229
put_u8(ID_DATA_ROW, dst);
240230
// Put the length of this row in bytes. The length is equal to the length of the data,
@@ -246,7 +236,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
246236
// Put the data
247237
put_slice(row.body().buffer(), dst);
248238
}
249-
250239
PassThroughDataRow(row) => {
251240
// Note that this body is the same as that of the match arm for PassThroughSimpleRow,
252241
// but we need to duplicate it since `row` is a different type in each variant - they
@@ -261,7 +250,10 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
261250
// Put the data
262251
put_slice(row.body().buffer(), dst);
263252
}
264-
253+
EmptyQueryResponse => {
254+
put_u8(ID_EMPTY_QUERY_RESPONSE, dst);
255+
put_i32(4, dst);
256+
}
265257
ErrorResponse {
266258
severity,
267259
sqlstate,
@@ -354,7 +346,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
354346
}
355347
put_u8(ERROR_RESPONSE_TERMINATOR, dst);
356348
}
357-
358349
ParameterDescription {
359350
parameter_data_types,
360351
} => {
@@ -365,7 +356,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
365356
put_type(t, dst)?;
366357
}
367358
}
368-
369359
ParameterStatus {
370360
parameter_name,
371361
parameter_value,
@@ -375,18 +365,15 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
375365
put_str(&parameter_name, dst);
376366
put_str(&parameter_value, dst);
377367
}
378-
379368
ParseComplete => {
380369
put_u8(ID_PARSE_COMPLETE, dst);
381370
put_i32(LENGTH_PLACEHOLDER, dst);
382371
}
383-
384372
ReadyForQuery { status } => {
385373
put_u8(ID_READY_FOR_QUERY, dst);
386374
put_i32(LENGTH_PLACEHOLDER, dst);
387375
put_u8(status, dst);
388376
}
389-
390377
RowDescription { field_descriptions } => {
391378
put_u8(ID_ROW_DESCRIPTION, dst);
392379
put_i32(LENGTH_PLACEHOLDER, dst);
@@ -401,7 +388,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
401388
put_format(d.transfer_format, dst);
402389
}
403390
}
404-
405391
PassThroughRowDescription(field_descriptions) => {
406392
put_u8(ID_ROW_DESCRIPTION, dst);
407393
put_i32(LENGTH_PLACEHOLDER, dst);
@@ -416,7 +402,6 @@ fn encode(message: BackendMessage, dst: &mut BytesMut) -> Result<(), Error> {
416402
put_i16(d.format(), dst);
417403
}
418404
}
419-
420405
#[allow(clippy::unreachable)]
421406
SSLResponse { .. } => {
422407
unreachable!("SSLResponse is handled as a special case above.")

psql-srv/src/message/backend.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub enum BackendMessage {
5656
values: Vec<PsqlValue>,
5757
explicit_transfer_formats: Option<Arc<Vec<TransferFormat>>>,
5858
},
59+
EmptyQueryResponse,
5960
ErrorResponse {
6061
severity: ErrorSeverity,
6162
sqlstate: SqlState,

psql-srv/src/response.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ where
7171
// number of associated rows in the stream itself. Otherwise, we keep track of
7272
// them and send our own command complete message for our single result set.
7373
let mut n_rows = 0;
74-
let mut sent_command_complete = false;
74+
let mut sent_response = false;
7575
// We send a row description for each batch of rows, then the rows themselves, then
7676
// a command complete
7777
let mut sent_row_description = false;
@@ -113,6 +113,19 @@ where
113113
sink.feed(BackendMessage::PassThroughSimpleRow(r)).await?;
114114
}
115115
SimpleQueryMessage::CommandComplete(c) => {
116+
if c.rows == 0 && c.fields.is_none() && c.tag.is_empty() {
117+
// XXX JCD tokio_postgres does not provide the expected
118+
// EmptyQueryResponse message when processing empty
119+
// queries and instead gives a CommandComplete message with
120+
// no rows, no fields, and an empty tag. I believe
121+
// PostgreSQL always supplies a tag for a valid
122+
// CommandComplete message. Now that we've detected an
123+
// empty query response, send the appropriate message to
124+
// our client.
125+
sink.feed(BackendMessage::EmptyQueryResponse).await?;
126+
sent_response = true;
127+
continue;
128+
}
116129
if let Some(fields) = &c.fields {
117130
sink.feed(BackendMessage::PassThroughRowDescription(
118131
fields.to_vec(),
@@ -127,7 +140,7 @@ where
127140
// We may have sent a row description, but it was for this
128141
// batch, so reset it for the next batch
129142
sent_row_description = false;
130-
sent_command_complete = true;
143+
sent_response = true;
131144
}
132145
_ => {
133146
unimplemented!("Unhandled variant of SimpleQueryMessage added")
@@ -137,7 +150,7 @@ where
137150
}
138151
}
139152

140-
if !sent_command_complete {
153+
if !sent_response {
141154
trace!("Sending command complete: {:?}", n_rows);
142155
sink.feed(BackendMessage::CommandComplete {
143156
tag: CommandCompleteTag::Select(n_rows),
@@ -161,6 +174,7 @@ mod tests {
161174
use std::vec;
162175

163176
use smallvec::smallvec;
177+
use tokio_postgres::CommandCompleteContents;
164178
use tokio_test::block_on;
165179

166180
use super::*;
@@ -180,6 +194,7 @@ mod tests {
180194
});
181195
futures::pin_mut!(validating_sink);
182196
block_on(response.write(&mut validating_sink)).unwrap();
197+
block_on(validating_sink.flush()).unwrap();
183198
}
184199

185200
#[test]
@@ -197,6 +212,7 @@ mod tests {
197212
});
198213
futures::pin_mut!(validating_sink);
199214
block_on(response.write(&mut validating_sink)).unwrap();
215+
block_on(validating_sink.flush()).unwrap();
200216
}
201217

202218
#[test]
@@ -218,25 +234,27 @@ mod tests {
218234
});
219235
futures::pin_mut!(validating_sink);
220236
block_on(response.write(&mut validating_sink)).unwrap();
237+
block_on(validating_sink.flush()).unwrap();
221238
}
222239

223240
#[test]
224-
fn write_select_simple_empty() {
241+
fn write_empty_query() {
225242
let response = TestResponse::Stream {
226243
header: None,
227-
resultset: stream::iter(vec![]),
244+
resultset: stream::iter(vec![Ok(PsqlSrvRow::SimpleQueryMessage(
245+
SimpleQueryMessage::CommandComplete(CommandCompleteContents {
246+
fields: None,
247+
rows: 0,
248+
tag: "".into(),
249+
}),
250+
))]),
228251
result_transfer_formats: None,
229252
trailer: None,
230253
};
231254
let validating_sink = sink::unfold(0, |i, m: BackendMessage| {
232255
async move {
233256
match i {
234-
0 => assert!(matches!(
235-
m,
236-
BackendMessage::CommandComplete {
237-
tag: CommandCompleteTag::Select(0)
238-
}
239-
)),
257+
0 => assert!(matches!(m, BackendMessage::EmptyQueryResponse)),
240258
// No further messages are expected.
241259
_ => panic!(),
242260
}
@@ -245,6 +263,7 @@ mod tests {
245263
});
246264
futures::pin_mut!(validating_sink);
247265
block_on(response.write(&mut validating_sink)).unwrap();
266+
block_on(validating_sink.flush()).unwrap();
248267
}
249268

250269
#[test]
@@ -326,5 +345,6 @@ mod tests {
326345
});
327346
futures::pin_mut!(validating_sink);
328347
block_on(response.write(&mut validating_sink)).unwrap();
348+
block_on(validating_sink.flush()).unwrap();
329349
}
330350
}

0 commit comments

Comments
 (0)