Skip to content

Commit c55bf5b

Browse files
committed
replicators: Support CREATE TABLE ... AS SELECT on MySQL
While inspecting the MySQL statement replication handling for `CREATE TABLE ... LIKE` statements, I went looking for other kinds of `CREATE TABLE` statements that would show up in the binlog *without* the `UpdatedDbNames` status variable. An AI suggested that `CREATE TABLE ... AS SELECT` was one such. But when I tested it, it turned out MySQL actually logs it as the resulting regular `CREATE TABLE` (i.e. with columns etc., and without the `AS SELECT` clause), followed by row events for the selected rows to insert them into the new table. However, it also appends a `START TRANSACTION` option to the `CREATE TABLE` statement, which we were not parsing. The combination (no `UpdatedDbNames` and failed to parse) meant we were silently ignoring the `CREATE TABLE` statement. First, this CL fixes the parsing for both nom-sql and sqlparser (albeit with the somewhat hacky solution of producing the AST as a space-separated key = START, value = TRANSACTION pair, but we are ignoring it anyway. Second, we now parse all statements in MySQL replication and attempt to handle them, regardless of whether `UpdatedDbNames` is set. However, we still check the status variable to decide whether to error out if parsing/handling the event fails. The main caveat is we don't support it on Postgres, and I don't yet know what that would require. Addresses: REA-4434, REA-3859 Release-Note-Core: Support replicating MySQL tables created with `CREATE TABLE ... AS SELECT`. Change-Id: I36eaeb8c2823d256da150d7f67b112266a6a6964 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10969 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 5f5815d commit c55bf5b

File tree

6 files changed

+173
-128
lines changed

6 files changed

+173
-128
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
statement ok
2+
CREATE TABLE foo (x INT)
3+
4+
statement ok
5+
INSERT INTO foo VALUES (1), (2), (3);
6+
7+
query I rowsort
8+
SELECT * FROM foo
9+
----
10+
1
11+
2
12+
3
13+
14+
statement ok
15+
CREATE TABLE bar AS SELECT * FROM foo;
16+
17+
query I rowsort
18+
SELECT * FROM bar
19+
----
20+
1
21+
2
22+
3

nom-sql/src/create_table_options.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ fn create_option(
6464
key: "KEY_BLOCK_SIZE".to_string(),
6565
value: v.display(dialect).to_string(),
6666
}),
67+
map(
68+
create_option_spaced_pair(tag_no_case("start"), tag_no_case("transaction")),
69+
|_| CreateTableOption::Other {
70+
key: "START TRANSACTION".to_string(),
71+
value: "".to_string(),
72+
},
73+
),
6774
))(i)
6875
}
6976
}

readyset-client/src/recipe/changelist.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use std::collections::HashMap;
3636
use std::hash::{Hash, Hasher};
3737

3838
use dataflow_expression::Dialect;
39+
use itertools::Itertools;
3940
use readyset_data::DfType;
4041
use readyset_errors::{internal, unsupported, ReadySetError, ReadySetResult};
4142
use readyset_sql::ast::{
@@ -172,12 +173,24 @@ impl ChangeList {
172173
dialect: Dialect,
173174
parsing_config: ParsingConfig,
174175
) -> ReadySetResult<Self> {
175-
let mut changes = Vec::new();
176-
for query_str in queries {
177-
let parsed =
178-
parse_query_with_config(parsing_config, Dialect::DEFAULT_MYSQL.into(), &query_str)?;
176+
Self::from_queries(
177+
queries
178+
.into_iter()
179+
.map(|query_str| {
180+
parse_query_with_config(parsing_config, dialect.into(), &query_str)
181+
})
182+
.try_collect::<_, Vec<_>, _>()?,
183+
dialect,
184+
)
185+
}
179186

180-
match parsed {
187+
pub fn from_queries(
188+
queries: impl IntoIterator<Item = SqlQuery>,
189+
dialect: Dialect,
190+
) -> ReadySetResult<Self> {
191+
let mut changes = Vec::new();
192+
for query in queries {
193+
match query {
181194
SqlQuery::CreateTable(statement) => changes.push(Change::CreateTable {
182195
statement,
183196
pg_meta: None,
@@ -236,7 +249,7 @@ impl ChangeList {
236249
}
237250
_ => unsupported!(
238251
"Only DDL statements supported in ChangeList (got {})",
239-
parsed.query_type()
252+
query.query_type()
240253
),
241254
}
242255
}

readyset-sql-parsing/tests/parity.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,3 +1202,8 @@ fn create_table_like_parenthesized() {
12021202
check_parse_mysql!("CREATE TABLE a (LIKE b)");
12031203
check_parse_mysql!("CREATE TABLE IF NOT EXISTS a (LIKE b)");
12041204
}
1205+
1206+
#[test]
1207+
fn create_table_start_transaction() {
1208+
check_parse_mysql!("CREATE TABLE a (x int) START TRANSACTION");
1209+
}

readyset-sql/src/ast/create.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,6 @@ impl TryFromDialect<sqlparser::ast::CreateTable> for CreateTableStatement {
394394
value: value.to_string(),
395395
}),
396396
},
397-
398397
SqlOption::NamedParenthesizedList(NamedParenthesizedList {
399398
key, name, ..
400399
}) => {
@@ -406,9 +405,19 @@ impl TryFromDialect<sqlparser::ast::CreateTable> for CreateTableStatement {
406405
SqlOption::Comment(
407406
CommentDef::WithEq(comment) | CommentDef::WithoutEq(comment),
408407
) => options.push(CreateTableOption::Comment(comment)),
409-
e => {
410-
return failed!("Unsupported table option {e}");
408+
SqlOption::Ident(ident) => options.push(CreateTableOption::Other {
409+
key: ident.to_string(),
410+
value: "".to_string(),
411+
}),
412+
SqlOption::Clustered(table_options_clustered) => {
413+
todo!("{table_options_clustered}")
411414
}
415+
SqlOption::Partition {
416+
column_name,
417+
range_direction,
418+
for_values,
419+
} => todo!("{column_name:?}, {range_direction:?}, {for_values:?}"),
420+
SqlOption::TableSpace(tablespace_option) => todo!("{tablespace_option:?}"),
412421
}
413422
}
414423
}

replicators/src/mysql_connector/connector.rs

Lines changed: 108 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use readyset_decimal::Decimal;
2727
use readyset_sql_parsing::{parse_query_with_config, ParsingConfig, ParsingPreset};
2828
use serde_json::Map;
2929
use tokio::time::timeout;
30-
use tracing::{error, info, warn};
30+
use tracing::{debug, error, info, warn};
3131

3232
use database_utils::UpstreamConfig;
3333
use readyset_client::metrics::recorded;
@@ -840,7 +840,7 @@ impl MySqlBinlogConnector {
840840
info!(target: "replicator_statement", "{:?}", q_event);
841841
}
842842

843-
let schema = match q_event
843+
let (expect_ddl, schema) = match q_event
844844
.status_vars()
845845
.get_status_var(binlog::consts::StatusVarKey::UpdatedDbNames)
846846
.as_ref()
@@ -851,121 +851,19 @@ impl MySqlBinlogConnector {
851851
// for example `DROP TABLE db1.tbl, db2.table;` Will have `db1` and
852852
// `db2` listed, however we only need the schema to filter out
853853
// `CREATE TABLE` and `ALTER TABLE` and those always change only one DB.
854-
names.first().unwrap().as_str().to_string()
854+
(true, names.first().unwrap().as_str().to_string())
855855
}
856-
// Even if the query does not affect the schema, it may still require a table action.
857-
_ => return self.try_non_ddl_action_from_query(q_event, is_last),
856+
// If MySQL couldn't determine affected schemas at binlogging time, default to the
857+
// effective `USE`d schema. The event still might require handling, but we shouldn't error if it's not DDL.
858+
_ => (false, q_event.schema().to_string()),
858859
};
859860

860-
let changes = match ChangeList::from_strings_with_config(
861-
vec![q_event.query()],
862-
Dialect::DEFAULT_MYSQL,
861+
match parse_query_with_config(
863862
self.parsing_config,
863+
Dialect::DEFAULT_MYSQL.into(),
864+
q_event.query(),
864865
) {
865-
Ok(mut changelist) => {
866-
// During replication of DDL, we don't necessarily have the default charset/collation as part of the
867-
// query event.
868-
let charset = q_event
869-
.status_vars()
870-
.get_status_var(binlog::consts::StatusVarKey::Charset);
871-
872-
if let Some(charset) = charset {
873-
let default_server_charset = match charset.get_value().unwrap() {
874-
StatusVarVal::Charset {
875-
charset_client: _,
876-
collation_connection: _,
877-
collation_server,
878-
} => collation_server,
879-
_ => unreachable!(),
880-
};
881-
882-
for change in changelist.changes_mut() {
883-
match change {
884-
Change::CreateTable { statement, .. } => {
885-
let default_table_collation = statement.get_collation();
886-
if default_table_collation.is_none() {
887-
let collation = Collation::resolve(CollationId::from(
888-
default_server_charset,
889-
));
890-
let options = match statement.options.as_mut() {
891-
Ok(opts) => opts,
892-
Err(_) => &mut Vec::new(),
893-
};
894-
options.push(CreateTableOption::Collate(CollationName {
895-
name: SqlIdentifier::from(collation.collation()),
896-
quote_style: None,
897-
}));
898-
}
899-
statement.propagate_default_charset(readyset_sql::Dialect::MySQL);
900-
statement.rewrite_binary_collation_columns();
901-
if let Ok(body) = &statement.body {
902-
self.table_schemas
903-
.insert(statement.table.clone(), body.clone());
904-
}
905-
}
906-
Change::Drop { name, .. } => {
907-
self.table_schemas.remove(name);
908-
}
909-
Change::AlterTable(AlterTableStatement { table, .. }) => {
910-
self.table_schemas.remove(table);
911-
}
912-
_ => {}
913-
}
914-
}
915-
}
916-
changelist.changes
917-
}
918-
Err(error) => {
919-
match readyset_sql_parsing::parse_query_with_config(
920-
self.parsing_config,
921-
readyset_sql::Dialect::MySQL,
922-
q_event.query(),
923-
) {
924-
Ok(SqlQuery::Insert(insert)) => {
925-
self.drop_and_add_non_replicated_table(insert.table, &schema)
926-
.await
927-
}
928-
Ok(SqlQuery::Update(update)) => {
929-
self.drop_and_add_non_replicated_table(update.table, &schema)
930-
.await
931-
}
932-
Ok(SqlQuery::Delete(delete)) => {
933-
self.drop_and_add_non_replicated_table(delete.table, &schema)
934-
.await
935-
}
936-
Ok(SqlQuery::StartTransaction(_)) => {
937-
return Err(ReadySetError::SkipEvent);
938-
}
939-
_ => {
940-
warn!(%error, "Error extending recipe, DDL statement will not be used");
941-
counter!(recorded::REPLICATOR_FAILURE).increment(1u64);
942-
return Err(ReadySetError::SkipEvent);
943-
}
944-
}
945-
}
946-
};
947-
948-
Ok(ReplicationAction::DdlChange { schema, changes })
949-
}
950-
951-
/// Attempt to produce a non-DDL [`ReplicationAction`] from the give query.
952-
///
953-
/// `COMMIT` queries are issued for writes on non-transactional storage engines such as MyISAM.
954-
/// We report the position after the `COMMIT` query if necessary.
955-
///
956-
/// `TRUNCATE` statements and `CREATE TABLE new_table LIKE existing_table` statements are also
957-
/// parsed and handled here, because we fall into this path when the binlog event doesn't have
958-
/// the [`binlog::consts::StatusVarKey::UpdatedDbNames`] status variable.
959-
///
960-
/// TODO: Transactions begin with `BEGIN` queries, but we do not currently support those.
961-
fn try_non_ddl_action_from_query(
962-
&mut self,
963-
q_event: binlog::events::QueryEvent<'_>,
964-
is_last: bool,
965-
) -> ReadySetResult<ReplicationAction> {
966-
use readyset_sql::Dialect;
967-
968-
match parse_query_with_config(self.parsing_config, Dialect::MySQL, q_event.query()) {
866+
Ok(SqlQuery::StartTransaction(_)) => Err(ReadySetError::SkipEvent),
969867
Ok(SqlQuery::Commit(_)) if self.report_position_elapsed() || is_last => {
970868
Ok(ReplicationAction::LogPosition)
971869
}
@@ -980,17 +878,12 @@ impl MySqlBinlogConnector {
980878
actions: vec![TableOperation::Truncate],
981879
})
982880
}
983-
Ok(SqlQuery::CreateTable(mut create)) => {
881+
Ok(SqlQuery::CreateTable(mut create)) if create.like.is_some() => {
984882
if let Some(like) = &mut create.like {
985883
let relation = like.as_relation_mut();
986884
if relation.schema.is_none() {
987885
relation.schema = Some(q_event.schema().into())
988886
}
989-
} else {
990-
return Err(ReadySetError::ReplicationFailed(format!(
991-
"Unexpected CREATE TABLE without LIKE should have been handled earlier: {}",
992-
create.table.display_unquoted()
993-
)));
994887
}
995888
if create.table.schema.is_none() {
996889
create.table.schema = Some(q_event.schema().into())
@@ -1000,7 +893,103 @@ impl MySqlBinlogConnector {
1000893
changes: create.into_changes(),
1001894
})
1002895
}
1003-
_ => Err(ReadySetError::SkipEvent),
896+
Ok(SqlQuery::Insert(insert)) => {
897+
let changes = self
898+
.drop_and_add_non_replicated_table(insert.table, &schema)
899+
.await;
900+
Ok(ReplicationAction::DdlChange { schema, changes })
901+
}
902+
Ok(SqlQuery::Update(update)) => {
903+
let changes = self
904+
.drop_and_add_non_replicated_table(update.table, &schema)
905+
.await;
906+
Ok(ReplicationAction::DdlChange { schema, changes })
907+
}
908+
Ok(SqlQuery::Delete(delete)) => {
909+
let changes = self
910+
.drop_and_add_non_replicated_table(delete.table, &schema)
911+
.await;
912+
Ok(ReplicationAction::DdlChange { schema, changes })
913+
}
914+
Ok(query) => {
915+
match ChangeList::from_queries(vec![query], Dialect::DEFAULT_MYSQL) {
916+
Ok(mut changelist) => {
917+
// During replication of DDL, we don't necessarily have the default charset/collation as part of the
918+
// query event.
919+
let charset = q_event
920+
.status_vars()
921+
.get_status_var(binlog::consts::StatusVarKey::Charset);
922+
923+
if let Some(charset) = charset {
924+
let default_server_charset = match charset.get_value().unwrap() {
925+
StatusVarVal::Charset {
926+
charset_client: _,
927+
collation_connection: _,
928+
collation_server,
929+
} => collation_server,
930+
_ => unreachable!(),
931+
};
932+
933+
for change in changelist.changes_mut() {
934+
match change {
935+
Change::CreateTable { statement, .. } => {
936+
let default_table_collation = statement.get_collation();
937+
if default_table_collation.is_none() {
938+
let collation = Collation::resolve(CollationId::from(
939+
default_server_charset,
940+
));
941+
let options = match statement.options.as_mut() {
942+
Ok(opts) => opts,
943+
Err(_) => &mut Vec::new(),
944+
};
945+
options.push(CreateTableOption::Collate(
946+
CollationName {
947+
name: SqlIdentifier::from(
948+
collation.collation(),
949+
),
950+
quote_style: None,
951+
},
952+
));
953+
}
954+
statement.propagate_default_charset(
955+
readyset_sql::Dialect::MySQL,
956+
);
957+
statement.rewrite_binary_collation_columns();
958+
if let Ok(body) = &statement.body {
959+
self.table_schemas
960+
.insert(statement.table.clone(), body.clone());
961+
}
962+
}
963+
Change::Drop { name, .. } => {
964+
self.table_schemas.remove(name);
965+
}
966+
Change::AlterTable(AlterTableStatement { table, .. }) => {
967+
self.table_schemas.remove(table);
968+
}
969+
_ => {}
970+
}
971+
}
972+
}
973+
Ok(ReplicationAction::DdlChange {
974+
schema,
975+
changes: changelist.changes,
976+
})
977+
}
978+
Err(error) if expect_ddl => {
979+
warn!(%error, "Error extending recipe, DDL statement will not be used");
980+
counter!(recorded::REPLICATOR_FAILURE).increment(1u64);
981+
Err(ReadySetError::SkipEvent)
982+
}
983+
Err(error) => {
984+
debug!(%error, "Skipping ChangeList error when not expecting DDL");
985+
Err(ReadySetError::SkipEvent)
986+
}
987+
}
988+
}
989+
Err(err) => {
990+
debug!(%err, raw_query = %q_event.query(), "Unable to parse statement event");
991+
Err(ReadySetError::SkipEvent)
992+
}
1004993
}
1005994
}
1006995

0 commit comments

Comments
 (0)