From 429f9b2feb28a4537e53cc84ff5adc214236fba4 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 5 Dec 2025 19:48:13 +0100 Subject: [PATCH 1/2] also retry index download on exists-in-archive, change locking --- src/storage/mod.rs | 87 +++++++++++++++++----------------------------- 1 file changed, 32 insertions(+), 55 deletions(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 264e07b1b..32c74d461 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -21,7 +21,6 @@ use crate::{ metrics::otel::AnyMeterProvider, utils::spawn_blocking, }; -use anyhow::anyhow; use axum_extra::headers; use chrono::{DateTime, Utc}; use dashmap::DashMap; @@ -40,13 +39,11 @@ use std::{ path::{Path, PathBuf}, str::FromStr, sync::Arc, - time::Duration, }; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt}, runtime, sync::Mutex, - time::sleep, }; use tracing::{debug, error, info_span, instrument, trace, warn}; use walkdir::WalkDir; @@ -383,19 +380,29 @@ impl AsyncStorage { latest_build_id: Option, path: &str, ) -> Result { - match self - .find_in_archive_index(archive_path, latest_build_id, path) - .await - { - Ok(file_info) => Ok(file_info.is_some()), - Err(err) => { - if err.downcast_ref::().is_some() { - Ok(false) - } else { - Err(err) + for attempt in 0..2 { + match self + .find_in_archive_index(archive_path, latest_build_id, path) + .await + { + Ok(file_info) => return Ok(file_info.is_some()), + Err(err) if err.downcast_ref::().is_some() => { + return Ok(false); + } + Err(err) if attempt == 0 => { + warn!( + ?err, + "error fetching range from archive, purging local index cache and retrying once" + ); + self.purge_archive_index_cache(archive_path, latest_build_id) + .await?; + + continue; } + Err(err) => return Err(err), } } + unreachable!("exists_in_archive retry loop exited unexpectedly"); } /// get, decompress and materialize an object from store @@ -557,54 +564,24 @@ impl AsyncStorage { } let lock = self.local_index_cache_lock(&local_index_path); + let write_guard = lock.lock().await; - // At this point we know the index is missing or broken. - // Try to become the "downloader" without queueing as a writer. - if let Ok(write_guard) = lock.try_lock() { - // Double-check: maybe someone fixed it between our first failure and now. - if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await { - return Ok(res); - } - - let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - - // We are the repairer: download fresh index into place. - self.download_archive_index(&local_index_path, &remote_index_path) - .await?; - - // Write lock is dropped here (end of scope), so others can proceed. - drop(write_guard); - - // Final attempt: if this still fails, bubble the error. - return archive_index::find_in_file(local_index_path, path_in_archive).await; + // Double-check: maybe someone fixed it between our first failure and now. + if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await { + return Ok(res); } - // Someone else is already downloading/repairing. Don't queue on write(); just wait - // a bit and poll the fast path until it becomes readable or we give up. - const STEP_MS: u64 = 10; - const ATTEMPTS: u64 = 50; // = 500ms total wait - const TOTAL_WAIT_MS: u64 = STEP_MS * ATTEMPTS; + let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); - let mut last_err = None; - - for _ in 0..ATTEMPTS { - sleep(Duration::from_millis(STEP_MS)).await; + // We are the repairer: download fresh index into place. + self.download_archive_index(&local_index_path, &remote_index_path) + .await?; - match archive_index::find_in_file(local_index_path.clone(), path_in_archive).await { - Ok(res) => return Ok(res), - Err(err) => { - // keep waiting; repair may still be in progress - last_err = Some(err); - } - } - } + // Write lock is dropped here (end of scope), so others can proceed. + drop(write_guard); - // Still not usable after waiting: return the last error we saw. - Err(last_err - .unwrap_or_else(|| anyhow!("archive index unavailable after repair wait")) - .context(format!( - "no archive index after waiting for {TOTAL_WAIT_MS}ms" - ))) + // Final attempt: if this still fails, bubble the error. + return archive_index::find_in_file(local_index_path, path_in_archive).await; } #[instrument] From 9e2539525f7a5ab10f19024598029256cf176566 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 7 Dec 2025 09:51:14 +0100 Subject: [PATCH 2/2] remove unnecessary `return` --- src/storage/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 32c74d461..bc515475a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -581,7 +581,7 @@ impl AsyncStorage { drop(write_guard); // Final attempt: if this still fails, bubble the error. - return archive_index::find_in_file(local_index_path, path_in_archive).await; + archive_index::find_in_file(local_index_path, path_in_archive).await } #[instrument]