Skip to content

Commit 90972d7

Browse files
author
Per-Victor Persson
committed
Fix persistence workers never terminating due to never receiving a shutdown signal
1 parent 3d0b9c2 commit 90972d7

2 files changed

Lines changed: 20 additions & 1 deletion

File tree

crates/y-sweet-core/src/sync_kv.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct SyncKv {
1717
key: String,
1818
dirty: AtomicBool,
1919
dirty_callback: Box<dyn Fn() + Send + Sync>,
20+
shutdown: AtomicBool,
2021
}
2122

2223
impl SyncKv {
@@ -44,11 +45,12 @@ impl SyncKv {
4445
key,
4546
dirty: AtomicBool::new(false),
4647
dirty_callback: Box::new(callback),
48+
shutdown: AtomicBool::new(false),
4749
})
4850
}
4951

5052
fn mark_dirty(&self) {
51-
if !self.dirty.load(Ordering::Relaxed) {
53+
if !self.dirty.load(Ordering::Relaxed) && !self.shutdown.load(Ordering::SeqCst) {
5254
self.dirty.store(true, Ordering::Relaxed);
5355
(self.dirty_callback)();
5456
}
@@ -88,6 +90,16 @@ impl SyncKv {
8890
pub fn is_empty(&self) -> bool {
8991
self.data.lock().unwrap().is_empty()
9092
}
93+
94+
pub fn is_shutdown(&self) -> bool {
95+
self.shutdown.load(Ordering::SeqCst)
96+
}
97+
98+
pub fn shutdown(&self) {
99+
self.shutdown.store(true, Ordering::SeqCst);
100+
// Call the callback one last time to wake up the persistence worker
101+
(self.dirty_callback)();
102+
}
91103
}
92104

93105
impl<'d> DocOps<'d> for SyncKv {}

crates/y-sweet/src/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ impl Server {
204204

205205
if checkpoints_without_refs >= 2 {
206206
tracing::info!("GCing doc");
207+
if let Some(doc) = docs.get(&doc_id) {
208+
doc.sync_kv().shutdown();
209+
}
210+
207211
docs.remove(&doc_id);
208212
break;
209213
}
@@ -229,6 +233,9 @@ impl Server {
229233
let is_done = tokio::select! {
230234
v = recv.recv() => v.is_none(),
231235
_ = cancellation_token.cancelled() => true,
236+
_ = tokio::time::sleep(checkpoint_freq) => {
237+
sync_kv.is_shutdown()
238+
}
232239
};
233240

234241
tracing::info!("Received signal. done: {}", is_done);

0 commit comments

Comments
 (0)