diff --git a/crates/diesel_utils/src/connection.rs b/crates/diesel_utils/src/connection.rs index 9025baf2fa..88a709fc64 100644 --- a/crates/diesel_utils/src/connection.rs +++ b/crates/diesel_utils/src/connection.rs @@ -1,8 +1,12 @@ +use crate::schema_setup::MIGRATIONS; use deadpool::Runtime; -use diesel::result::{ - ConnectionError, - ConnectionResult, - Error::{self as DieselError, QueryBuilderError}, +use diesel::{ + connection::SimpleConnection, + result::{ + ConnectionError, + ConnectionResult, + Error::{self as DieselError, QueryBuilderError}, + }, }; use diesel_async::{ AsyncConnection, @@ -14,6 +18,7 @@ use diesel_async::{ }, scoped_futures::ScopedBoxFuture, }; +use diesel_migrations::MigrationHarness; use futures_util::{FutureExt, future::BoxFuture}; use lemmy_utils::{ error::{LemmyError, LemmyResult}, @@ -37,7 +42,7 @@ use std::{ sync::Arc, time::Duration, }; -use tracing::error; +use tracing::{debug, error}; pub type ActualDbPool = Pool; @@ -179,7 +184,27 @@ pub fn build_db_pool() -> LemmyResult { })) .build()?; - crate::schema_setup::run(crate::schema_setup::Options::default().run(), &db_url)?; + let mut inner_harness = crate::schema_setup::MigrationHarnessWrapper::new(&db_url)?; + let mut harness = + TimedHarnessWithOutput::write_to_tracing(&mut inner_harness, tracing::Level::DEBUG); + + // If possible, skip getting a lock and recreating the "r" schema, so lemmy_server processes in a + // horizontally scaled setup can start without causing locks + if harness.need_schema_setup()? { + // Block concurrent attempts to run migrations until `conn` is closed (otherwise, in a + // horizontally scaled setup, it's possible for multiple processes to try running the same + // migration, which would throw a unique violation error) + debug!("Waiting for lock..."); + harness.conn.batch_execute("SELECT pg_advisory_lock(0);")?; + + debug!("Running Database migrations (This may take a long time)..."); + harness + .run_pending_migrations(MIGRATIONS) + .map_err(anyhow::Error::from_boxed)?; + harness.run_replaceable_schema()?; + + debug!("Database migrations complete."); + } Ok(pool) } diff --git a/crates/diesel_utils/src/main.rs b/crates/diesel_utils/src/main.rs index b8c914c499..e71d85a198 100644 --- a/crates/diesel_utils/src/main.rs +++ b/crates/diesel_utils/src/main.rs @@ -1,14 +1,20 @@ -/// Very minimal wrapper around `lemmy_diesel_utils::run` to allow running migrations without +use diesel_migrations::MigrationHarness; + +/// Very minimal wrapper to allow running migrations without /// compiling everything. fn main() -> anyhow::Result<()> { if std::env::args().len() > 1 { anyhow::bail!("To set parameters for running migrations, use the lemmy_server command."); } - lemmy_diesel_utils::schema_setup::run( - lemmy_diesel_utils::schema_setup::Options::default().run(), + // todo: set the application_name + let mut harness = lemmy_diesel_utils::schema_setup::MigrationHarnessWrapper::new( &std::env::var("LEMMY_DATABASE_URL")?, )?; + harness + .run_pending_migrations(MIGRATIONS) + .map_err(lemmy_diesel_utils::schema_setup::convert_err)?; + lemmy_diesel_utils::schema_setup::run_replaceable_schema(&mut harness.conn)?; Ok(()) } diff --git a/crates/diesel_utils/src/schema_setup/mod.rs b/crates/diesel_utils/src/schema_setup/mod.rs index 5edf142b21..51319127d5 100644 --- a/crates/diesel_utils/src/schema_setup/mod.rs +++ b/crates/diesel_utils/src/schema_setup/mod.rs @@ -1,6 +1,5 @@ mod diff_check; -use anyhow::{Context, anyhow}; -use chrono::TimeDelta; +use anyhow::Context; use diesel::{ BoolExpressionMethods, Connection, @@ -16,8 +15,8 @@ use diesel::{ update, }; use diesel_migrations::MigrationHarness; -use std::time::Instant; -use tracing::debug; + +// `?` can't convert `diesel::migration::Result` to some other types because of https://github.com/dtolnay/anyhow/issues/66 diesel::table! { pg_namespace (nspname) { @@ -32,11 +31,8 @@ diesel::table! { } } -fn migrations() -> diesel_migrations::EmbeddedMigrations { - // Using `const` here is required by the borrow checker - const MIGRATIONS: diesel_migrations::EmbeddedMigrations = diesel_migrations::embed_migrations!(); - MIGRATIONS -} +pub const MIGRATIONS: diesel_migrations::EmbeddedMigrations = + diesel_migrations::embed_migrations!(); /// This SQL code sets up the `r` schema, which contains things that can be safely dropped and /// replaced instead of being changed using migrations. It may not create or modify things outside @@ -52,73 +48,32 @@ fn replaceable_schema() -> String { const REPLACEABLE_SCHEMA_PATH: &str = "crates/diesel_utils/replaceable_schema"; -struct MigrationHarnessWrapper<'a> { - conn: &'a mut PgConnection, - #[cfg(test)] - enable_diff_check: bool, - options: &'a Options, -} - -impl MigrationHarnessWrapper<'_> { - fn run_migration_inner( - &mut self, - migration: &dyn Migration, - ) -> diesel::migration::Result> { - let start_time = Instant::now(); - - let result = self.conn.run_migration(migration); - - let duration = TimeDelta::from_std(start_time.elapsed()) - .map(|d| d.to_string()) - .unwrap_or_default(); - let name = migration.name(); - self.options.print(&format!("{duration} run {name}")); - - result - } +pub struct MigrationHarnessWrapper { + // Migrations don't support async connection, and non-async migration execution is okay + pub conn: PgConnection, } -impl MigrationHarness for MigrationHarnessWrapper<'_> { +impl MigrationHarness for MigrationHarnessWrapper { fn run_migration( &mut self, migration: &dyn Migration, ) -> diesel::migration::Result> { - #[cfg(test)] - if self.enable_diff_check { - let before = diff_check::get_dump(); - - self.run_migration_inner(migration)?; - self.revert_migration(migration)?; - - let after = diff_check::get_dump(); - - diff_check::check_dump_diff( - [&after, &before], - &format!( - "These changes need to be applied in migrations/{}/down.sql:", - migration.name() - ), - ); - } + // Drop `r` schema, so migrations don't need to be made to work both with and without things in + // it existing + self.revert_replaceable_schema()?; - self.run_migration_inner(migration) + self.conn.run_migration(migration) } fn revert_migration( &mut self, migration: &dyn Migration, ) -> diesel::migration::Result> { - let start_time = Instant::now(); - - let result = self.conn.revert_migration(migration); + // Drop `r` schema, so migrations don't need to be made to work both with and without things in + // it existing + self.revert_replaceable_schema()?; - let duration = TimeDelta::from_std(start_time.elapsed()) - .map(|d| d.to_string()) - .unwrap_or_default(); - let name = migration.name(); - self.options.print(&format!("{duration} revert {name}")); - - result + self.conn.revert_migration(migration) } fn applied_migrations(&mut self) -> diesel::migration::Result>> { @@ -126,214 +81,79 @@ impl MigrationHarness for MigrationHarnessWrapper<'_> { } } -#[derive(Default, Clone, Copy)] -pub struct Options { - #[cfg(test)] - enable_diff_check: bool, - revert: bool, - run: bool, - print_output: bool, - limit: Option, -} - -impl Options { - #[cfg(test)] - fn enable_diff_check(mut self) -> Self { - self.enable_diff_check = true; - self - } - - pub fn run(mut self) -> Self { - self.run = true; - self - } - - pub fn revert(mut self) -> Self { - self.revert = true; - self +impl MigrationHarnessWrapper { + pub fn new(db_url: &str) -> anyhow::Result { + Ok(MigrationHarnessWrapper { + conn: PgConnection::establish(db_url)?, + }) } - pub fn limit(mut self, limit: u64) -> Self { - self.limit = Some(limit); - self - } - - /// If print_output is true, use println!. - /// Otherwise, use debug! - pub fn print_output(mut self) -> Self { - self.print_output = true; - self + pub fn need_schema_setup(&mut self) -> anyhow::Result { + Ok( + self + .conn + .has_pending_migration(MIGRATIONS) + .map_err(anyhow::Error::from_boxed)? + || !self.replaceable_schema_is_up_to_date()?, + ) } - fn print(&self, text: &str) { - if self.print_output { - println!("{text}"); - } else { - debug!("{text}"); - } - } -} - -/// Checked by tests -#[derive(PartialEq, Eq, Debug)] -pub enum Branch { - EarlyReturn, - ReplaceableSchemaRebuilt, - ReplaceableSchemaNotRebuilt, -} - -pub fn run(options: Options, db_url: &str) -> anyhow::Result { - // Migrations don't support async connection, and this function doesn't need to be async - let conn = &mut PgConnection::establish(db_url)?; - - // If possible, skip getting a lock and recreating the "r" schema, so - // lemmy_server processes in a horizontally scaled setup can start without causing locks - if !options.revert - && options.run - && options.limit.is_none() - && !conn - .has_pending_migration(migrations()) - .map_err(convert_err)? - { - // The condition above implies that the migration that creates the previously_run_sql table was - // already run + fn replaceable_schema_is_up_to_date(&mut self) -> anyhow::Result { + // Assumes that the migration that creates the previously_run_sql table was already run. This + // assumption is true if has_pending_migration already returned false. let sql_unchanged = exists( previously_run_sql::table.filter(previously_run_sql::content.eq(replaceable_schema())), ); let schema_exists = exists(pg_namespace::table.find("r")); - if select(sql_unchanged.and(schema_exists)).get_result(conn)? { - return Ok(Branch::EarlyReturn); - } + Ok(select(sql_unchanged.and(schema_exists)).get_result(&mut self.conn)?) } - // Block concurrent attempts to run migrations until `conn` is closed, and disable the - // trigger that prevents the Diesel CLI from running migrations - options.print("Waiting for lock..."); - conn.batch_execute("SELECT pg_advisory_lock(0);")?; - options.print("Running Database migrations (This may take a long time)..."); - - // Drop `r` schema, so migrations don't need to be made to work both with and without things in - // it existing - revert_replaceable_schema(conn)?; - - run_selected_migrations(conn, &options).map_err(convert_err)?; - - // Only run replaceable_schema if newest migration was applied - let output = if (options.run && options.limit.is_none()) - || !conn - .has_pending_migration(migrations()) - .map_err(convert_err)? - { - #[cfg(test)] - if options.enable_diff_check { - let before = diff_check::get_dump(); - - run_replaceable_schema(conn)?; - revert_replaceable_schema(conn)?; - - let after = diff_check::get_dump(); - - diff_check::check_dump_diff( - [&before, &after], - "The code in crates/diesel_utils/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:", - ); - - diff_check::deferr_constraint_check(&after); - } - - run_replaceable_schema(conn)?; - - Branch::ReplaceableSchemaRebuilt - } else { - Branch::ReplaceableSchemaNotRebuilt - }; - - options.print("Database migrations complete."); - - Ok(output) -} - -fn run_replaceable_schema(conn: &mut PgConnection) -> anyhow::Result<()> { - conn.transaction(|conn| { - conn - .batch_execute(&replaceable_schema()) - .with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; - - let num_rows_updated = update(previously_run_sql::table) - .set(previously_run_sql::content.eq(replaceable_schema())) - .execute(conn)?; + /// this shouldn't be run in the same transaction as the next stuff, since [todo finish + /// explanation] + fn revert_replaceable_schema(&mut self) -> anyhow::Result<()> { + self + .conn + .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;") + .with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; - debug_assert_eq!(num_rows_updated, 1); + // Value in `previously_run_sql` table is not set here because the table might not exist, + // and that's fine because the existence of the `r` schema is also checked Ok(()) - }) -} - -fn revert_replaceable_schema(conn: &mut PgConnection) -> anyhow::Result<()> { - conn - .batch_execute("DROP SCHEMA IF EXISTS r CASCADE;") - .with_context(|| format!("Failed to revert SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; + } - // Value in `previously_run_sql` table is not set here because the table might not exist, - // and that's fine because the existence of the `r` schema is also checked + pub fn run_replaceable_schema(&mut self) -> anyhow::Result<()> { + self.revert_replaceable_schema()?; - Ok(()) -} + self.conn.transaction(|conn| { + conn + .batch_execute(&replaceable_schema()) + .with_context(|| format!("Failed to run SQL files in {REPLACEABLE_SCHEMA_PATH}"))?; -fn run_selected_migrations( - conn: &mut PgConnection, - options: &Options, -) -> diesel::migration::Result<()> { - let mut wrapper = MigrationHarnessWrapper { - conn, - options, - #[cfg(test)] - enable_diff_check: options.enable_diff_check, - }; + let num_rows_updated = update(previously_run_sql::table) + .set(previously_run_sql::content.eq(replaceable_schema())) + .execute(conn)?; - if options.revert { - if let Some(limit) = options.limit { - for _ in 0..limit { - wrapper.revert_last_migration(migrations())?; - } - } else { - wrapper.revert_all_migrations(migrations())?; - } - } + debug_assert_eq!(num_rows_updated, 1); - if options.run { - if let Some(limit) = options.limit { - for _ in 0..limit { - wrapper.run_next_migration(migrations())?; - } - } else { - wrapper.run_pending_migrations(migrations())?; - } + Ok(()) + }) } - - Ok(()) -} - -/// Makes `diesel::migration::Result` work with `anyhow` and `LemmyError` -fn convert_err(e: Box) -> anyhow::Error { - anyhow!(e) } #[cfg(test)] #[allow(clippy::indexing_slicing, clippy::unwrap_used)] mod tests { - use super::{ - Branch::{EarlyReturn, ReplaceableSchemaNotRebuilt, ReplaceableSchemaRebuilt}, - *, - }; + use super::*; + use anyhow::anyhow; use diesel::{ dsl::{not, sql}, sql_types, }; use diesel_ltree::Ltree; - use lemmy_utils::{error::LemmyResult, settings::SETTINGS}; + use lemmy_utils::{error::LemmyErrorExt2, settings::SETTINGS}; use serial_test::serial; // The number of migrations that should be run to set up some test data. // Currently, this includes migrations until @@ -383,74 +203,108 @@ mod tests { #[test] #[serial] - fn test_schema_setup() -> LemmyResult<()> { - let o = Options::default(); - let db_url = SETTINGS.get_database_url(); - let conn = &mut PgConnection::establish(&db_url)?; + // todo: maybe add commends for need_schema_setup asserts + fn test_schema_setup() -> diesel::migration::Result<()> { + let db_url = SETTINGS.get_database_url_with_options().into_anyhow()?; + let mut harness = crate::schema_setup::MigrationHarnessWrapper::new(&db_url)?; // Start with consistent state by dropping everything - conn.batch_execute("DROP OWNED BY CURRENT_USER;")?; + harness.conn.batch_execute("DROP OWNED BY CURRENT_USER;")?; + + assert!(harness.need_schema_setup()?); // Run initial migrations to prepare basic tables - assert_eq!( - run(o.run().limit(INITIAL_MIGRATIONS_COUNT), &db_url)?, - ReplaceableSchemaNotRebuilt - ); + harness.run_pending_migrations_in_range( + MIGRATIONS, + diesel_migrations::Range::NumberOfMigrations(INITIAL_MIGRATIONS_COUNT), + )?; + + assert!(harness.need_schema_setup()?); // Insert the test data - insert_test_data(conn)?; + insert_test_data(&mut harness.conn)?; // Run all migrations, and make sure that changes can be correctly reverted - assert_eq!( - run(o.run().enable_diff_check(), &db_url)?, - ReplaceableSchemaRebuilt + for migration in harness.pending_migrations(MIGRATIONS)? { + let before = diff_check::get_dump(); + + harness.run_migration(&migration)?; + harness.revert_migration(&migration)?; + + let after = diff_check::get_dump(); + + diff_check::check_dump_diff( + [&after, &before], + &format!( + "These changes need to be applied in migrations/{}/down.sql:", + migration.name() + ), + ); + + harness.run_migration(&migration)?; + } + + assert!(harness.need_schema_setup()?); + + // Make sure that replaceable schema can be correctly reverted + let before = diff_check::get_dump(); + + harness.run_replaceable_schema()?; + harness.revert_replaceable_schema()?; + + let after = diff_check::get_dump(); + + diff_check::check_dump_diff( + [&before, &after], + "The code in crates/diesel_utils/replaceable_schema incorrectly created or modified things outside of the `r` schema, causing these changes to be left behind after dropping the schema:", ); + assert!(harness.need_schema_setup()?); + harness.run_replaceable_schema()?; + assert!(!harness.need_schema_setup()?); + // Check the test data we inserted before after running migrations - check_test_data(conn)?; + check_test_data(&mut harness.conn)?; // Check the current schema assert_eq!( - get_foreign_keys_with_missing_indexes(conn)?, + get_foreign_keys_with_missing_indexes(&mut harness.conn)?, Vec::::new(), "each foreign key needs an index so that deleting the referenced row does not scan the whole referencing table" ); + diff_check::deferr_constraint_check(&after); - // Check for early return - assert_eq!(run(o.run(), &db_url)?, EarlyReturn); - - // Test `limit` - assert_eq!( - run(o.revert().limit(1), &db_url)?, - ReplaceableSchemaNotRebuilt - ); - assert_eq!( - conn - .pending_migrations(migrations()) - .map_err(convert_err)? - .len(), - 1 - ); - assert_eq!(run(o.run().limit(1), &db_url)?, ReplaceableSchemaRebuilt); + // Todo: maybe clean up (this used to be for testing the limit option) + harness.revert_last_migration(MIGRATIONS)?; + assert!(harness.need_schema_setup()?); + harness.run_next_migration(MIGRATIONS)?; + harness.run_replaceable_schema()?; + assert!(!harness.need_schema_setup()?); // Get a new connection, workaround for error `cache lookup failed for function 26633` // on `migrations/2025-10-15-114811-0000_merge-modlog-tables/down.sql`. - let conn = &mut PgConnection::establish(&db_url)?; + harness.conn = PgConnection::establish(&db_url)?; - // This should throw an error saying to use lemmy_server instead of diesel CLI - conn.batch_execute("DROP OWNED BY CURRENT_USER;")?; + // This should throw an error saying to use lemmy_server instead of diesel CLI, since + // application_name isn't set to lemmy + harness + .conn + .batch_execute("DROP OWNED BY CURRENT_USER; SET application_name=reddit;")?; assert!(matches!( - conn.run_pending_migrations(migrations()), + harness.run_pending_migrations(MIGRATIONS), Err(e) if e.to_string().contains("lemmy_server") )); // Diesel CLI's way of running migrations shouldn't break the custom migration runner - assert_eq!(run(o.run(), &db_url)?, ReplaceableSchemaRebuilt); + harness.conn.batch_execute("SET application_name=lemmy;")?; + harness.run_pending_migrations(MIGRATIONS)?; + harness.run_replaceable_schema()?; + assert!(!harness.need_schema_setup()?); Ok(()) } - fn insert_test_data(conn: &mut PgConnection) -> LemmyResult<()> { + fn insert_test_data(conn: &mut PgConnection) -> anyhow::Result<()> { // Users conn.batch_execute(&format!( "INSERT INTO user_ (id, name, actor_id, preferred_username, password_encrypted, email, public_key) \ @@ -536,7 +390,7 @@ mod tests { Ok(()) } - fn check_test_data(conn: &mut PgConnection) -> LemmyResult<()> { + fn check_test_data(conn: &mut PgConnection) -> anyhow::Result<()> { use lemmy_db_schema_file::schema::{comment, community, notification, person, post}; // Check users @@ -657,7 +511,7 @@ mod tests { const FOREIGN_KEY: &str = "f"; - fn get_foreign_keys_with_missing_indexes(conn: &mut PgConnection) -> LemmyResult> { + fn get_foreign_keys_with_missing_indexes(conn: &mut PgConnection) -> anyhow::Result> { diesel::table! { pg_constraint (table_oid, name, kind, column_numbers) { #[sql_name = "conrelid"] diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index cbeb9e2c1f..98dc7150d6 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -25,7 +25,7 @@ use lemmy_apub_objects::objects::{community::FETCH_COMMUNITY_COLLECTIONS, instan use lemmy_apub_send::{Opts, SendManager}; use lemmy_db_schema::source::secret::Secret; use lemmy_db_views_site::SiteView; -use lemmy_diesel_utils::connection::build_db_pool; +use lemmy_diesel_utils::{connection::build_db_pool, schema_setup::convert_err}; use lemmy_routes::{ feeds, middleware::{ @@ -144,17 +144,22 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { number, }) = args.subcommand { - let mut options = match subcommand { - MigrationSubcommand::Run => lemmy_diesel_utils::schema_setup::Options::default().run(), - MigrationSubcommand::Revert => lemmy_diesel_utils::schema_setup::Options::default().revert(), + let mut inner_harness = lemmy_diesel_utils::schema_setup::MigrationHarnessWrapper::new( + &SETTINGS.get_database_url_with_options()?, + )?; + let mut harness = TimedHarnessWithOutput::write_to_stdout(&mut inner_harness); + + let range = if all { + diesel_migrations::Range::All + } else { + diesel_migrations::Range::NumberOfMigrations(number) + }; + + match subcommand { + MigrationSubcommand::Run => harness.run_pending_migrations_in_range(MIGRATIONS, range), + MigrationSubcommand::Revert => harness.revert_last_migrations_in_range(MIGRATIONS, range), } - .print_output(); - - if !all { - options = options.limit(number); - } - - lemmy_diesel_utils::schema_setup::run(options, &SETTINGS.get_database_url_with_options()?)?; + .map_err(convert_err)?; #[cfg(debug_assertions)] if all && subcommand == MigrationSubcommand::Run { diff --git a/crates/utils/src/settings/mod.rs b/crates/utils/src/settings/mod.rs index 82c838e983..973b6979b3 100644 --- a/crates/utils/src/settings/mod.rs +++ b/crates/utils/src/settings/mod.rs @@ -10,8 +10,11 @@ pub mod structs; static DEFAULT_CONFIG_FILE: &str = "config/config.hjson"; -/// Some connection options to speed up queries -const CONNECTION_OPTIONS: [&str; 1] = ["geqo_threshold=12"]; +const CONNECTION_OPTIONS: [&str; 2] = [ + "geqo_threshold=12", // speed up queries + "application_name=lemmy", /* disable the trigger that prevents the Diesel CLI from running + * migrations */ +]; #[allow(clippy::expect_used)] pub static SETTINGS: LazyLock = LazyLock::new(|| { diff --git a/migrations/2025-08-01-000017_forbid_diesel_cli/up.sql b/migrations/2025-08-01-000017_forbid_diesel_cli/up.sql index a2f8391566..f99ca6b758 100644 --- a/migrations/2025-08-01-000017_forbid_diesel_cli/up.sql +++ b/migrations/2025-08-01-000017_forbid_diesel_cli/up.sql @@ -16,13 +16,9 @@ CREATE FUNCTION forbid_diesel_cli () LANGUAGE plpgsql AS $$ BEGIN - IF NOT EXISTS ( - SELECT - FROM - pg_locks - WHERE (locktype, pid, objid) = ('advisory', pg_backend_pid(), 0)) THEN -RAISE 'migrations must be managed using lemmy_server instead of diesel CLI'; -END IF; + IF current_setting('application_name', TRUE) IS DISTINCT FROM 'lemmy' THEN + RAISE 'migrations must be managed using lemmy_server instead of diesel CLI'; + END IF; RETURN NULL; END; $$;