diff --git a/aws-reactive/images/rective-upload.png b/aws-reactive/images/rective-upload.png
new file mode 100644
index 0000000000..ebe5a7d69c
Binary files /dev/null and b/aws-reactive/images/rective-upload.png differ
diff --git a/aws-reactive/images/rective-upload.txt b/aws-reactive/images/rective-upload.txt
new file mode 100644
index 0000000000..fb5177544e
--- /dev/null
+++ b/aws-reactive/images/rective-upload.txt
@@ -0,0 +1,29 @@
+participant "Client 1" as C1
+participant "Client 2" as C2
+participant "Reactive Web App" as RWS
+participant "Backend" as S3
+C1 -> RWS: POST
+activate C1
+activate RWS
+RWS -> S3: Async POST
+deactivate RWS
+C2 -> RWS: POST
+activate C2
+activate RWS
+RWS -> S3: Async POST
+deactivate RWS
+S3 --> RWS: Async Result
+activate RWS
+RWS -->C2: Result
+deactivate RWS
+deactivate C2
+// First file EOF
+S3 --> RWS: Async Result
+activate RWS
+RWS -->C1: Result
+deactivate RWS
+deactivate C1
+
+
+
+
diff --git a/aws-reactive/images/thread-per-client.png b/aws-reactive/images/thread-per-client.png
new file mode 100644
index 0000000000..dc75839aa4
Binary files /dev/null and b/aws-reactive/images/thread-per-client.png differ
diff --git a/aws-reactive/images/thread-per-client.txt b/aws-reactive/images/thread-per-client.txt
new file mode 100644
index 0000000000..a7009e318c
--- /dev/null
+++ b/aws-reactive/images/thread-per-client.txt
@@ -0,0 +1,28 @@
+participant Client 1
+participant Client 2
+participant Controller
+participant Backend
+Client 1-> Controller: POST Data
+activate Client 1
+activate Controller
+Controller -> Backend: Save Data
+activate Backend
+note left of Controller #yellow: Controller blocked\nuntil result received
+Backend --> Controller: Result
+deactivate Backend
+Controller --> Client 1: Result
+deactivate Client 1
+deactivate Controller
+// 2nd Upload
+Client 2-> Controller: POST Data
+activate Client 2
+activate Controller
+Controller -> Backend: Save Data
+activate Backend
+note left of Controller #yellow: Controller blocket\nuntil result received
+Backend --> Controller: Result
+deactivate Backend
+Controller --> Client 2: Result
+deactivate Controller
+deactivate Client 2
+
diff --git a/aws-reactive/pom.xml b/aws-reactive/pom.xml
new file mode 100644
index 0000000000..b3fcb24902
--- /dev/null
+++ b/aws-reactive/pom.xml
@@ -0,0 +1,105 @@
+
+
+ 4.0.0
+
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+
+
+ aws-reactive
+ 0.0.1-SNAPSHOT
+ aws-reactive
+ AWS Reactive Sample
+
+
+ 1.8
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.2.1.RELEASE
+ pom
+ import
+
+
+
+ software.amazon.awssdk
+ bom
+ 2.10.27
+ pom
+ import
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+ software.amazon.awssdk
+ s3
+ compile
+
+
+
+ netty-nio-client
+ software.amazon.awssdk
+ compile
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.springframework.boot
+ spring-boot-devtools
+ runtime
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadFailedException.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadFailedException.java
new file mode 100644
index 0000000000..a88e1ab010
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadFailedException.java
@@ -0,0 +1,32 @@
+package com.baeldung.aws.reactive.s3;
+
+import java.util.Optional;
+
+import org.springframework.http.HttpStatus;
+
+import lombok.AllArgsConstructor;
+import software.amazon.awssdk.core.SdkResponse;
+import software.amazon.awssdk.http.SdkHttpResponse;
+
+@AllArgsConstructor
+public class DownloadFailedException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private int statusCode;
+ private Optional statusText;
+
+ public DownloadFailedException(SdkResponse response) {
+
+ SdkHttpResponse httpResponse = response.sdkHttpResponse();
+ if (httpResponse != null) {
+ this.statusCode = httpResponse.statusCode();
+ this.statusText = httpResponse.statusText();
+ } else {
+ this.statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
+ this.statusText = Optional.of("UNKNOWN");
+ }
+
+ }
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadResource.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadResource.java
new file mode 100644
index 0000000000..838ada1685
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/DownloadResource.java
@@ -0,0 +1,144 @@
+/**
+ *
+ */
+package com.baeldung.aws.reactive.s3;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.ResponseEntity.BodyBuilder;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.SdkResponse;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.core.async.SdkPublisher;
+import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+/**
+ * @author Philippe
+ *
+ */
+@RestController
+@RequestMapping("/inbox")
+@Slf4j
+public class DownloadResource {
+
+
+ private final S3AsyncClient s3client;
+ private final S3ClientConfigurarionProperties s3config;
+
+ public DownloadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
+ this.s3client = s3client;
+ this.s3config = s3config;
+ }
+
+
+ @GetMapping(path="/{filekey}")
+ public Mono>> downloadFile(@PathVariable("filekey") String filekey) {
+
+ GetObjectRequest request = GetObjectRequest.builder()
+ .bucket(s3config.getBucket())
+ .key(filekey)
+ .build();
+
+ return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
+ .map( (response) -> {
+ checkResult(response.sdkResponse);
+ String filename = getMetadataItem(response.sdkResponse,"filename",filekey);
+
+ log.info("[I65] filename={}, length={}",filename, response.sdkResponse.contentLength() );
+
+ return ResponseEntity.ok()
+ .header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
+ .header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
+ .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
+ .body(response.flux);
+ });
+ }
+
+ /**
+ * Lookup a metadata key in a case-insensitive way.
+ * @param sdkResponse
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ private String getMetadataItem(GetObjectResponse sdkResponse, String key, String defaultValue) {
+ for( Entry entry : sdkResponse.metadata().entrySet()) {
+ if ( entry.getKey().equalsIgnoreCase(key)) {
+ return entry.getValue();
+ }
+ }
+ return defaultValue;
+ }
+
+
+ // Helper used to check return codes from an API call
+ private static void checkResult(GetObjectResponse response) {
+ SdkHttpResponse sdkResponse = response.sdkHttpResponse();
+ if ( sdkResponse != null && sdkResponse.isSuccessful()) {
+ return;
+ }
+
+ throw new DownloadFailedException(response);
+ }
+
+
+ static class FluxResponseProvider implements AsyncResponseTransformer {
+
+ private FluxResponse response;
+
+ @Override
+ public CompletableFuture prepare() {
+ response = new FluxResponse();
+ return response.cf;
+ }
+
+ @Override
+ public void onResponse(GetObjectResponse sdkResponse) {
+ this.response.sdkResponse = sdkResponse;
+ }
+
+ @Override
+ public void onStream(SdkPublisher publisher) {
+ response.flux = Flux.from(publisher);
+ response.cf.complete(response);
+ }
+
+ @Override
+ public void exceptionOccurred(Throwable error) {
+ response.cf.completeExceptionally(error);
+ }
+
+ }
+
+ /**
+ * Holds the API response and stream
+ * @author Philippe
+ */
+ static class FluxResponse {
+
+ final CompletableFuture cf = new CompletableFuture<>();
+ GetObjectResponse sdkResponse;
+ Flux flux;
+ }
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/ReactiveS3Application.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/ReactiveS3Application.java
new file mode 100644
index 0000000000..b90c085fc9
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/ReactiveS3Application.java
@@ -0,0 +1,13 @@
+package com.baeldung.aws.reactive.s3;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ReactiveS3Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ReactiveS3Application.class, args);
+ }
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfigurarionProperties.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfigurarionProperties.java
new file mode 100644
index 0000000000..b30bc1e5fa
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfigurarionProperties.java
@@ -0,0 +1,28 @@
+package com.baeldung.aws.reactive.s3;
+
+import java.net.URI;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import lombok.Data;
+import software.amazon.awssdk.regions.Region;
+
+@ConfigurationProperties(prefix = "aws.s3")
+@Data
+public class S3ClientConfigurarionProperties {
+
+ private Region region = Region.US_EAST_1;
+ private URI endpoint = null;
+
+ private String accessKeyId;
+ private String secretAccessKey;
+
+ // Bucket name we'll be using as our backend storage
+ private String bucket;
+
+ // AWS S3 requires that file parts must have at least 5MB, except
+ // for the last part. This may change for other S3-compatible services, so let't
+ // define a configuration property for that
+ private int multipartMinPartSize = 5*1024*1024;
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfiguration.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfiguration.java
new file mode 100644
index 0000000000..906ea088a1
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/S3ClientConfiguration.java
@@ -0,0 +1,65 @@
+package com.baeldung.aws.reactive.s3;
+
+import java.time.Duration;
+
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
+import software.amazon.awssdk.utils.StringUtils;
+
+@Configuration
+@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
+public class S3ClientConfiguration {
+
+ @Bean
+ public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, AwsCredentialsProvider credentialsProvider) {
+
+ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
+ .writeTimeout(Duration.ZERO)
+ .maxConcurrency(64)
+ .build();
+
+ S3Configuration serviceConfiguration = S3Configuration.builder()
+ .checksumValidationEnabled(false)
+ .chunkedEncodingEnabled(true)
+ .build();
+
+ S3AsyncClientBuilder b = S3AsyncClient.builder()
+ .httpClient(httpClient)
+ .region(s3props.getRegion())
+ .credentialsProvider(credentialsProvider)
+ .serviceConfiguration(serviceConfiguration);
+
+ if (s3props.getEndpoint() != null) {
+ b = b.endpointOverride(s3props.getEndpoint());
+ }
+
+ return b.build();
+ }
+
+ @Bean
+ public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
+
+ if (StringUtils.isBlank(s3props.getAccessKeyId())) {
+ // Return default provider
+ return DefaultCredentialsProvider.create();
+ }
+ else {
+ // Return custom credentials provider
+ return () -> {
+ AwsCredentials creds = AwsBasicCredentials.create(s3props.getAccessKeyId(), s3props.getSecretAccessKey());
+ return creds;
+ };
+ }
+ }
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadFailedException.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadFailedException.java
new file mode 100644
index 0000000000..0cfebc85d2
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadFailedException.java
@@ -0,0 +1,32 @@
+package com.baeldung.aws.reactive.s3;
+
+import java.util.Optional;
+
+import org.springframework.http.HttpStatus;
+
+import lombok.AllArgsConstructor;
+import software.amazon.awssdk.core.SdkResponse;
+import software.amazon.awssdk.http.SdkHttpResponse;
+
+@AllArgsConstructor
+public class UploadFailedException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ private int statusCode;
+ private Optional statusText;
+
+ public UploadFailedException(SdkResponse response) {
+
+ SdkHttpResponse httpResponse = response.sdkHttpResponse();
+ if (httpResponse != null) {
+ this.statusCode = httpResponse.statusCode();
+ this.statusText = httpResponse.statusText();
+ } else {
+ this.statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
+ this.statusText = Optional.of("UNKNOWN");
+ }
+
+ }
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResource.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResource.java
new file mode 100644
index 0000000000..fa7bf6a471
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResource.java
@@ -0,0 +1,308 @@
+/**
+ *
+ */
+package com.baeldung.aws.reactive.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.codec.multipart.FilePart;
+import org.springframework.http.codec.multipart.Part;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.core.SdkResponse;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload.Builder;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+
+/**
+ * @author Philippe
+ *
+ */
+@RestController
+@RequestMapping("/inbox")
+@Slf4j
+public class UploadResource {
+
+ private final S3AsyncClient s3client;
+ private final S3ClientConfigurarionProperties s3config;
+
+ public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
+ this.s3client = s3client;
+ this.s3config = s3config;
+ }
+
+ /**
+ * Standard file upload.
+ */
+ @PostMapping
+ public Mono> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux body) {
+
+ long length = headers.getContentLength();
+ if (length < 0) {
+ throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
+ }
+
+ String fileKey = UUID.randomUUID().toString();
+ Map metadata = new HashMap();
+ MediaType mediaType = headers.getContentType();
+
+ if (mediaType == null) {
+ mediaType = MediaType.APPLICATION_OCTET_STREAM;
+ }
+
+ log.info("[I95] uploadHandler: mediaType{}, length={}", mediaType, length);
+ CompletableFuture future = s3client
+ .putObject(PutObjectRequest.builder()
+ .bucket(s3config.getBucket())
+ .contentLength(length)
+ .key(fileKey.toString())
+ .contentType(mediaType.toString())
+ .metadata(metadata)
+ .build(),
+ AsyncRequestBody.fromPublisher(body));
+
+ return Mono.fromFuture(future)
+ .map((response) -> {
+ checkResult(response);
+ return ResponseEntity
+ .status(HttpStatus.CREATED)
+ .body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
+ });
+ }
+
+
+ /**
+ * Multipart file upload
+ * @param bucket
+ * @param parts
+ * @param headers
+ * @return
+ */
+ @RequestMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE, method = {RequestMethod.POST, RequestMethod.PUT})
+ public Mono> multipartUploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux parts ) {
+
+ return parts
+ .ofType(FilePart.class) // We'll ignore other data for now
+ .flatMap((part) -> saveFile(headers, s3config.getBucket(), part))
+ .collect(Collectors.toList())
+ .map((keys) -> ResponseEntity.status(HttpStatus.CREATED)
+ .body(new UploadResult(HttpStatus.CREATED,keys)));
+ }
+
+
+ /**
+ * Save file using a multipart upload. This method does not require any temporary
+ * storage at the REST service
+ * @param headers
+ * @param bucket Bucket name
+ * @param part Uploaded file
+ * @return
+ */
+ protected Mono saveFile(HttpHeaders headers,String bucket, FilePart part) {
+
+ // Generate a filekey for this upload
+ String filekey = UUID.randomUUID().toString();
+
+ log.info("[I137] saveFile: filekey={}, filename={}", filekey, part.filename());
+
+ // Gather metadata
+ Map metadata = new HashMap();
+ String filename = part.filename();
+ if ( filename == null ) {
+ filename = filekey;
+ }
+
+ metadata.put("filename", filename);
+
+ MediaType mt = part.headers().getContentType();
+ if ( mt == null ) {
+ mt = MediaType.APPLICATION_OCTET_STREAM;
+ }
+
+ // Create multipart upload request
+ CompletableFuture uploadRequest = s3client
+ .createMultipartUpload(CreateMultipartUploadRequest.builder()
+ .contentType(mt.toString())
+ .key(filekey)
+ .metadata(metadata)
+ .bucket(bucket)
+ .build());
+
+ // This variable will hold the upload state that we must keep
+ // around until all uploads complete
+ final UploadState uploadState = new UploadState(bucket,filekey);
+
+ return Mono
+ .fromFuture(uploadRequest)
+ .flatMapMany((response) -> {
+ checkResult(response);
+ uploadState.uploadId = response.uploadId();
+ log.info("[I183] uploadId={}", response.uploadId());
+ return part.content();
+ })
+ .bufferUntil((buffer) -> {
+ uploadState.buffered += buffer.readableByteCount();
+ if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
+ log.info("[I173] bufferUntil: returning true, bufferedBytes={}, partCounter={}, uploadId={}", uploadState.buffered, uploadState.partCounter, uploadState.uploadId);
+ uploadState.buffered = 0;
+ return true;
+ }
+ else {
+ return false;
+ }
+ })
+ .map((buffers) -> concatBuffers(buffers))
+ .flatMap((buffer) -> uploadPart(uploadState,buffer))
+ .onBackpressureBuffer()
+ .reduce(uploadState,(state,completedPart) -> {
+ log.info("[I188] completed: partNumber={}, etag={}", completedPart.partNumber(), completedPart.eTag());
+ state.completedParts.put(completedPart.partNumber(), completedPart);
+ return state;
+ })
+ .flatMap((state) -> completeUpload(state))
+ .map((response) -> {
+ checkResult(response);
+ return uploadState.filekey;
+ });
+ }
+
+ private static ByteBuffer concatBuffers(List buffers) {
+ log.info("[I198] creating BytBuffer from {} chunks", buffers.size());
+
+ int partSize = 0;
+ for( DataBuffer b : buffers) {
+ partSize += b.readableByteCount();
+ }
+
+ ByteBuffer partData = ByteBuffer.allocate(partSize);
+ buffers.forEach((buffer) -> {
+ partData.put(buffer.asByteBuffer());
+ });
+
+ // Reset read pointer to first byte
+ partData.rewind();
+
+ log.info("[I208] partData: size={}", partData.capacity());
+ return partData;
+
+ }
+
+ /**
+ * Upload a single file part to the requested bucket
+ * @param uploadState
+ * @param buffer
+ * @return
+ */
+ private Mono uploadPart(UploadState uploadState, ByteBuffer buffer) {
+ final int partNumber = ++uploadState.partCounter;
+ log.info("[I218] uploadPart: partNumber={}, contentLength={}",partNumber, buffer.capacity());
+
+ CompletableFuture request = s3client.uploadPart(UploadPartRequest.builder()
+ .bucket(uploadState.bucket)
+ .key(uploadState.filekey)
+ .partNumber(partNumber)
+ .uploadId(uploadState.uploadId)
+ .contentLength((long) buffer.capacity())
+ .build(),
+ AsyncRequestBody.fromPublisher(Mono.just(buffer)));
+
+ return Mono
+ .fromFuture(request)
+ .map((uploadPartResult) -> {
+ checkResult(uploadPartResult);
+ log.info("[I230] uploadPart complete: part={}, etag={}",partNumber,uploadPartResult.eTag());
+ return CompletedPart.builder()
+ .eTag(uploadPartResult.eTag())
+ .partNumber(partNumber)
+ .build();
+ });
+ }
+
+ private Mono completeUpload(UploadState state) {
+ log.info("[I202] completeUpload: bucket={}, filekey={}, completedParts.size={}", state.bucket, state.filekey, state.completedParts.size());
+
+ CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
+ .parts(state.completedParts.values())
+ .build();
+
+ return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
+ .bucket(state.bucket)
+ .uploadId(state.uploadId)
+ .multipartUpload(multipartUpload)
+ .key(state.filekey)
+ .build()));
+ }
+
+
+ /**
+ * check result from an API call.
+ * @param result Result from an API call
+ */
+ private static void checkResult(SdkResponse result) {
+ if (result.sdkHttpResponse() == null || !result.sdkHttpResponse().isSuccessful()) {
+ throw new UploadFailedException(result);
+ }
+ }
+
+
+ /**
+ * Holds upload state during a multipart upload
+ */
+ static class UploadState {
+ final String bucket;
+ final String filekey;
+
+ String uploadId;
+ int partCounter;
+ Map completedParts = new HashMap<>();
+ int buffered = 0;
+
+ UploadState(String bucket, String filekey) {
+ this.bucket = bucket;
+ this.filekey = filekey;
+ }
+ }
+
+}
diff --git a/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResult.java b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResult.java
new file mode 100644
index 0000000000..642ad426a5
--- /dev/null
+++ b/aws-reactive/src/main/java/com/baeldung/aws/reactive/s3/UploadResult.java
@@ -0,0 +1,25 @@
+package com.baeldung.aws.reactive.s3;
+
+import java.util.List;
+
+import org.springframework.http.HttpStatus;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+@Builder
+public class UploadResult {
+ HttpStatus status;
+ String[] keys;
+
+ public UploadResult() {}
+
+ public UploadResult(HttpStatus status, List keys) {
+ this.status = status;
+ this.keys = keys == null ? new String[] {}: keys.toArray(new String[] {});
+
+ }
+}
diff --git a/aws-reactive/src/main/resources/application-minio.yml b/aws-reactive/src/main/resources/application-minio.yml
new file mode 100644
index 0000000000..93bc2ff18b
--- /dev/null
+++ b/aws-reactive/src/main/resources/application-minio.yml
@@ -0,0 +1,15 @@
+
+#
+# Minio profile
+#
+aws:
+ s3:
+ region: sa-east-1
+ endpoint: http://localhost:9000
+ accessKeyId: 8KLF8U60JER4AP23H0A6
+ secretAccessKey: vX4uM7e7nNGPqjcXycVVhceNR7NQkiMQkR9Hoctf
+ bucket: bucket1
+
+
+
+
diff --git a/aws-reactive/src/main/resources/application.yml b/aws-reactive/src/main/resources/application.yml
new file mode 100644
index 0000000000..957ebf82c3
--- /dev/null
+++ b/aws-reactive/src/main/resources/application.yml
@@ -0,0 +1,16 @@
+
+#
+# Configurações de acesso ao Minio
+#
+aws:
+ s3:
+ region: sa-east-1
+# When using AWS, the library will use one of the available
+# credential sources described in the documentation.
+# accessKeyId: ****
+# secretAccessKey: ****
+ bucket: dev1.token.com.br
+
+
+
+
diff --git a/aws-reactive/src/test/java/com/baeldung/aws/reactive/s3/ReactiveS3ApplicationLiveTest.java b/aws-reactive/src/test/java/com/baeldung/aws/reactive/s3/ReactiveS3ApplicationLiveTest.java
new file mode 100644
index 0000000000..9e5720225f
--- /dev/null
+++ b/aws-reactive/src/test/java/com/baeldung/aws/reactive/s3/ReactiveS3ApplicationLiveTest.java
@@ -0,0 +1,85 @@
+package com.baeldung.aws.reactive.s3;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.core.io.Resource;
+import org.springframework.http.ContentDisposition;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.ResourceUtils;
+
+import static org.junit.Assert.*;
+
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
+@ActiveProfiles("minio")
+class ReactiveS3ApplicationLiveTest {
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+ @LocalServerPort
+ private int serverPort;
+
+
+ @Test
+ void whenUploadSingleFile_thenSuccess() throws Exception {
+
+ String url = "http://localhost:" + serverPort + "/inbox";
+ byte[] data = Files.readAllBytes(Paths.get("src/test/resources/testimage1.png"));
+ UploadResult result = restTemplate.postForObject(url, data , UploadResult.class);
+
+ assertEquals("Expected CREATED (202)", result.getStatus(), HttpStatus.CREATED );
+
+ }
+
+ @Test
+ void whenUploadMultipleFiles_thenSuccess() throws Exception {
+
+
+ MultiValueMap body = new LinkedMultiValueMap<>();
+ addFileEntity("f1", body, new File("src/test/resources/testimage1.png"));
+ addFileEntity("f2", body, new File("src/test/resources/testimage2.png"));
+
+ HttpEntity> requestEntity = new HttpEntity<>(body);
+ String url = "http://localhost:" + serverPort + "/inbox";
+
+ ResponseEntity result = restTemplate.postForEntity(url, requestEntity, UploadResult.class);
+
+ assertEquals("Http Code",HttpStatus.CREATED, result.getStatusCode() );
+ assertEquals("File keys",2, result.getBody().getKeys().length);
+
+ }
+
+ private void addFileEntity(String name, MultiValueMap body, File file) throws Exception {
+
+ byte[] data = Files.readAllBytes(file.toPath());
+ MultiValueMap headers = new LinkedMultiValueMap<>();
+ ContentDisposition contentDispositionHeader = ContentDisposition.builder("form-data")
+ .name(name)
+ .filename(file.getName())
+ .build();
+
+ headers.add(HttpHeaders.CONTENT_DISPOSITION, contentDispositionHeader.toString());
+
+ HttpEntity fileEntity = new HttpEntity<>(data, headers);
+ body.add(name, fileEntity);
+ }
+
+
+}
diff --git a/aws-reactive/src/test/resources/testimage1.png b/aws-reactive/src/test/resources/testimage1.png
new file mode 100644
index 0000000000..c61a9b677f
Binary files /dev/null and b/aws-reactive/src/test/resources/testimage1.png differ
diff --git a/aws-reactive/src/test/resources/testimage2.png b/aws-reactive/src/test/resources/testimage2.png
new file mode 100644
index 0000000000..8c4f119ae6
Binary files /dev/null and b/aws-reactive/src/test/resources/testimage2.png differ