Skip to content

Commit 7fca3e6

Browse files
committed
feat: truncate after
1 parent 0621529 commit 7fca3e6

File tree

1 file changed

+107
-7
lines changed

1 file changed

+107
-7
lines changed

duva/src/adapters/op_logs/disk_based.rs

Lines changed: 107 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,42 @@ impl Segment {
142142
.ok()
143143
.map(|found_index| self.lookups[found_index].byte_offset)
144144
}
145+
146+
fn truncate(&mut self, log_index: u64) -> Result<()> {
147+
// Finds the position of the last entry to keep, which is the entry with the given log_index or the one just before it.
148+
let Some(last_entry_pos) =
149+
self.lookups.iter().rposition(|lookup| lookup.log_index <= log_index)
150+
else {
151+
// No entry with log_index <= specified one, so truncate the entire segment.
152+
self.lookups.clear();
153+
self.end_index = self.start_index;
154+
self.size = 0;
155+
self.writer.get_ref().set_len(0)?;
156+
self.writer.seek(SeekFrom::Start(0))?;
157+
return Ok(());
158+
};
159+
160+
// Determine the new size of the file. It's the offset of the entry *after* the one we're keeping.
161+
let new_size = if let Some(next_lookup) = self.lookups.get(last_entry_pos + 1) {
162+
next_lookup.byte_offset
163+
} else {
164+
// We are keeping all entries up to the end of the segment.
165+
self.size
166+
};
167+
168+
// Truncate lookups vector to keep only the entries up to last_entry_pos.
169+
self.lookups.truncate(last_entry_pos + 1);
170+
171+
// Update metadata.
172+
self.end_index = self.lookups.last().map_or(self.start_index, |l| l.log_index);
173+
self.size = new_size;
174+
175+
// Physically truncate the file.
176+
self.writer.get_ref().set_len(new_size as u64)?;
177+
self.writer.seek(SeekFrom::Start(new_size as u64))?;
178+
179+
Ok(())
180+
}
145181
}
146182

147183
impl FileOpLogs {
@@ -392,14 +428,49 @@ impl TWriteAheadLog for FileOpLogs {
392428
}
393429

394430
fn truncate_after(&mut self, log_index: u64) {
395-
// Remove segments after the truncation point
396-
self.segments.retain(|segment| segment.end_index <= log_index);
397-
398-
// If active segment needs truncation
431+
// Discard any sealed segments that are entirely after the truncation point.
432+
self.segments.retain(|segment| {
433+
if segment.start_index > log_index {
434+
let _ = std::fs::remove_file(&segment.path);
435+
false
436+
} else {
437+
true
438+
}
439+
});
440+
441+
// Check if the last sealed segment needs to be truncated and promoted.
442+
if let Some(last_segment) = self.segments.last() {
443+
if last_segment.end_index > log_index {
444+
let mut new_active = self.segments.pop().unwrap();
445+
if new_active.truncate(log_index).is_ok() {
446+
// This truncated segment becomes the new active one.
447+
// The old active segment is now obsolete.
448+
let _ = std::fs::remove_file(&self.active_segment.path);
449+
self.active_segment = new_active;
450+
}
451+
// After promoting a sealed segment, the job is done.
452+
return;
453+
}
454+
}
399455

400-
if self.active_segment.start_index <= log_index && self.active_segment.end_index > log_index
401-
{
402-
// TODO: Implement truncation of active segment
456+
// If no sealed segment was promoted, the active segment might need truncation.
457+
if self.active_segment.end_index > log_index {
458+
if self.active_segment.start_index > log_index {
459+
// The active segment is entirely after the truncation point.
460+
// It should be replaced with a new, empty segment.
461+
let _ = std::fs::remove_file(&self.active_segment.path);
462+
let new_segment_idx = self.segments.len();
463+
let new_path = self.path.join(format!("segment_{}.oplog", new_segment_idx));
464+
let mut new_active = Segment::new(new_path).expect("Failed to create new segment");
465+
// The start_index of the new segment should be consistent.
466+
// If the previous segment (now the last in self.segments) exists,
467+
// its end_index + 1 should be the start of the new active segment.
468+
new_active.start_index = self.segments.last().map_or(0, |s| s.end_index + 1);
469+
self.active_segment = new_active;
470+
} else {
471+
// The truncation point is within the active segment.
472+
let _ = self.active_segment.truncate(log_index);
473+
}
403474
}
404475
}
405476
}
@@ -1008,4 +1079,33 @@ mod tests {
10081079

10091080
Ok(())
10101081
}
1082+
1083+
#[test]
1084+
fn test_truncate_after() -> Result<()> {
1085+
let dir = TempDir::new()?;
1086+
let path = dir.path();
1087+
let mut op_logs = FileOpLogs::new(path)?;
1088+
1089+
let ops = create_ops(0, 20, 1); // ops 0-19
1090+
op_logs.write_many(ops)?;
1091+
op_logs.rotate_segment()?;
1092+
let ops2 = create_ops(20, 20, 1); // ops 20-39
1093+
op_logs.write_many(ops2)?;
1094+
1095+
// Truncate in the middle of the active segment
1096+
op_logs.truncate_after(25);
1097+
assert_eq!(op_logs.read_at(25).unwrap().log_index, 25);
1098+
assert!(op_logs.read_at(26).is_none());
1099+
assert_eq!(op_logs.active_segment.end_index, 25);
1100+
1101+
// Truncate in the sealed segment
1102+
op_logs.truncate_after(10);
1103+
assert_eq!(op_logs.read_at(10).unwrap().log_index, 10);
1104+
assert!(op_logs.read_at(11).is_none());
1105+
assert!(op_logs.read_at(20).is_none());
1106+
assert_eq!(op_logs.active_segment.end_index, 10);
1107+
assert_eq!(op_logs.segments.len(), 0);
1108+
1109+
Ok(())
1110+
}
10111111
}

0 commit comments

Comments
 (0)