Skip to content

Commit 981f459

Browse files
readyset-mysql: Infer MySQL implicit defaults
MySQL allows users to omit columns from an INSERT statement, if the column is defined as NOT NULL and has no default value it will error out. However, if the user disable STRICT_TRANS_TABLES, MySQL will infer the default value for the column. This is fine for snapshot and replication with full row image, as we will receive the inferred default value. However, this is not the case for minimal row-based replication, where we receive only the columns that were specified in the INSERT statement and are responsible for inferring the default based on the column type, which we were defaulting to None. If a table has not PK or UK, DELETE/UPDATE operations will require a full match on all columns to find the row to operate on. This will cause replication to fail, as we and MySQL will have different values. This commit also adjusts the mysql-time crate to not display a negative value for zero time. Closes: REA-5555 Closes: #1482 Release-Note-Core: Fixed a MySQL issue where INSERT statements with omitted columns could cause replication to fail under certain conditions. Change-Id: Id00f4d6dc9b67368fad5453489f1768c4095ce7f Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9140 Reviewed-by: Michael Zink <michael.z@readyset.io> Tested-by: Buildkite CI
1 parent 5f84889 commit 981f459

File tree

4 files changed

+214
-65
lines changed

4 files changed

+214
-65
lines changed

mysql-time/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,11 @@ impl Default for MySqlTime {
301301

302302
impl fmt::Display for MySqlTime {
303303
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304-
let sign = if self.is_positive() { "" } else { "-" };
304+
let sign = if self.is_positive() || self.nanos == 0 {
305+
""
306+
} else {
307+
"-"
308+
};
305309
let h = self.hour();
306310
let m = self.minutes();
307311
let s = self.seconds();

readyset-data/src/lib.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,77 @@ impl DfValue {
877877
_ => self,
878878
}
879879
}
880+
881+
/// Returns an implicit default value for a column that is not null and has no default.
882+
/// MySQL has [implicit defaults], and in case you disable sql_mode STRICT_TRANS_TABLES
883+
/// it will let you either omit the columns that has no default and is defined as NOT NULL,
884+
/// or allow you to set it to DEFAULT.
885+
///
886+
/// [implicit defaults]: https://dev.mysql.com/doc/refman/8.4/en/data-type-defaults.html
887+
///
888+
/// # Parameters
889+
///
890+
/// * `collation`: The collation of the column.
891+
/// * `dftype`: The type of the column.
892+
/// * `dialect`: The dialect of the database.
893+
///
894+
/// # Returns
895+
///
896+
/// The implicit default value for the column.
897+
pub fn implicit_default(collation: Collation, dftype: DfType, dialect: Dialect) -> Self {
898+
match dialect {
899+
Dialect::DEFAULT_MYSQL => {
900+
// Yay, MySQL has implicit defaults if you omit a column that is not null and has no default
901+
if matches!(dftype, DfType::Blob) {
902+
// TODO: Fix this to return ByteArray(Vec::new().into())
903+
return DfValue::from_str_and_collation("", collation);
904+
} else if dftype.is_any_text() {
905+
return DfValue::from_str_and_collation("", collation);
906+
} else if dftype.is_any_int() || dftype.is_bool() {
907+
return DfValue::Int(0);
908+
} else if dftype.is_any_float() {
909+
return DfValue::Float(0.0);
910+
} else if matches!(dftype, DfType::Numeric { .. }) {
911+
return DfValue::Numeric(Arc::new(Decimal::new(0, 2)));
912+
} else if matches!(dftype, DfType::Bit { .. }) {
913+
return DfValue::from_str_and_collation("\0", Collation::Utf8);
914+
} else if dftype.is_any_temporal() {
915+
match dftype {
916+
DfType::Timestamp { subsecond_digits }
917+
| DfType::DateTime { subsecond_digits } => {
918+
let mut dt = TimestampTz::zero();
919+
dt.set_subsecond_digits(subsecond_digits as u8);
920+
return DfValue::TimestampTz(dt);
921+
}
922+
DfType::Time { .. } => {
923+
return DfValue::Time(MySqlTime::default());
924+
}
925+
DfType::Date { .. } => {
926+
let mut dt = TimestampTz::zero();
927+
dt.set_date_only();
928+
return DfValue::TimestampTz(dt);
929+
}
930+
_ => {}
931+
}
932+
} else if matches!(dftype, DfType::Enum { .. }) {
933+
if let DfType::Enum { variants, .. } = dftype {
934+
// Safety, I don't think this can happen
935+
if variants.is_empty() {
936+
return DfValue::from_str_and_collation("", collation);
937+
}
938+
return DfValue::from_str_and_collation(
939+
variants[0].as_str(),
940+
Collation::Utf8,
941+
);
942+
}
943+
} else if matches!(dftype, DfType::Json { .. }) {
944+
return DfValue::from_str_and_collation("null", Collation::Utf8);
945+
}
946+
DfValue::None
947+
}
948+
Dialect::DEFAULT_POSTGRESQL => DfValue::None,
949+
}
950+
}
880951
}
881952

882953
impl PartialEq for DfValue {

readyset-mysql/tests/integration.rs

Lines changed: 121 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2522,72 +2522,77 @@ async fn test_case_expr_then_expr() {
25222522
shutdown_tx.shutdown().await;
25232523
}
25242524

2525-
async fn populate_all_data_types(direct_mysql: &mut mysql_async::Conn) {
2525+
async fn populate_all_data_types(
2526+
direct_mysql: &mut mysql_async::Conn,
2527+
table_name: &str,
2528+
add_sample_data: bool,
2529+
) {
25262530
// Drop existing table if any
25272531
direct_mysql
2528-
.query_drop("DROP TABLE IF EXISTS all_data_types CASCADE;")
2532+
.query_drop(format!("DROP TABLE IF EXISTS {} CASCADE;", table_name))
25292533
.await
25302534
.unwrap();
25312535
// Create table
25322536
direct_mysql
25332537
.query_drop(
2534-
"CREATE TABLE all_data_types (
2538+
format!("CREATE TABLE {} (
25352539
-- Numeric Data Types (Signed and Unsigned Integers)
2536-
col_tinyint TINYINT, -- Signed (-128 to 127)
2537-
col_tinyint_unsigned TINYINT UNSIGNED, -- Unsigned (0 to 255)
2538-
2539-
col_smallint SMALLINT, -- Signed (-32,768 to 32,767)
2540-
col_smallint_unsigned SMALLINT UNSIGNED, -- Unsigned (0 to 65,535)
2541-
2542-
col_mediumint MEDIUMINT, -- Signed (-8,388,608 to 8,388,607)
2543-
col_mediumint_unsigned MEDIUMINT UNSIGNED, -- Unsigned (0 to 16,777,215)
2544-
2545-
col_int INT, -- Signed (-2,147,483,648 to 2,147,483,647)
2546-
col_int_unsigned INT UNSIGNED, -- Unsigned (0 to 4,294,967,295)
2547-
2548-
col_bigint BIGINT, -- Signed (-2^63 to 2^63-1)
2549-
col_bigint_unsigned BIGINT UNSIGNED, -- Unsigned (0 to 2^64-1)
2550-
2551-
col_decimal DECIMAL(12,2), -- Fixed-point exact decimal (e.g., 99999999.99)
2552-
col_numeric NUMERIC(10,2), -- Synonym for DECIMAL
2553-
2554-
col_float FLOAT, -- 32-bit floating point (approximate value)
2555-
col_double DOUBLE, -- 64-bit floating point (approximate value)
2556-
col_real REAL, -- Synonym for DOUBLE
2557-
2558-
col_bit BIT(8), -- Bit field (e.g., 8-bit binary)
2559-
col_boolean BOOLEAN, -- Alias for TINYINT(1) (0 = false, 1 = true)
2560-
2540+
col_tinyint TINYINT NOT NULL, -- Signed (-128 to 127)
2541+
col_tinyint_unsigned TINYINT UNSIGNED NOT NULL, -- Unsigned (0 to 255)
2542+
2543+
col_smallint SMALLINT NOT NULL, -- Signed (-32,768 to 32,767)
2544+
col_smallint_unsigned SMALLINT UNSIGNED NOT NULL, -- Unsigned (0 to 65,535)
2545+
2546+
col_mediumint MEDIUMINT NOT NULL, -- Signed (-8,388,608 to 8,388,607)
2547+
col_mediumint_unsigned MEDIUMINT UNSIGNED NOT NULL, -- Unsigned (0 to 16,777,215)
2548+
2549+
col_int INT NOT NULL, -- Signed (-2,147,483,648 to 2,147,483,647)
2550+
col_int_unsigned INT UNSIGNED NOT NULL, -- Unsigned (0 to 4,294,967,295)
2551+
2552+
col_bigint BIGINT NOT NULL, -- Signed (-2^63 to 2^63-1)
2553+
col_bigint_unsigned BIGINT UNSIGNED NOT NULL, -- Unsigned (0 to 2^64-1)
2554+
2555+
col_decimal DECIMAL(12,2) NOT NULL, -- Fixed-point exact decimal (e.g., 99999999.99)
2556+
col_numeric NUMERIC(10,2) NOT NULL, -- Synonym for DECIMAL
2557+
2558+
col_float FLOAT NOT NULL, -- 32-bit floating point (approximate value)
2559+
col_double DOUBLE NOT NULL, -- 64-bit floating point (approximate value)
2560+
col_real REAL NOT NULL, -- Synonym for DOUBLE
2561+
2562+
col_bit BIT(8) NOT NULL, -- Bit field (e.g., 8-bit binary)
2563+
col_boolean BOOLEAN NOT NULL, -- Alias for TINYINT(1) (0 = false, 1 = true)
2564+
25612565
-- Date and Time Data Types
2562-
col_date DATE,
2563-
col_datetime DATETIME(2),
2564-
col_timestamp TIMESTAMP,
2565-
col_time TIME,
2566+
col_date DATE NOT NULL,
2567+
col_datetime DATETIME(2) NOT NULL,
2568+
col_timestamp TIMESTAMP NOT NULL,
2569+
col_time TIME NOT NULL,
25662570
25672571
-- String Data Types
2568-
col_char CHAR(10),
2569-
col_varchar VARCHAR(255),
2570-
2571-
col_text TEXT,
2572-
col_tinytext TINYTEXT,
2573-
col_mediumtext MEDIUMTEXT,
2574-
col_longtext LONGTEXT,
2575-
2576-
col_blob BLOB,
2577-
col_tinyblob TINYBLOB,
2578-
col_mediumblob MEDIUMBLOB,
2579-
col_longblob LONGBLOB,
2580-
2581-
col_enum ENUM('small', 'medium', 'large'),
2582-
col_json JSON
2583-
);",
2572+
col_char CHAR(10) NOT NULL,
2573+
col_varchar VARCHAR(255) NOT NULL,
2574+
2575+
col_text TEXT NOT NULL,
2576+
col_tinytext TINYTEXT NOT NULL,
2577+
col_mediumtext MEDIUMTEXT NOT NULL,
2578+
col_longtext LONGTEXT NOT NULL,
2579+
2580+
col_blob BLOB NOT NULL,
2581+
col_tinyblob TINYBLOB NOT NULL,
2582+
col_mediumblob MEDIUMBLOB NOT NULL,
2583+
col_longblob LONGBLOB NOT NULL,
2584+
2585+
col_enum ENUM('small', 'medium', 'large') NOT NULL,
2586+
col_json JSON NOT NULL
2587+
);", table_name)
25842588
)
25852589
.await
25862590
.unwrap();
2587-
// Insert test data
2588-
direct_mysql
2589-
.query_drop(
2590-
"INSERT INTO all_data_types (
2591+
if add_sample_data {
2592+
// Insert test data
2593+
direct_mysql
2594+
.query_drop(format!(
2595+
"INSERT INTO {} (
25912596
col_tinyint, col_tinyint_unsigned,
25922597
col_smallint, col_smallint_unsigned,
25932598
col_mediumint, col_mediumint_unsigned,
@@ -2643,23 +2648,29 @@ CURRENT_TIMESTAMP,
26432648
'long blob contents here',
26442649
26452650
'medium',
2646-
'{\"name\": \"Alice\", \"roles\": [\"admin\", \"editor\"], \"active\": true}'
2651+
'{{\"name\": \"Alice\", \"roles\": [\"admin\", \"editor\"], \"active\": true}}'
26472652
);",
2648-
)
2649-
.await
2650-
.unwrap();
2653+
table_name
2654+
))
2655+
.await
2656+
.unwrap();
2657+
}
26512658
}
26522659

26532660
async fn test_column_definition_verify(
26542661
direct_mysql: &mut mysql_async::Conn,
26552662
rs_conn: &mut mysql_async::Conn,
2663+
table_name: &str,
26562664
) {
26572665
// Verify results
26582666
let direct_rows: Vec<Row> = direct_mysql
2659-
.query("SELECT * FROM all_data_types")
2667+
.query(format!("SELECT * FROM {}", table_name))
2668+
.await
2669+
.unwrap();
2670+
let rs_rows: Vec<Row> = rs_conn
2671+
.query(format!("SELECT * FROM {}", table_name))
26602672
.await
26612673
.unwrap();
2662-
let rs_rows: Vec<Row> = rs_conn.query("SELECT * FROM all_data_types").await.unwrap();
26632674

26642675
let direct_columns = direct_rows[0].columns();
26652676
let rs_columns = rs_rows[0].columns();
@@ -2711,7 +2722,7 @@ async fn test_column_definition_verify(
27112722
async fn test_column_definition_upstream_readyset_snapshot() {
27122723
readyset_tracing::init_test_logging();
27132724
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2714-
populate_all_data_types(&mut direct_mysql).await;
2725+
populate_all_data_types(&mut direct_mysql, "all_data_types", true).await;
27152726

27162727
// Setup ReadySet connection after table creation
27172728
let (rs_opts, _rs_handle, tx) = TestBuilder::default()
@@ -2720,7 +2731,7 @@ async fn test_column_definition_upstream_readyset_snapshot() {
27202731
.await;
27212732
let mut conn = mysql_async::Conn::new(rs_opts).await.unwrap();
27222733

2723-
test_column_definition_verify(&mut direct_mysql, &mut conn).await;
2734+
test_column_definition_verify(&mut direct_mysql, &mut conn, "all_data_types").await;
27242735
tx.shutdown().await;
27252736
}
27262737

@@ -2737,8 +2748,8 @@ async fn test_column_definition_upstream_readyset_replication() {
27372748
.await;
27382749
let mut conn = mysql_async::Conn::new(rs_opts).await.unwrap();
27392750

2740-
populate_all_data_types(&mut direct_mysql).await;
2741-
test_column_definition_verify(&mut direct_mysql, &mut conn).await;
2751+
populate_all_data_types(&mut direct_mysql, "all_data_types", true).await;
2752+
test_column_definition_verify(&mut direct_mysql, &mut conn, "all_data_types").await;
27422753
tx.shutdown().await;
27432754
}
27442755

@@ -2790,3 +2801,51 @@ async fn text_citext_default_coercion_minimal_row_base_replication() {
27902801
assert_eq!(rs_row[1], direct_row[1]);
27912802
tx.shutdown().await;
27922803
}
2804+
2805+
#[tokio::test(flavor = "multi_thread")]
2806+
#[serial(mysql)]
2807+
async fn test_default_value_not_null_for_replication() {
2808+
readyset_tracing::init_test_logging();
2809+
let (rs_opts, _rs_handle, tx) = TestBuilder::default()
2810+
.recreate_database(false)
2811+
.fallback(true)
2812+
.build::<MySQLAdapter>()
2813+
.await;
2814+
sleep().await;
2815+
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2816+
direct_mysql
2817+
.query_drop("SET SESSION binlog_row_image = MINIMAL; SET SESSION sql_mode = '';")
2818+
.await
2819+
.unwrap();
2820+
populate_all_data_types(&mut direct_mysql, "all_data_types_not_null", false).await;
2821+
direct_mysql
2822+
.query_drop("INSERT INTO all_data_types_not_null () VALUES ();")
2823+
.await
2824+
.unwrap();
2825+
2826+
let mut conn = mysql_async::Conn::new(rs_opts).await.unwrap();
2827+
conn.query_drop("CREATE CACHE FROM SELECT * FROM all_data_types_not_null")
2828+
.await
2829+
.unwrap();
2830+
let rs_row: Row = conn
2831+
.query_first("SELECT * FROM all_data_types_not_null")
2832+
.await
2833+
.unwrap()
2834+
.unwrap();
2835+
let direct_row: Row = direct_mysql
2836+
.query_first("SELECT * FROM all_data_types_not_null")
2837+
.await
2838+
.unwrap()
2839+
.unwrap();
2840+
assert_eq!(rs_row.len(), direct_row.len());
2841+
let row_length = rs_row.len();
2842+
for i in 0..row_length {
2843+
assert_eq!(
2844+
rs_row[i],
2845+
direct_row[i],
2846+
"Value mismatch for column: {}",
2847+
String::from_utf8_lossy(rs_row.columns()[i].name_ref())
2848+
);
2849+
}
2850+
tx.shutdown().await;
2851+
}

readyset-server/src/controller/mir_to_flow.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,8 @@ fn make_base_node(
368368
.map(|cs| DfColumn::from_spec(cs.clone(), mig.dialect, |ty| custom_types.get(&ty).cloned()))
369369
.collect::<Result<Vec<_>, _>>()?;
370370

371-
// note that this defaults to a "None" (= NULL) default value for columns that do not have one
372-
// specified; we don't currently handle a "NOT NULL" SQL constraint for defaults
371+
// note that this defaults to a "None" (= NULL) default value for columns that don't have a default value
372+
// or we cannot infer a default value from the SQL type
373373
let default_values = column_specs
374374
.iter()
375375
.map(|cs| {
@@ -387,6 +387,21 @@ fn make_base_node(
387387
internal_err!("Failed to convert SQL type to DfType: {}", e)
388388
})?;
389389
return df.coerce_to(&dftype_to, &df.infer_dataflow_type());
390+
} else if let ColumnConstraint::NotNull { .. } = *c {
391+
let collation = cs.get_collation().map(|collation| {
392+
Collation::from_mysql_collation(collation).unwrap_or(Collation::Utf8)
393+
});
394+
let dftype_to =
395+
DfType::from_sql_type(&cs.sql_type, mig.dialect, |_| None, collation)
396+
.map_err(|e| {
397+
internal_err!("Failed to convert SQL type to DfType: {}", e)
398+
})?;
399+
400+
return Ok(DfValue::implicit_default(
401+
collation.unwrap_or(Collation::Utf8),
402+
dftype_to,
403+
mig.dialect,
404+
));
390405
}
391406
}
392407
Ok(DfValue::None)

0 commit comments

Comments
 (0)