Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context as _, Result, anyhow};
use clap::{Parser, Subcommand, ValueEnum};
use docs_rs::{
Config, Context, Index, PackageKind, RustwideBuilder,
db::{self, CrateId, Overrides, ReleaseId, add_path_into_database, types::version::Version},
db::{self, CrateId, Overrides, add_path_into_database, types::version::Version},
start_background_metrics_webserver, start_web_server,
utils::{
ConfigName, daemon::start_background_service_metric_collector, get_config,
Expand Down Expand Up @@ -532,16 +532,6 @@ enum DatabaseSubcommand {
/// Backfill GitHub/GitLab stats for crates.
BackfillRepositoryStats,

/// Recompress broken archive index files in storage.
RecompressArchiveIndexes {
#[arg(long)]
min_release_id: Option<ReleaseId>,
#[arg(long)]
max_release_id: Option<ReleaseId>,
#[arg(long)]
concurrency: Option<u8>,
},

/// Updates info for a crate from the registry's API
UpdateCrateRegistryFields {
#[arg(name = "CRATE")]
Expand Down Expand Up @@ -630,28 +620,6 @@ impl DatabaseSubcommand {
db::update_crate_data_in_database(&mut conn, &name, &registry_data).await
})?,

Self::RecompressArchiveIndexes {
min_release_id,
max_release_id,
concurrency,
} => ctx.runtime.block_on(async move {
let mut conn = ctx.pool.get_async().await?;

let (checked, recompressed) = ctx
.async_storage
.recompress_index_files_in_bucket(
&mut conn,
min_release_id,
max_release_id,
concurrency.map(Into::into),
)
.await?;

println!("{} index files checked", checked);
println!("{} index files recompressed", recompressed);
Ok::<_, anyhow::Error>(())
})?,

Self::AddDirectory { directory } => {
ctx.runtime
.block_on(add_path_into_database(
Expand Down
147 changes: 5 additions & 142 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use self::{
use crate::{
Config, InstanceMetrics,
db::{
BuildId, Pool, ReleaseId,
BuildId, Pool,
file::{FileEntry, detect_mime},
mimes,
types::version::Version,
Expand All @@ -21,11 +21,11 @@ use crate::{
metrics::otel::AnyMeterProvider,
utils::spawn_blocking,
};
use anyhow::{anyhow, bail};
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use fn_error_context::context;
use futures_util::{TryStreamExt as _, stream::BoxStream};
use futures_util::stream::BoxStream;
use mime::Mime;
use opentelemetry::metrics::Counter;
use path_slash::PathExt;
Expand All @@ -38,18 +38,14 @@ use std::{
ops::RangeInclusive,
path::{Path, PathBuf},
str::FromStr,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
sync::Arc,
};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt},
runtime,
sync::RwLock,
};
use tracing::{error, info, info_span, instrument, trace, warn};
use tracing_futures::Instrument as _;
use tracing::{error, info_span, instrument, trace, warn};
use walkdir::WalkDir;

const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
Expand Down Expand Up @@ -887,139 +883,6 @@ impl AsyncStorage {
}
Ok(())
}

/// fix the broken zstd archives in our bucket
/// See https://github.com/rust-lang/docs.rs/pull/2988
/// returns the number of files recompressed.
///
/// Doesn't handle the local cache, when the remove files are fixed,
/// I'll just wipe it.
///
/// We intentionally start with the latest releases, I'll probably first
/// find a release ID to check up to and then let the command run in the
/// background.
///
/// so we start at release_id_max and go down to release_id_min.
pub async fn recompress_index_files_in_bucket(
&self,
conn: &mut sqlx::PgConnection,
min_release_id: Option<ReleaseId>,
max_release_id: Option<ReleaseId>,
concurrency: Option<usize>,
) -> Result<(u64, u64)> {
let recompressed = Arc::new(AtomicU64::new(0));
let checked = Arc::new(AtomicU64::new(0));

let StorageBackend::S3(raw_storage) = &self.backend else {
bail!("only works with S3 backend");
};

sqlx::query!(
r#"
SELECT
r.id,
c.name,
r.version as "version: Version",
r.release_time
FROM
crates AS c
INNER JOIN releases AS r ON r.crate_id = c.id
WHERE
r.archive_storage IS TRUE AND
r.id >= $1 AND
r.id <= $2
ORDER BY
r.id DESC
"#,
min_release_id.unwrap_or(ReleaseId(0)) as _,
max_release_id.unwrap_or(ReleaseId(i32::MAX)) as _
)
.fetch(conn)
.err_into::<anyhow::Error>()
.try_for_each_concurrent(concurrency.unwrap_or_else(num_cpus::get), |row| {
let recompressed = recompressed.clone();
let checked = checked.clone();

let release_span = tracing::info_span!(
"recompress_release",
id=row.id,
name=&row.name,
version=%row.version,
release_time=row.release_time.map(|rt| rt.to_rfc3339()),
);

async move {
trace!("handling release");

for path in &[
rustdoc_archive_path(&row.name, &row.version),
source_archive_path(&row.name, &row.version),
] {
let path = format!("{path}.index");
trace!(path, "checking path");

let compressed_stream = match raw_storage.get_stream(&path, None).await {
Ok(stream) => stream,
Err(err) => {
if matches!(err.downcast_ref(), Some(PathNotFoundError)) {
trace!(path, "path not found, skipping");
continue;
}
trace!(path, ?err, "error fetching stream");
return Err(err);
}
};

let alg = CompressionAlgorithm::default();

if compressed_stream.compression != Some(alg) {
trace!(path, "Archive index not compressed with zstd, skipping");
continue;
}

info!(path, "checking archive");
checked.fetch_add(1, Ordering::Relaxed);

// download the compressed raw blob first.
// Like this we can first check if it's worth recompressing & re-uploading.
let mut compressed_blob = compressed_stream.materialize(usize::MAX).await?;

if decompress(compressed_blob.content.as_slice(), alg, usize::MAX).is_ok() {
info!(path, "Archive can be decompressed, skipping");
continue;
}

warn!(path, "recompressing archive");
recompressed.fetch_add(1, Ordering::Relaxed);

let mut decompressed = Vec::new();
{
// old async-compression can read the broken zstd stream
let mut reader =
wrap_reader_for_decompression(compressed_blob.content.as_slice(), alg);

tokio::io::copy(&mut reader, &mut decompressed).await?;
}

let mut buf = Vec::with_capacity(decompressed.len());
compress_async(decompressed.as_slice(), &mut buf, alg).await?;
compressed_blob.content = buf;
compressed_blob.compression = Some(alg);

// `.store_inner` just uploads what it gets, without any compression logic
self.store_inner(vec![compressed_blob.into()]).await?;
}
Ok(())
}
.instrument(release_span)
})
.await?;

Ok((
checked.load(Ordering::Relaxed),
recompressed.load(Ordering::Relaxed),
))
}
}

impl std::fmt::Debug for AsyncStorage {
Expand Down
Loading