diff --git a/src/storage/database.rs b/src/storage/database.rs index 5c3eb06a7..4c29d1715 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,4 +1,4 @@ -use super::{Blob, FileRange, StorageMetrics, StreamingBlob}; +use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob}; use crate::{InstanceMetrics, db::Pool, error::Result}; use chrono::{DateTime, Utc}; use futures_util::stream::{Stream, TryStreamExt}; @@ -136,7 +136,7 @@ impl DatabaseBackend { }) } - pub(super) async fn store_batch(&self, batch: Vec) -> Result<()> { + pub(super) async fn store_batch(&self, batch: Vec) -> Result<()> { let mut conn = self.pool.get_async().await?; let mut trans = conn.begin().await?; for blob in batch { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 56eb7eb28..a316d63a5 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -60,6 +60,26 @@ type FileRange = RangeInclusive; #[error("path not found")] pub(crate) struct PathNotFoundError; +/// represents a blob to be uploaded to storage. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct BlobUpload { + pub(crate) path: String, + pub(crate) mime: Mime, + pub(crate) content: Vec, + pub(crate) compression: Option, +} + +impl From for BlobUpload { + fn from(value: Blob) -> Self { + Self { + path: value.path, + mime: value.mime, + content: value.content, + compression: value.compression, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub(crate) struct Blob { pub(crate) path: String, @@ -683,19 +703,17 @@ impl AsyncStorage { }; self.store_inner(vec![ - Blob { + BlobUpload { path: archive_path.to_string(), mime: mimes::APPLICATION_ZIP.clone(), content: zip_content, compression: None, - date_updated: Utc::now(), }, - Blob { + BlobUpload { path: remote_index_path, mime: mime::APPLICATION_OCTET_STREAM, content: compressed_index_content, compression: Some(alg), - date_updated: Utc::now(), }, ]) .await?; @@ -717,7 +735,7 @@ impl AsyncStorage { let root_dir = root_dir.to_owned(); move || { let mut file_paths = Vec::new(); - let mut blobs: Vec = Vec::new(); + let mut blobs: Vec = Vec::new(); for file_path in get_file_list(&root_dir) { let file_path = file_path?; @@ -740,13 +758,12 @@ impl AsyncStorage { let mime = file_info.mime(); file_paths.push(file_info); - blobs.push(Blob { + blobs.push(BlobUpload { path: bucket_path, mime, content, compression: Some(alg), // this field is ignored by the backend - date_updated: Utc::now(), }); } Ok((blobs, file_paths)) @@ -759,7 +776,7 @@ impl AsyncStorage { } #[cfg(test)] - pub(crate) async fn store_blobs(&self, blobs: Vec) -> Result<()> { + pub(crate) async fn store_blobs(&self, blobs: Vec) -> Result<()> { self.store_inner(blobs).await } @@ -775,13 +792,11 @@ impl AsyncStorage { let content = content.into(); let mime = detect_mime(&path).to_owned(); - self.store_inner(vec![Blob { + self.store_inner(vec![BlobUpload { path, mime, content, compression: None, - // this field is ignored by the backend - date_updated: Utc::now(), }]) .await?; @@ -802,13 +817,11 @@ impl AsyncStorage { let content = compress(&*content, alg)?; let mime = detect_mime(&path).to_owned(); - self.store_inner(vec![Blob { + self.store_inner(vec![BlobUpload { path, mime, content, compression: Some(alg), - // this field is ignored by the backend - date_updated: Utc::now(), }]) .await?; @@ -829,20 +842,18 @@ impl AsyncStorage { let mime = detect_mime(&target_path).to_owned(); - self.store_inner(vec![Blob { + self.store_inner(vec![BlobUpload { path: target_path, mime, content, compression: Some(alg), - // this field is ignored by the backend - date_updated: Utc::now(), }]) .await?; Ok(alg) } - async fn store_inner(&self, batch: Vec) -> Result<()> { + async fn store_inner(&self, batch: Vec) -> Result<()> { match &self.backend { StorageBackend::Database(db) => db.store_batch(batch).await, StorageBackend::S3(s3) => s3.store_batch(batch).await, @@ -996,7 +1007,7 @@ impl AsyncStorage { compressed_blob.compression = Some(alg); // `.store_inner` just uploads what it gets, without any compression logic - self.store_inner(vec![compressed_blob]).await?; + self.store_inner(vec![compressed_blob.into()]).await?; } Ok(()) } @@ -1140,7 +1151,7 @@ impl Storage { } #[cfg(test)] - pub(crate) fn store_blobs(&self, blobs: Vec) -> Result<()> { + pub(crate) fn store_blobs(&self, blobs: Vec) -> Result<()> { self.runtime.block_on(self.inner.store_blobs(blobs)) } @@ -1634,10 +1645,9 @@ mod backend_tests { fn test_exists(storage: &Storage) -> Result<()> { assert!(!storage.exists("path/to/file.txt").unwrap()); - let blob = Blob { + let blob = BlobUpload { path: "path/to/file.txt".into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), content: "Hello world!".into(), compression: None, }; @@ -1650,10 +1660,9 @@ mod backend_tests { fn test_set_public(storage: &Storage) -> Result<()> { let path: &str = "foo/bar.txt"; - storage.store_blobs(vec![Blob { + storage.store_blobs(vec![BlobUpload { path: path.into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), compression: None, content: b"test content\n".to_vec(), }])?; @@ -1679,10 +1688,9 @@ mod backend_tests { fn test_get_object(storage: &Storage) -> Result<()> { let path: &str = "foo/bar.txt"; - let blob = Blob { + let blob = BlobUpload { path: path.into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), compression: None, content: b"test content\n".to_vec(), }; @@ -1718,10 +1726,9 @@ mod backend_tests { } fn test_get_range(storage: &Storage) -> Result<()> { - let blob = Blob { + let blob = BlobUpload { path: "foo/bar.txt".into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), compression: None, content: b"test content\n".to_vec(), }; @@ -1760,10 +1767,9 @@ mod backend_tests { storage.store_blobs( FILENAMES .iter() - .map(|&filename| Blob { + .map(|&filename| BlobUpload { path: filename.into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), compression: None, content: b"test content\n".to_vec(), }) @@ -1803,17 +1809,15 @@ mod backend_tests { fn test_get_too_big(storage: &Storage) -> Result<()> { const MAX_SIZE: usize = 1024; - let small_blob = Blob { + let small_blob = BlobUpload { path: "small-blob.bin".into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), content: vec![0; MAX_SIZE], compression: None, }; - let big_blob = Blob { + let big_blob = BlobUpload { path: "big-blob.bin".into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), content: vec![0; MAX_SIZE * 2], compression: None, }; @@ -1851,10 +1855,9 @@ mod backend_tests { let blobs = NAMES .iter() - .map(|&path| Blob { + .map(|&path| BlobUpload { path: path.into(), mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), compression: None, content: b"Hello world!\n".to_vec(), }) @@ -2009,15 +2012,13 @@ mod backend_tests { } fn test_batched_uploads(storage: &Storage) -> Result<()> { - let now = Utc::now(); let uploads: Vec<_> = (0..=100) .map(|i| { let content = format!("const IDX: usize = {i};").as_bytes().to_vec(); - Blob { + BlobUpload { mime: mimes::TEXT_RUST.clone(), content, path: format!("{i}.rs"), - date_updated: now, compression: None, } }) @@ -2075,12 +2076,11 @@ mod backend_tests { storage.store_blobs( start .iter() - .map(|path| Blob { + .map(|path| BlobUpload { path: (*path).to_string(), content: b"foo\n".to_vec(), compression: None, mime: mime::TEXT_PLAIN, - date_updated: Utc::now(), }) .collect(), )?; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 364cd5d6c..c67a8ee01 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,4 +1,4 @@ -use super::{Blob, FileRange, StorageMetrics, StreamingBlob}; +use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob}; use crate::{Config, InstanceMetrics}; use anyhow::{Context as _, Error}; use async_stream::try_stream; @@ -222,7 +222,7 @@ impl S3Backend { }) } - pub(super) async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { + pub(super) async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { // Attempt to upload the batch 3 times for _ in 0..3 { let mut futures = FuturesUnordered::new();