Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 89 additions & 56 deletions crates/lib/docs_rs_storage/src/archive_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
/// dummy size we assume in case of errors
const DUMMY_FILE_SIZE: u64 = 1024 * 1024; // 1 MiB
/// self-repair attempts
const FIND_ATTEMPTS: usize = 5;
const REPAIR_ATTEMPTS: usize = 5;

#[derive(Debug)]
struct Metrics {
Expand All @@ -54,7 +54,7 @@ struct Metrics {
evicted_entry_size: Histogram<u64>,

// local cache misses / downloads & bytes
// includes & doesn't differentiate retries / repairs for now
// includes retries / repairs
downloads: Counter<u64>,
downloaded_bytes: Counter<u64>,
downloaded_entry_size: Histogram<u64>,
Expand Down Expand Up @@ -207,7 +207,7 @@ impl Cache {
/// create a new archive index cache.
///
/// Also starts a background task that will backfill the in-memory cache management based
/// on the local files that are already.
/// on the local files that are already present.
pub(crate) async fn new(
config: Arc<ArchiveIndexCacheConfig>,
meter_provider: &AnyMeterProvider,
Expand Down Expand Up @@ -259,11 +259,9 @@ impl Cache {
// the specified duration past from get or insert.
// We don't set TTL (time to live), which would be just time-after-insert.
.time_to_idle(config.ttl)
// we weigh each cache entry by the file size of the sqlite database.
// The max size of the cache for all of docs.rs is 500 GiB at the time of writing.
// In KiB, this would be around 500k, which makes KiB the right unit.
// Anything bigger (like MiB) would mean that we count smaller dbs than 1 MiB as if
// they were 1 MiB big.
// We weigh each cache entry by the file size of the SQLite database.
// The configured capacity is in MiB, but using KiB as moka's weight unit
// avoids counting every index smaller than 1 MiB as if it were 1 MiB.
.weigher(|_key: &PathBuf, entry: &Arc<Entry>| -> u32 { entry.file_size_kib })
// max capacity
// not entries, but _weighted entries_.
Expand All @@ -275,7 +273,7 @@ impl Cache {
let path = path.to_path_buf();
let metrics = metrics_for_eviction.clone();
// The spawned task means file deletion is deferred. See the
// "benign race with the eviction listener" comment in `find_inner`
// "benign race with the eviction listener" comment in `find_index_inner`
// for why this is acceptable.
tokio::spawn(async move {
let reason = format!("{reason:?}");
Expand Down Expand Up @@ -356,8 +354,8 @@ impl Cache {
///
/// Should be needed only once after server startup.
///
/// While this is running, our `find_inner` & `download_archive_index` logic will just
/// fill it itself.
/// While this is running, `find_index_inner` and `download_archive_index` backfill
/// entries on demand for requested indexes.
///
/// Concurrency is set to a lower value intentionally so we don't put
/// too much i/o pressure onto the disk.
Expand Down Expand Up @@ -438,11 +436,53 @@ impl Cache {
Ok(())
}

pub(crate) async fn find_index(
async fn retry_with_purge<T, F, Fut>(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
downloader: &impl Downloader,
mut action: F,
) -> Result<(T, usize)>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T>>,
{
for attempt in 1..=REPAIR_ATTEMPTS {
match action().await {
Ok(value) => return Ok((value, attempt)),
Err(err) if attempt < REPAIR_ATTEMPTS => {
warn!(
?err,
%attempt,
"archive index operation failed, purging local cache and retrying"
);
self.purge(archive_path, latest_build_id).await?;
}
Err(err) => return Err(err),
}
}

unreachable!("archive index retry loop exited unexpectedly");
}

pub(crate) async fn find_index<D: Downloader + Sync>(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
downloader: &D,
) -> Result<Index> {
let (index, _) = self
.retry_with_purge(archive_path, latest_build_id, || {
self.find_index_inner(archive_path, latest_build_id, downloader)
})
.await?;
Ok(index)
}

async fn find_index_inner<D: Downloader + Sync>(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
downloader: &D,
) -> Result<Index> {
let local_index_path = self.local_index_path(archive_path, latest_build_id);

Expand Down Expand Up @@ -539,63 +579,53 @@ impl Cache {
}
})?;

// Final attempt: if this still fails, bubble the error.
// Final open for this retry attempt. If it fails, the caller's retry loop
// purges the cache entry and tries again.
Index::open(local_index_path).await
}

/// Find the file metadata needed to fetch a certain path inside a remote archive.
/// Will try to use a local cache of the index file, and otherwise download it
/// from storage.
#[instrument(skip(self, downloader))]
pub(crate) async fn find(
pub(crate) async fn find<D: Downloader + Sync>(
&self,
archive_path: &str,
latest_build_id: Option<BuildId>,
path_in_archive: &str,
downloader: &impl Downloader,
downloader: &D,
) -> Result<Option<FileInfo>> {
for attempt in 1..=FIND_ATTEMPTS {
let result = async {
let result = self
.retry_with_purge(archive_path, latest_build_id, || async {
let mut index = self
.find_index(archive_path, latest_build_id, downloader)
.find_index_inner(archive_path, latest_build_id, downloader)
.await?;
index.find(path_in_archive).await
}
})
.await;

match result {
Ok(file_info) => {
self.metrics.find_calls.add(
1,
&[
KeyValue::new("attempt", attempt.to_string()),
KeyValue::new("outcome", "success"),
],
);
return Ok(file_info);
}
Err(err) if attempt < FIND_ATTEMPTS => {
warn!(
?err,
%attempt,
"error in archive index lookup, purging local cache and retrying"
);
self.purge(archive_path, latest_build_id).await?;
}
Err(err) => {
self.metrics.find_calls.add(
1,
&[
KeyValue::new("attempt", attempt.to_string()),
KeyValue::new("outcome", "error"),
],
);
return Err(err);
}
match result {
Ok((file_info, attempt)) => {
self.metrics.find_calls.add(
1,
&[
KeyValue::new("attempt", attempt.to_string()),
KeyValue::new("outcome", "success"),
],
);
return Ok(file_info);
}
Err(err) => {
self.metrics.find_calls.add(
1,
&[
KeyValue::new("attempt", REPAIR_ATTEMPTS.to_string()),
KeyValue::new("outcome", "error"),
],
);
return Err(err);
}
}

unreachable!("find retry loop exited unexpectedly");
}

#[instrument(skip(self, downloader))]
Expand Down Expand Up @@ -678,7 +708,7 @@ async fn sqlite_create<P: AsRef<Path>>(path: P) -> Result<sqlx::SqliteConnection
.map_err(Into::into)
}

/// open existing SQLite database, return a configured connection poll
/// open existing SQLite database, return a configured connection pool
/// to connect to the DB.
/// Will error when the database doesn't exist at that path.
async fn sqlite_open<P: AsRef<Path>>(path: P) -> Result<sqlx::SqliteConnection> {
Expand Down Expand Up @@ -1341,7 +1371,7 @@ mod tests {
}

#[tokio::test]
async fn find_retries_once_then_errors() -> Result<()> {
async fn find_retries_then_errors() -> Result<()> {
let cache = test_cache().await?;
const LATEST_BUILD_ID: Option<BuildId> = Some(BuildId(7));
const ARCHIVE_NAME: &str = "test.zip";
Expand All @@ -1368,7 +1398,10 @@ mod tests {
.message(),
"file is not a database"
);
assert_eq!(downloader.download_count(&remote_index_path), FIND_ATTEMPTS);
assert_eq!(
downloader.download_count(&remote_index_path),
REPAIR_ATTEMPTS
);

Ok(())
}
Expand All @@ -1388,14 +1421,14 @@ mod tests {
let downloader = FlakyDownloader::new(
remote_index_path,
create_index_bytes(1).await?,
FIND_ATTEMPTS - 1,
REPAIR_ATTEMPTS - 1,
);

let result = cache
.find(ARCHIVE_NAME, LATEST_BUILD_ID, FILE_IN_ARCHIVE, &downloader)
.await?;
assert!(result.is_some());
assert_eq!(downloader.fetch_count(), FIND_ATTEMPTS);
assert_eq!(downloader.fetch_count(), REPAIR_ATTEMPTS);

Ok(())
}
Expand Down
Loading