Skip to content

Commit edbf0af

Browse files
committed
replicators: Add initial support for latin1 replication
Note that this only allows us to snapshot latin1 columns and read latin1 columns from the binlog. It simply causes the data to be decoded to utf8 before storage. This means that comparisons (cache lookup and ordering), retrieval, and functions operating on this data will not match upstream. This introduces a new `readyset_data::encoding::Encoding` type which maps from MySQL collation IDs to corresponding encoders. We are using `encoding_rs`, which implements the WHATWG Encoding Standard. This means it does not precisely match MySQL's behavior for all encodings (e.g. it uses a different replacement character than MySQL for invalid byte sequences for some encodings). We may want to replace it in the future, but it should be simple to swap it out within `readyset-data` to avoid updating all the callsites (of which more will soon be added). Release-Note-Core: As a first step toward supporting latin1 text columns in MySQL, we now allow replicating these columns but storing them re-encoded as utf8. Data will be retrieved as utf8, so comparisons and retrieval will not match upstream. Future work will improve support. Change-Id: I86c4fbf8da2361ebbfa8ad6224bbce8763ad8e2d Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9101 Reviewed-by: Johnathan Davis <jcd@readyset.io> Tested-by: Buildkite CI
1 parent b409385 commit edbf0af

File tree

9 files changed

+382
-13
lines changed

9 files changed

+382
-13
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ derive_more = { version = "1.0.0", features = [
125125
"display",
126126
] }
127127
diff = "0.1.13"
128+
encoding_rs = "0.8.35"
128129
enum-display-derive = "0.1.1"
129130
enum-kinds = "0.5.1"
130131
enum_dispatch = "0.3.13"

readyset-data/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ bit-vec = { workspace = true, features = ["serde"] }
1111
bytes = { workspace = true }
1212
chrono = { workspace = true, features = ["serde"] }
1313
chrono-tz = { workspace = true, features = ["serde"] }
14+
encoding_rs = { workspace = true }
1415
eui48 = { workspace = true }
1516
itertools = { workspace = true }
1617
lazy_static = { workspace = true }

readyset-data/src/encoding.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use std::fmt;
2+
3+
use encoding_rs::{UTF_8, WINDOWS_1252};
4+
use readyset_errors::ReadySetError;
5+
use readyset_errors::ReadySetResult;
6+
7+
/// Supported character encodings for string data
8+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9+
pub enum Encoding {
10+
/// UTF-8
11+
///
12+
/// Note, we don't distinguish between MySQL's default utf8mb4 and deprecated utf8mb3 (which
13+
/// only supports the BMP).
14+
Utf8,
15+
/// latin1 (CP1252/ISO-8859-1)
16+
Latin1,
17+
/// Binary data (not interpreted as text)
18+
Binary,
19+
}
20+
21+
impl fmt::Display for Encoding {
22+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23+
match self {
24+
Encoding::Utf8 => write!(f, "utf8"),
25+
Encoding::Latin1 => write!(f, "latin1"),
26+
Encoding::Binary => write!(f, "binary"),
27+
}
28+
}
29+
}
30+
31+
impl Encoding {
32+
/// For reference, see [`mysql_common::collations::CollationId`]
33+
pub fn from_mysql_collation_id(collation_id: u16) -> Self {
34+
match collation_id {
35+
// ascii, utf8mb3, utf8mb4
36+
11 | 192..=247 | 255..=323 => Self::Utf8,
37+
// latin1
38+
5 | 8 | 15 | 31 | 47 | 48 | 49 | 94 => Self::Latin1,
39+
// binary
40+
63 => Self::Binary,
41+
42+
// Default to UTF-8 for other collations
43+
_ => Self::Utf8,
44+
}
45+
}
46+
47+
fn get_encoding_rs(&self) -> Option<&'static encoding_rs::Encoding> {
48+
match self {
49+
Self::Utf8 => Some(UTF_8),
50+
Self::Latin1 => Some(WINDOWS_1252),
51+
Self::Binary => None,
52+
}
53+
}
54+
55+
/// Decode bytes from this encoding to a UTF-8 String
56+
///
57+
/// For UTF-8 encoding, this validates that the bytes are valid UTF-8.
58+
/// For Binary encoding, this returns an error as binary data can't be converted to a String.
59+
pub fn decode(&self, bytes: &[u8]) -> ReadySetResult<String> {
60+
match self {
61+
Encoding::Binary => Err(ReadySetError::DecodingError {
62+
encoding: self.to_string(),
63+
message: "Cannot decode binary data to string".to_string(),
64+
}),
65+
_ => {
66+
if let Some(encoding) = self.get_encoding_rs() {
67+
let (cow, _encoding_used, had_errors) = encoding.decode(bytes);
68+
69+
if had_errors {
70+
return Err(ReadySetError::DecodingError {
71+
encoding: self.to_string(),
72+
message: "Some characters couldn't be decoded properly".to_string(),
73+
});
74+
}
75+
76+
Ok(cow.into_owned())
77+
} else {
78+
Err(ReadySetError::DecodingError {
79+
encoding: self.to_string(),
80+
message: "Unsupported encoding".to_string(),
81+
})
82+
}
83+
}
84+
}
85+
}
86+
87+
/// Encode a UTF-8 string to bytes in this encoding
88+
pub fn encode(&self, string: &str) -> ReadySetResult<Vec<u8>> {
89+
match self {
90+
Encoding::Binary => Err(ReadySetError::EncodingError {
91+
encoding: self.to_string(),
92+
message: "Cannot encode string to binary".to_string(),
93+
}),
94+
_ => {
95+
if let Some(encoding) = self.get_encoding_rs() {
96+
let (cow, _encoding_used, had_errors) = encoding.encode(string);
97+
98+
if had_errors {
99+
return Err(ReadySetError::EncodingError {
100+
encoding: self.to_string(),
101+
message: "Some characters couldn't be encoded properly".to_string(),
102+
});
103+
}
104+
105+
Ok(cow.into_owned())
106+
} else {
107+
Err(ReadySetError::EncodingError {
108+
encoding: self.to_string(),
109+
message: "Unsupported encoding".to_string(),
110+
})
111+
}
112+
}
113+
}
114+
}
115+
}
116+
117+
#[cfg(test)]
118+
mod tests {
119+
use super::*;
120+
121+
#[test]
122+
fn test_latin1_to_utf8() {
123+
// Test with ASCII characters (valid in both Latin1 and UTF-8)
124+
let latin1_bytes = b"Hello World";
125+
let result = Encoding::Latin1.decode(latin1_bytes).unwrap();
126+
assert_eq!(result, "Hello World");
127+
128+
// Test with Latin1 characters that need conversion in UTF-8
129+
// Characters 0xA0-0xFF in Latin1 map to Unicode code points 0xA0-0xFF
130+
// For example, 0xE9 in Latin1 is 'é'
131+
let latin1_bytes = &[0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0xE9]; // "Hello é" in Latin1
132+
let result = Encoding::Latin1.decode(latin1_bytes).unwrap();
133+
assert_eq!(result, "Hello é");
134+
135+
// Test with all high-bit Latin1 characters (0x80-0xFF)
136+
let mut latin1_high_bytes = Vec::new();
137+
for b in 0x80..=0xFF {
138+
latin1_high_bytes.push(b);
139+
}
140+
141+
let result = Encoding::Latin1.decode(&latin1_high_bytes).unwrap();
142+
// Make sure all characters were decoded (should be 128 chars for bytes 0x80-0xFF)
143+
assert_eq!(result.chars().count(), 128);
144+
}
145+
146+
#[test]
147+
fn test_utf8_to_latin1() {
148+
// Test with ASCII (should work fine)
149+
let utf8_str = "Hello World";
150+
let result = Encoding::Latin1.encode(utf8_str).unwrap();
151+
assert_eq!(result, b"Hello World");
152+
153+
// Test with Latin1 characters
154+
let utf8_str = "Hello é";
155+
let result = Encoding::Latin1.encode(utf8_str).unwrap();
156+
assert_eq!(result, &[0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0xE9]);
157+
158+
// Test with characters outside Latin1 range (should fail)
159+
let utf8_str = "Hello 😊"; // Emoji is outside Latin1 range
160+
let result = Encoding::Latin1.encode(utf8_str);
161+
assert!(result.is_err());
162+
match result.unwrap_err() {
163+
ReadySetError::EncodingError { encoding, .. } => {
164+
assert_eq!(encoding, "latin1");
165+
}
166+
e => panic!("Unexpected error type: {:?}", e),
167+
}
168+
}
169+
}

readyset-data/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use uuid::Uuid;
3434
mod array;
3535
mod collation;
3636
pub mod dialect;
37+
pub mod encoding;
3738
mod r#enum;
3839
mod float;
3940
mod integer;

readyset-e2e-tests/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ edition = "2021"
88
[dev-dependencies]
99
chrono = { workspace = true }
1010
futures = { workspace = true }
11+
itertools = { workspace = true }
1112
mysql_async = { workspace = true }
1213
mysql_common = { workspace = true }
14+
pretty_assertions = { workspace = true }
1315
proptest = { workspace = true }
1416
readyset-adapter = { path = "../readyset-adapter" }
1517
readyset-client-test-helpers = { path = "../readyset-client-test-helpers", features = [
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use mysql_async::prelude::Queryable;
2+
use pretty_assertions::assert_eq;
3+
use readyset_client_test_helpers::{
4+
mysql_helpers::{self, MySQLAdapter},
5+
sleep, TestBuilder,
6+
};
7+
use std::fmt::{Display, UpperHex};
8+
use test_utils::{serial, slow};
9+
10+
/// At present, this tests that snapshotting and streaming replication of a varchar column with the
11+
/// specified character set results in the same utf8 encoded version of the data in Readyset. This
12+
/// means that we connect with relevant session variables configured for utf8 (i.e. `SET NAMES
13+
/// utf8mb4;`) so that MySQL will convert the data to utf8 before returning it to the client. This
14+
/// causes Readyset to return matching data iff it decodes the replicated (e.g. latin1) data to utf8
15+
/// before storing it.
16+
///
17+
/// In the future, this test can be extended for other boundary interfaces, e.g. also checking that
18+
/// `HEX(text)` returns the same values on both MySQL and Readyset; or that connecting to Readyset
19+
/// and requesting the original character set results in it being re-encoded from utf8 to the
20+
/// original before being returned to the client.
21+
#[cfg(test)]
22+
async fn test_encoding_replication_inner<T>(
23+
charset: &str,
24+
hex_len: usize,
25+
range: impl IntoIterator<Item = T>,
26+
) where
27+
T: UpperHex + Display,
28+
{
29+
readyset_tracing::init_test_logging();
30+
31+
println!("starting");
32+
33+
let upstream_opts = mysql_helpers::upstream_config().db_name(Some("encoding_test"));
34+
mysql_helpers::recreate_database("encoding_test").await;
35+
36+
let mut upstream_conn = mysql_async::Conn::new(upstream_opts).await.unwrap();
37+
38+
let create_snapshot_table = format!(
39+
r#"
40+
SET NAMES utf8mb4;
41+
DROP TABLE IF EXISTS encoding_snapshot;
42+
CREATE TABLE encoding_snapshot (
43+
id INT NOT NULL PRIMARY KEY,
44+
text VARCHAR(255) CHARACTER SET {}
45+
);
46+
"#,
47+
charset
48+
);
49+
upstream_conn
50+
.query_drop(create_snapshot_table)
51+
.await
52+
.unwrap();
53+
54+
println!("created table");
55+
56+
// Generate all latin1 characters (0x00-0xFF); characters 128-255 aren't valid UTF-8
57+
let insert_values: String = range
58+
.into_iter()
59+
.map(|i| format!("({i}, UNHEX('{i:0width$X}'))", width = hex_len))
60+
.collect::<Vec<String>>()
61+
.join(",");
62+
upstream_conn
63+
.query_drop(format!(
64+
"INSERT INTO encoding_snapshot (id, text) VALUES {insert_values}"
65+
))
66+
.await
67+
.unwrap();
68+
69+
println!("inserted data");
70+
71+
// Verify the data was inserted correctly
72+
let my_rows: Vec<(i64, Vec<u8>)> = upstream_conn
73+
.query("SELECT id, text FROM encoding_snapshot ORDER BY id")
74+
.await
75+
.unwrap();
76+
dbg!(&my_rows);
77+
78+
println!("starting readyset");
79+
80+
// Test snapshot replication
81+
let (rs_opts, _handle, shutdown_tx) = TestBuilder::default()
82+
.recreate_database(false)
83+
.fallback_db("encoding_test".to_string())
84+
.build::<MySQLAdapter>()
85+
.await;
86+
87+
println!("started readyset");
88+
89+
sleep().await;
90+
91+
println!("slept");
92+
93+
// Query the snapshotted table to verify all 255 characters were replicated correctly
94+
let mut rs_conn = mysql_async::Conn::new(mysql_async::OptsBuilder::from_opts(rs_opts))
95+
.await
96+
.unwrap();
97+
98+
println!("connected");
99+
100+
let rs_snapshot_rows: Vec<(i64, Vec<u8>)> = rs_conn
101+
.query("SELECT id, text FROM encoding_snapshot ORDER BY id")
102+
.await
103+
.unwrap();
104+
105+
dbg!(&rs_snapshot_rows);
106+
107+
assert_eq!(my_rows, rs_snapshot_rows);
108+
109+
// Test streaming replication
110+
let create_streaming_table = format!(
111+
r#"
112+
DROP TABLE IF EXISTS encoding_streaming;
113+
CREATE TABLE encoding_streaming (
114+
id INT NOT NULL PRIMARY KEY,
115+
text VARCHAR(255) CHARACTER SET {}
116+
);
117+
"#,
118+
charset
119+
);
120+
upstream_conn
121+
.query_drop(create_streaming_table)
122+
.await
123+
.unwrap();
124+
125+
println!("created streaming table");
126+
127+
upstream_conn
128+
.query_drop(format!(
129+
"INSERT INTO encoding_streaming (id, text) VALUES {insert_values}"
130+
))
131+
.await
132+
.unwrap();
133+
134+
println!("inserted streaming data");
135+
136+
sleep().await;
137+
138+
println!("slept");
139+
140+
let rs_streaming_rows: Vec<(i64, Vec<u8>)> = rs_conn
141+
.query("SELECT id, text FROM encoding_streaming ORDER BY id")
142+
.await
143+
.unwrap();
144+
145+
dbg!(&rs_streaming_rows);
146+
147+
assert_eq!(my_rows, rs_streaming_rows);
148+
149+
println!("shutting down");
150+
151+
shutdown_tx.shutdown().await;
152+
153+
println!("shutdown complete");
154+
}
155+
156+
macro_rules! test_encoding_replication {
157+
($name:ident, $charset:expr, $hex_len:expr, $range:expr) => {
158+
#[tokio::test]
159+
#[serial(mysql)]
160+
#[slow]
161+
async fn $name() {
162+
test_encoding_replication_inner($charset, $hex_len, $range).await;
163+
}
164+
};
165+
}
166+
167+
test_encoding_replication!(ascii, "ascii", 2, 0..=127);
168+
test_encoding_replication!(test_latin1, "latin1", 2, 0..=255);
169+
test_encoding_replication!(test_utf8mb4, "utf8mb4", 2, 0..=127);
170+
test_encoding_replication!(test_utf8mb3, "utf8mb3", 2, 0..=127);

0 commit comments

Comments
 (0)