From 90972d7433d0953163138f70b9410a040f404c36 Mon Sep 17 00:00:00 2001 From: Per-Victor Persson Date: Fri, 28 Mar 2025 17:02:24 +0100 Subject: [PATCH] Fix persistence workers never terminating due to never receiving a shutdown signal --- crates/y-sweet-core/src/sync_kv.rs | 14 +++++++++++++- crates/y-sweet/src/server.rs | 7 +++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/crates/y-sweet-core/src/sync_kv.rs b/crates/y-sweet-core/src/sync_kv.rs index b91d43e17..7fd048473 100644 --- a/crates/y-sweet-core/src/sync_kv.rs +++ b/crates/y-sweet-core/src/sync_kv.rs @@ -17,6 +17,7 @@ pub struct SyncKv { key: String, dirty: AtomicBool, dirty_callback: Box, + shutdown: AtomicBool, } impl SyncKv { @@ -44,11 +45,12 @@ impl SyncKv { key, dirty: AtomicBool::new(false), dirty_callback: Box::new(callback), + shutdown: AtomicBool::new(false), }) } fn mark_dirty(&self) { - if !self.dirty.load(Ordering::Relaxed) { + if !self.dirty.load(Ordering::Relaxed) && !self.shutdown.load(Ordering::SeqCst) { self.dirty.store(true, Ordering::Relaxed); (self.dirty_callback)(); } @@ -88,6 +90,16 @@ impl SyncKv { pub fn is_empty(&self) -> bool { self.data.lock().unwrap().is_empty() } + + pub fn is_shutdown(&self) -> bool { + self.shutdown.load(Ordering::SeqCst) + } + + pub fn shutdown(&self) { + self.shutdown.store(true, Ordering::SeqCst); + // Call the callback one last time to wake up the persistence worker + (self.dirty_callback)(); + } } impl<'d> DocOps<'d> for SyncKv {} diff --git a/crates/y-sweet/src/server.rs b/crates/y-sweet/src/server.rs index d3a50d7e8..657d14cc1 100644 --- a/crates/y-sweet/src/server.rs +++ b/crates/y-sweet/src/server.rs @@ -204,6 +204,10 @@ impl Server { if checkpoints_without_refs >= 2 { tracing::info!("GCing doc"); + if let Some(doc) = docs.get(&doc_id) { + doc.sync_kv().shutdown(); + } + docs.remove(&doc_id); break; } @@ -229,6 +233,9 @@ impl Server { let is_done = tokio::select! { v = recv.recv() => v.is_none(), _ = cancellation_token.cancelled() => true, + _ = tokio::time::sleep(checkpoint_freq) => { + sync_kv.is_shutdown() + } }; tracing::info!("Received signal. done: {}", is_done);