Skip to content

Commit 3707844

Browse files
DracyrPer-Victor Persson
andauthored
Fix persistence workers never terminating (#401)
Hey! This PR fixes an issue where memory is never reclaimed and keeps rising, due to persistence workers never shutting down and freeing up the used memory. I'm running y-sweet as a long-running service, instead of via Plane or another way to spin up a process per "room". And while experimenting I also allowed images to be uploaded to the room state, which made the fact that there was something up with the memory usage very visible, and after adding a bit of logging/monitoring I could see that the number of persistence workers never decreased. In order to fix this, this PR adds a link between the GC Worker and the persistence worker. When the GC detects that there's no more references to the document it also notifies the persistence worker to shut down, and when it does the memory is also freed. Co-authored-by: Per-Victor Persson <pepe@netlight.com>
1 parent 3d0b9c2 commit 3707844

2 files changed

Lines changed: 20 additions & 1 deletion

File tree

crates/y-sweet-core/src/sync_kv.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct SyncKv {
1717
key: String,
1818
dirty: AtomicBool,
1919
dirty_callback: Box<dyn Fn() + Send + Sync>,
20+
shutdown: AtomicBool,
2021
}
2122

2223
impl SyncKv {
@@ -44,11 +45,12 @@ impl SyncKv {
4445
key,
4546
dirty: AtomicBool::new(false),
4647
dirty_callback: Box::new(callback),
48+
shutdown: AtomicBool::new(false),
4749
})
4850
}
4951

5052
fn mark_dirty(&self) {
51-
if !self.dirty.load(Ordering::Relaxed) {
53+
if !self.dirty.load(Ordering::Relaxed) && !self.shutdown.load(Ordering::SeqCst) {
5254
self.dirty.store(true, Ordering::Relaxed);
5355
(self.dirty_callback)();
5456
}
@@ -88,6 +90,16 @@ impl SyncKv {
8890
pub fn is_empty(&self) -> bool {
8991
self.data.lock().unwrap().is_empty()
9092
}
93+
94+
pub fn is_shutdown(&self) -> bool {
95+
self.shutdown.load(Ordering::SeqCst)
96+
}
97+
98+
pub fn shutdown(&self) {
99+
self.shutdown.store(true, Ordering::SeqCst);
100+
// Call the callback one last time to wake up the persistence worker
101+
(self.dirty_callback)();
102+
}
91103
}
92104

93105
impl<'d> DocOps<'d> for SyncKv {}

crates/y-sweet/src/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ impl Server {
204204

205205
if checkpoints_without_refs >= 2 {
206206
tracing::info!("GCing doc");
207+
if let Some(doc) = docs.get(&doc_id) {
208+
doc.sync_kv().shutdown();
209+
}
210+
207211
docs.remove(&doc_id);
208212
break;
209213
}
@@ -229,6 +233,9 @@ impl Server {
229233
let is_done = tokio::select! {
230234
v = recv.recv() => v.is_none(),
231235
_ = cancellation_token.cancelled() => true,
236+
_ = tokio::time::sleep(checkpoint_freq) => {
237+
sync_kv.is_shutdown()
238+
}
232239
};
233240

234241
tracing::info!("Received signal. done: {}", is_done);

0 commit comments

Comments
 (0)