Skip to content

Commit 2f12f3d

Browse files
authored
[BUG] Perform read-repair on collections that get written to but have a crash before manifest write. (#5476)
## Description of changes Sequence of events: - write to fragment - write to dirty log - crash writer - compactor picks up dirty log bit - tries to compact endlessly This patch performs read repair. The same loop can happen and require operator intervention when the dirty log gets written but the fragment does not. chroma-purge-dirty-log is the remedy for that. ## Test plan CI ## Migration plan N/A ## Observability plan N/A ## Documentation Changes N/A
1 parent 204379b commit 2f12f3d

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

rust/log-service/src/lib.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ impl LogServer {
599599
async fn _update_collection_log_offset(
600600
&self,
601601
request: Request<UpdateCollectionLogOffsetRequest>,
602+
active: tokio::sync::MutexGuard<'_, ActiveLog>,
602603
allow_rollback: bool,
603604
) -> Result<Response<UpdateCollectionLogOffsetResponse>, Status> {
604605
let request = request.into_inner();
@@ -611,12 +612,35 @@ impl LogServer {
611612
adjusted_log_offset
612613
);
613614
let storage_prefix = collection_id.storage_prefix_for_log();
614-
615-
let log_reader = LogReader::new(
616-
self.config.reader.clone(),
617-
Arc::clone(&self.storage),
618-
storage_prefix.clone(),
619-
);
615+
let key = LogKey { collection_id };
616+
let handle = self.open_logs.get_or_create_state(key);
617+
let mark_dirty = MarkDirty {
618+
collection_id,
619+
dirty_log: Arc::clone(&self.dirty_log),
620+
};
621+
// NOTE(rescrv): We use the writer and fall back to constructing a local reader in order
622+
// to force a read-repair of the collection when things partially fail.
623+
//
624+
// The writer will read the manifest, and try to read the next fragment. This adds
625+
// latency, but improves correctness.
626+
let log = get_log_from_handle_with_mutex_held(
627+
&handle,
628+
active,
629+
&self.config.writer,
630+
&self.storage,
631+
&storage_prefix,
632+
mark_dirty,
633+
)
634+
.await
635+
.map_err(|err| Status::unknown(err.to_string()))?;
636+
637+
let log_reader = log
638+
.reader(self.config.reader.clone())
639+
.unwrap_or(LogReader::new(
640+
self.config.reader.clone(),
641+
Arc::clone(&self.storage),
642+
storage_prefix.clone(),
643+
));
620644

621645
let res = log_reader.next_write_timestamp().await;
622646
if let Err(wal3::Error::UninitializedLog) = res {
@@ -1464,8 +1488,8 @@ impl LogServer {
14641488
// Grab a lock on the state for this key, so that a racing initialize won't do anything.
14651489
let key = LogKey { collection_id };
14661490
let handle = self.open_logs.get_or_create_state(key);
1467-
let mut _active = handle.active.lock().await;
1468-
self._update_collection_log_offset(Request::new(request), false)
1491+
let active = handle.active.lock().await;
1492+
self._update_collection_log_offset(Request::new(request), active, false)
14691493
.await
14701494
}
14711495

@@ -1482,8 +1506,8 @@ impl LogServer {
14821506
// Grab a lock on the state for this key, so that a racing initialize won't do anything.
14831507
let key = LogKey { collection_id };
14841508
let handle = self.open_logs.get_or_create_state(key);
1485-
let mut _active = handle.active.lock().await;
1486-
self._update_collection_log_offset(Request::new(request), true)
1509+
let active = handle.active.lock().await;
1510+
self._update_collection_log_offset(Request::new(request), active, true)
14871511
.await
14881512
}
14891513

0 commit comments

Comments
 (0)