Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 32 additions & 55 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
metrics::otel::AnyMeterProvider,
utils::spawn_blocking,
};
use anyhow::anyhow;
use axum_extra::headers;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
Expand All @@ -40,13 +39,11 @@ use std::{
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt},
runtime,
sync::Mutex,
time::sleep,
};
use tracing::{debug, error, info_span, instrument, trace, warn};
use walkdir::WalkDir;
Expand Down Expand Up @@ -383,19 +380,29 @@ impl AsyncStorage {
latest_build_id: Option<BuildId>,
path: &str,
) -> Result<bool> {
match self
.find_in_archive_index(archive_path, latest_build_id, path)
.await
{
Ok(file_info) => Ok(file_info.is_some()),
Err(err) => {
if err.downcast_ref::<PathNotFoundError>().is_some() {
Ok(false)
} else {
Err(err)
for attempt in 0..2 {
match self
.find_in_archive_index(archive_path, latest_build_id, path)
.await
{
Ok(file_info) => return Ok(file_info.is_some()),
Err(err) if err.downcast_ref::<PathNotFoundError>().is_some() => {
return Ok(false);
}
Err(err) if attempt == 0 => {
warn!(
?err,
"error fetching range from archive, purging local index cache and retrying once"
);
self.purge_archive_index_cache(archive_path, latest_build_id)
.await?;

continue;
}
Err(err) => return Err(err),
}
}
unreachable!("exists_in_archive retry loop exited unexpectedly");
}

/// get, decompress and materialize an object from store
Expand Down Expand Up @@ -557,54 +564,24 @@ impl AsyncStorage {
}

let lock = self.local_index_cache_lock(&local_index_path);
let write_guard = lock.lock().await;

// At this point we know the index is missing or broken.
// Try to become the "downloader" without queueing as a writer.
if let Ok(write_guard) = lock.try_lock() {
// Double-check: maybe someone fixed it between our first failure and now.
if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await {
return Ok(res);
}

let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");

// We are the repairer: download fresh index into place.
self.download_archive_index(&local_index_path, &remote_index_path)
.await?;

// Write lock is dropped here (end of scope), so others can proceed.
drop(write_guard);

// Final attempt: if this still fails, bubble the error.
return archive_index::find_in_file(local_index_path, path_in_archive).await;
// Double-check: maybe someone fixed it between our first failure and now.
if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await {
return Ok(res);
}

// Someone else is already downloading/repairing. Don't queue on write(); just wait
// a bit and poll the fast path until it becomes readable or we give up.
const STEP_MS: u64 = 10;
const ATTEMPTS: u64 = 50; // = 500ms total wait
const TOTAL_WAIT_MS: u64 = STEP_MS * ATTEMPTS;
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");

let mut last_err = None;

for _ in 0..ATTEMPTS {
sleep(Duration::from_millis(STEP_MS)).await;
// We are the repairer: download fresh index into place.
self.download_archive_index(&local_index_path, &remote_index_path)
.await?;

match archive_index::find_in_file(local_index_path.clone(), path_in_archive).await {
Ok(res) => return Ok(res),
Err(err) => {
// keep waiting; repair may still be in progress
last_err = Some(err);
}
}
}
// Write lock is dropped here (end of scope), so others can proceed.
drop(write_guard);

// Still not usable after waiting: return the last error we saw.
Err(last_err
.unwrap_or_else(|| anyhow!("archive index unavailable after repair wait"))
.context(format!(
"no archive index after waiting for {TOTAL_WAIT_MS}ms"
)))
// Final attempt: if this still fails, bubble the error.
archive_index::find_in_file(local_index_path, path_in_archive).await
}

#[instrument]
Expand Down
Loading