Skip to content

Commit 2d185a6

Browse files
replicators: Fix DATETIME microsecond precision
This commit fixes the microsecond precision of DATETIME columns in MySQL by setting the correct fractional seconds precision in the TimestampTZ object. This fixes a discrepancy between the precision of the DATETIME in Readyset and MySQL. Fixes: REA-4469 Fixes: #1309 Release-Note-Core: Fixes the microsecond precision of DATETIME columns in MySQL that sometimes were not being correctly represented in Readyset. Change-Id: Ifc3bb58b16a87423a0e4079dffa34ed28fafaa35 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7656 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent 891998c commit 2d185a6

File tree

5 files changed

+264
-39
lines changed

5 files changed

+264
-39
lines changed

mysql-srv/src/value/encode.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ impl ToMySqlValue for TimestampTz {
523523
// are all 0, length is 4 and no other field is sent.
524524
// if microseconds is 0, length is 7 and micro_seconds is not sent.
525525
// otherwise the length is 11
526+
// TODO(marce): Currently TimestampTZ will produce NULL for zero/invalid dates.
526527
let us = ts.nanosecond() / 1_000;
527528
let packet_len = if us != 0 {
528529
11

readyset-data/src/timestamp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl TimestampTz {
143143

144144
/// Set the desired precision when displaying subseconds.
145145
#[inline(always)]
146-
fn set_subsecond_digits(&mut self, count: u8) {
146+
pub fn set_subsecond_digits(&mut self, count: u8) {
147147
self.extra[2] = ((count << TimestampTz::SUBSECOND_DIGITS_BITS.trailing_zeros())
148148
& TimestampTz::SUBSECOND_DIGITS_BITS)
149149
| (self.extra[2] & !TimestampTz::SUBSECOND_DIGITS_BITS);

readyset-mysql/tests/integration.rs

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use std::env;
2+
use std::panic::AssertUnwindSafe;
13
use std::sync::Arc;
24
use std::time::Duration;
35

46
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
7+
use mysql::Row;
58
use mysql_async::prelude::Queryable;
69
use mysql_async::OptsBuilder;
710
use readyset_adapter::backend::noria_connector::ReadBehavior;
@@ -15,11 +18,13 @@ use readyset_client_test_helpers::{sleep, TestBuilder};
1518
use readyset_errors::ReadySetError;
1619
use readyset_server::Handle;
1720
use readyset_telemetry_reporter::{TelemetryEvent, TelemetryInitializer, TelemetryReporter};
21+
use readyset_util::eventually;
1822
use readyset_util::shutdown::ShutdownSender;
1923
use regex::Regex;
24+
use serial_test::serial;
2025
use test_utils::skip_flaky_finder;
2126

22-
async fn setup_with_mysql() -> (mysql_async::Opts, Handle, ShutdownSender) {
27+
async fn setup_with_mysql(recreate_db: bool) -> (mysql_async::Opts, Handle, ShutdownSender) {
2328
readyset_tracing::init_test_logging();
2429
let mut users = std::collections::HashMap::new();
2530
users.insert("root".to_string(), "noria".to_string());
@@ -30,6 +35,7 @@ async fn setup_with_mysql() -> (mysql_async::Opts, Handle, ShutdownSender) {
3035
.users(users),
3136
)
3237
.fallback(true)
38+
.recreate_database(recreate_db)
3339
.build::<MySQLAdapter>()
3440
.await
3541
}
@@ -51,6 +57,14 @@ async fn setup_telemetry() -> (TelemetryReporter, mysql_async::Opts, Handle, Shu
5157
(reporter, opts, handle, shutdown_tx)
5258
}
5359

60+
fn mysql_url() -> String {
61+
format!(
62+
"mysql://root:noria@{}:{}/noria",
63+
env::var("MYSQL_HOST").unwrap_or_else(|_| "127.0.0.1".into()),
64+
env::var("MYSQL_TCP_PORT").unwrap_or_else(|_| "3306".into()),
65+
)
66+
}
67+
5468
#[tokio::test(flavor = "multi_thread")]
5569
async fn duplicate_join_key() {
5670
// This used to trigger a bug involving weak indexes. See issue #179 for more info.
@@ -1842,8 +1856,9 @@ async fn show_caches_with_always() {
18421856
}
18431857

18441858
#[tokio::test(flavor = "multi_thread")]
1859+
#[serial]
18451860
async fn show_readyset_status() {
1846-
let (opts, _handle, shutdown_tx) = setup_with_mysql().await;
1861+
let (opts, _handle, shutdown_tx) = setup_with_mysql(true).await;
18471862
let mut conn = mysql_async::Conn::new(opts).await.unwrap();
18481863
let mut ret: Vec<mysql::Row> = conn.query("SHOW READYSET STATUS;").await.unwrap();
18491864

@@ -2107,3 +2122,154 @@ async fn test_proxied_queries_telemetry() {
21072122

21082123
shutdown_tx.shutdown().await;
21092124
}
2125+
2126+
#[tokio::test(flavor = "multi_thread")]
2127+
#[serial]
2128+
async fn datetime_nanosecond_precision_text_protocol() {
2129+
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2130+
direct_mysql.query_drop("DROP TABLE IF EXISTS dt_nano_text_protocol CASCADE;
2131+
CREATE TABLE dt_nano_text_protocol (col1 DATETIME, col2 DATETIME(2), col3 DATETIME(4), col4 DATETIME(6));
2132+
INSERT INTO dt_nano_text_protocol VALUES ('2021-01-01 00:00:00', '2021-01-01 00:00:00.00', '2021-01-01 00:00:00.0000', '2021-01-01 00:00:00.000000');
2133+
INSERT INTO dt_nano_text_protocol VALUES ('2021-01-01 00:00:00', '2021-01-01 00:00:00.01', '2021-01-01 00:00:00.0001', '2021-01-01 00:00:00.000001');")
2134+
.await
2135+
.unwrap();
2136+
let (opts, _handle, shutdown_tx) = setup_with_mysql(false).await;
2137+
let mut conn = mysql_async::Conn::new(opts).await.unwrap();
2138+
sleep().await;
2139+
2140+
conn.query_drop("CREATE CACHE FROM SELECT * FROM dt_nano_text_protocol")
2141+
.await
2142+
.unwrap();
2143+
sleep().await;
2144+
2145+
let my_rows: Vec<(String, String, String, String)> = direct_mysql
2146+
.query("SELECT * FROM dt_nano_text_protocol")
2147+
.await
2148+
.unwrap();
2149+
let rs_rows: Vec<(String, String, String, String)> = conn
2150+
.query("SELECT * FROM dt_nano_text_protocol")
2151+
.await
2152+
.unwrap();
2153+
assert_eq!(rs_rows, my_rows);
2154+
conn.query_drop("INSERT INTO dt_nano_text_protocol VALUES ('2021-01-02 00:00:00', '2021-01-02 00:00:00.00', '2021-01-02 00:00:00.0000', '2021-01-02 00:00:00.000000');").await.unwrap();
2155+
conn.query_drop("INSERT INTO dt_nano_text_protocol VALUES ('2021-01-02 00:00:00', '2021-01-02 00:00:00.01', '2021-01-02 00:00:00.0001', '2021-01-02 00:00:00.000001');").await.unwrap();
2156+
2157+
sleep().await;
2158+
2159+
eventually!(run_test: {
2160+
let my_rows: Vec<(String, String, String, String)> = direct_mysql
2161+
.query("SELECT * FROM dt_nano_text_protocol")
2162+
.await
2163+
.unwrap();
2164+
2165+
let rs_rows: Vec<(String, String, String, String)> = conn
2166+
.query("SELECT * FROM dt_nano_text_protocol")
2167+
.await
2168+
.unwrap();
2169+
AssertUnwindSafe(move || (rs_rows, my_rows))
2170+
},then_assert: |results| {
2171+
let (rs_rows, my_rows) = results();
2172+
assert_eq!(rs_rows, my_rows)
2173+
});
2174+
2175+
shutdown_tx.shutdown().await;
2176+
}
2177+
2178+
#[tokio::test(flavor = "multi_thread")]
2179+
#[serial]
2180+
async fn datetime_nanosecond_precision_binary_protocol() {
2181+
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2182+
direct_mysql.query_drop("DROP TABLE IF EXISTS dt_nano_bin_protocol CASCADE;
2183+
CREATE TABLE dt_nano_bin_protocol (ID INT PRIMARY KEY, col1 DATETIME, col2 DATETIME(2), col3 DATETIME(4), col4 DATETIME(6));
2184+
INSERT INTO dt_nano_bin_protocol VALUES (1, '2021-01-01 00:00:00', '2021-01-01 00:00:00.00', '2021-01-01 00:00:00.0000', '2021-01-01 00:00:00.000000');
2185+
INSERT INTO dt_nano_bin_protocol VALUES (2, '2021-01-01 00:00:00', '2021-01-01 00:00:00.01', '2021-01-01 00:00:00.0001', '2021-01-01 00:00:00.000001');")
2186+
.await
2187+
.unwrap();
2188+
let (opts, _handle, shutdown_tx) = setup_with_mysql(false).await;
2189+
let mut conn = mysql_async::Conn::new(opts).await.unwrap();
2190+
sleep().await;
2191+
2192+
conn.query_drop("CREATE CACHE FROM SELECT * FROM dt_nano_bin_protocol WHERE ID = ?")
2193+
.await
2194+
.unwrap();
2195+
sleep().await;
2196+
2197+
for id in 1..=2 {
2198+
let my_rows: Row = direct_mysql
2199+
.exec_first("SELECT * FROM dt_nano_bin_protocol WHERE ID = ?", (id,))
2200+
.await
2201+
.unwrap()
2202+
.unwrap();
2203+
let rs_rows: Row = conn
2204+
.exec_first("SELECT * FROM dt_nano_bin_protocol WHERE ID = ?", (id,))
2205+
.await
2206+
.unwrap()
2207+
.unwrap();
2208+
assert_eq!(rs_rows.unwrap_raw(), my_rows.unwrap_raw())
2209+
}
2210+
2211+
conn.query_drop("INSERT INTO dt_nano_bin_protocol VALUES (3, '2021-01-02 00:00:00', '2021-01-02 00:00:00.00', '2021-01-02 00:00:00.0000', '2021-01-02 00:00:00.000000');").await.unwrap();
2212+
conn.query_drop("INSERT INTO dt_nano_bin_protocol VALUES (4, '2021-01-02 00:00:00', '2021-01-02 00:00:00.01', '2021-01-02 00:00:00.0001', '2021-01-02 00:00:00.000001');").await.unwrap();
2213+
2214+
sleep().await;
2215+
2216+
for id in 3..=4 {
2217+
eventually!(run_test: {
2218+
2219+
let my_rows: Row = direct_mysql
2220+
.exec_first("SELECT * FROM dt_nano_bin_protocol WHERE ID = ?", (id,))
2221+
.await
2222+
.unwrap()
2223+
.unwrap();
2224+
let rs_rows: Row = conn
2225+
.exec_first("SELECT * FROM dt_nano_bin_protocol WHERE ID = ?", (id,))
2226+
.await
2227+
.unwrap()
2228+
.unwrap();
2229+
AssertUnwindSafe(move || (rs_rows, my_rows))
2230+
},then_assert: |results| {
2231+
let (rs_rows, my_rows) = results();
2232+
assert_eq!(rs_rows.unwrap_raw(), my_rows.unwrap_raw())
2233+
});
2234+
}
2235+
shutdown_tx.shutdown().await;
2236+
}
2237+
2238+
#[tokio::test(flavor = "multi_thread")]
2239+
#[serial]
2240+
async fn datetime_binary_protocol() {
2241+
let (opts, _handle, shutdown_tx) = setup_with_mysql(false).await;
2242+
let mut conn = mysql_async::Conn::new(opts).await.unwrap();
2243+
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2244+
direct_mysql.query_drop("SET sql_mode='';").await.unwrap();
2245+
direct_mysql
2246+
.query_drop("DROP TABLE IF EXISTS dt_bin_protocol CASCADE;")
2247+
.await
2248+
.unwrap();
2249+
direct_mysql.query_drop("CREATE TABLE dt_bin_protocol (ID INT PRIMARY KEY, col1 DATETIME(6), col2 DATETIME(6), col3 DATETIME(6), col4 DATETIME(6));").await.unwrap();
2250+
direct_mysql.query_drop("INSERT INTO dt_bin_protocol VALUES (1, '0000-00-00 00:00:00.000000', '2021-01-01 00:00:00.000000', '2021-01-01 00:00:01.0000000', '2021-01-01 00:00:01.000001');")
2251+
.await
2252+
.unwrap();
2253+
sleep().await;
2254+
2255+
conn.query_drop("CREATE CACHE FROM SELECT * FROM dt_bin_protocol WHERE ID = ?")
2256+
.await
2257+
.unwrap();
2258+
sleep().await;
2259+
let rs_rows: Row = conn
2260+
.exec_first("SELECT * FROM dt_bin_protocol WHERE ID = 1", ())
2261+
.await
2262+
.unwrap()
2263+
.unwrap();
2264+
assert_eq!(
2265+
rs_rows.unwrap_raw(),
2266+
vec![
2267+
Some(mysql::Value::Int(1)),
2268+
Some(mysql::Value::NULL),
2269+
Some(mysql::Value::Date(2021, 1, 1, 0, 0, 0, 0)),
2270+
Some(mysql::Value::Date(2021, 1, 1, 0, 0, 1, 0)),
2271+
Some(mysql::Value::Date(2021, 1, 1, 0, 0, 1, 1))
2272+
]
2273+
);
2274+
shutdown_tx.shutdown().await;
2275+
}

replicators/src/mysql_connector/connector.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -853,18 +853,16 @@ fn binlog_val_to_noria_val(
853853
// Not all values are coerced to the value expected by ReadySet directly
854854

855855
use mysql_common::constants::ColumnType;
856-
857-
let buf = match val {
858-
mysql_common::value::Value::Bytes(b) => b,
859-
_ => {
860-
return val.try_into().map_err(|e| {
861-
mysql_async::Error::Other(Box::new(internal_err!("Unable to coerce value {}", e)))
862-
})
863-
}
864-
};
865-
866856
match (col_kind, meta) {
867857
(ColumnType::MYSQL_TYPE_TIMESTAMP2, &[0]) => {
858+
let buf = match val {
859+
mysql_common::value::Value::Bytes(b) => b,
860+
_ => {
861+
return Err(mysql_async::Error::Other(Box::new(internal_err!(
862+
"Expected a byte array for timestamp"
863+
))));
864+
}
865+
};
868866
//https://github.com/blackbeam/rust_mysql_common/blob/408effed435c059d80a9e708bcfa5d974527f476/src/binlog/value.rs#L144
869867
// When meta is 0, `mysql_common` encodes this value as number of seconds (since UNIX
870868
// EPOCH)
@@ -881,6 +879,14 @@ fn binlog_val_to_noria_val(
881879
Ok(time.into())
882880
}
883881
(ColumnType::MYSQL_TYPE_TIMESTAMP2, _) => {
882+
let buf = match val {
883+
mysql_common::value::Value::Bytes(b) => b,
884+
_ => {
885+
return Err(mysql_async::Error::Other(Box::new(internal_err!(
886+
"Expected a byte array for timestamp"
887+
))));
888+
}
889+
};
884890
// When meta is anything else, `mysql_common` encodes this value as number of
885891
// seconds.microseconds (since UNIX EPOCH)
886892
let s = String::from_utf8_lossy(buf);
@@ -894,6 +900,14 @@ fn binlog_val_to_noria_val(
894900
Ok(time.into())
895901
}
896902
(ColumnType::MYSQL_TYPE_STRING, meta) => {
903+
let buf = match val {
904+
mysql_common::value::Value::Bytes(b) => b,
905+
_ => {
906+
return Err(mysql_async::Error::Other(Box::new(internal_err!(
907+
"Expected a byte array for timestamp"
908+
))));
909+
}
910+
};
897911
match mysql_pad_collation_column(
898912
buf,
899913
col_kind,
@@ -904,6 +918,28 @@ fn binlog_val_to_noria_val(
904918
Err(e) => Err(mysql_async::Error::Other(Box::new(internal_err!("{e}")))),
905919
}
906920
}
921+
(ColumnType::MYSQL_TYPE_DATETIME2, meta) => {
922+
//meta[0] is the fractional seconds precision
923+
let df_val: DfValue = val
924+
.try_into()
925+
.map_err(|e| {
926+
mysql_async::Error::Other(Box::new(internal_err!(
927+
"Unable to coerce value {}",
928+
e
929+
)))
930+
})
931+
.and_then(|val| match val {
932+
DfValue::TimestampTz(mut ts) => {
933+
ts.set_subsecond_digits(meta[0]);
934+
Ok(DfValue::TimestampTz(ts))
935+
}
936+
DfValue::None => Ok(DfValue::None), //NULL
937+
_ => Err(mysql_async::Error::Other(Box::new(internal_err!(
938+
"Expected a timestamp"
939+
)))),
940+
})?;
941+
Ok(df_val)
942+
}
907943
_ => Ok(val.try_into().map_err(|e| {
908944
mysql_async::Error::Other(Box::new(internal_err!("Unable to coerce value {}", e)))
909945
})?),

0 commit comments

Comments
 (0)