diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
index affed10b0255e..b1480e96a7e2f 100644
--- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
+++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
@@ -84,6 +84,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead;
@@ -144,10 +145,11 @@ public void writeBlob(OperationPurpose purpose, String blobName, InputStream inp
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
+ final var condition = failIfAlreadyExists ? ConditionalOperation.IF_NONE_MATCH : ConditionalOperation.NONE;
if (blobSize <= getLargeBlobThresholdInBytes()) {
- executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
+ executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
} else {
- executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
+ executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, condition);
}
}
@@ -536,6 +538,59 @@ String buildKey(String blobName) {
return keyPath + blobName;
}
+ /**
+ * Enumeration of mutually exlusive conditional operations supported by S3.
+ *
+ * @see S3-conditional-requests
+ */
+ sealed interface ConditionalOperation permits ConditionalOperation.IfMatch, ConditionalOperation.IfNoneMatch,
+ ConditionalOperation.None {
+ ConditionalOperation NONE = new None();
+ ConditionalOperation IF_NONE_MATCH = new IfNoneMatch();
+
+ static ConditionalOperation ifMatch(String etag) {
+ return new IfMatch(etag);
+ }
+
+ record None() implements ConditionalOperation {}
+
+ record IfNoneMatch() implements ConditionalOperation {}
+
+ record IfMatch(String etag) implements ConditionalOperation {}
+ }
+
+ static void putObject(
+ OperationPurpose purpose,
+ S3BlobStore s3BlobStore,
+ String blobName,
+ long contentLength,
+ Supplier body,
+ ConditionalOperation condition
+ ) {
+ final var putRequestBuilder = PutObjectRequest.builder()
+ .bucket(s3BlobStore.bucket())
+ .key(blobName)
+ .contentLength(contentLength)
+ .storageClass(s3BlobStore.getStorageClass())
+ .acl(s3BlobStore.getCannedACL());
+ if (s3BlobStore.serverSideEncryption()) {
+ putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
+ }
+ if (s3BlobStore.supportsConditionalWrites(purpose)) {
+ switch (condition) {
+ case ConditionalOperation.IfMatch ifMatch -> putRequestBuilder.ifMatch(ifMatch.etag);
+ case ConditionalOperation.IfNoneMatch ignored -> putRequestBuilder.ifNoneMatch("*");
+ case ConditionalOperation.None ignored -> {
+ }
+ }
+ }
+ S3BlobStore.configureRequestForMetrics(putRequestBuilder, s3BlobStore, Operation.PUT_OBJECT, purpose);
+ final var putRequest = putRequestBuilder.build();
+ try (var client = s3BlobStore.clientReference()) {
+ client.client().putObject(putRequest, body.get());
+ }
+ }
+
/**
* Uploads a blob using a single upload request
*/
@@ -545,9 +600,9 @@ void executeSingleUpload(
final String blobName,
final InputStream input,
final long blobSize,
- final boolean failIfAlreadyExists
+ final ConditionalOperation condition
) throws IOException {
- try (var clientReference = s3BlobStore.clientReference()) {
+ try {
// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE);
@@ -555,23 +610,7 @@ void executeSingleUpload(
if (blobSize > s3BlobStore.bufferSizeInBytes()) {
throw new IllegalArgumentException("Upload request size [" + blobSize + "] can't be larger than buffer size");
}
-
- final var putRequestBuilder = PutObjectRequest.builder()
- .bucket(s3BlobStore.bucket())
- .key(blobName)
- .contentLength(blobSize)
- .storageClass(s3BlobStore.getStorageClass())
- .acl(s3BlobStore.getCannedACL());
- if (s3BlobStore.serverSideEncryption()) {
- putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
- }
- if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
- putRequestBuilder.ifNoneMatch("*");
- }
- S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
-
- final var putRequest = putRequestBuilder.build();
- clientReference.client().putObject(putRequest, RequestBody.fromInputStream(input, blobSize));
+ putObject(purpose, s3BlobStore, blobName, blobSize, () -> RequestBody.fromInputStream(input, blobSize), condition);
} catch (final SdkException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
}
@@ -590,7 +629,7 @@ private void executeMultipart(
final long partSize,
final long blobSize,
final PartOperation partOperation,
- final boolean failIfAlreadyExists
+ final ConditionalOperation condition
) throws IOException {
ensureMultiPartUploadSize(blobSize);
@@ -661,8 +700,13 @@ private void executeMultipart(
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));
- if (failIfAlreadyExists && s3BlobStore.supportsConditionalWrites(purpose)) {
- completeMultipartUploadRequestBuilder.ifNoneMatch("*");
+ if (s3BlobStore.supportsConditionalWrites(purpose)) {
+ switch (condition) {
+ case ConditionalOperation.IfMatch ifMatch -> completeMultipartUploadRequestBuilder.ifMatch(ifMatch.etag);
+ case ConditionalOperation.IfNoneMatch ignored -> completeMultipartUploadRequestBuilder.ifNoneMatch("*");
+ case ConditionalOperation.None ignored -> {
+ }
+ }
}
S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
@@ -690,7 +734,7 @@ void executeMultipartUpload(
final String blobName,
final InputStream input,
final long blobSize,
- final boolean failIfAlreadyExists
+ final ConditionalOperation condition
) throws IOException {
executeMultipart(
purpose,
@@ -708,7 +752,7 @@ void executeMultipartUpload(
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
}
},
- failIfAlreadyExists
+ condition
);
}
@@ -756,7 +800,7 @@ void executeMultipartCopy(
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
}
}),
- false
+ ConditionalOperation.NONE
);
}
diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java
index 662c2137b79e2..bb2816b52eeb4 100644
--- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java
+++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java
@@ -52,6 +52,7 @@
import java.util.stream.IntStream;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
+import static org.elasticsearch.repositories.s3.S3BlobContainer.ConditionalOperation;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
@@ -75,7 +76,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
- randomBoolean()
+ randomCondition()
)
);
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
@@ -96,7 +97,7 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
blobName,
new ByteArrayInputStream(new byte[0]),
ByteSizeUnit.MB.toBytes(2),
- randomBoolean()
+ randomCondition()
)
);
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
@@ -132,7 +133,7 @@ public void testExecuteSingleUpload() throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}
- final boolean failIfAlreadyExists = randomBoolean();
+ final ConditionalOperation conditionalOperation = randomCondition();
final S3Client client = configureMockClient(blobStore);
@@ -142,7 +143,7 @@ public void testExecuteSingleUpload() throws IOException {
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
- blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
+ blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);
final PutObjectRequest request = requestCaptor.getValue();
assertEquals(bucketName, request.bucket());
@@ -158,8 +159,19 @@ public void testExecuteSingleUpload() throws IOException {
);
}
- if (failIfAlreadyExists) {
- assertEquals("*", request.ifNoneMatch());
+ switch (conditionalOperation) {
+ case ConditionalOperation.IfMatch ifMatch -> {
+ assertEquals(ifMatch.etag(), request.ifMatch());
+ assertNull(request.ifNoneMatch());
+ }
+ case ConditionalOperation.IfNoneMatch ignored -> {
+ assertNull(request.ifMatch());
+ assertEquals("*", request.ifNoneMatch());
+ }
+ case ConditionalOperation.None ignored -> {
+ assertNull(request.ifMatch());
+ assertNull(request.ifNoneMatch());
+ }
}
final RequestBody requestBody = bodyCaptor.getValue();
@@ -185,7 +197,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
- randomBoolean()
+ randomCondition()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
@@ -204,7 +216,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
- randomBoolean()
+ randomCondition()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
@@ -255,7 +267,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}
- final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
+ final ConditionalOperation conditionalOperation = doCopy ? ConditionalOperation.NONE : randomCondition();
final S3Client client = configureMockClient(blobStore);
@@ -305,7 +317,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
if (doCopy) {
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
} else {
- blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
+ blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, conditionalOperation);
}
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
@@ -372,8 +384,19 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
assertEquals(uploadId, compRequest.uploadId());
- if (failIfAlreadyExists) {
- assertEquals("*", compRequest.ifNoneMatch());
+ switch (conditionalOperation) {
+ case ConditionalOperation.IfMatch ifMatch -> {
+ assertEquals(ifMatch.etag(), compRequest.ifMatch());
+ assertNull(compRequest.ifNoneMatch());
+ }
+ case ConditionalOperation.IfNoneMatch ignored -> {
+ assertNull(compRequest.ifMatch());
+ assertEquals("*", compRequest.ifNoneMatch());
+ }
+ case ConditionalOperation.None ignored -> {
+ assertNull(compRequest.ifMatch());
+ assertNull(compRequest.ifNoneMatch());
+ }
}
final List actualETags = compRequest.multipartUpload()
@@ -461,7 +484,7 @@ public void close() {}
blobName,
new ByteArrayInputStream(new byte[0]),
blobSize,
- randomBoolean()
+ randomCondition()
);
});
@@ -557,6 +580,14 @@ public void close() {}
return client;
}
+ private ConditionalOperation randomCondition() {
+ return switch (between(0, 2)) {
+ case 0 -> ConditionalOperation.NONE;
+ case 1 -> ConditionalOperation.IF_NONE_MATCH;
+ default -> ConditionalOperation.ifMatch(randomAlphanumericOfLength(128));
+ };
+ }
+
private static void closeMockClient(S3BlobStore blobStore) {
final var finalClientReference = blobStore.clientReference();
assertFalse(finalClientReference.decRef());