diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index affed10b0255e..b1480e96a7e2f 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -84,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead; @@ -144,10 +145,11 @@ public void writeBlob(OperationPurpose purpose, String blobName, InputStream inp throws IOException { assert BlobContainer.assertPurposeConsistency(purpose, blobName); assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; + final var condition = failIfAlreadyExists ? ConditionalOperation.IF_NONE_MATCH : ConditionalOperation.NONE; if (blobSize <= getLargeBlobThresholdInBytes()) { - executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); + executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition); } else { - executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); + executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition); } } @@ -536,6 +538,59 @@ String buildKey(String blobName) { return keyPath + blobName; } + /** + * Enumeration of mutually exlusive conditional operations supported by S3. + * + * @see S3-conditional-requests + */ + sealed interface ConditionalOperation permits ConditionalOperation.IfMatch, ConditionalOperation.IfNoneMatch, + ConditionalOperation.None { + ConditionalOperation NONE = new None(); + ConditionalOperation IF_NONE_MATCH = new IfNoneMatch(); + + static ConditionalOperation ifMatch(String etag) { + return new IfMatch(etag); + } + + record None() implements ConditionalOperation {} + + record IfNoneMatch() implements ConditionalOperation {} + + record IfMatch(String etag) implements ConditionalOperation {} + } + + static void putObject( + OperationPurpose purpose, + S3BlobStore s3BlobStore, + String blobName, + long contentLength, + Supplier body, + ConditionalOperation condition + ) { + final var putRequestBuilder = PutObjectRequest.builder() + .bucket(s3BlobStore.bucket()) + .key(blobName) + .contentLength(contentLength) + .storageClass(s3BlobStore.getStorageClass()) + .acl(s3BlobStore.getCannedACL()); + if (s3BlobStore.serverSideEncryption()) { + putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); + } + if (s3BlobStore.supportsConditionalWrites(purpose)) { + switch (condition) { + case ConditionalOperation.IfMatch ifMatch -> putRequestBuilder.ifMatch(ifMatch.etag); + case ConditionalOperation.IfNoneMatch ignored -> putRequestBuilder.ifNoneMatch("*"); + case ConditionalOperation.None ignored -> { + } + } + } + S3BlobStore.configureRequestForMetrics(putRequestBuilder, s3BlobStore, Operation.PUT_OBJECT, purpose); + final var putRequest = putRequestBuilder.build(); + try (var client = s3BlobStore.clientReference()) { + client.client().putObject(putRequest, body.get()); + } + } + /** * Uploads a blob using a single upload request */ @@ -545,9 +600,9 @@ void executeSingleUpload( final String blobName, final InputStream input, final long blobSize, - final boolean failIfAlreadyExists + final ConditionalOperation condition ) throws IOException { - try (var clientReference = s3BlobStore.clientReference()) { + try { // Extra safety checks if (blobSize > MAX_FILE_SIZE.getBytes()) { throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE); @@ -555,23 +610,7 @@ void executeSingleUpload( if (blobSize > s3BlobStore.bufferSizeInBytes()) { throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size"); } - - final var putRequestBuilder = PutObjectRequest.builder() - .bucket(s3BlobStore.bucket()) - .key(blobName) - .contentLength(blobSize) - .storageClass(s3BlobStore.getStorageClass()) - .acl(s3BlobStore.getCannedACL()); - if (s3BlobStore.serverSideEncryption()) { - putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); - } - if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) { - putRequestBuilder.ifNoneMatch("*"); - } - S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose); - - final var putRequest = putRequestBuilder.build(); - clientReference.client().putObject(putRequest, RequestBody.fromInputStream(input, blobSize)); + putObject(purpose, s3BlobStore, blobName, blobSize, () -> RequestBody.fromInputStream(input, blobSize), condition); } catch (final SdkException e) { throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); } @@ -590,7 +629,7 @@ private void executeMultipart( final long partSize, final long blobSize, final PartOperation partOperation, - final boolean failIfAlreadyExists + final ConditionalOperation condition ) throws IOException { ensureMultiPartUploadSize(blobSize); @@ -661,8 +700,13 @@ private void executeMultipart( .uploadId(uploadId) .multipartUpload(b -> b.parts(parts)); - if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) { - completeMultipartUploadRequestBuilder.ifNoneMatch("*"); + if (s3BlobStore.supportsConditionalWrites(purpose)) { + switch (condition) { + case ConditionalOperation.IfMatch ifMatch -> completeMultipartUploadRequestBuilder.ifMatch(ifMatch.etag); + case ConditionalOperation.IfNoneMatch ignored -> completeMultipartUploadRequestBuilder.ifNoneMatch("*"); + case ConditionalOperation.None ignored -> { + } + } } S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose); @@ -690,7 +734,7 @@ void executeMultipartUpload( final String blobName, final InputStream input, final long blobSize, - final boolean failIfAlreadyExists + final ConditionalOperation condition ) throws IOException { executeMultipart( purpose, @@ -708,7 +752,7 @@ void executeMultipartUpload( return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build(); } }, - failIfAlreadyExists + condition ); } @@ -756,7 +800,7 @@ void executeMultipartCopy( return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build(); } }), - false + ConditionalOperation.NONE ); } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java index 662c2137b79e2..bb2816b52eeb4 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java @@ -52,6 +52,7 @@ import java.util.stream.IntStream; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; +import static org.elasticsearch.repositories.s3.S3BlobContainer.ConditionalOperation; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.ArgumentMatchers.any; @@ -75,7 +76,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() { randomAlphaOfLengthBetween(1, 10), null, blobSize, - randomBoolean() + randomCondition() ) ); assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage()); @@ -96,7 +97,7 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() { blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2), - randomBoolean() + randomCondition() ) ); assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage()); @@ -132,7 +133,7 @@ public void testExecuteSingleUpload() throws IOException { when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); } - final boolean failIfAlreadyExists = randomBoolean(); + final ConditionalOperation conditionalOperation = randomCondition(); final S3Client client = configureMockClient(blobStore); @@ -142,7 +143,7 @@ public void testExecuteSingleUpload() throws IOException { when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build()); final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]); - blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists); + blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation); final PutObjectRequest request = requestCaptor.getValue(); assertEquals(bucketName, request.bucket()); @@ -158,8 +159,19 @@ public void testExecuteSingleUpload() throws IOException { ); } - if (failIfAlreadyExists) { - assertEquals("*", request.ifNoneMatch()); + switch (conditionalOperation) { + case ConditionalOperation.IfMatch ifMatch -> { + assertEquals(ifMatch.etag(), request.ifMatch()); + assertNull(request.ifNoneMatch()); + } + case ConditionalOperation.IfNoneMatch ignored -> { + assertNull(request.ifMatch()); + assertEquals("*", request.ifNoneMatch()); + } + case ConditionalOperation.None ignored -> { + assertNull(request.ifMatch()); + assertNull(request.ifNoneMatch()); + } } final RequestBody requestBody = bodyCaptor.getValue(); @@ -185,7 +197,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() { randomAlphaOfLengthBetween(1, 10), null, blobSize, - randomBoolean() + randomCondition() ) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage()); @@ -204,7 +216,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() { randomAlphaOfLengthBetween(1, 10), null, blobSize, - randomBoolean() + randomCondition() ) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage()); @@ -255,7 +267,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException { when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); } - final boolean failIfAlreadyExists = doCopy ? false : randomBoolean(); + final ConditionalOperation conditionalOperation = doCopy ? ConditionalOperation.NONE : randomCondition(); final S3Client client = configureMockClient(blobStore); @@ -305,7 +317,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException { if (doCopy) { blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize); } else { - blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists); + blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation); } final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue(); @@ -372,8 +384,19 @@ void testExecuteMultipart(boolean doCopy) throws IOException { assertEquals(blobPath.buildAsString() + blobName, compRequest.key()); assertEquals(uploadId, compRequest.uploadId()); - if (failIfAlreadyExists) { - assertEquals("*", compRequest.ifNoneMatch()); + switch (conditionalOperation) { + case ConditionalOperation.IfMatch ifMatch -> { + assertEquals(ifMatch.etag(), compRequest.ifMatch()); + assertNull(compRequest.ifNoneMatch()); + } + case ConditionalOperation.IfNoneMatch ignored -> { + assertNull(compRequest.ifMatch()); + assertEquals("*", compRequest.ifNoneMatch()); + } + case ConditionalOperation.None ignored -> { + assertNull(compRequest.ifMatch()); + assertNull(compRequest.ifNoneMatch()); + } } final List actualETags = compRequest.multipartUpload() @@ -461,7 +484,7 @@ public void close() {} blobName, new ByteArrayInputStream(new byte[0]), blobSize, - randomBoolean() + randomCondition() ); }); @@ -557,6 +580,14 @@ public void close() {} return client; } + private ConditionalOperation randomCondition() { + return switch (between(0, 2)) { + case 0 -> ConditionalOperation.NONE; + case 1 -> ConditionalOperation.IF_NONE_MATCH; + default -> ConditionalOperation.ifMatch(randomAlphanumericOfLength(128)); + }; + } + private static void closeMockClient(S3BlobStore blobStore) { final var finalClientReference = blobStore.clientReference(); assertFalse(finalClientReference.decRef());