8484import java .util .List ;
8585import java .util .Map ;
8686import java .util .concurrent .atomic .AtomicLong ;
87+ import java .util .function .Supplier ;
8788import java .util .stream .Collectors ;
8889
8990import static org .elasticsearch .common .blobstore .support .BlobContainerUtils .getRegisterUsingConsistentRead ;
@@ -144,10 +145,11 @@ public void writeBlob(OperationPurpose purpose, String blobName, InputStream inp
144145 throws IOException {
145146 assert BlobContainer .assertPurposeConsistency (purpose , blobName );
146147 assert inputStream .markSupported () : "No mark support on inputStream breaks the S3 SDK's ability to retry requests" ;
148+ final var condition = failIfAlreadyExists ? ConditionalOperation .IF_NONE_MATCH : ConditionalOperation .NONE ;
147149 if (blobSize <= getLargeBlobThresholdInBytes ()) {
148- executeSingleUpload (purpose , blobStore , buildKey (blobName ), inputStream , blobSize , failIfAlreadyExists );
150+ executeSingleUpload (purpose , blobStore , buildKey (blobName ), inputStream , blobSize , condition );
149151 } else {
150- executeMultipartUpload (purpose , blobStore , buildKey (blobName ), inputStream , blobSize , failIfAlreadyExists );
152+ executeMultipartUpload (purpose , blobStore , buildKey (blobName ), inputStream , blobSize , condition );
151153 }
152154 }
153155
@@ -536,6 +538,59 @@ String buildKey(String blobName) {
536538 return keyPath + blobName ;
537539 }
538540
541+ /**
542+ * Enumeration of mutually exlusive conditional operations supported by S3.
543+ *
544+ * @see <a href=https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-requests.html>S3-conditional-requests</a>
545+ */
546+ sealed interface ConditionalOperation permits ConditionalOperation .IfMatch , ConditionalOperation .IfNoneMatch ,
547+ ConditionalOperation .None {
548+ ConditionalOperation NONE = new None ();
549+ ConditionalOperation IF_NONE_MATCH = new IfNoneMatch ();
550+
551+ static ConditionalOperation ifMatch (String etag ) {
552+ return new IfMatch (etag );
553+ }
554+
555+ record None () implements ConditionalOperation {}
556+
557+ record IfNoneMatch () implements ConditionalOperation {}
558+
559+ record IfMatch (String etag ) implements ConditionalOperation {}
560+ }
561+
562+ static void putObject (
563+ OperationPurpose purpose ,
564+ S3BlobStore s3BlobStore ,
565+ String blobName ,
566+ long contentLength ,
567+ Supplier <RequestBody > body ,
568+ ConditionalOperation condition
569+ ) {
570+ final var putRequestBuilder = PutObjectRequest .builder ()
571+ .bucket (s3BlobStore .bucket ())
572+ .key (blobName )
573+ .contentLength (contentLength )
574+ .storageClass (s3BlobStore .getStorageClass ())
575+ .acl (s3BlobStore .getCannedACL ());
576+ if (s3BlobStore .serverSideEncryption ()) {
577+ putRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
578+ }
579+ if (s3BlobStore .supportsConditionalWrites (purpose )) {
580+ switch (condition ) {
581+ case ConditionalOperation .IfMatch ifMatch -> putRequestBuilder .ifMatch (ifMatch .etag );
582+ case ConditionalOperation .IfNoneMatch ignored -> putRequestBuilder .ifNoneMatch ("*" );
583+ case ConditionalOperation .None ignored -> {
584+ }
585+ }
586+ }
587+ S3BlobStore .configureRequestForMetrics (putRequestBuilder , s3BlobStore , Operation .PUT_OBJECT , purpose );
588+ final var putRequest = putRequestBuilder .build ();
589+ try (var client = s3BlobStore .clientReference ()) {
590+ client .client ().putObject (putRequest , body .get ());
591+ }
592+ }
593+
539594 /**
540595 * Uploads a blob using a single upload request
541596 */
@@ -545,33 +600,17 @@ void executeSingleUpload(
545600 final String blobName ,
546601 final InputStream input ,
547602 final long blobSize ,
548- final boolean failIfAlreadyExists
603+ final ConditionalOperation condition
549604 ) throws IOException {
550- try ( var clientReference = s3BlobStore . clientReference ()) {
605+ try {
551606 // Extra safety checks
552607 if (blobSize > MAX_FILE_SIZE .getBytes ()) {
553608 throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than " + MAX_FILE_SIZE );
554609 }
555610 if (blobSize > s3BlobStore .bufferSizeInBytes ()) {
556611 throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than buffer size" );
557612 }
558-
559- final var putRequestBuilder = PutObjectRequest .builder ()
560- .bucket (s3BlobStore .bucket ())
561- .key (blobName )
562- .contentLength (blobSize )
563- .storageClass (s3BlobStore .getStorageClass ())
564- .acl (s3BlobStore .getCannedACL ());
565- if (s3BlobStore .serverSideEncryption ()) {
566- putRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
567- }
568- if (failIfAlreadyExists && s3BlobStore .supportsConditionalWrites (purpose )) {
569- putRequestBuilder .ifNoneMatch ("*" );
570- }
571- S3BlobStore .configureRequestForMetrics (putRequestBuilder , blobStore , Operation .PUT_OBJECT , purpose );
572-
573- final var putRequest = putRequestBuilder .build ();
574- clientReference .client ().putObject (putRequest , RequestBody .fromInputStream (input , blobSize ));
613+ putObject (purpose , s3BlobStore , blobName , blobSize , () -> RequestBody .fromInputStream (input , blobSize ), condition );
575614 } catch (final SdkException e ) {
576615 throw new IOException ("Unable to upload object [" + blobName + "] using a single upload" , e );
577616 }
@@ -590,7 +629,7 @@ private void executeMultipart(
590629 final long partSize ,
591630 final long blobSize ,
592631 final PartOperation partOperation ,
593- final boolean failIfAlreadyExists
632+ final ConditionalOperation condition
594633 ) throws IOException {
595634
596635 ensureMultiPartUploadSize (blobSize );
@@ -661,8 +700,13 @@ private void executeMultipart(
661700 .uploadId (uploadId )
662701 .multipartUpload (b -> b .parts (parts ));
663702
664- if (failIfAlreadyExists && s3BlobStore .supportsConditionalWrites (purpose )) {
665- completeMultipartUploadRequestBuilder .ifNoneMatch ("*" );
703+ if (s3BlobStore .supportsConditionalWrites (purpose )) {
704+ switch (condition ) {
705+ case ConditionalOperation .IfMatch ifMatch -> completeMultipartUploadRequestBuilder .ifMatch (ifMatch .etag );
706+ case ConditionalOperation .IfNoneMatch ignored -> completeMultipartUploadRequestBuilder .ifNoneMatch ("*" );
707+ case ConditionalOperation .None ignored -> {
708+ }
709+ }
666710 }
667711
668712 S3BlobStore .configureRequestForMetrics (completeMultipartUploadRequestBuilder , blobStore , operation , purpose );
@@ -690,7 +734,7 @@ void executeMultipartUpload(
690734 final String blobName ,
691735 final InputStream input ,
692736 final long blobSize ,
693- final boolean failIfAlreadyExists
737+ final ConditionalOperation condition
694738 ) throws IOException {
695739 executeMultipart (
696740 purpose ,
@@ -708,7 +752,7 @@ void executeMultipartUpload(
708752 return CompletedPart .builder ().partNumber (partNum ).eTag (uploadResponse .eTag ()).build ();
709753 }
710754 },
711- failIfAlreadyExists
755+ condition
712756 );
713757 }
714758
@@ -756,7 +800,7 @@ void executeMultipartCopy(
756800 return CompletedPart .builder ().partNumber (partNum ).eTag (uploadPartCopyResponse .copyPartResult ().eTag ()).build ();
757801 }
758802 }),
759- false
803+ ConditionalOperation . NONE
760804 );
761805 }
762806
0 commit comments