Skip to content

Commit dc97c03

Browse files
committed
readyset-mysql: Encode results according to @@character_set_results
This is accomplished by plumbing the character set information into the connector and using it when writing out results. All text/string types are encoded in the target character set before being written out to the MySQL protocol connection. Fixes: REA-5652 Release-Note-Core: Respect `@@character_set_results` when returning latin1 and cp850 text columns in MySQL. `SET NAMES` and `\C` also work, but do not yet affect collation in lookups. Change-Id: Ia0eb481f41dbfc0465bcf0d4ea10c6ac6e7d6d2e Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9376 Tested-by: Buildkite CI Reviewed-by: Marcelo Altmann <marcelo@readyset.io>
1 parent 49876ac commit dc97c03

File tree

4 files changed

+120
-50
lines changed

4 files changed

+120
-50
lines changed

readyset-adapter/src/backend.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ where
537537
/// Remote socket address of a connected client
538538
client_addr: SocketAddr,
539539
/// ReadySet connector used for reads, and writes when no upstream DB is present
540-
noria: NoriaConnector,
540+
pub noria: NoriaConnector,
541541
/// Optional connector to the upstream DB. Used for fallback reads and all writes if it exists
542542
upstream: Option<DB>,
543543
/// Map from username to password for all users allowed to connect to the db
@@ -2833,6 +2833,7 @@ where
28332833
}
28342834
if let Some(encoding) = set_results_encoding {
28352835
trace!(?encoding, "Setting results_encoding");
2836+
noria.set_results_encoding(encoding);
28362837
}
28372838

28382839
Ok(())

readyset-adapter/src/backend/noria_connector.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use readyset_client::{
1515
ColumnSchema, GraphvizOptions, ReadQuery, ReaderAddress, ReaderHandle, ReadySetHandle,
1616
SchemaType, Table, TableOperation, View, ViewCreateRequest, ViewQuery,
1717
};
18+
use readyset_data::encoding::Encoding;
1819
use readyset_data::{Collation, DfType, DfValue, Dialect};
1920
use readyset_errors::{
2021
internal_err, invariant_eq, table_err, unsupported, unsupported_err, ReadySetError,
@@ -300,6 +301,10 @@ pub struct NoriaConnector {
300301
/// supports a multi-element schema search path, the concept of "currently connected database"
301302
/// in MySQL can be thought of as a schema search path that only has one element.
302303
schema_search_path: Vec<SqlIdentifier>,
304+
305+
/// The encoding in which to return text results. Corresponds to `character_set_results` in
306+
/// MySQL and `SET NAMES` in both MySQL and Postgres.
307+
results_encoding: Encoding,
303308
}
304309

305310
mod request_handler {
@@ -417,6 +422,7 @@ impl NoriaConnector {
417422
dialect,
418423
parse_dialect,
419424
schema_search_path,
425+
results_encoding: Encoding::Utf8,
420426
}
421427
}
422428

@@ -957,6 +963,16 @@ impl NoriaConnector {
957963
self.schema_search_path.as_ref()
958964
}
959965

966+
/// Set the encoding for result sets
967+
pub fn set_results_encoding(&mut self, encoding: Encoding) {
968+
self.results_encoding = encoding;
969+
}
970+
971+
/// Returns the encoding for result sets
972+
pub fn results_encoding(&self) -> Encoding {
973+
self.results_encoding
974+
}
975+
960976
pub(crate) async fn resnapshot_table(
961977
&mut self,
962978
table: &mut Relation,

readyset-e2e-tests/tests/encoding_mysql.rs

Lines changed: 76 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ macro_rules! check_rows {
1818
}
1919

2020
const CHUNK_SIZE: usize = 1000;
21+
const CHARACTER_SETS: [&str; 4] = ["latin1", "cp850", "utf8mb3", "utf8mb4"];
2122

2223
/// Tests snapshotting replication of a varchar column with the specified character set.
2324
/// Verifies that the same utf8 encoded version of the data is stored in Readyset.
@@ -142,30 +143,46 @@ where
142143
}
143144
);
144145

145-
// Verify this chunk after updates
146-
let mut my_chunk: Vec<(i64, String, Vec<u8>, i32)> = upstream_conn
147-
.exec(
148-
"SELECT id, hex, text, counter FROM encoding_table WHERE id >= ? AND id <= ?",
149-
(first_id, last_id),
150-
)
151-
.await
152-
.unwrap();
153-
my_chunk.sort();
146+
// Verify this chunk after updates in all supported character sets
147+
for character_set in CHARACTER_SETS {
148+
upstream_conn
149+
.query_drop(format!(
150+
"SET @@session.character_set_results = {character_set}"
151+
))
152+
.await
153+
.unwrap();
154154

155-
let mut rs_chunk: Vec<(i64, String, Vec<u8>, i32)> = rs_conn
156-
.exec(
157-
"SELECT id, hex, text, counter FROM encoding_table WHERE id >= ? AND id <= ?",
158-
(first_id, last_id),
159-
)
160-
.await
161-
.unwrap();
162-
rs_chunk.sort();
155+
let mut my_chunk: Vec<(i64, String, Vec<u8>, i32)> = upstream_conn
156+
.exec(
157+
"SELECT id, hex, text, counter FROM encoding_table WHERE id >= ? AND id <= ?",
158+
(first_id, last_id),
159+
)
160+
.await
161+
.unwrap();
162+
my_chunk.sort();
163+
164+
rs_conn
165+
.query_drop(format!(
166+
"SET @@session.character_set_results = {character_set}"
167+
))
168+
.await
169+
.unwrap();
170+
171+
let mut rs_chunk: Vec<(i64, String, Vec<u8>, i32)> = rs_conn
172+
.exec(
173+
"SELECT id, hex, text, counter FROM encoding_table WHERE id >= ? AND id <= ?",
174+
(first_id, last_id),
175+
)
176+
.await
177+
.unwrap();
178+
rs_chunk.sort();
163179

164-
check_rows!(
165-
my_chunk,
166-
rs_chunk,
167-
"mysql (left) differed from readyset (right) after updates for snapshot update chunk {first_id}-{last_id}",
168-
);
180+
check_rows!(
181+
my_chunk,
182+
rs_chunk,
183+
"mysql (left) differed from readyset (right) after updates for snapshot update chunk {first_id}-{last_id} with character set {character_set}",
184+
);
185+
}
169186
}
170187

171188
shutdown_tx.shutdown().await;
@@ -247,27 +264,45 @@ where
247264
count == chunk.len()
248265
});
249266

250-
let my_streaming_rows_chunk: Vec<(i64, String, Vec<u8>)> = upstream_conn
251-
.exec(
252-
"SELECT id, hex, text FROM encoding_table WHERE id >= ? AND id <= ? ORDER BY id",
253-
(first_id, last_id),
254-
)
255-
.await
256-
.unwrap();
267+
for character_set in CHARACTER_SETS {
268+
upstream_conn
269+
.query_drop(format!(
270+
"SET @@session.character_set_results = {character_set}"
271+
))
272+
.await
273+
.unwrap();
257274

258-
let rs_streaming_rows_chunk: Vec<(i64, String, Vec<u8>)> = rs_conn
259-
.exec(
260-
"SELECT id, hex, text FROM encoding_table WHERE id >= ? AND id <= ? ORDER BY id",
261-
(first_id, last_id),
262-
)
263-
.await
264-
.unwrap();
275+
let mut my_chunk: Vec<(i64, String, Vec<u8>)> = upstream_conn
276+
.exec(
277+
"SELECT id, hex, text FROM encoding_table WHERE id >= ? AND id <= ?",
278+
(first_id, last_id),
279+
)
280+
.await
281+
.unwrap();
282+
my_chunk.sort();
265283

266-
check_rows!(
267-
my_streaming_rows_chunk,
268-
rs_streaming_rows_chunk,
269-
"mysql (left) differed from readyset (right) for streaming replication chunk {first_id}-{last_id}"
270-
);
284+
rs_conn
285+
.query_drop(format!(
286+
"SET @@session.character_set_results = {character_set}"
287+
))
288+
.await
289+
.unwrap();
290+
291+
let mut rs_chunk: Vec<(i64, String, Vec<u8>)> = rs_conn
292+
.exec(
293+
"SELECT id, hex, text FROM encoding_table WHERE id >= ? AND id <= ?",
294+
(first_id, last_id),
295+
)
296+
.await
297+
.unwrap();
298+
rs_chunk.sort();
299+
300+
check_rows!(
301+
my_chunk,
302+
rs_chunk,
303+
"mysql (left) differed from readyset (right) for streaming replication chunk {first_id}-{last_id} with character set {character_set}",
304+
);
305+
}
271306
}
272307

273308
shutdown_tx.shutdown().await;

readyset-mysql/src/backend.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use readyset_adapter::backend::{
2121
};
2222
use readyset_adapter::upstream_database::LazyUpstream;
2323
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
24+
use readyset_data::encoding::Encoding;
2425
use readyset_data::{DfType, DfValue, DfValueKind};
2526
use readyset_errors::{internal, ReadySetError};
2627
use readyset_util::redacted::{RedactedString, Sensitive};
@@ -58,6 +59,7 @@ async fn write_column<S: AsyncRead + AsyncWrite + Unpin>(
5859
c: &DfValue,
5960
cs: &mysql_srv::Column,
6061
ty: &DfType,
62+
encoding: Encoding,
6163
) -> Result<(), Error> {
6264
let conv_error = || ReadySetError::DfValueConversionError {
6365
src_type: format!("{:?}", DfValueKind::from(c)),
@@ -97,7 +99,8 @@ async fn write_column<S: AsyncRead + AsyncWrite + Unpin>(
9799
} else {
98100
t.as_str()
99101
};
100-
rw.write_col(s)
102+
let b = encoding.encode(s)?;
103+
rw.write_col(&*b)
101104
}
102105
}
103106
DfValue::TinyText(ref t) => {
@@ -109,7 +112,8 @@ async fn write_column<S: AsyncRead + AsyncWrite + Unpin>(
109112
} else {
110113
t.as_str()
111114
};
112-
rw.write_col(s)
115+
let b = encoding.encode(s)?;
116+
rw.write_col(&*b)
113117
}
114118
}
115119
ref dt @ (DfValue::Float(..) | DfValue::Double(..)) => match cs.coltype {
@@ -387,6 +391,7 @@ macro_rules! handle_error {
387391
async fn handle_readyset_result<S>(
388392
result: noria_connector::QueryResult<'_>,
389393
writer: QueryResultWriter<'_, S>,
394+
results_encoding: Encoding,
390395
) -> io::Result<()>
391396
where
392397
S: AsyncRead + AsyncWrite + Unpin,
@@ -440,7 +445,7 @@ where
440445
None => &DfType::Unknown,
441446
};
442447

443-
if let Err(e) = write_column(&mut rw, val, c, ty).await {
448+
if let Err(e) = write_column(&mut rw, val, c, ty, results_encoding).await {
444449
return handle_column_write_err(e, rw).await;
445450
}
446451
}
@@ -505,12 +510,15 @@ where
505510
async fn handle_execute_result<S>(
506511
result: Result<QueryResult<'_, LazyUpstream<MySqlUpstream>>, Error>,
507512
writer: QueryResultWriter<'_, S>,
513+
results_encoding: Encoding,
508514
) -> io::Result<()>
509515
where
510516
S: AsyncRead + AsyncWrite + Unpin,
511517
{
512518
match result {
513-
Ok(QueryResult::Noria(result)) => handle_readyset_result(result, writer).await,
519+
Ok(QueryResult::Noria(result)) => {
520+
handle_readyset_result(result, writer, results_encoding).await
521+
}
514522
Ok(QueryResult::Upstream(result)) => handle_upstream_result(result, writer).await,
515523
Ok(QueryResult::UpstreamBufferedInMemory(..)) => handle_error!(
516524
Error::ReadySet(readyset_errors::unsupported_err!(
@@ -531,13 +539,16 @@ where
531539
async fn handle_query_result<S>(
532540
result: Result<QueryResult<'_, LazyUpstream<MySqlUpstream>>, Error>,
533541
writer: QueryResultWriter<'_, S>,
542+
results_encoding: Encoding,
534543
) -> QueryResultsResponse
535544
where
536545
S: AsyncRead + AsyncWrite + Unpin,
537546
{
538547
match result {
539548
Ok(QueryResult::Parser(command)) => QueryResultsResponse::Command(command),
540-
res => QueryResultsResponse::IoResult(handle_execute_result(res, writer).await),
549+
res => QueryResultsResponse::IoResult(
550+
handle_execute_result(res, writer, results_encoding).await,
551+
),
541552
}
542553
}
543554

@@ -680,6 +691,8 @@ where
680691
info!(target: "client_statement", "Execute: {{id: {id}, params: {:?}}}", value_params)
681692
}
682693

694+
let results_encoding = self.noria.noria.results_encoding();
695+
683696
match self.execute(id, &value_params, ()).await {
684697
Ok(QueryResult::Noria(noria_connector::QueryResult::Select { mut rows, schema })) => {
685698
let CachedSchema {
@@ -714,15 +727,17 @@ where
714727
while let Some(row) = rows.next() {
715728
for (c, ty, val) in izip!(mysql_schema.iter(), column_types.iter(), row.iter())
716729
{
717-
if let Err(e) = write_column(&mut rw, val, c, ty).await {
730+
if let Err(e) = write_column(&mut rw, val, c, ty, results_encoding).await {
718731
return handle_column_write_err(e, rw).await;
719732
};
720733
}
721734
rw.end_row().await?;
722735
}
723736
rw.finish().await
724737
}
725-
execute_result => handle_execute_result(execute_result, results).await,
738+
execute_result => {
739+
handle_execute_result(execute_result, results, results_encoding).await
740+
}
726741
}
727742
}
728743

@@ -744,6 +759,8 @@ where
744759
"charset" => charset.to_string(),
745760
)
746761
.increment(1);
762+
let encoding = readyset_data::encoding::Encoding::from_mysql_collation_id(charset);
763+
self.noria.noria.set_results_encoding(encoding);
747764
Ok(())
748765
}
749766

@@ -812,8 +829,9 @@ where
812829
info!(target: "client_statement", "Query: {query}");
813830
}
814831

832+
let results_encoding = self.noria.noria.results_encoding();
815833
let query_result = self.query(query).await;
816-
handle_query_result(query_result, results).await
834+
handle_query_result(query_result, results, results_encoding).await
817835
}
818836

819837
fn password_for_username(&self, username: &str) -> Option<Vec<u8>> {

0 commit comments

Comments
 (0)