Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ reason = """
[[disallowed-types]]
path = "semver::Version"
reason = "use our own custom db::types::version::Version so you can use it with sqlx"

[[disallowed-types]]
path = "axum_extra::headers::IfNoneMatch"
reason = "use our own custom web::headers::IfNoneMatch for sane behaviour with missing headers"

5 changes: 4 additions & 1 deletion src/storage/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob};
use crate::{InstanceMetrics, db::Pool, error::Result};
use crate::{InstanceMetrics, db::Pool, error::Result, web::headers::compute_etag};
use chrono::{DateTime, Utc};
use futures_util::stream::{Stream, TryStreamExt};
use sqlx::Acquire;
Expand Down Expand Up @@ -123,13 +123,16 @@ impl DatabaseBackend {
});
let content = result.content.unwrap_or_default();
let content_len = content.len();

let etag = compute_etag(&content);
Ok(StreamingBlob {
path: result.path,
mime: result
.mime
.parse()
.unwrap_or(mime::APPLICATION_OCTET_STREAM),
date_updated: result.date_updated,
etag: Some(etag),
content: Box::new(io::Cursor::new(content)),
content_length: content_len,
compression,
Expand Down
66 changes: 48 additions & 18 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
utils::spawn_blocking,
};
use anyhow::anyhow;
use axum_extra::headers;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use fn_error_context::context;
Expand Down Expand Up @@ -57,7 +58,7 @@ type FileRange = RangeInclusive<u64>;
pub(crate) struct PathNotFoundError;

/// represents a blob to be uploaded to storage.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct BlobUpload {
pub(crate) path: String,
pub(crate) mime: Mime,
Expand All @@ -76,11 +77,12 @@ impl From<Blob> for BlobUpload {
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Blob {
pub(crate) path: String,
pub(crate) mime: Mime,
pub(crate) date_updated: DateTime<Utc>,
pub(crate) etag: Option<headers::ETag>,
pub(crate) content: Vec<u8>,
pub(crate) compression: Option<CompressionAlgorithm>,
}
Expand All @@ -89,6 +91,7 @@ pub(crate) struct StreamingBlob {
pub(crate) path: String,
pub(crate) mime: Mime,
pub(crate) date_updated: DateTime<Utc>,
pub(crate) etag: Option<headers::ETag>,
pub(crate) compression: Option<CompressionAlgorithm>,
pub(crate) content_length: usize,
pub(crate) content: Box<dyn AsyncBufRead + Unpin + Send>,
Expand All @@ -100,6 +103,7 @@ impl std::fmt::Debug for StreamingBlob {
.field("path", &self.path)
.field("mime", &self.mime)
.field("date_updated", &self.date_updated)
.field("etag", &self.etag)
.field("compression", &self.compression)
.finish()
}
Expand Down Expand Up @@ -134,6 +138,7 @@ impl StreamingBlob {
);

self.compression = None;
// not touching the etag, it should represent the original content
Ok(self)
}

Expand All @@ -148,12 +153,27 @@ impl StreamingBlob {
path: self.path,
mime: self.mime,
date_updated: self.date_updated,
etag: self.etag, // downloading doesn't change the etag
content: content.into_inner(),
compression: self.compression,
})
}
}

impl From<Blob> for StreamingBlob {
fn from(value: Blob) -> Self {
Self {
path: value.path,
mime: value.mime,
date_updated: value.date_updated,
etag: value.etag,
compression: value.compression,
content_length: value.content.len(),
content: Box::new(io::Cursor::new(value.content)),
}
}
}

pub fn get_file_list<P: AsRef<Path>>(path: P) -> Box<dyn Iterator<Item = Result<PathBuf>>> {
let path = path.as_ref().to_path_buf();
if path.is_file() {
Expand Down Expand Up @@ -583,6 +603,7 @@ impl AsyncStorage {
path: format!("{archive_path}/{path}"),
mime: detect_mime(path),
date_updated: stream.date_updated,
etag: stream.etag,
content: stream.content,
content_length: stream.content_length,
compression: None,
Expand Down Expand Up @@ -756,7 +777,6 @@ impl AsyncStorage {
mime,
content,
compression: Some(alg),
// this field is ignored by the backend
});
}
Ok((blobs, file_paths))
Expand Down Expand Up @@ -1135,7 +1155,7 @@ pub(crate) fn source_archive_path(name: &str, version: &Version) -> String {
#[cfg(test)]
mod test {
use super::*;
use crate::test::TestEnvironment;
use crate::{test::TestEnvironment, web::headers::compute_etag};
use std::env;
use test_case::test_case;

Expand All @@ -1151,6 +1171,7 @@ mod test {
mime: mime::APPLICATION_OCTET_STREAM,
date_updated: Utc::now(),
compression: alg,
etag: Some(compute_etag(&content)),
content_length: content.len(),
content: Box::new(io::Cursor::new(content)),
}
Expand Down Expand Up @@ -1292,6 +1313,7 @@ mod test {
path: "some_path.db".into(),
mime: mime::APPLICATION_OCTET_STREAM,
date_updated: Utc::now(),
etag: None,
compression: Some(alg),
content_length: compressed_index_content.len(),
content: Box::new(io::Cursor::new(compressed_index_content)),
Expand Down Expand Up @@ -1494,9 +1516,8 @@ mod test {
/// This is the preferred way to test whether backends work.
#[cfg(test)]
mod backend_tests {
use crate::test::TestEnvironment;

use super::*;
use crate::{test::TestEnvironment, web::headers::compute_etag};

fn get_file_info(files: &[FileEntry], path: impl AsRef<Path>) -> Option<&FileEntry> {
let path = path.as_ref();
Expand Down Expand Up @@ -1560,6 +1581,9 @@ mod backend_tests {
let found = storage.get(path, usize::MAX)?;
assert_eq!(blob.mime, found.mime);
assert_eq!(blob.content, found.content);
// while our db backend just does MD5,
// it seems like minio does it too :)
assert_eq!(found.etag, Some(compute_etag(&blob.content)));

// default visibility is private
assert!(!storage.get_public_access(path)?);
Expand Down Expand Up @@ -1593,20 +1617,26 @@ mod backend_tests {
content: b"test content\n".to_vec(),
};

let full_etag = compute_etag(&blob.content);

storage.store_blobs(vec![blob.clone()])?;

assert_eq!(
blob.content[0..=4],
storage
.get_range("foo/bar.txt", usize::MAX, 0..=4, None)?
.content
);
assert_eq!(
blob.content[5..=12],
storage
.get_range("foo/bar.txt", usize::MAX, 5..=12, None)?
.content
);
let mut etags = Vec::new();

for range in [0..=4, 5..=12] {
let partial_blob = storage.get_range("foo/bar.txt", usize::MAX, range.clone(), None)?;
let range = (*range.start() as usize)..=(*range.end() as usize);
assert_eq!(blob.content[range], partial_blob.content);

etags.push(partial_blob.etag.unwrap());
}
if let [etag1, etag2] = &etags[..] {
assert_ne!(etag1, etag2);
assert_ne!(etag1, &full_etag);
assert_ne!(etag2, &full_etag);
} else {
panic!("expected two etags");
}

for path in &["bar.txt", "baz.txt", "foo/baz.txt"] {
assert!(
Expand Down
48 changes: 45 additions & 3 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{BlobUpload, FileRange, StorageMetrics, StreamingBlob};
use crate::{Config, InstanceMetrics};
use crate::{Config, InstanceMetrics, web::headers::compute_etag};
use anyhow::{Context as _, Error};
use async_stream::try_stream;
use aws_config::BehaviorVersion;
Expand All @@ -10,14 +10,15 @@ use aws_sdk_s3::{
types::{Delete, ObjectIdentifier, Tag, Tagging},
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use axum_extra::headers;
use chrono::Utc;
use futures_util::{
future::TryFutureExt,
pin_mut,
stream::{FuturesUnordered, Stream, StreamExt},
};
use std::sync::Arc;
use tracing::{error, warn};
use tracing::{error, instrument, warn};

const PUBLIC_ACCESS_TAG: &str = "static-cloudfront-access";
const PUBLIC_ACCESS_VALUE: &str = "allow";
Expand Down Expand Up @@ -180,6 +181,7 @@ impl S3Backend {
.map(|_| ())
}

#[instrument(skip(self))]
pub(super) async fn get_stream(
&self,
path: &str,
Expand All @@ -190,7 +192,11 @@ impl S3Backend {
.get_object()
.bucket(&self.bucket)
.key(path)
.set_range(range.map(|r| format!("bytes={}-{}", r.start(), r.end())))
.set_range(
range
.as_ref()
.map(|r| format!("bytes={}-{}", r.start(), r.end())),
)
.send()
.await
.convert_errors()?;
Expand All @@ -204,6 +210,41 @@ impl S3Backend {

let compression = res.content_encoding.as_ref().and_then(|s| s.parse().ok());

let etag = if let Some(s3_etag) = res.e_tag
&& !s3_etag.is_empty()
{
if let Some(range) = range {
// we can generate a unique etag for a range of the remote object too,
// by just concatenating the original etag with the range start and end.
//
// About edge cases:
// When the etag of the full archive changes after a rebuild,
// all derived etags for files inside the archive will also change.
//
// This could lead to _changed_ ETags, where the single file inside the archive
// is actually the same.
//
// AWS implementation (an minio) is to just use an MD5 hash of the file as
// ETag
Some(compute_etag(format!(
"{}-{}-{}",
s3_etag,
range.start(),
range.end()
)))
} else {
match s3_etag.parse::<headers::ETag>() {
Ok(etag) => Some(etag),
Err(err) => {
error!(?err, s3_etag, "Failed to parse ETag from S3");
None
}
}
}
} else {
None
};

Ok(StreamingBlob {
path: path.into(),
mime: res
Expand All @@ -213,6 +254,7 @@ impl S3Backend {
.parse()
.unwrap_or(mime::APPLICATION_OCTET_STREAM),
date_updated,
etag,
content_length: res
.content_length
.and_then(|length| length.try_into().ok())
Expand Down
Loading
Loading