[JAVA-14456] Updated sdk to latest version + made use of toPublisher() new method on AsyncResponseTransformer (#12793)

Co-authored-by: panagiotiskakos <panagiotis.kakos@libra-is.com>
This commit is contained in:
panos-kakos 2022-10-10 20:13:14 +01:00 committed by GitHub
parent 5a68e4459d
commit d6918ce11d
2 changed files with 33 additions and 81 deletions

View File

@ -91,7 +91,7 @@
<properties> <properties>
<spring.version>2.2.1.RELEASE</spring.version> <spring.version>2.2.1.RELEASE</spring.version>
<awssdk.version>2.10.27</awssdk.version> <awssdk.version>2.17.283</awssdk.version>
</properties> </properties>
</project> </project>

View File

@ -3,17 +3,11 @@
*/ */
package com.baeldung.aws.reactive.s3; package com.baeldung.aws.reactive.s3;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.http.ResponseEntity.BodyBuilder;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -22,11 +16,7 @@ import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@ -41,7 +31,6 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@Slf4j @Slf4j
public class DownloadResource { public class DownloadResource {
private final S3AsyncClient s3client; private final S3AsyncClient s3client;
private final S3ClientConfigurarionProperties s3config; private final S3ClientConfigurarionProperties s3config;
@ -50,28 +39,30 @@ public class DownloadResource {
this.s3config = s3config; this.s3config = s3config;
} }
@GetMapping(path = "/{filekey}")
@GetMapping(path="/{filekey}")
public Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) { public Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
GetObjectRequest request = GetObjectRequest.builder() GetObjectRequest request = GetObjectRequest.builder()
.bucket(s3config.getBucket()) .bucket(s3config.getBucket())
.key(filekey) .key(filekey)
.build(); .build();
return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider())) return Mono.fromFuture(s3client.getObject(request, AsyncResponseTransformer.toPublisher()))
.map( (response) -> { .map(response -> {
checkResult(response.sdkResponse); checkResult(response.response());
String filename = getMetadataItem(response.sdkResponse,"filename",filekey); String filename = getMetadataItem(response.response(), "filename", filekey);
log.info("[I65] filename={}, length={}",filename, response.sdkResponse.contentLength() ); log.info("[I65] filename={}, length={}", filename, response.response()
.contentLength());
return ResponseEntity.ok() return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType()) .header(HttpHeaders.CONTENT_TYPE, response.response()
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength())) .contentType())
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"") .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.response()
.body(response.flux); .contentLength()))
}); .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
.body(Flux.from(response));
});
} }
/** /**
@ -82,63 +73,24 @@ public class DownloadResource {
* @return * @return
*/ */
private String getMetadataItem(GetObjectResponse sdkResponse, String key, String defaultValue) { private String getMetadataItem(GetObjectResponse sdkResponse, String key, String defaultValue) {
for( Entry<String, String> entry : sdkResponse.metadata().entrySet()) { for (Entry<String, String> entry : sdkResponse.metadata()
if ( entry.getKey().equalsIgnoreCase(key)) { .entrySet()) {
if (entry.getKey()
.equalsIgnoreCase(key)) {
return entry.getValue(); return entry.getValue();
} }
} }
return defaultValue; return defaultValue;
} }
// Helper used to check return codes from an API call // Helper used to check return codes from an API call
private static void checkResult(GetObjectResponse response) { private static void checkResult(GetObjectResponse response) {
SdkHttpResponse sdkResponse = response.sdkHttpResponse(); SdkHttpResponse sdkResponse = response.sdkHttpResponse();
if ( sdkResponse != null && sdkResponse.isSuccessful()) { if (sdkResponse != null && sdkResponse.isSuccessful()) {
return; return;
} }
throw new DownloadFailedException(response); throw new DownloadFailedException(response);
} }
static class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {
private FluxResponse response;
@Override
public CompletableFuture<FluxResponse> prepare() {
response = new FluxResponse();
return response.cf;
}
@Override
public void onResponse(GetObjectResponse sdkResponse) {
this.response.sdkResponse = sdkResponse;
}
@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
response.flux = Flux.from(publisher);
response.cf.complete(response);
}
@Override
public void exceptionOccurred(Throwable error) {
response.cf.completeExceptionally(error);
}
}
/**
* Holds the API response and stream
* @author Philippe
*/
static class FluxResponse {
final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
GetObjectResponse sdkResponse;
Flux<ByteBuffer> flux;
}
} }