Skip to content

Commit fc819f9

Browse files
readyset-dataflow: apply default before coercion
When MySQL row based replication is enabled, if the type we are trying to coerce to is a text with case-insensitive collation (col_ty.is_citext()), we will attempt to coerce the value. DfValue::Default is not coerceble, and should have been converted to the column default value before attempting to coerce. Closes: REA-5521 Closes: #1471 Release-Note-Core: Fixed a MySQL replication issue when Minimal Row Image is enabled. If a case-insensitive text column was omitted during an insert, we will try to coerce its value to the corresponding type, which will fail if we attempt to coerce before translating the value to the column default. Change-Id: Ic95d3ad3ad53abcd61b13a06847b507841ef4705 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9077 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent 7e2c8ed commit fc819f9

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

readyset-dataflow/src/node/special/base.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,9 @@ fn apply_table_op_coercions(
500500
) -> ReadySetResult<()> {
501501
let coerce_row = |row: &mut Vec<DfValue>| {
502502
for (idx, (val, col)) in row.iter_mut().zip(columns).enumerate() {
503-
val.maybe_coerce_for_table_op(col.ty())?;
503+
// Apply the default must happen before coercion
504504
val.maybe_apply_default(defaults, idx);
505+
val.maybe_coerce_for_table_op(col.ty())?;
505506
}
506507
Ok(())
507508
};

readyset-mysql/tests/integration.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2734,3 +2734,53 @@ async fn test_column_definition_upstream_readyset_replication() {
27342734
test_column_definition_verify(&mut direct_mysql, &mut conn).await;
27352735
tx.shutdown().await;
27362736
}
2737+
2738+
#[tokio::test(flavor = "multi_thread")]
2739+
#[serial(mysql)]
2740+
async fn text_citext_default_coercion_minimal_row_base_replication() {
2741+
readyset_tracing::init_test_logging();
2742+
let mut direct_mysql = mysql_async::Conn::from_url(mysql_url()).await.unwrap();
2743+
2744+
direct_mysql
2745+
.query_drop("DROP TABLE IF EXISTS text_citext_default_coercion CASCADE;")
2746+
.await
2747+
.unwrap();
2748+
direct_mysql
2749+
.query_drop("CREATE TABLE text_citext_default_coercion (id INT PRIMARY KEY, col1 TEXT);")
2750+
.await
2751+
.unwrap();
2752+
2753+
let (rs_opts, _rs_handle, tx) = TestBuilder::default()
2754+
.recreate_database(false)
2755+
.fallback(true)
2756+
.build::<MySQLAdapter>()
2757+
.await;
2758+
sleep().await;
2759+
let mut conn = mysql_async::Conn::new(rs_opts).await.unwrap();
2760+
direct_mysql
2761+
.query_drop("SET SESSION binlog_row_image = MINIMAL;")
2762+
.await
2763+
.unwrap();
2764+
direct_mysql
2765+
.query_drop("INSERT INTO text_citext_default_coercion (id) VALUES (1);")
2766+
.await
2767+
.unwrap();
2768+
sleep().await;
2769+
conn.query_drop("CREATE CACHE FROM SELECT * FROM text_citext_default_coercion WHERE id = ?")
2770+
.await
2771+
.unwrap();
2772+
let rs_row: Row = conn
2773+
.query_first("SELECT * FROM text_citext_default_coercion WHERE id = 1")
2774+
.await
2775+
.unwrap()
2776+
.unwrap();
2777+
let direct_row: Row = direct_mysql
2778+
.query_first("SELECT * FROM text_citext_default_coercion WHERE id = 1")
2779+
.await
2780+
.unwrap()
2781+
.unwrap();
2782+
assert_eq!(rs_row.len(), 2);
2783+
assert_eq!(rs_row[0], direct_row[0]);
2784+
assert_eq!(rs_row[1], direct_row[1]);
2785+
tx.shutdown().await;
2786+
}

0 commit comments

Comments
 (0)