From 8c33c9d4ab95dd07e7b201ca92c0b5326eb646f7 Mon Sep 17 00:00:00 2001 From: yma Date: Tue, 7 Jan 2025 17:24:31 +0800 Subject: [PATCH] Use Uni async way to do checksum validation for local downloads --- .../jaxrs/FoloContentAccessResource.java | 39 ++++- .../util/sidecar/services/ProxyService.java | 140 +++++++++++++++++- .../util/sidecar/services/ReportService.java | 5 +- 3 files changed, 170 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/commonjava/util/sidecar/jaxrs/FoloContentAccessResource.java b/src/main/java/org/commonjava/util/sidecar/jaxrs/FoloContentAccessResource.java index 78a48d1..a4ef1a9 100644 --- a/src/main/java/org/commonjava/util/sidecar/jaxrs/FoloContentAccessResource.java +++ b/src/main/java/org/commonjava/util/sidecar/jaxrs/FoloContentAccessResource.java @@ -41,6 +41,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.Optional; @@ -87,11 +88,39 @@ public Uni get( @Parameter( in = PATH, required = true ) @PathParam( " Optional download = archiveService.getLocally( path ); if ( download.isPresent() && download.get().isFile() ) { - InputStream inputStream = FileUtils.openInputStream( download.get() ); - final Response.ResponseBuilder builder = Response.ok( new TransferStreamingOutput( inputStream ) ); - logger.debug( "Download path: {} from historical archive.", path ); - publishTrackingEvent( path, id ); - return Uni.createFrom().item( builder.build() ); + Uni checksumValidation = + proxyService.validateChecksum( id, packageType, type, name, path, request ); + return checksumValidation.onItem().transform( result -> { + if ( result != null && result ) + { + try + { + InputStream inputStream = FileUtils.openInputStream( download.get() ); + final Response.ResponseBuilder builder = + Response.ok( new TransferStreamingOutput( inputStream ) ); + logger.debug( "Download path: {} from historical archive.", path ); + publishTrackingEvent( path, id ); + return Uni.createFrom().item( builder.build() ); + } + catch ( IOException e ) + { + logger.error( "IO error for local file, path {}.", path, e ); + } + } + else + { + try + { + logger.debug( "Checksum validation failed, download from proxy: {}.", path ); + return proxyService.doGet( id, packageType, type, name, path, request ); + } + catch ( Exception e ) + { + logger.error( "Error for proxy download, path {}.", path, e ); + } + } + return null; + } ).flatMap( response -> response ); } else { diff --git a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java index 09566d6..d604979 100644 --- a/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java +++ b/src/main/java/org/commonjava/util/sidecar/services/ProxyService.java @@ -21,6 +21,7 @@ import kotlin.Pair; import org.commonjava.util.sidecar.config.ProxyConfiguration; import org.commonjava.util.sidecar.interceptor.ExceptionHandler; +import org.commonjava.util.sidecar.model.dto.HistoricalEntryDTO; import org.commonjava.util.sidecar.util.OtelAdapter; import org.commonjava.util.sidecar.util.ProxyStreamingOutput; import org.commonjava.util.sidecar.util.UrlUtils; @@ -31,7 +32,11 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.ws.rs.core.Response; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.util.LinkedHashMap; +import java.util.Map; import static io.vertx.core.http.HttpMethod.HEAD; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; @@ -54,28 +59,34 @@ public class ProxyService @Inject OtelAdapter otel; + @Inject + ReportService reportService; + public Uni doHead( String trackingId, String packageType, String type, String name, String path, - HttpServerRequest request ) throws Exception + HttpServerRequest request ) + throws Exception { String contentPath = UrlUtils.buildUrl( FOLO_TRACK_REST_BASE_PATH, trackingId, packageType, type, name, path ); return doHead( contentPath, request ); } - public Uni doHead( String path, HttpServerRequest request ) throws Exception + public Uni doHead( String path, HttpServerRequest request ) + throws Exception { return normalizePathAnd( path, p -> classifier.classifyAnd( p, request, ( client, service ) -> wrapAsyncCall( - client.head( p, request ).call(), request.method() ) ) ); + client.head( p, request ).call(), request.method() ) ) ); } public Uni doGet( String trackingId, String packageType, String type, String name, String path, - HttpServerRequest request ) throws Exception + HttpServerRequest request ) + throws Exception { String contentPath = UrlUtils.buildUrl( FOLO_TRACK_REST_BASE_PATH, trackingId, packageType, type, name, path ); return doGet( contentPath, request ); } public Uni doGet( String path, HttpServerRequest request ) - throws Exception + throws Exception { return normalizePathAnd( path, p -> classifier.classifyAnd( p, request, ( client, service ) -> wrapAsyncCall( client.get( p, request ).call(), request.method() ) ) ); @@ -113,16 +124,87 @@ public Uni doPut( String path, InputStream is, HttpServerRequest reque public Uni doDelete( String path, HttpServerRequest request ) throws Exception { return normalizePathAnd( path, p -> classifier.classifyAnd( p, request, ( client, service ) -> wrapAsyncCall( - client.delete( p ).headersFrom( request ).call(), request.method() ) ) ); + client.delete( p ).headersFrom( request ).call(), request.method() ) ) ); } public Uni wrapAsyncCall( WebClientAdapter.CallAdapter asyncCall, HttpMethod method ) { - Uni ret = - asyncCall.enqueue().onItem().transform( ( resp ) -> convertProxyResp( resp, method ) ); + Uni ret = asyncCall.enqueue().onItem().transform( ( resp ) -> convertProxyResp( resp, method ) ); return ret.onFailure().recoverWithItem( this::handleProxyException ); } + public Uni validateChecksum( String trackingId, String packageType, String type, String name, String path, + HttpServerRequest request ) + { + Map localChecksums = getChecksums( path ); + Uni resultUni = Uni.createFrom().item( false ); + + for ( String checksumType : localChecksums.keySet() ) + { + String localChecksum = localChecksums.get( checksumType ); + if ( localChecksum == null ) + { + continue; + } + String checksumUrl = path + "." + checksumType; + resultUni = resultUni.onItem().call( () -> { + try + { + return downloadAndCompareChecksum( trackingId, packageType, type, name, checksumUrl, localChecksum, + request ).onItem().invoke( result -> { + if ( result != null && result ) + { + // This is just used to skip loop to avoid unnecessary checksum download + logger.debug( + "Found the valid checksum compare result, stopping further checks, remote path {}", + checksumUrl ); + throw new FoundValidChecksumException(); + } + } ); + } + catch ( Exception e ) + { + logger.error( "Checksum download compare error for path: {}", checksumUrl, e ); + } + return null; + } ); + } + return resultUni.onFailure().recoverWithItem( false ).onItem().transform( result -> { + // If catch FoundValidChecksumException,return true + return true; + } ); // If no valid checksum compare result found, return false + } + + private Uni downloadAndCompareChecksum( String trackingId, String packageType, String type, String name, + String checksumUrl, String localChecksum, + HttpServerRequest request ) + throws Exception + { + return doGet( trackingId, packageType, type, name, checksumUrl, request ).onItem().transform( response -> { + if ( response.getStatus() == Response.Status.OK.getStatusCode() ) + { + ProxyStreamingOutput streamingOutput = (ProxyStreamingOutput) response.getEntity(); + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) + { + streamingOutput.write( outputStream ); + String remoteChecksum = outputStream.toString(); + return localChecksum.equals( remoteChecksum ); + } + catch ( IOException e ) + { + logger.error( "Error to read remote checksum, path:{}.", checksumUrl, e ); + return null; + } + } + else + { + logger.error( "Failed to download remote checksum for {}: HTTP {}.", checksumUrl, + response.getStatus() ); + return null; + } + } ); + } + /** * Send status 500 with error message body. * @param t error @@ -162,4 +244,46 @@ private boolean isHeaderAllowed( Pair header String key = header.getFirst(); return !FORBIDDEN_HEADERS.contains( key.toLowerCase() ); } + + private Map getChecksums( String path ) + { + Map result = new LinkedHashMap<>(); + HistoricalEntryDTO entryDTO = reportService.getHistoricalContentMap().get( path ); + if ( entryDTO != null ) + { + result.put( ChecksumType.SHA1.getValue(), entryDTO.getSha1() ); + result.put( ChecksumType.SHA256.getValue(), entryDTO.getSha256() ); + result.put( ChecksumType.MD5.getValue(), entryDTO.getMd5() ); + } + + return result; + } + + enum ChecksumType + { + SHA1( "sha1" ), + SHA256( "sha256" ), + MD5( "md5" ); + + private final String value; + + ChecksumType( String value ) + { + this.value = value; + } + + public String getValue() + { + return value; + } + } + + class FoundValidChecksumException + extends RuntimeException + { + public FoundValidChecksumException() + { + super( "Found a valid checksum, stopping further checks." ); + } + } } \ No newline at end of file diff --git a/src/main/java/org/commonjava/util/sidecar/services/ReportService.java b/src/main/java/org/commonjava/util/sidecar/services/ReportService.java index cddef08..79cd7cc 100644 --- a/src/main/java/org/commonjava/util/sidecar/services/ReportService.java +++ b/src/main/java/org/commonjava/util/sidecar/services/ReportService.java @@ -145,7 +145,10 @@ public void storeTrackedDownload( JsonObject message ) Quarkus.asyncExit(); } } - } + public HashMap getHistoricalContentMap() + { + return historicalContentMap; + } }