Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/storage/database.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Blob, FileRange, StorageMetrics, StreamingBlob};
use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob};
use crate::{InstanceMetrics, db::Pool, error::Result};
use chrono::{DateTime, Utc};
use futures_util::stream::{Stream, TryStreamExt};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl DatabaseBackend {
})
}

pub(super) async fn store_batch(&self, batch: Vec<Blob>) -> Result<()> {
pub(super) async fn store_batch(&self, batch: Vec<BlobUpload>) -> Result<()> {
let mut conn = self.pool.get_async().await?;
let mut trans = conn.begin().await?;
for blob in batch {
Expand Down
82 changes: 41 additions & 41 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ type FileRange = RangeInclusive<u64>;
#[error("path not found")]
pub(crate) struct PathNotFoundError;

/// represents a blob to be uploaded to storage.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct BlobUpload {
pub(crate) path: String,
pub(crate) mime: Mime,
pub(crate) content: Vec<u8>,
pub(crate) compression: Option<CompressionAlgorithm>,
}

impl From<Blob> for BlobUpload {
fn from(value: Blob) -> Self {
Self {
path: value.path,
mime: value.mime,
content: value.content,
compression: value.compression,
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Blob {
pub(crate) path: String,
Expand Down Expand Up @@ -683,19 +703,17 @@ impl AsyncStorage {
};

self.store_inner(vec![
Blob {
BlobUpload {
path: archive_path.to_string(),
mime: mimes::APPLICATION_ZIP.clone(),
content: zip_content,
compression: None,
date_updated: Utc::now(),
},
Blob {
BlobUpload {
path: remote_index_path,
mime: mime::APPLICATION_OCTET_STREAM,
content: compressed_index_content,
compression: Some(alg),
date_updated: Utc::now(),
},
])
.await?;
Expand All @@ -717,7 +735,7 @@ impl AsyncStorage {
let root_dir = root_dir.to_owned();
move || {
let mut file_paths = Vec::new();
let mut blobs: Vec<Blob> = Vec::new();
let mut blobs: Vec<BlobUpload> = Vec::new();
for file_path in get_file_list(&root_dir) {
let file_path = file_path?;

Expand All @@ -740,13 +758,12 @@ impl AsyncStorage {
let mime = file_info.mime();
file_paths.push(file_info);

blobs.push(Blob {
blobs.push(BlobUpload {
path: bucket_path,
mime,
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
});
}
Ok((blobs, file_paths))
Expand All @@ -759,7 +776,7 @@ impl AsyncStorage {
}

#[cfg(test)]
pub(crate) async fn store_blobs(&self, blobs: Vec<Blob>) -> Result<()> {
pub(crate) async fn store_blobs(&self, blobs: Vec<BlobUpload>) -> Result<()> {
self.store_inner(blobs).await
}

Expand All @@ -775,13 +792,11 @@ impl AsyncStorage {
let content = content.into();
let mime = detect_mime(&path).to_owned();

self.store_inner(vec![Blob {
self.store_inner(vec![BlobUpload {
path,
mime,
content,
compression: None,
// this field is ignored by the backend
date_updated: Utc::now(),
}])
.await?;

Expand All @@ -802,13 +817,11 @@ impl AsyncStorage {
let content = compress(&*content, alg)?;
let mime = detect_mime(&path).to_owned();

self.store_inner(vec![Blob {
self.store_inner(vec![BlobUpload {
path,
mime,
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
}])
.await?;

Expand All @@ -829,20 +842,18 @@ impl AsyncStorage {

let mime = detect_mime(&target_path).to_owned();

self.store_inner(vec![Blob {
self.store_inner(vec![BlobUpload {
path: target_path,
mime,
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
}])
.await?;

Ok(alg)
}

async fn store_inner(&self, batch: Vec<Blob>) -> Result<()> {
async fn store_inner(&self, batch: Vec<BlobUpload>) -> Result<()> {
match &self.backend {
StorageBackend::Database(db) => db.store_batch(batch).await,
StorageBackend::S3(s3) => s3.store_batch(batch).await,
Expand Down Expand Up @@ -996,7 +1007,7 @@ impl AsyncStorage {
compressed_blob.compression = Some(alg);

// `.store_inner` just uploads what it gets, without any compression logic
self.store_inner(vec![compressed_blob]).await?;
self.store_inner(vec![compressed_blob.into()]).await?;
}
Ok(())
}
Expand Down Expand Up @@ -1140,7 +1151,7 @@ impl Storage {
}

#[cfg(test)]
pub(crate) fn store_blobs(&self, blobs: Vec<Blob>) -> Result<()> {
pub(crate) fn store_blobs(&self, blobs: Vec<BlobUpload>) -> Result<()> {
self.runtime.block_on(self.inner.store_blobs(blobs))
}

Expand Down Expand Up @@ -1634,10 +1645,9 @@ mod backend_tests {

fn test_exists(storage: &Storage) -> Result<()> {
assert!(!storage.exists("path/to/file.txt").unwrap());
let blob = Blob {
let blob = BlobUpload {
path: "path/to/file.txt".into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
content: "Hello world!".into(),
compression: None,
};
Expand All @@ -1650,10 +1660,9 @@ mod backend_tests {
fn test_set_public(storage: &Storage) -> Result<()> {
let path: &str = "foo/bar.txt";

storage.store_blobs(vec![Blob {
storage.store_blobs(vec![BlobUpload {
path: path.into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
compression: None,
content: b"test content\n".to_vec(),
}])?;
Expand All @@ -1679,10 +1688,9 @@ mod backend_tests {

fn test_get_object(storage: &Storage) -> Result<()> {
let path: &str = "foo/bar.txt";
let blob = Blob {
let blob = BlobUpload {
path: path.into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
compression: None,
content: b"test content\n".to_vec(),
};
Expand Down Expand Up @@ -1718,10 +1726,9 @@ mod backend_tests {
}

fn test_get_range(storage: &Storage) -> Result<()> {
let blob = Blob {
let blob = BlobUpload {
path: "foo/bar.txt".into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
compression: None,
content: b"test content\n".to_vec(),
};
Expand Down Expand Up @@ -1760,10 +1767,9 @@ mod backend_tests {
storage.store_blobs(
FILENAMES
.iter()
.map(|&filename| Blob {
.map(|&filename| BlobUpload {
path: filename.into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
compression: None,
content: b"test content\n".to_vec(),
})
Expand Down Expand Up @@ -1803,17 +1809,15 @@ mod backend_tests {
fn test_get_too_big(storage: &Storage) -> Result<()> {
const MAX_SIZE: usize = 1024;

let small_blob = Blob {
let small_blob = BlobUpload {
path: "small-blob.bin".into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
content: vec![0; MAX_SIZE],
compression: None,
};
let big_blob = Blob {
let big_blob = BlobUpload {
path: "big-blob.bin".into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
content: vec![0; MAX_SIZE * 2],
compression: None,
};
Expand Down Expand Up @@ -1851,10 +1855,9 @@ mod backend_tests {

let blobs = NAMES
.iter()
.map(|&path| Blob {
.map(|&path| BlobUpload {
path: path.into(),
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
compression: None,
content: b"Hello world!\n".to_vec(),
})
Expand Down Expand Up @@ -2009,15 +2012,13 @@ mod backend_tests {
}

fn test_batched_uploads(storage: &Storage) -> Result<()> {
let now = Utc::now();
let uploads: Vec<_> = (0..=100)
.map(|i| {
let content = format!("const IDX: usize = {i};").as_bytes().to_vec();
Blob {
BlobUpload {
mime: mimes::TEXT_RUST.clone(),
content,
path: format!("{i}.rs"),
date_updated: now,
compression: None,
}
})
Expand Down Expand Up @@ -2075,12 +2076,11 @@ mod backend_tests {
storage.store_blobs(
start
.iter()
.map(|path| Blob {
.map(|path| BlobUpload {
path: (*path).to_string(),
content: b"foo\n".to_vec(),
compression: None,
mime: mime::TEXT_PLAIN,
date_updated: Utc::now(),
})
.collect(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Blob, FileRange, StorageMetrics, StreamingBlob};
use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob};
use crate::{Config, InstanceMetrics};
use anyhow::{Context as _, Error};
use async_stream::try_stream;
Expand Down Expand Up @@ -222,7 +222,7 @@ impl S3Backend {
})
}

pub(super) async fn store_batch(&self, mut batch: Vec<Blob>) -> Result<(), Error> {
pub(super) async fn store_batch(&self, mut batch: Vec<BlobUpload>) -> Result<(), Error> {
// Attempt to upload the batch 3 times
for _ in 0..3 {
let mut futures = FuturesUnordered::new();
Expand Down
Loading