|
3 | 3 | pub mod mysql; |
4 | 4 | pub mod psql; |
5 | 5 | pub mod query_logger; |
6 | | -use std::collections::HashMap; |
| 6 | +use std::collections::{HashMap, HashSet}; |
7 | 7 | use std::fs::remove_dir_all; |
8 | 8 | use std::future::Future; |
9 | 9 | use std::io; |
@@ -228,15 +228,11 @@ pub struct Options { |
228 | 228 | #[arg(long, env = "METRICS_ADDRESS", default_value = "0.0.0.0:6034")] |
229 | 229 | metrics_address: SocketAddr, |
230 | 230 |
|
231 | | - /// Allow database connections authenticated as this user. Defaults to the username in |
232 | | - /// --upstream-db-url if not set. Ignored if --allow-unauthenticated-connections is passed |
233 | | - #[arg(long, env = "ALLOWED_USERNAME", short = 'u', hide = true)] |
234 | | - username: Option<String>, |
235 | | - |
236 | | - /// Password to authenticate database connections with. Defaults to the password in |
237 | | - /// --upstream-db-url if not set. Ignored if --allow-unauthenticated-connections is passed |
238 | | - #[arg(long, env = "ALLOWED_PASSWORD", short = 'p', hide = true)] |
239 | | - password: Option<RedactedString>, |
| 231 | + /// Comma list of allowed usernames:passwords to authenticate database connections with. |
| 232 | + /// If not set, the username and password in --upstream-db-url will be used. |
| 233 | + /// If --allow-unauthenticated-connections is passed, this will be ignored. |
| 234 | + #[arg(long, env = "ALLOWED_USERS")] |
| 235 | + allowed_users: Option<RedactedString>, |
240 | 236 |
|
241 | 237 | /// Enable recording and exposing Prometheus metrics |
242 | 238 | #[arg(long, env = "PROMETHEUS_METRICS", default_value = "true", hide = true)] |
@@ -511,6 +507,111 @@ impl Options { |
511 | 507 | native_tls::TlsAcceptor::new(tls_identity)?, |
512 | 508 | )))) |
513 | 509 | } |
| 510 | + |
| 511 | + fn process_pair( |
| 512 | + &self, |
| 513 | + pair: &str, |
| 514 | + seen_users: &mut HashSet<String>, |
| 515 | + ) -> Result<(String, String), anyhow::Error> { |
| 516 | + let mut parts = pair.trim().splitn(2, ':'); |
| 517 | + match (parts.next(), parts.next()) { |
| 518 | + (Some(user), Some(pass)) => { |
| 519 | + let user = user.trim(); |
| 520 | + let pass = pass.trim(); |
| 521 | + if user.is_empty() || pass.is_empty() { |
| 522 | + return Err(anyhow::anyhow!( |
| 523 | + "Invalid user:password pair format. Expected format: user:password" |
| 524 | + )); |
| 525 | + } |
| 526 | + if !seen_users.insert(user.to_string()) { |
| 527 | + return Err(anyhow::anyhow!("Duplicate user found: {}", user)); |
| 528 | + } |
| 529 | + Ok((user.to_string(), pass.to_string())) |
| 530 | + } |
| 531 | + _ => Err(anyhow::anyhow!( |
| 532 | + "Invalid user:password pair format. Expected format: user:password" |
| 533 | + )), |
| 534 | + } |
| 535 | + } |
| 536 | + |
| 537 | + // Build list of allowed user to connect to Readyset |
| 538 | + fn build_allowed_users(&self) -> Result<HashMap<String, String>, anyhow::Error> { |
| 539 | + let upstream_config = self.server_worker_options.replicator_config.clone(); |
| 540 | + let upstream_url = upstream_config |
| 541 | + .upstream_db_url |
| 542 | + .as_ref() |
| 543 | + .and_then(|s| s.parse::<DatabaseURL>().ok()); |
| 544 | + let mut seen_users = std::collections::HashSet::new(); |
| 545 | + // Parse allowed users from comma-separated "user:pass" pairs |
| 546 | + let mut allowed_users = self |
| 547 | + .allowed_users |
| 548 | + .as_ref() |
| 549 | + .map(|s| { |
| 550 | + let mut users = HashMap::new(); |
| 551 | + let mut current = String::new(); |
| 552 | + let mut in_quotes = false; |
| 553 | + let mut quote_char = None; |
| 554 | + |
| 555 | + // Parse character by character |
| 556 | + for (i, c) in s.chars().enumerate() { |
| 557 | + match c { |
| 558 | + '\'' | '"' if !in_quotes => { |
| 559 | + in_quotes = true; |
| 560 | + quote_char = Some(c); |
| 561 | + } |
| 562 | + c if Some(c) == quote_char => { |
| 563 | + if let Some(next_c) = s.chars().nth(i + 1) { |
| 564 | + if next_c == c { |
| 565 | + // Handle escaped quote |
| 566 | + current.push(c); |
| 567 | + continue; // Skip next quote |
| 568 | + } |
| 569 | + } |
| 570 | + in_quotes = false; |
| 571 | + quote_char = None; |
| 572 | + } |
| 573 | + ',' if !in_quotes => { |
| 574 | + if !current.is_empty() { |
| 575 | + let (user, pass) = self.process_pair(¤t, &mut seen_users)?; |
| 576 | + users.insert(user, pass); |
| 577 | + current.clear(); |
| 578 | + } |
| 579 | + } |
| 580 | + _ => current.push(c), |
| 581 | + } |
| 582 | + } |
| 583 | + |
| 584 | + // Process the last pair if any |
| 585 | + if !current.is_empty() { |
| 586 | + let (user, pass) = self.process_pair(¤t, &mut seen_users)?; |
| 587 | + users.insert(user, pass); |
| 588 | + } |
| 589 | + |
| 590 | + if in_quotes { |
| 591 | + return Err(anyhow::anyhow!("Unclosed quote in input")); |
| 592 | + } |
| 593 | + |
| 594 | + Ok(users) |
| 595 | + }) |
| 596 | + .transpose()? |
| 597 | + .unwrap_or_default(); |
| 598 | + |
| 599 | + match ( |
| 600 | + upstream_url.as_ref().and_then(|url| url.user()), |
| 601 | + upstream_url.as_ref().and_then(|url| url.password()), |
| 602 | + ) { |
| 603 | + (Some(user), Some(pass)) => { |
| 604 | + if seen_users.insert(user.to_owned()) { |
| 605 | + allowed_users.insert(user.to_owned(), pass.to_owned()) |
| 606 | + } else { |
| 607 | + return Err(anyhow::anyhow!("Duplicate user found: {}", user)); |
| 608 | + } |
| 609 | + } |
| 610 | + _ => None, |
| 611 | + }; |
| 612 | + |
| 613 | + Ok(allowed_users) |
| 614 | + } |
514 | 615 | } |
515 | 616 |
|
516 | 617 | async fn connect_upstream<U>( |
@@ -650,54 +751,21 @@ where |
650 | 751 |
|
651 | 752 | return rt.block_on(async { self.cleanup(upstream_config, deployment_dir).await }); |
652 | 753 | } |
653 | | - |
654 | | - let users: &'static HashMap<String, String> = Box::leak(Box::new( |
655 | | - if !options.allow_unauthenticated_connections { |
656 | | - HashMap::from([{ |
657 | | - let upstream_url = upstream_config |
658 | | - .upstream_db_url |
659 | | - .as_ref() |
660 | | - .and_then(|s| s.parse::<DatabaseURL>().ok()); |
661 | | - |
662 | | - match ( |
663 | | - (options.username, options.password), |
664 | | - ( |
665 | | - upstream_url.as_ref().and_then(|url| url.user()), |
666 | | - upstream_url.as_ref().and_then(|url| url.password()), |
667 | | - ), |
668 | | - ) { |
669 | | - // --username and --password |
670 | | - ((Some(user), Some(pass)), _) => (user, pass.0), |
671 | | - // --password, username from url |
672 | | - ((None, Some(pass)), (Some(user), _)) => (user.to_owned(), pass.0), |
673 | | - // username and password from url |
674 | | - (_, (Some(user), Some(pass))) => (user.to_owned(), pass.to_owned()), |
675 | | - _ => { |
676 | | - if upstream_url.is_some() { |
677 | | - bail!( |
678 | | - "Failed to infer ReadySet username and password from \ |
679 | | - upstream DB URL. Please ensure they are present and \ |
680 | | - correctly formatted as follows: \ |
681 | | - <protocol>://<username>:<password>@<address>[:<port>][/<database>] \ |
682 | | - You can also configure ReadySet to accept credentials \ |
683 | | - different from those of your upstream database via \ |
684 | | - --username/-u and --password/-p, or use \ |
685 | | - --allow-unauthenticated-connections." |
686 | | - ) |
687 | | - } else { |
688 | | - bail!( |
689 | | - "Must specify --username/-u and --password/-p if one of \ |
690 | | - --allow-unauthenticated-connections or --upstream-db-url is not \ |
691 | | - passed" |
692 | | - ) |
693 | | - } |
694 | | - } |
695 | | - } |
696 | | - }]) |
697 | | - } else { |
698 | | - HashMap::new() |
699 | | - }, |
700 | | - )); |
| 754 | + let users = options.build_allowed_users()?; |
| 755 | + let users: &'static HashMap<String, String> = if !options.allow_unauthenticated_connections |
| 756 | + { |
| 757 | + if users.is_empty() { |
| 758 | + bail!( |
| 759 | + "Failed to build authentication map from \ |
| 760 | + upstream DB URL or --allowed-users. Please ensure they are present and \ |
| 761 | + correctly formatted as follows: --upstream-db-url <protocol>://<username>:<password>@<address>[:<port>][/<database>] \ |
| 762 | + or --allowed-users <username:password>[,<username:password>...]" |
| 763 | + ) |
| 764 | + } |
| 765 | + Box::leak(Box::new(users)) |
| 766 | + } else { |
| 767 | + Box::leak(Box::new(HashMap::new())) |
| 768 | + }; |
701 | 769 |
|
702 | 770 | info!(version = %VERSION_STR_ONELINE); |
703 | 771 |
|
@@ -1529,4 +1597,42 @@ mod tests { |
1529 | 1597 | ]); |
1530 | 1598 | assert_eq!(DeploymentMode::Standalone, opts.deployment_mode); |
1531 | 1599 | } |
| 1600 | + |
| 1601 | + #[test] |
| 1602 | + fn allowed_users() { |
| 1603 | + // test allowed-users with comma and colon in password |
| 1604 | + let opts = Options::parse_from(vec![ |
| 1605 | + "readyset", |
| 1606 | + "--allowed-users", |
| 1607 | + "user1:pass1,u:\'pwd,\',u2:\'pwd,:,\'", |
| 1608 | + "--upstream-db-url", |
| 1609 | + "mysql://root:password@mysql:3306/readyset", |
| 1610 | + ]); |
| 1611 | + let user_list = opts.build_allowed_users().unwrap(); |
| 1612 | + assert_eq!(user_list.len(), 4); |
| 1613 | + assert_eq!(user_list["user1"], "pass1"); |
| 1614 | + assert_eq!(user_list["u"], "pwd,"); |
| 1615 | + assert_eq!(user_list["u2"], "pwd,:,"); |
| 1616 | + assert_eq!(user_list["root"], "password"); |
| 1617 | + |
| 1618 | + // duplicate user |
| 1619 | + let opts = Options::parse_from(vec![ |
| 1620 | + "readyset", |
| 1621 | + "--allowed-users", |
| 1622 | + "user1:pass1,user1:pass2", |
| 1623 | + "--upstream-db-url", |
| 1624 | + "mysql://root:password@mysql:3306/readyset", |
| 1625 | + ]); |
| 1626 | + opts.build_allowed_users().unwrap_err(); |
| 1627 | + |
| 1628 | + // duplicate user between allowed-users and upstream-db-url |
| 1629 | + let opts = Options::parse_from(vec![ |
| 1630 | + "readyset", |
| 1631 | + "--allowed-users", |
| 1632 | + "user1:pass1,user2:pass2", |
| 1633 | + "--upstream-db-url", |
| 1634 | + "mysql://user1:pass1@mysql:3306/readyset", |
| 1635 | + ]); |
| 1636 | + opts.build_allowed_users().unwrap_err(); |
| 1637 | + } |
1532 | 1638 | } |
0 commit comments