@@ -12,6 +12,7 @@ use mysql::binlog::events::{OptionalMetaExtractor, StatusVarVal};
1212use mysql:: binlog:: jsonb:: { self , JsonbToJsonError } ;
1313use mysql:: prelude:: Queryable ;
1414use mysql_async as mysql;
15+ use mysql_async:: binlog:: events:: TableMapEvent ;
1516use mysql_common:: binlog:: jsonb:: {
1617 Array , ComplexValue , Large , Object , OpaqueValue , Small , StorageFormat ,
1718} ;
@@ -48,6 +49,7 @@ use super::utils::get_mysql_version;
4849const DEFAULT_SERVER_ID : u32 = u32:: MAX - 55 ;
4950const MAX_POSITION_TIME : u64 = 10 ;
5051
52+ type TableMetadata = ( Vec < Option < u16 > > , Vec < Option < bool > > ) ;
5153/// A connector that connects to a MySQL server and starts reading binlogs from a given position.
5254///
5355/// The server must be configured with `binlog_format` set to `row` and `binlog_row_image` set to
@@ -248,9 +250,12 @@ impl MySqlBinlogConnector {
248250 & self ,
249251 row_data : & BinlogRow ,
250252 tme : & binlog:: events:: TableMapEvent < ' static > ,
253+ collation_vec : & [ Option < u16 > ] ,
254+ signedness_vec : & [ Option < bool > ] ,
251255 ) -> mysql:: Result < Vec < DfValue > > {
252256 let mut row = vec ! [ DfValue :: Default ; tme. columns_count( ) as usize ] ;
253- let mut binlog_iter = binlog_row_to_noria_row ( row_data, tme) ?. into_iter ( ) ;
257+ let mut binlog_iter =
258+ binlog_row_to_noria_row ( row_data, tme, collation_vec, signedness_vec) ?. into_iter ( ) ;
254259
255260 for col in row_data. columns_ref ( ) {
256261 if let Some ( idx) = parse_column_index ( & col. name_str ( ) ) {
@@ -322,12 +327,9 @@ impl MySqlBinlogConnector {
322327 if self . enable_statement_logging {
323328 info ! ( target: "replicator_statement" , "{:?}" , wr_event) ;
324329 }
325- // Retrieve the corresponding TABLE_MAP_EVENT
326- let tme = self . reader . get_tme ( wr_event. table_id ( ) ) . ok_or_else ( || {
327- mysql_async:: Error :: Other ( Box :: new ( internal_err ! (
328- "TME not found for WRITE_ROWS_EVENT"
329- ) ) )
330- } ) ?;
330+ // Retrieve the corresponding TABLE_MAP_EVENT, charset and signedness metadata
331+ let ( tme, ( collation_vec, signedness_vec) ) =
332+ self . get_table_metadata ( wr_event. table_id ( ) ) ?;
331333
332334 if !self
333335 . table_filter
@@ -337,7 +339,6 @@ impl MySqlBinlogConnector {
337339 }
338340
339341 let mut inserted_rows = Vec :: new ( ) ;
340-
341342 for row in wr_event. rows ( tme) {
342343 let row = & row?;
343344 let row_data = row. 1 . as_ref ( ) . ok_or_else ( || {
@@ -346,7 +347,7 @@ impl MySqlBinlogConnector {
346347 ) ) )
347348 } ) ?;
348349 inserted_rows. push ( readyset_client:: TableOperation :: Insert (
349- self . map_partial_binlog_row_insert ( row_data, tme) ?,
350+ self . map_partial_binlog_row_insert ( row_data, tme, & collation_vec , & signedness_vec ) ?,
350351 ) ) ;
351352 }
352353
@@ -378,9 +379,11 @@ impl MySqlBinlogConnector {
378379 & self ,
379380 tme : & binlog:: events:: TableMapEvent < ' static > ,
380381 row : & ( Option < BinlogRow > , Option < BinlogRow > ) ,
382+ collation_vec : & [ Option < u16 > ] ,
383+ signedness_vec : & [ Option < bool > ] ,
381384 ) -> mysql:: Result < Vec < readyset_client:: TableOperation > > {
382- let ( before_image, after_image) = self . get_before_after_images ( tme , row ) ? ;
383-
385+ let ( before_image, after_image) =
386+ self . get_before_after_images ( tme , row , collation_vec , signedness_vec ) ? ;
384387 if before_image. len ( ) == tme. columns_count ( ) as usize {
385388 let after_image = if after_image. len ( ) == tme. columns_count ( ) as usize {
386389 after_image
@@ -430,6 +433,8 @@ impl MySqlBinlogConnector {
430433 & self ,
431434 tme : & binlog:: events:: TableMapEvent < ' static > ,
432435 row : & ( Option < BinlogRow > , Option < BinlogRow > ) ,
436+ collation_vec : & [ Option < u16 > ] ,
437+ signedness_vec : & [ Option < bool > ] ,
433438 ) -> mysql:: Result < ( Vec < DfValue > , Vec < DfValue > ) > {
434439 let before_image = binlog_row_to_noria_row (
435440 row. 0 . as_ref ( ) . ok_or_else ( || {
@@ -439,6 +444,8 @@ impl MySqlBinlogConnector {
439444 ) ) )
440445 } ) ?,
441446 tme,
447+ collation_vec,
448+ signedness_vec,
442449 ) ?;
443450
444451 let after_image = binlog_row_to_noria_row (
@@ -449,6 +456,8 @@ impl MySqlBinlogConnector {
449456 ) ) )
450457 } ) ?,
451458 tme,
459+ collation_vec,
460+ signedness_vec,
452461 ) ?;
453462
454463 Ok ( ( before_image, after_image) )
@@ -518,13 +527,9 @@ impl MySqlBinlogConnector {
518527 info ! ( target: "replicator_statement" , "{:?}" , ur_event) ;
519528 }
520529
521- // Retrieve the corresponding TABLE_MAP_EVENT
522- let tme = self . reader . get_tme ( ur_event. table_id ( ) ) . ok_or_else ( || {
523- mysql_async:: Error :: Other ( Box :: new ( internal_err ! (
524- "TME not found for UPDATE_ROWS_EVENT {:?}" ,
525- ur_event
526- ) ) )
527- } ) ?;
530+ // Retrieve the corresponding TABLE_MAP_EVENT, charset and signedness metadata
531+ let ( tme, ( collation_vec, signedness_vec) ) =
532+ self . get_table_metadata ( ur_event. table_id ( ) ) ?;
528533
529534 if !self
530535 . table_filter
@@ -534,10 +539,9 @@ impl MySqlBinlogConnector {
534539 }
535540
536541 let mut updated_rows = Vec :: with_capacity ( ur_event. rows ( tme) . count ( ) ) ;
537-
538542 // Process each row update
539543 for row in ur_event. rows ( tme) {
540- let row_ops = self . process_update_row ( tme, & row?) ?;
544+ let row_ops = self . process_update_row ( tme, & row?, & collation_vec , & signedness_vec ) ?;
541545 updated_rows. extend ( row_ops) ;
542546 }
543547
@@ -564,13 +568,9 @@ impl MySqlBinlogConnector {
564568 if self . enable_statement_logging {
565569 info ! ( target: "replicator_statement" , "{:?}" , dr_event) ;
566570 }
567- // Retrieve the corresponding TABLE_MAP_EVENT
568- let tme = self . reader . get_tme ( dr_event. table_id ( ) ) . ok_or_else ( || {
569- mysql_async:: Error :: Other ( Box :: new ( internal_err ! (
570- "TME not found for UPDATE_ROWS_EVENT {:?}" ,
571- dr_event
572- ) ) )
573- } ) ?;
571+ // Retrieve the corresponding TABLE_MAP_EVENT, charset and signedness metadata
572+ let ( tme, ( collation_vec, signedness_vec) ) =
573+ self . get_table_metadata ( dr_event. table_id ( ) ) ?;
574574
575575 if !self
576576 . table_filter
@@ -580,7 +580,6 @@ impl MySqlBinlogConnector {
580580 }
581581
582582 let mut deleted_rows = Vec :: new ( ) ;
583-
584583 for row in dr_event. rows ( tme) {
585584 // For each row in the event we produce a vector of ReadySet types that
586585 // represent that row
@@ -591,6 +590,8 @@ impl MySqlBinlogConnector {
591590 ) ) )
592591 } ) ?,
593592 tme,
593+ & collation_vec,
594+ & signedness_vec,
594595 ) ?;
595596
596597 // Partial Row, Before row image is the PKE
@@ -1166,6 +1167,68 @@ impl MySqlBinlogConnector {
11661167 }
11671168 }
11681169 }
1170+
1171+ /// Get the table map event, charset and signedness metadata for a table map event.
1172+ ///
1173+ /// # Arguments
1174+ /// * `table_id` - The table id
1175+ ///
1176+ /// # Returns
1177+ /// * `(&TableMapEvent<'static>, TableMetadata)` - The table map event, charset and signedness metadata
1178+ fn get_table_metadata (
1179+ & self ,
1180+ table_id : u64 ,
1181+ ) -> mysql:: Result < ( & TableMapEvent < ' static > , TableMetadata ) > {
1182+ let tme = self . reader . get_tme ( table_id) . ok_or_else ( || {
1183+ mysql_async:: Error :: Other ( Box :: new ( internal_err ! (
1184+ "Table map event not found for table id {}" ,
1185+ table_id
1186+ ) ) )
1187+ } ) ?;
1188+ /*
1189+ * TME iterators are only populated for the types that make sense.
1190+ * For example, for a table with col1 CHAR(1) collate utf8mb4_0900_ai_ci, col2 int, col3 unsigned int, col4 char(1) collate binary
1191+ * the iterators will be:
1192+ * charset_iter: [255, 63]
1193+ * enum_and_set_charset_iter: []
1194+ * signedness_iter: [true, false]
1195+ *
1196+ * For MRBR - where we receive the row event with only the list of columns that are set
1197+ * we need to advance the iterators even for columns that are not present in the event. In the event above, if we do an UPDATE table SET col4 = 'A'
1198+ * the row event will come as {@3: Value(Bytes("A"))}, making it difficult to retrive the charset 63 for col4.
1199+ */
1200+ let opt_meta_extractor = OptionalMetaExtractor :: new ( tme. iter_optional_meta ( ) ) . unwrap ( ) ;
1201+
1202+ let mut charset_iter = opt_meta_extractor. iter_charset ( ) ;
1203+ let mut enum_and_set_charset_iter = opt_meta_extractor. iter_enum_and_set_charset ( ) ;
1204+ let mut signedness_iter = opt_meta_extractor. iter_signedness ( ) ;
1205+ let mut collation_vec = vec ! [ None ; tme. columns_count( ) as usize ] ;
1206+ let mut signedness_vec = vec ! [ None ; tme. columns_count( ) as usize ] ;
1207+ for id in 0 ..tme. columns_count ( ) as usize {
1208+ let kind = tme
1209+ . get_column_type ( id)
1210+ . map_err ( |e| {
1211+ mysql_async:: Error :: Other ( Box :: new ( internal_err ! (
1212+ "Unable to get column type {}" ,
1213+ e
1214+ ) ) )
1215+ } ) ?
1216+ . unwrap ( ) ;
1217+ if kind. is_character_type ( ) {
1218+ collation_vec[ id] = Some ( charset_iter. next ( ) . transpose ( ) ?. unwrap_or_default ( ) ) ;
1219+ } else if kind. is_enum_or_set_type ( ) {
1220+ collation_vec[ id] = Some (
1221+ enum_and_set_charset_iter
1222+ . next ( )
1223+ . transpose ( ) ?
1224+ . unwrap_or_default ( ) ,
1225+ ) ;
1226+ } else if kind. is_numeric_type ( ) {
1227+ signedness_vec[ id] = Some ( signedness_iter. next ( ) . unwrap_or ( false ) ) ;
1228+ }
1229+ }
1230+ Ok ( ( tme, ( collation_vec, signedness_vec) ) )
1231+ }
11691232}
11701233
11711234fn binlog_val_to_noria_val (
@@ -1584,11 +1647,9 @@ fn parse_column_index(col_name: &str) -> Option<usize> {
15841647fn binlog_row_to_noria_row (
15851648 binlog_row : & BinlogRow ,
15861649 tme : & binlog:: events:: TableMapEvent < ' static > ,
1650+ collation_vec : & [ Option < u16 > ] ,
1651+ signedness_vec : & [ Option < bool > ] ,
15871652) -> mysql:: Result < Vec < DfValue > > {
1588- let opt_meta_extractor = OptionalMetaExtractor :: new ( tme. iter_optional_meta ( ) ) . unwrap ( ) ;
1589- let mut charset_iter = opt_meta_extractor. iter_charset ( ) ;
1590- let mut enum_and_set_charset_iter = opt_meta_extractor. iter_enum_and_set_charset ( ) ;
1591- let mut signedness_iter = opt_meta_extractor. iter_signedness ( ) ;
15921653 binlog_row
15931654 . columns ( )
15941655 . iter ( )
@@ -1608,21 +1669,9 @@ fn binlog_row_to_noria_row(
16081669 . unwrap ( ) ,
16091670 tme. get_column_metadata ( tme_idx) . unwrap ( ) ,
16101671 ) ;
1611- let collation = if kind. is_character_type ( ) {
1612- charset_iter. next ( ) . transpose ( ) ?. unwrap_or_default ( )
1613- } else if kind. is_enum_or_set_type ( ) {
1614- enum_and_set_charset_iter
1615- . next ( )
1616- . transpose ( ) ?
1617- . unwrap_or_default ( )
1618- } else {
1619- Default :: default ( )
1620- } ;
1621- let unsigned = if kind. is_numeric_type ( ) {
1622- signedness_iter. next ( ) . unwrap_or ( false )
1623- } else {
1624- false
1625- } ;
1672+ debug_assert ! ( !kind. is_character_type( ) || collation_vec[ tme_idx] . is_some( ) ) ;
1673+ let collation = collation_vec[ tme_idx] . unwrap_or ( 0 ) ;
1674+ let unsigned = signedness_vec[ tme_idx] . unwrap_or ( false ) ;
16261675 // `BLOB` columns have `BINARY_FLAG` set in snapshot, but not here. However, the
16271676 // collation should be correctly set to binary here, though it's not over there.
16281677 let binary = col. flags ( ) . contains ( ColumnFlags :: BINARY_FLAG )
0 commit comments