diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index f5bdd5a2e..53519366b 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -2,7 +2,7 @@ use anyhow::{Context as _, Result, anyhow}; use clap::{Parser, Subcommand, ValueEnum}; use docs_rs::{ Config, Context, Index, PackageKind, RustwideBuilder, - db::{self, CrateId, Overrides, ReleaseId, add_path_into_database, types::version::Version}, + db::{self, CrateId, Overrides, add_path_into_database, types::version::Version}, start_background_metrics_webserver, start_web_server, utils::{ ConfigName, daemon::start_background_service_metric_collector, get_config, @@ -532,16 +532,6 @@ enum DatabaseSubcommand { /// Backfill GitHub/GitLab stats for crates. BackfillRepositoryStats, - /// Recompress broken archive index files in storage. - RecompressArchiveIndexes { - #[arg(long)] - min_release_id: Option, - #[arg(long)] - max_release_id: Option, - #[arg(long)] - concurrency: Option, - }, - /// Updates info for a crate from the registry's API UpdateCrateRegistryFields { #[arg(name = "CRATE")] @@ -630,28 +620,6 @@ impl DatabaseSubcommand { db::update_crate_data_in_database(&mut conn, &name, ®istry_data).await })?, - Self::RecompressArchiveIndexes { - min_release_id, - max_release_id, - concurrency, - } => ctx.runtime.block_on(async move { - let mut conn = ctx.pool.get_async().await?; - - let (checked, recompressed) = ctx - .async_storage - .recompress_index_files_in_bucket( - &mut conn, - min_release_id, - max_release_id, - concurrency.map(Into::into), - ) - .await?; - - println!("{} index files checked", checked); - println!("{} index files recompressed", recompressed); - Ok::<_, anyhow::Error>(()) - })?, - Self::AddDirectory { directory } => { ctx.runtime .block_on(add_path_into_database( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a316d63a5..cf29ab682 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,7 +12,7 @@ use self::{ use crate::{ Config, InstanceMetrics, db::{ - BuildId, Pool, ReleaseId, + BuildId, Pool, file::{FileEntry, detect_mime}, mimes, types::version::Version, @@ -21,11 +21,11 @@ use crate::{ metrics::otel::AnyMeterProvider, utils::spawn_blocking, }; -use anyhow::{anyhow, bail}; +use anyhow::anyhow; use chrono::{DateTime, Utc}; use dashmap::DashMap; use fn_error_context::context; -use futures_util::{TryStreamExt as _, stream::BoxStream}; +use futures_util::stream::BoxStream; use mime::Mime; use opentelemetry::metrics::Counter; use path_slash::PathExt; @@ -38,18 +38,14 @@ use std::{ ops::RangeInclusive, path::{Path, PathBuf}, str::FromStr, - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, + sync::Arc, }; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt}, runtime, sync::RwLock, }; -use tracing::{error, info, info_span, instrument, trace, warn}; -use tracing_futures::Instrument as _; +use tracing::{error, info_span, instrument, trace, warn}; use walkdir::WalkDir; const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; @@ -887,139 +883,6 @@ impl AsyncStorage { } Ok(()) } - - /// fix the broken zstd archives in our bucket - /// See https://github.com/rust-lang/docs.rs/pull/2988 - /// returns the number of files recompressed. - /// - /// Doesn't handle the local cache, when the remove files are fixed, - /// I'll just wipe it. - /// - /// We intentionally start with the latest releases, I'll probably first - /// find a release ID to check up to and then let the command run in the - /// background. - /// - /// so we start at release_id_max and go down to release_id_min. - pub async fn recompress_index_files_in_bucket( - &self, - conn: &mut sqlx::PgConnection, - min_release_id: Option, - max_release_id: Option, - concurrency: Option, - ) -> Result<(u64, u64)> { - let recompressed = Arc::new(AtomicU64::new(0)); - let checked = Arc::new(AtomicU64::new(0)); - - let StorageBackend::S3(raw_storage) = &self.backend else { - bail!("only works with S3 backend"); - }; - - sqlx::query!( - r#" - SELECT - r.id, - c.name, - r.version as "version: Version", - r.release_time - FROM - crates AS c - INNER JOIN releases AS r ON r.crate_id = c.id - WHERE - r.archive_storage IS TRUE AND - r.id >= $1 AND - r.id <= $2 - ORDER BY - r.id DESC - "#, - min_release_id.unwrap_or(ReleaseId(0)) as _, - max_release_id.unwrap_or(ReleaseId(i32::MAX)) as _ - ) - .fetch(conn) - .err_into::() - .try_for_each_concurrent(concurrency.unwrap_or_else(num_cpus::get), |row| { - let recompressed = recompressed.clone(); - let checked = checked.clone(); - - let release_span = tracing::info_span!( - "recompress_release", - id=row.id, - name=&row.name, - version=%row.version, - release_time=row.release_time.map(|rt| rt.to_rfc3339()), - ); - - async move { - trace!("handling release"); - - for path in &[ - rustdoc_archive_path(&row.name, &row.version), - source_archive_path(&row.name, &row.version), - ] { - let path = format!("{path}.index"); - trace!(path, "checking path"); - - let compressed_stream = match raw_storage.get_stream(&path, None).await { - Ok(stream) => stream, - Err(err) => { - if matches!(err.downcast_ref(), Some(PathNotFoundError)) { - trace!(path, "path not found, skipping"); - continue; - } - trace!(path, ?err, "error fetching stream"); - return Err(err); - } - }; - - let alg = CompressionAlgorithm::default(); - - if compressed_stream.compression != Some(alg) { - trace!(path, "Archive index not compressed with zstd, skipping"); - continue; - } - - info!(path, "checking archive"); - checked.fetch_add(1, Ordering::Relaxed); - - // download the compressed raw blob first. - // Like this we can first check if it's worth recompressing & re-uploading. - let mut compressed_blob = compressed_stream.materialize(usize::MAX).await?; - - if decompress(compressed_blob.content.as_slice(), alg, usize::MAX).is_ok() { - info!(path, "Archive can be decompressed, skipping"); - continue; - } - - warn!(path, "recompressing archive"); - recompressed.fetch_add(1, Ordering::Relaxed); - - let mut decompressed = Vec::new(); - { - // old async-compression can read the broken zstd stream - let mut reader = - wrap_reader_for_decompression(compressed_blob.content.as_slice(), alg); - - tokio::io::copy(&mut reader, &mut decompressed).await?; - } - - let mut buf = Vec::with_capacity(decompressed.len()); - compress_async(decompressed.as_slice(), &mut buf, alg).await?; - compressed_blob.content = buf; - compressed_blob.compression = Some(alg); - - // `.store_inner` just uploads what it gets, without any compression logic - self.store_inner(vec![compressed_blob.into()]).await?; - } - Ok(()) - } - .instrument(release_span) - }) - .await?; - - Ok(( - checked.load(Ordering::Relaxed), - recompressed.load(Ordering::Relaxed), - )) - } } impl std::fmt::Debug for AsyncStorage {