[BAEL-3562] AWS S3 with Java - Reactive Support (#8309)
* [BAEL-3164] Add spring-boot-jdbi module * [BAEL-3164] Remove extra files * [BAEL-3164] Update springboot main dependency * Reset bad commit * [BAEL-3562] Added basic code * [BAEL-3562] Some refatoring * [BAEL-3562] More refatoring * [BAEL-3562] LiveTests
This commit is contained in:
parent
8de37745a1
commit
2235656358
Binary file not shown.
After Width: | Height: | Size: 22 KiB |
|
@ -0,0 +1,29 @@
|
|||
participant "Client 1" as C1
|
||||
participant "Client 2" as C2
|
||||
participant "Reactive Web App" as RWS
|
||||
participant "Backend" as S3
|
||||
C1 -> RWS: POST
|
||||
activate C1
|
||||
activate RWS
|
||||
RWS -> S3: Async POST
|
||||
deactivate RWS
|
||||
C2 -> RWS: POST
|
||||
activate C2
|
||||
activate RWS
|
||||
RWS -> S3: Async POST
|
||||
deactivate RWS
|
||||
S3 --> RWS: Async Result
|
||||
activate RWS
|
||||
RWS -->C2: Result
|
||||
deactivate RWS
|
||||
deactivate C2
|
||||
// First file EOF
|
||||
S3 --> RWS: Async Result
|
||||
activate RWS
|
||||
RWS -->C1: Result
|
||||
deactivate RWS
|
||||
deactivate C1
|
||||
|
||||
|
||||
|
||||
|
Binary file not shown.
After Width: | Height: | Size: 31 KiB |
|
@ -0,0 +1,28 @@
|
|||
participant Client 1
|
||||
participant Client 2
|
||||
participant Controller
|
||||
participant Backend
|
||||
Client 1-> Controller: POST Data
|
||||
activate Client 1
|
||||
activate Controller
|
||||
Controller -> Backend: Save Data
|
||||
activate Backend
|
||||
note left of Controller #yellow: Controller blocked\nuntil result received
|
||||
Backend --> Controller: Result
|
||||
deactivate Backend
|
||||
Controller --> Client 1: Result
|
||||
deactivate Client 1
|
||||
deactivate Controller
|
||||
// 2nd Upload
|
||||
Client 2-> Controller: POST Data
|
||||
activate Client 2
|
||||
activate Controller
|
||||
Controller -> Backend: Save Data
|
||||
activate Backend
|
||||
note left of Controller #yellow: Controller blocket\nuntil result received
|
||||
Backend --> Controller: Result
|
||||
deactivate Backend
|
||||
Controller --> Client 2: Result
|
||||
deactivate Controller
|
||||
deactivate Client 2
|
||||
|
|
@ -0,0 +1,105 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>com.baeldung</groupId>
|
||||
<artifactId>parent-modules</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>aws-reactive</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<name>aws-reactive</name>
|
||||
<description>AWS Reactive Sample</description>
|
||||
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<!-- Import dependency management from Spring Boot -->
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>software.amazon.awssdk</groupId>
|
||||
<artifactId>bom</artifactId>
|
||||
<version>2.10.27</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>software.amazon.awssdk</groupId>
|
||||
<artifactId>s3</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<artifactId>netty-nio-client</artifactId>
|
||||
<groupId>software.amazon.awssdk</groupId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.junit.vintage</groupId>
|
||||
<artifactId>junit-vintage-engine</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-devtools</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.http.SdkHttpResponse;
|
||||
|
||||
@AllArgsConstructor
|
||||
public class DownloadFailedException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private int statusCode;
|
||||
private Optional<String> statusText;
|
||||
|
||||
public DownloadFailedException(SdkResponse response) {
|
||||
|
||||
SdkHttpResponse httpResponse = response.sdkHttpResponse();
|
||||
if (httpResponse != null) {
|
||||
this.statusCode = httpResponse.statusCode();
|
||||
this.statusText = httpResponse.statusText();
|
||||
} else {
|
||||
this.statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
|
||||
this.statusText = Optional.of("UNKNOWN");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.ResponseEntity.BodyBuilder;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import software.amazon.awssdk.core.ResponseBytes;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
|
||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
|
||||
import software.amazon.awssdk.http.SdkHttpResponse;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
||||
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
|
||||
|
||||
/**
|
||||
* @author Philippe
|
||||
*
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/inbox")
|
||||
@Slf4j
|
||||
public class DownloadResource {
|
||||
|
||||
|
||||
private final S3AsyncClient s3client;
|
||||
private final S3ClientConfigurarionProperties s3config;
|
||||
|
||||
public DownloadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
|
||||
this.s3client = s3client;
|
||||
this.s3config = s3config;
|
||||
}
|
||||
|
||||
|
||||
@GetMapping(path="/{filekey}")
|
||||
public Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
|
||||
|
||||
GetObjectRequest request = GetObjectRequest.builder()
|
||||
.bucket(s3config.getBucket())
|
||||
.key(filekey)
|
||||
.build();
|
||||
|
||||
return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
|
||||
.map( (response) -> {
|
||||
checkResult(response.sdkResponse);
|
||||
String filename = getMetadataItem(response.sdkResponse,"filename",filekey);
|
||||
|
||||
log.info("[I65] filename={}, length={}",filename, response.sdkResponse.contentLength() );
|
||||
|
||||
return ResponseEntity.ok()
|
||||
.header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
|
||||
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
|
||||
.body(response.flux);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Lookup a metadata key in a case-insensitive way.
|
||||
* @param sdkResponse
|
||||
* @param key
|
||||
* @param defaultValue
|
||||
* @return
|
||||
*/
|
||||
private String getMetadataItem(GetObjectResponse sdkResponse, String key, String defaultValue) {
|
||||
for( Entry<String, String> entry : sdkResponse.metadata().entrySet()) {
|
||||
if ( entry.getKey().equalsIgnoreCase(key)) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
|
||||
// Helper used to check return codes from an API call
|
||||
private static void checkResult(GetObjectResponse response) {
|
||||
SdkHttpResponse sdkResponse = response.sdkHttpResponse();
|
||||
if ( sdkResponse != null && sdkResponse.isSuccessful()) {
|
||||
return;
|
||||
}
|
||||
|
||||
throw new DownloadFailedException(response);
|
||||
}
|
||||
|
||||
|
||||
static class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {
|
||||
|
||||
private FluxResponse response;
|
||||
|
||||
@Override
|
||||
public CompletableFuture<FluxResponse> prepare() {
|
||||
response = new FluxResponse();
|
||||
return response.cf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(GetObjectResponse sdkResponse) {
|
||||
this.response.sdkResponse = sdkResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStream(SdkPublisher<ByteBuffer> publisher) {
|
||||
response.flux = Flux.from(publisher);
|
||||
response.cf.complete(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionOccurred(Throwable error) {
|
||||
response.cf.completeExceptionally(error);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds the API response and stream
|
||||
* @author Philippe
|
||||
*/
|
||||
static class FluxResponse {
|
||||
|
||||
final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
|
||||
GetObjectResponse sdkResponse;
|
||||
Flux<ByteBuffer> flux;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ReactiveS3Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ReactiveS3Application.class, args);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import lombok.Data;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
|
||||
@ConfigurationProperties(prefix = "aws.s3")
|
||||
@Data
|
||||
public class S3ClientConfigurarionProperties {
|
||||
|
||||
private Region region = Region.US_EAST_1;
|
||||
private URI endpoint = null;
|
||||
|
||||
private String accessKeyId;
|
||||
private String secretAccessKey;
|
||||
|
||||
// Bucket name we'll be using as our backend storage
|
||||
private String bucket;
|
||||
|
||||
// AWS S3 requires that file parts must have at least 5MB, except
|
||||
// for the last part. This may change for other S3-compatible services, so let't
|
||||
// define a configuration property for that
|
||||
private int multipartMinPartSize = 5*1024*1024;
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
|
||||
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
|
||||
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
|
||||
import software.amazon.awssdk.services.s3.S3Configuration;
|
||||
import software.amazon.awssdk.utils.StringUtils;
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(S3ClientConfigurarionProperties.class)
|
||||
public class S3ClientConfiguration {
|
||||
|
||||
@Bean
|
||||
public S3AsyncClient s3client(S3ClientConfigurarionProperties s3props, AwsCredentialsProvider credentialsProvider) {
|
||||
|
||||
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder()
|
||||
.writeTimeout(Duration.ZERO)
|
||||
.maxConcurrency(64)
|
||||
.build();
|
||||
|
||||
S3Configuration serviceConfiguration = S3Configuration.builder()
|
||||
.checksumValidationEnabled(false)
|
||||
.chunkedEncodingEnabled(true)
|
||||
.build();
|
||||
|
||||
S3AsyncClientBuilder b = S3AsyncClient.builder()
|
||||
.httpClient(httpClient)
|
||||
.region(s3props.getRegion())
|
||||
.credentialsProvider(credentialsProvider)
|
||||
.serviceConfiguration(serviceConfiguration);
|
||||
|
||||
if (s3props.getEndpoint() != null) {
|
||||
b = b.endpointOverride(s3props.getEndpoint());
|
||||
}
|
||||
|
||||
return b.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public AwsCredentialsProvider awsCredentialsProvider(S3ClientConfigurarionProperties s3props) {
|
||||
|
||||
if (StringUtils.isBlank(s3props.getAccessKeyId())) {
|
||||
// Return default provider
|
||||
return DefaultCredentialsProvider.create();
|
||||
}
|
||||
else {
|
||||
// Return custom credentials provider
|
||||
return () -> {
|
||||
AwsCredentials creds = AwsBasicCredentials.create(s3props.getAccessKeyId(), s3props.getSecretAccessKey());
|
||||
return creds;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.http.SdkHttpResponse;
|
||||
|
||||
@AllArgsConstructor
|
||||
public class UploadFailedException extends RuntimeException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private int statusCode;
|
||||
private Optional<String> statusText;
|
||||
|
||||
public UploadFailedException(SdkResponse response) {
|
||||
|
||||
SdkHttpResponse httpResponse = response.sdkHttpResponse();
|
||||
if (httpResponse != null) {
|
||||
this.statusCode = httpResponse.statusCode();
|
||||
this.statusText = httpResponse.statusText();
|
||||
} else {
|
||||
this.statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
|
||||
this.statusText = Optional.of("UNKNOWN");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,308 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.codec.multipart.FilePart;
|
||||
import org.springframework.http.codec.multipart.Part;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestHeader;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.core.async.AsyncRequestBody;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
||||
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
|
||||
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
|
||||
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload.Builder;
|
||||
import software.amazon.awssdk.services.s3.model.CompletedPart;
|
||||
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
|
||||
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
|
||||
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
|
||||
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
|
||||
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
|
||||
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
|
||||
|
||||
/**
|
||||
* @author Philippe
|
||||
*
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/inbox")
|
||||
@Slf4j
|
||||
public class UploadResource {
|
||||
|
||||
private final S3AsyncClient s3client;
|
||||
private final S3ClientConfigurarionProperties s3config;
|
||||
|
||||
public UploadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
|
||||
this.s3client = s3client;
|
||||
this.s3config = s3config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Standard file upload.
|
||||
*/
|
||||
@PostMapping
|
||||
public Mono<ResponseEntity<UploadResult>> uploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<ByteBuffer> body) {
|
||||
|
||||
long length = headers.getContentLength();
|
||||
if (length < 0) {
|
||||
throw new UploadFailedException(HttpStatus.BAD_REQUEST.value(), Optional.of("required header missing: Content-Length"));
|
||||
}
|
||||
|
||||
String fileKey = UUID.randomUUID().toString();
|
||||
Map<String, String> metadata = new HashMap<String, String>();
|
||||
MediaType mediaType = headers.getContentType();
|
||||
|
||||
if (mediaType == null) {
|
||||
mediaType = MediaType.APPLICATION_OCTET_STREAM;
|
||||
}
|
||||
|
||||
log.info("[I95] uploadHandler: mediaType{}, length={}", mediaType, length);
|
||||
CompletableFuture<PutObjectResponse> future = s3client
|
||||
.putObject(PutObjectRequest.builder()
|
||||
.bucket(s3config.getBucket())
|
||||
.contentLength(length)
|
||||
.key(fileKey.toString())
|
||||
.contentType(mediaType.toString())
|
||||
.metadata(metadata)
|
||||
.build(),
|
||||
AsyncRequestBody.fromPublisher(body));
|
||||
|
||||
return Mono.fromFuture(future)
|
||||
.map((response) -> {
|
||||
checkResult(response);
|
||||
return ResponseEntity
|
||||
.status(HttpStatus.CREATED)
|
||||
.body(new UploadResult(HttpStatus.CREATED, new String[] {fileKey}));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Multipart file upload
|
||||
* @param bucket
|
||||
* @param parts
|
||||
* @param headers
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE, method = {RequestMethod.POST, RequestMethod.PUT})
|
||||
public Mono<ResponseEntity<UploadResult>> multipartUploadHandler(@RequestHeader HttpHeaders headers, @RequestBody Flux<Part> parts ) {
|
||||
|
||||
return parts
|
||||
.ofType(FilePart.class) // We'll ignore other data for now
|
||||
.flatMap((part) -> saveFile(headers, s3config.getBucket(), part))
|
||||
.collect(Collectors.toList())
|
||||
.map((keys) -> ResponseEntity.status(HttpStatus.CREATED)
|
||||
.body(new UploadResult(HttpStatus.CREATED,keys)));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Save file using a multipart upload. This method does not require any temporary
|
||||
* storage at the REST service
|
||||
* @param headers
|
||||
* @param bucket Bucket name
|
||||
* @param part Uploaded file
|
||||
* @return
|
||||
*/
|
||||
protected Mono<String> saveFile(HttpHeaders headers,String bucket, FilePart part) {
|
||||
|
||||
// Generate a filekey for this upload
|
||||
String filekey = UUID.randomUUID().toString();
|
||||
|
||||
log.info("[I137] saveFile: filekey={}, filename={}", filekey, part.filename());
|
||||
|
||||
// Gather metadata
|
||||
Map<String, String> metadata = new HashMap<String, String>();
|
||||
String filename = part.filename();
|
||||
if ( filename == null ) {
|
||||
filename = filekey;
|
||||
}
|
||||
|
||||
metadata.put("filename", filename);
|
||||
|
||||
MediaType mt = part.headers().getContentType();
|
||||
if ( mt == null ) {
|
||||
mt = MediaType.APPLICATION_OCTET_STREAM;
|
||||
}
|
||||
|
||||
// Create multipart upload request
|
||||
CompletableFuture<CreateMultipartUploadResponse> uploadRequest = s3client
|
||||
.createMultipartUpload(CreateMultipartUploadRequest.builder()
|
||||
.contentType(mt.toString())
|
||||
.key(filekey)
|
||||
.metadata(metadata)
|
||||
.bucket(bucket)
|
||||
.build());
|
||||
|
||||
// This variable will hold the upload state that we must keep
|
||||
// around until all uploads complete
|
||||
final UploadState uploadState = new UploadState(bucket,filekey);
|
||||
|
||||
return Mono
|
||||
.fromFuture(uploadRequest)
|
||||
.flatMapMany((response) -> {
|
||||
checkResult(response);
|
||||
uploadState.uploadId = response.uploadId();
|
||||
log.info("[I183] uploadId={}", response.uploadId());
|
||||
return part.content();
|
||||
})
|
||||
.bufferUntil((buffer) -> {
|
||||
uploadState.buffered += buffer.readableByteCount();
|
||||
if ( uploadState.buffered >= s3config.getMultipartMinPartSize() ) {
|
||||
log.info("[I173] bufferUntil: returning true, bufferedBytes={}, partCounter={}, uploadId={}", uploadState.buffered, uploadState.partCounter, uploadState.uploadId);
|
||||
uploadState.buffered = 0;
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.map((buffers) -> concatBuffers(buffers))
|
||||
.flatMap((buffer) -> uploadPart(uploadState,buffer))
|
||||
.onBackpressureBuffer()
|
||||
.reduce(uploadState,(state,completedPart) -> {
|
||||
log.info("[I188] completed: partNumber={}, etag={}", completedPart.partNumber(), completedPart.eTag());
|
||||
state.completedParts.put(completedPart.partNumber(), completedPart);
|
||||
return state;
|
||||
})
|
||||
.flatMap((state) -> completeUpload(state))
|
||||
.map((response) -> {
|
||||
checkResult(response);
|
||||
return uploadState.filekey;
|
||||
});
|
||||
}
|
||||
|
||||
private static ByteBuffer concatBuffers(List<DataBuffer> buffers) {
|
||||
log.info("[I198] creating BytBuffer from {} chunks", buffers.size());
|
||||
|
||||
int partSize = 0;
|
||||
for( DataBuffer b : buffers) {
|
||||
partSize += b.readableByteCount();
|
||||
}
|
||||
|
||||
ByteBuffer partData = ByteBuffer.allocate(partSize);
|
||||
buffers.forEach((buffer) -> {
|
||||
partData.put(buffer.asByteBuffer());
|
||||
});
|
||||
|
||||
// Reset read pointer to first byte
|
||||
partData.rewind();
|
||||
|
||||
log.info("[I208] partData: size={}", partData.capacity());
|
||||
return partData;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a single file part to the requested bucket
|
||||
* @param uploadState
|
||||
* @param buffer
|
||||
* @return
|
||||
*/
|
||||
private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer) {
|
||||
final int partNumber = ++uploadState.partCounter;
|
||||
log.info("[I218] uploadPart: partNumber={}, contentLength={}",partNumber, buffer.capacity());
|
||||
|
||||
CompletableFuture<UploadPartResponse> request = s3client.uploadPart(UploadPartRequest.builder()
|
||||
.bucket(uploadState.bucket)
|
||||
.key(uploadState.filekey)
|
||||
.partNumber(partNumber)
|
||||
.uploadId(uploadState.uploadId)
|
||||
.contentLength((long) buffer.capacity())
|
||||
.build(),
|
||||
AsyncRequestBody.fromPublisher(Mono.just(buffer)));
|
||||
|
||||
return Mono
|
||||
.fromFuture(request)
|
||||
.map((uploadPartResult) -> {
|
||||
checkResult(uploadPartResult);
|
||||
log.info("[I230] uploadPart complete: part={}, etag={}",partNumber,uploadPartResult.eTag());
|
||||
return CompletedPart.builder()
|
||||
.eTag(uploadPartResult.eTag())
|
||||
.partNumber(partNumber)
|
||||
.build();
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<CompleteMultipartUploadResponse> completeUpload(UploadState state) {
|
||||
log.info("[I202] completeUpload: bucket={}, filekey={}, completedParts.size={}", state.bucket, state.filekey, state.completedParts.size());
|
||||
|
||||
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
|
||||
.parts(state.completedParts.values())
|
||||
.build();
|
||||
|
||||
return Mono.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
|
||||
.bucket(state.bucket)
|
||||
.uploadId(state.uploadId)
|
||||
.multipartUpload(multipartUpload)
|
||||
.key(state.filekey)
|
||||
.build()));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check result from an API call.
|
||||
* @param result Result from an API call
|
||||
*/
|
||||
private static void checkResult(SdkResponse result) {
|
||||
if (result.sdkHttpResponse() == null || !result.sdkHttpResponse().isSuccessful()) {
|
||||
throw new UploadFailedException(result);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Holds upload state during a multipart upload
|
||||
*/
|
||||
static class UploadState {
|
||||
final String bucket;
|
||||
final String filekey;
|
||||
|
||||
String uploadId;
|
||||
int partCounter;
|
||||
Map<Integer, CompletedPart> completedParts = new HashMap<>();
|
||||
int buffered = 0;
|
||||
|
||||
UploadState(String bucket, String filekey) {
|
||||
this.bucket = bucket;
|
||||
this.filekey = filekey;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class UploadResult {
|
||||
HttpStatus status;
|
||||
String[] keys;
|
||||
|
||||
public UploadResult() {}
|
||||
|
||||
public UploadResult(HttpStatus status, List<String> keys) {
|
||||
this.status = status;
|
||||
this.keys = keys == null ? new String[] {}: keys.toArray(new String[] {});
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
#
|
||||
# Minio profile
|
||||
#
|
||||
aws:
|
||||
s3:
|
||||
region: sa-east-1
|
||||
endpoint: http://localhost:9000
|
||||
accessKeyId: 8KLF8U60JER4AP23H0A6
|
||||
secretAccessKey: vX4uM7e7nNGPqjcXycVVhceNR7NQkiMQkR9Hoctf
|
||||
bucket: bucket1
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
|
||||
#
|
||||
# Configurações de acesso ao Minio
|
||||
#
|
||||
aws:
|
||||
s3:
|
||||
region: sa-east-1
|
||||
# When using AWS, the library will use one of the available
|
||||
# credential sources described in the documentation.
|
||||
# accessKeyId: ****
|
||||
# secretAccessKey: ****
|
||||
bucket: dev1.token.com.br
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
package com.baeldung.aws.reactive.s3;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.http.ContentDisposition;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ResourceUtils;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
|
||||
@ActiveProfiles("minio")
|
||||
class ReactiveS3ApplicationLiveTest {
|
||||
|
||||
@Autowired
|
||||
private TestRestTemplate restTemplate;
|
||||
|
||||
@LocalServerPort
|
||||
private int serverPort;
|
||||
|
||||
|
||||
@Test
|
||||
void whenUploadSingleFile_thenSuccess() throws Exception {
|
||||
|
||||
String url = "http://localhost:" + serverPort + "/inbox";
|
||||
byte[] data = Files.readAllBytes(Paths.get("src/test/resources/testimage1.png"));
|
||||
UploadResult result = restTemplate.postForObject(url, data , UploadResult.class);
|
||||
|
||||
assertEquals("Expected CREATED (202)", result.getStatus(), HttpStatus.CREATED );
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenUploadMultipleFiles_thenSuccess() throws Exception {
|
||||
|
||||
|
||||
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
|
||||
addFileEntity("f1", body, new File("src/test/resources/testimage1.png"));
|
||||
addFileEntity("f2", body, new File("src/test/resources/testimage2.png"));
|
||||
|
||||
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body);
|
||||
String url = "http://localhost:" + serverPort + "/inbox";
|
||||
|
||||
ResponseEntity<UploadResult> result = restTemplate.postForEntity(url, requestEntity, UploadResult.class);
|
||||
|
||||
assertEquals("Http Code",HttpStatus.CREATED, result.getStatusCode() );
|
||||
assertEquals("File keys",2, result.getBody().getKeys().length);
|
||||
|
||||
}
|
||||
|
||||
private void addFileEntity(String name, MultiValueMap<String, Object> body, File file) throws Exception {
|
||||
|
||||
byte[] data = Files.readAllBytes(file.toPath());
|
||||
MultiValueMap<String, String> headers = new LinkedMultiValueMap<>();
|
||||
ContentDisposition contentDispositionHeader = ContentDisposition.builder("form-data")
|
||||
.name(name)
|
||||
.filename(file.getName())
|
||||
.build();
|
||||
|
||||
headers.add(HttpHeaders.CONTENT_DISPOSITION, contentDispositionHeader.toString());
|
||||
|
||||
HttpEntity<byte[]> fileEntity = new HttpEntity<>(data, headers);
|
||||
body.add(name, fileEntity);
|
||||
}
|
||||
|
||||
|
||||
}
|
Binary file not shown.
After Width: | Height: | Size: 4.2 KiB |
Binary file not shown.
After Width: | Height: | Size: 24 KiB |
Loading…
Reference in New Issue