Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead;
Expand Down Expand Up @@ -144,10 +145,11 @@ public void writeBlob(OperationPurpose purpose, String blobName, InputStream inp
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
final var condition = failIfAlreadyExists ? ConditionalOperation.IF_NONE_MATCH : ConditionalOperation.NONE;
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
} else {
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
}
}

Expand Down Expand Up @@ -536,6 +538,59 @@ String buildKey(String blobName) {
return keyPath + blobName;
}

/**
* Enumeration of mutually exlusive conditional operations supported by S3.
*
* @see <a href=https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-requests.html>S3-conditional-requests</a>
*/
sealed interface ConditionalOperation permits ConditionalOperation.IfMatch, ConditionalOperation.IfNoneMatch,
ConditionalOperation.None {
ConditionalOperation NONE = new None();
ConditionalOperation IF_NONE_MATCH = new IfNoneMatch();

static ConditionalOperation ifMatch(String etag) {
return new IfMatch(etag);
}

record None() implements ConditionalOperation {}

record IfNoneMatch() implements ConditionalOperation {}

record IfMatch(String etag) implements ConditionalOperation {}
}

static void putObject(
OperationPurpose purpose,
S3BlobStore s3BlobStore,
String blobName,
long contentLength,
Supplier<RequestBody> body,
ConditionalOperation condition
) {
final var putRequestBuilder = PutObjectRequest.builder()
.bucket(s3BlobStore.bucket())
.key(blobName)
.contentLength(contentLength)
.storageClass(s3BlobStore.getStorageClass())
.acl(s3BlobStore.getCannedACL());
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (s3BlobStore.supportsConditionalWrites(purpose)) {
switch (condition) {
case ConditionalOperation.IfMatch ifMatch -> putRequestBuilder.ifMatch(ifMatch.etag);
case ConditionalOperation.IfNoneMatch ignored -> putRequestBuilder.ifNoneMatch("*");
case ConditionalOperation.None ignored -> {
}
}
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, s3BlobStore, Operation.PUT_OBJECT, purpose);
final var putRequest = putRequestBuilder.build();
try (var client = s3BlobStore.clientReference()) {
client.client().putObject(putRequest, body.get());
}
}

/**
* Uploads a blob using a single upload request
*/
Expand All @@ -545,33 +600,17 @@ void executeSingleUpload(
final String blobName,
final InputStream input,
final long blobSize,
final boolean failIfAlreadyExists
final ConditionalOperation condition
) throws IOException {
try (var clientReference = s3BlobStore.clientReference()) {
try {
// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
}
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
}

final var putRequestBuilder = PutObjectRequest.builder()
.bucket(s3BlobStore.bucket())
.key(blobName)
.contentLength(blobSize)
.storageClass(s3BlobStore.getStorageClass())
.acl(s3BlobStore.getCannedACL());
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
putRequestBuilder.ifNoneMatch("*");
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);

final var putRequest = putRequestBuilder.build();
clientReference.client().putObject(putRequest, RequestBody.fromInputStream(input, blobSize));
putObject(purpose, s3BlobStore, blobName, blobSize, () -> RequestBody.fromInputStream(input, blobSize), condition);
} catch (final SdkException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
}
Expand All @@ -590,7 +629,7 @@ private void executeMultipart(
final long partSize,
final long blobSize,
final PartOperation partOperation,
final boolean failIfAlreadyExists
final ConditionalOperation condition
) throws IOException {

ensureMultiPartUploadSize(blobSize);
Expand Down Expand Up @@ -661,8 +700,13 @@ private void executeMultipart(
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
if (s3BlobStore.supportsConditionalWrites(purpose)) {
switch (condition) {
case ConditionalOperation.IfMatch ifMatch -> completeMultipartUploadRequestBuilder.ifMatch(ifMatch.etag);
case ConditionalOperation.IfNoneMatch ignored -> completeMultipartUploadRequestBuilder.ifNoneMatch("*");
case ConditionalOperation.None ignored -> {
}
}
}

S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
Expand Down Expand Up @@ -690,7 +734,7 @@ void executeMultipartUpload(
final String blobName,
final InputStream input,
final long blobSize,
final boolean failIfAlreadyExists
final ConditionalOperation condition
) throws IOException {
executeMultipart(
purpose,
Expand All @@ -708,7 +752,7 @@ void executeMultipartUpload(
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
}
},
failIfAlreadyExists
condition
);
}

Expand Down Expand Up @@ -756,7 +800,7 @@ void executeMultipartCopy(
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
}
}),
false
ConditionalOperation.NONE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.s3.S3BlobContainer.ConditionalOperation;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -75,7 +76,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
randomCondition()
)
);
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
Expand All @@ -96,7 +97,7 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
blobName,
new ByteArrayInputStream(new byte[0]),
ByteSizeUnit.MB.toBytes(2),
randomBoolean()
randomCondition()
)
);
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
Expand Down Expand Up @@ -132,7 +133,7 @@ public void testExecuteSingleUpload() throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = randomBoolean();
final ConditionalOperation conditionalOperation = randomCondition();

final S3Client client = configureMockClient(blobStore);

Expand All @@ -142,7 +143,7 @@ public void testExecuteSingleUpload() throws IOException {
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());

final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);

final PutObjectRequest request = requestCaptor.getValue();
assertEquals(bucketName, request.bucket());
Expand All @@ -158,8 +159,19 @@ public void testExecuteSingleUpload() throws IOException {
);
}

if (failIfAlreadyExists) {
assertEquals("*", request.ifNoneMatch());
switch (conditionalOperation) {
case ConditionalOperation.IfMatch ifMatch -> {
assertEquals(ifMatch.etag(), request.ifMatch());
assertNull(request.ifNoneMatch());
}
case ConditionalOperation.IfNoneMatch ignored -> {
assertNull(request.ifMatch());
assertEquals("*", request.ifNoneMatch());
}
case ConditionalOperation.None ignored -> {
assertNull(request.ifMatch());
assertNull(request.ifNoneMatch());
}
}

final RequestBody requestBody = bodyCaptor.getValue();
Expand All @@ -185,7 +197,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
randomCondition()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
Expand All @@ -204,7 +216,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
randomCondition()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
Expand Down Expand Up @@ -255,7 +267,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
final ConditionalOperation conditionalOperation = doCopy ? ConditionalOperation.NONE : randomCondition();

final S3Client client = configureMockClient(blobStore);

Expand Down Expand Up @@ -305,7 +317,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
if (doCopy) {
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
} else {
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);
}

final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
Expand Down Expand Up @@ -372,8 +384,19 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
assertEquals(uploadId, compRequest.uploadId());

if (failIfAlreadyExists) {
assertEquals("*", compRequest.ifNoneMatch());
switch (conditionalOperation) {
case ConditionalOperation.IfMatch ifMatch -> {
assertEquals(ifMatch.etag(), compRequest.ifMatch());
assertNull(compRequest.ifNoneMatch());
}
case ConditionalOperation.IfNoneMatch ignored -> {
assertNull(compRequest.ifMatch());
assertEquals("*", compRequest.ifNoneMatch());
}
case ConditionalOperation.None ignored -> {
assertNull(compRequest.ifMatch());
assertNull(compRequest.ifNoneMatch());
}
}

final List<String> actualETags = compRequest.multipartUpload()
Expand Down Expand Up @@ -461,7 +484,7 @@ public void close() {}
blobName,
new ByteArrayInputStream(new byte[0]),
blobSize,
randomBoolean()
randomCondition()
);
});

Expand Down Expand Up @@ -557,6 +580,14 @@ public void close() {}
return client;
}

private ConditionalOperation randomCondition() {
return switch (between(0, 2)) {
case 0 -> ConditionalOperation.NONE;
case 1 -> ConditionalOperation.IF_NONE_MATCH;
default -> ConditionalOperation.ifMatch(randomAlphanumericOfLength(128));
};
}

private static void closeMockClient(S3BlobStore blobStore) {
final var finalClientReference = blobStore.clientReference();
assertFalse(finalClientReference.decRef());
Expand Down