Skip to content

Commit d3be028

Browse files
committed
readyset: Add --parsing-preset CLI option
We plumb it through to everywhere that was previously using a hardcoded `Default::default()`. It can also be set with the `PARSING_PRESET` environment variable. Addresses: REA-5047 Release-Note-Core: Add a `--parsing-preset` CLI option and `PARSING_PRESET` environment variable which can be used to configure how we handle mismatches between nom-sql and sqlparser-rs parsing. It defaults to `prefer-nom` which parses with both nom-sql and sqlparser-rs, logs a warning if they don't agree, and uses the AST produced by nom-sql. To disable parsing with sqlparser-rs, use `only-nom`. To test experimental query support for syntax not supported by nom, use `prefer-sqlparser` or `only-sqlparser`. Change-Id: I6a6a6964f5168268cc6983c8e8df79c64e7ed36a Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9683 Reviewed-by: Mohamed Yasser Abdelhamed Abdeen <mohamed@readyset.io> Tested-by: Buildkite CI
1 parent 20596be commit d3be028

File tree

18 files changed

+181
-45
lines changed

18 files changed

+181
-45
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

readyset-adapter/src/backend.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ pub struct BackendBuilder {
280280
client_addr: SocketAddr,
281281
slowlog: bool,
282282
dialect: Dialect,
283+
parsing_preset: ParsingPreset,
283284
users: HashMap<String, String>,
284285
require_authentication: bool,
285286
query_log_sender: Option<UnboundedSender<QueryExecutionEvent>>,
@@ -301,6 +302,7 @@ impl Default for BackendBuilder {
301302
client_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
302303
slowlog: false,
303304
dialect: Dialect::MySQL,
305+
parsing_preset: ParsingPreset::for_prod(),
304306
users: Default::default(),
305307
require_authentication: true,
306308
query_log_sender: None,
@@ -363,6 +365,7 @@ impl BackendBuilder {
363365
settings: BackendSettings {
364366
slowlog: self.slowlog,
365367
dialect: self.dialect,
368+
parsing_preset: self.parsing_preset,
366369
require_authentication: self.require_authentication,
367370
unsupported_set_mode: self.unsupported_set_mode,
368371
migration_mode: self.migration_mode,
@@ -396,6 +399,11 @@ impl BackendBuilder {
396399
self
397400
}
398401

402+
pub fn parsing_preset(mut self, parsing_preset: ParsingPreset) -> Self {
403+
self.parsing_preset = parsing_preset;
404+
self
405+
}
406+
399407
pub fn query_log_sender(
400408
mut self,
401409
query_log_sender: Option<UnboundedSender<QueryExecutionEvent>>,
@@ -609,6 +617,8 @@ where
609617
struct BackendSettings {
610618
/// SQL dialect to use when parsing queries from clients
611619
dialect: Dialect,
620+
/// Parsing mode that determines which parser(s) to use and how to handle conflicts
621+
parsing_preset: ParsingPreset,
612622
slowlog: bool,
613623
require_authentication: bool,
614624
/// How to behave when receiving unsupported `SET` statements
@@ -3191,7 +3201,7 @@ where
31913201
fn parse_query(&mut self, query: &str) -> ReadySetResult<SqlQuery> {
31923202
trace!(%query, "Parsing query");
31933203
match readyset_sql_parsing::parse_query_with_config(
3194-
ParsingPreset::for_prod(),
3204+
self.settings.parsing_preset,
31953205
self.settings.dialect,
31963206
query,
31973207
) {

readyset-client/src/recipe/changelist.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use readyset_sql::ast::{
4343
CreateTableStatement, CreateViewStatement, DropTableStatement, DropViewStatement,
4444
NonReplicatedRelation, Relation, SelectStatement, SqlIdentifier, SqlQuery, TableKey,
4545
};
46-
use readyset_sql_parsing::{parse_query_with_config, ParsingPreset};
46+
use readyset_sql_parsing::{parse_query_with_config, ParsingConfig, ParsingPreset};
4747
use readyset_sql_passes::adapter_rewrites::{self, AdapterRewriteParams};
4848
use serde::{Deserialize, Serialize};
4949
use test_strategy::Arbitrary;
@@ -138,23 +138,27 @@ impl ChangeList {
138138
}
139139
}
140140

141-
/// Parse a `ChangeList` from the given vector of SQL strings, using the given [`Dialect`]
142-
/// for expression evaluation semantics.
141+
/// Parse a `ChangeList` from the given vector of SQL strings, using the given [`Dialect`] for
142+
/// expression evaluation semantics (but it will be parsed with the canonical MySQL dialect).
143143
///
144144
/// This method processes each string in the vector individually and constructs a list of
145145
/// changes. If any query fails to parse, the method returns an error.
146146
pub fn from_strings(queries: Vec<impl AsRef<str>>, dialect: Dialect) -> ReadySetResult<Self> {
147-
let mut changes = Vec::new();
147+
Self::from_strings_with_config(queries, dialect, ParsingPreset::for_prod().into_config())
148+
}
148149

150+
pub fn from_strings_with_config(
151+
queries: Vec<impl AsRef<str>>,
152+
dialect: Dialect,
153+
parsing_config: ParsingConfig,
154+
) -> ReadySetResult<Self> {
155+
let mut changes = Vec::new();
149156
for query_str in queries {
150-
let parsed = parse_query_with_config(
151-
ParsingPreset::BothPreferNom,
152-
Dialect::DEFAULT_MYSQL.into(),
153-
&query_str,
154-
)
155-
.map_err(|_| ReadySetError::UnparseableQuery {
156-
query: query_str.as_ref().to_string(),
157-
})?;
157+
let parsed =
158+
parse_query_with_config(parsing_config, Dialect::DEFAULT_MYSQL.into(), &query_str)
159+
.map_err(|_| ReadySetError::UnparseableQuery {
160+
query: query_str.as_ref().to_string(),
161+
})?;
158162

159163
match parsed {
160164
SqlQuery::CreateTable(statement) => changes.push(Change::CreateTable {
@@ -449,6 +453,7 @@ impl Change {
449453
pub fn from_cache_ddl_request(
450454
ddl_req: &CacheDDLRequest,
451455
adapter_rewrite_params: AdapterRewriteParams,
456+
parsing_preset: ParsingPreset,
452457
) -> ReadySetResult<Self> {
453458
macro_rules! mk_error {
454459
($str:expr) => {
@@ -458,7 +463,7 @@ impl Change {
458463
};
459464
}
460465
match parse_query_with_config(
461-
ParsingPreset::for_prod(),
466+
parsing_preset.into_config().log_on_mismatch(true),
462467
ddl_req.dialect.into(),
463468
&ddl_req.unparsed_stmt,
464469
) {

readyset-server/src/builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use readyset_client::consensus::{
1010
Authority, LocalAuthority, LocalAuthorityStore, NodeTypeSchedulingRestriction,
1111
WorkerSchedulingConfig,
1212
};
13+
use readyset_sql_parsing::ParsingPreset;
1314
use readyset_telemetry_reporter::TelemetrySender;
1415
use readyset_util::shutdown::{self, ShutdownSender};
1516

@@ -27,6 +28,9 @@ pub struct Builder {
2728
external_addr: SocketAddr,
2829
leader_eligible: bool,
2930
domain_scheduling_config: WorkerSchedulingConfig,
31+
/// The parsing preset to use for SQL parsing. Public so we can use it when setting up replication
32+
/// separately in tests.
33+
pub parsing_preset: ParsingPreset,
3034
/// The telemetry sender
3135
pub telemetry: TelemetrySender,
3236
wait_for_failpoint: bool,
@@ -44,6 +48,7 @@ impl Default for Builder {
4448
memory_check_frequency: None,
4549
leader_eligible: true,
4650
domain_scheduling_config: Default::default(),
51+
parsing_preset: ParsingPreset::for_prod(),
4752
telemetry: TelemetrySender::new_no_op(),
4853
wait_for_failpoint: false,
4954
unquery: true,
@@ -131,6 +136,7 @@ impl Builder {
131136
builder.set_abort_on_task_failure(false);
132137
builder.set_full_materialization(true);
133138
builder.set_post_lookup(true);
139+
builder.set_parsing_preset(ParsingPreset::BothPanicOnMismatch);
134140
builder
135141
}
136142

@@ -374,6 +380,10 @@ impl Builder {
374380
self.config.replicator_statement_logging = value;
375381
}
376382

383+
pub fn set_parsing_preset(&mut self, value: ParsingPreset) {
384+
self.parsing_preset = value;
385+
}
386+
377387
/// Start a server instance and return a handle to it. This method also returns a
378388
/// [`ShutdownSender`] that should be used to shut down the server when it is no longer needed.
379389
pub fn start(
@@ -387,6 +397,7 @@ impl Builder {
387397
memory_limit,
388398
memory_check_frequency,
389399
domain_scheduling_config,
400+
parsing_preset,
390401
leader_eligible,
391402
telemetry,
392403
wait_for_failpoint,
@@ -403,6 +414,7 @@ impl Builder {
403414
memory_limit,
404415
memory_check_frequency,
405416
domain_scheduling_config,
417+
parsing_preset,
406418
leader_eligible,
407419
telemetry,
408420
wait_for_failpoint,
@@ -426,6 +438,7 @@ impl Builder {
426438
memory_limit,
427439
memory_check_frequency,
428440
domain_scheduling_config,
441+
parsing_preset,
429442
leader_eligible,
430443
telemetry,
431444
wait_for_failpoint,
@@ -443,6 +456,7 @@ impl Builder {
443456
memory_limit,
444457
memory_check_frequency,
445458
domain_scheduling_config,
459+
parsing_preset,
446460
leader_eligible,
447461
readers,
448462
reader_addr,

readyset-server/src/controller/inner.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use readyset_client::{GraphvizOptions, ViewCreateRequest, WorkerDescriptor};
3030
use readyset_errors::{internal_err, ReadySetError, ReadySetResult};
3131
use readyset_sql::ast::Relation;
3232
use readyset_sql::Dialect;
33+
use readyset_sql_parsing::ParsingPreset;
3334
use readyset_telemetry_reporter::TelemetrySender;
3435
#[cfg(feature = "failure_injection")]
3536
use readyset_util::failpoints;
@@ -94,6 +95,9 @@ pub struct Leader {
9495
/// A client to the current authority.
9596
pub(super) authority: Arc<Authority>,
9697

98+
/// Parsing mode for the controller (used in extend_recipe and the replicator)
99+
parsing_preset: ParsingPreset,
100+
97101
/// A map of currently running migrations.
98102
///
99103
/// Requests to `/extend_recipe` run for at least [`EXTEND_RECIPE_MAX_SYNC_TIME`], after which
@@ -206,6 +210,7 @@ impl Leader {
206210
let replicator_restart_timeout = self.replicator_config.replicator_restart_timeout;
207211
let config = self.replicator_config.clone();
208212
let replicator_statement_logging = self.replicator_statement_logging;
213+
let parsing_preset = self.parsing_preset;
209214

210215
// The replication task ideally won't panic, but if it does and we arent replicating, that
211216
// will mean the data we return, will be more and more stale, and the transaction logs on
@@ -230,6 +235,7 @@ impl Leader {
230235
telemetry_sender.clone(),
231236
server_startup,
232237
replicator_statement_logging,
238+
parsing_preset,
233239
)
234240
.await
235241
{
@@ -1050,6 +1056,7 @@ impl Leader {
10501056
replicator_config: UpstreamConfig,
10511057
worker_request_timeout: Duration,
10521058
background_recovery_interval: Duration,
1059+
parsing_mode: ParsingPreset,
10531060
replicator_tx: UnboundedSender<ReplicatorMessage>,
10541061
controller_tx: UnboundedSender<ControllerMessage>,
10551062
) -> Self {
@@ -1069,6 +1076,7 @@ impl Leader {
10691076
worker_request_timeout,
10701077
background_recovery_interval,
10711078
background_recovery_running: Arc::new(AtomicBool::new(false)),
1079+
parsing_preset: parsing_mode,
10721080
running_migrations: Default::default(),
10731081
background_task_failed,
10741082
running_recovery: None,

readyset-server/src/controller/mod.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use readyset_client::ControllerDescriptor;
2323
use readyset_data::Dialect;
2424
use readyset_errors::{internal, internal_err, ReadySetError, ReadySetResult};
2525
use readyset_sql::ast::Relation;
26+
use readyset_sql_parsing::ParsingPreset;
2627
use readyset_telemetry_reporter::TelemetrySender;
2728
#[cfg(feature = "failure_injection")]
2829
use readyset_util::failpoints;
@@ -377,6 +378,10 @@ pub struct Controller {
377378
/// Whether we are the leader and ready to handle requests.
378379
leader_ready: Arc<AtomicBool>,
379380

381+
/// Parsing mode for the controller, used in extend_recipe. Not part of the serialized
382+
/// [`Config`] so it can be changed on restart.
383+
parsing_preset: ParsingPreset,
384+
380385
/// Whether we are in maintenance mode
381386
maintenance_mode: Arc<AtomicBool>,
382387

@@ -409,6 +414,7 @@ impl Controller {
409414
worker_descriptor: WorkerDescriptor,
410415
telemetry_sender: TelemetrySender,
411416
config: Config,
417+
parsing_preset: ParsingPreset,
412418
shutdown_rx: ShutdownReceiver,
413419
) -> Self {
414420
// If we don't have an upstream, we allow permissive writes to base tables.
@@ -425,6 +431,7 @@ impl Controller {
425431
our_descriptor,
426432
worker_descriptor,
427433
config,
434+
parsing_preset,
428435
leader_ready: Arc::new(AtomicBool::new(false)),
429436
maintenance_mode: Arc::new(AtomicBool::new(false)),
430437
controller_channel: ControllerChannel::new(),
@@ -534,6 +541,7 @@ impl Controller {
534541
self.config.replicator_config.clone(),
535542
self.config.worker_request_timeout,
536543
self.config.background_recovery_interval,
544+
self.parsing_preset,
537545
self.replicator_channel.sender(),
538546
self.controller_channel.sender(),
539547
);
@@ -855,7 +863,11 @@ impl Controller {
855863
ddl_reqs
856864
.iter()
857865
.filter(|req| matches!(
858-
Change::from_cache_ddl_request(req, adapter_rewrite_params),
866+
Change::from_cache_ddl_request(
867+
req,
868+
adapter_rewrite_params,
869+
self.parsing_preset,
870+
),
859871
Ok(Change::Drop { .. })
860872
))
861873
.all(|req| req.dialect == ddl_reqs[0].dialect),
@@ -866,7 +878,11 @@ impl Controller {
866878
ddl_reqs
867879
.iter()
868880
.filter(|req| matches!(
869-
Change::from_cache_ddl_request(req, adapter_rewrite_params),
881+
Change::from_cache_ddl_request(
882+
req,
883+
adapter_rewrite_params,
884+
self.parsing_preset,
885+
),
870886
Ok(Change::Drop { .. })
871887
))
872888
.all(|req| req.schema_search_path.is_empty()),
@@ -877,7 +893,11 @@ impl Controller {
877893
ddl_reqs
878894
.iter()
879895
.filter(|req| matches!(
880-
Change::from_cache_ddl_request(req, adapter_rewrite_params),
896+
Change::from_cache_ddl_request(
897+
req,
898+
adapter_rewrite_params,
899+
self.parsing_preset,
900+
),
881901
Ok(Change::CreateCache(_))
882902
))
883903
.all(|req| req.dialect == ddl_reqs[0].dialect),
@@ -891,7 +911,11 @@ impl Controller {
891911
let mut last_was_drop = false;
892912

893913
for ddl_req in ddl_reqs {
894-
match Change::from_cache_ddl_request(&ddl_req, adapter_rewrite_params) {
914+
match Change::from_cache_ddl_request(
915+
&ddl_req,
916+
adapter_rewrite_params,
917+
self.parsing_preset,
918+
) {
895919
Ok(change) => {
896920
let is_drop = matches!(change, Change::Drop { .. });
897921

readyset-server/src/main.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use readyset_server::metrics::{
1616
};
1717
use readyset_server::PrometheusBuilder;
1818
use readyset_server::{resolve_addr, Builder, NoriaMetricsRecorder, WorkerOptions};
19+
use readyset_sql_parsing::ParsingPreset;
1920
use readyset_telemetry_reporter::{TelemetryEvent, TelemetryInitializer};
2021
use readyset_version::*;
2122
use tracing::{error, info};
@@ -147,6 +148,17 @@ struct Options {
147148
/// impact startup.
148149
#[arg(long, hide = true)]
149150
wait_for_failpoint: bool,
151+
152+
/// Parsing mode that determines which parser(s) to use and how to handle conflicts.
153+
/// Options: only-nom, only-sqlparser, prefer-nom, prefer-sqlparser, panic
154+
#[arg(
155+
long,
156+
env = "PARSING_PRESET",
157+
value_enum,
158+
default_value = "both-prefer-nom",
159+
hide = true
160+
)]
161+
parsing_preset: ParsingPreset,
150162
}
151163

152164
fn main() -> anyhow::Result<()> {
@@ -247,6 +259,7 @@ fn main() -> anyhow::Result<()> {
247259
builder.set_telemetry_sender(telemetry_sender.clone());
248260
builder.set_wait_for_failpoint(opts.wait_for_failpoint);
249261
builder.set_replicator_statement_logging(opts.tracing.statement_logging);
262+
builder.set_parsing_preset(opts.parsing_preset);
250263

251264
if opts.cannot_become_leader {
252265
builder.cannot_become_leader();

0 commit comments

Comments
 (0)