Merge branch 'eugenp:master' into master
This commit is contained in:
		
						commit
						a73a408c6c
					
				| @ -7,3 +7,4 @@ This module contains articles about Spring 5 WebFlux | |||||||
| - [Comparison Between Mono’s doOnNext() and doOnSuccess()](https://www.baeldung.com/mono-doonnext-doonsuccess) | - [Comparison Between Mono’s doOnNext() and doOnSuccess()](https://www.baeldung.com/mono-doonnext-doonsuccess) | ||||||
| - [How to Access the First Element of a Flux](https://www.baeldung.com/java-flux-first-element) | - [How to Access the First Element of a Flux](https://www.baeldung.com/java-flux-first-element) | ||||||
| - [Using zipWhen() With Mono](https://www.baeldung.com/java-mono-zipwhen) | - [Using zipWhen() With Mono](https://www.baeldung.com/java-mono-zipwhen) | ||||||
|  | - [Upload Multiple Files Using WebFlux](https://www.baeldung.com/spring-webflux-upload-multiple-files) | ||||||
|  | |||||||
| @ -88,6 +88,20 @@ | |||||||
|             <artifactId>mockwebserver</artifactId> |             <artifactId>mockwebserver</artifactId> | ||||||
|             <version>4.12.0</version> <!-- this can be removed when we migrate spring-boot-dependencies to the latest version --> |             <version>4.12.0</version> <!-- this can be removed when we migrate spring-boot-dependencies to the latest version --> | ||||||
|         </dependency> |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.boot</groupId> | ||||||
|  |             <artifactId>spring-boot-starter-data-r2dbc</artifactId> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>com.h2database</groupId> | ||||||
|  |             <artifactId>h2</artifactId> | ||||||
|  |             <scope>runtime</scope> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>io.r2dbc</groupId> | ||||||
|  |             <artifactId>r2dbc-h2</artifactId> | ||||||
|  |             <scope>runtime</scope> | ||||||
|  |         </dependency> | ||||||
|     </dependencies> |     </dependencies> | ||||||
| 
 | 
 | ||||||
|     <build> |     <build> | ||||||
| @ -96,6 +110,14 @@ | |||||||
|                 <groupId>org.springframework.boot</groupId> |                 <groupId>org.springframework.boot</groupId> | ||||||
|                 <artifactId>spring-boot-maven-plugin</artifactId> |                 <artifactId>spring-boot-maven-plugin</artifactId> | ||||||
|             </plugin> |             </plugin> | ||||||
|  |             <plugin> | ||||||
|  |                 <groupId>org.apache.maven.plugins</groupId> | ||||||
|  |                 <artifactId>maven-compiler-plugin</artifactId> | ||||||
|  |                 <configuration> | ||||||
|  |                     <source>16</source> | ||||||
|  |                     <target>16</target> | ||||||
|  |                 </configuration> | ||||||
|  |             </plugin> | ||||||
|         </plugins> |         </plugins> | ||||||
|     </build> |     </build> | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -0,0 +1,35 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import org.springframework.data.annotation.Id; | ||||||
|  | 
 | ||||||
|  | import java.util.List; | ||||||
|  | 
 | ||||||
|  | public class FileRecord { | ||||||
|  |     @Id | ||||||
|  |     private int id; | ||||||
|  | 
 | ||||||
|  |     private List<String> filenames; | ||||||
|  | 
 | ||||||
|  |     public FileRecord(List<String> filenames) { | ||||||
|  |         this.filenames = filenames; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public FileRecord() { | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public int getId() { | ||||||
|  |         return id; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setId(int id) { | ||||||
|  |         this.id = id; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public List<String> getFilenames() { | ||||||
|  |         return filenames; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setFilenames(List<String> filenames) { | ||||||
|  |         this.filenames = filenames; | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -0,0 +1,28 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import io.r2dbc.spi.ConnectionFactory; | ||||||
|  | import org.springframework.boot.SpringApplication; | ||||||
|  | import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||||
|  | import org.springframework.context.annotation.Bean; | ||||||
|  | import org.springframework.core.io.ClassPathResource; | ||||||
|  | import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer; | ||||||
|  | import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator; | ||||||
|  | 
 | ||||||
|  | @SpringBootApplication | ||||||
|  | public class FileRecordApplication { | ||||||
|  | 
 | ||||||
|  |     public static void main(String[] args) { | ||||||
|  |         SpringApplication.run(FileRecordApplication.class, args); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) { | ||||||
|  | 
 | ||||||
|  |         ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer(); | ||||||
|  |         initializer.setConnectionFactory(connectionFactory); | ||||||
|  |         initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("schema.sql"))); | ||||||
|  | 
 | ||||||
|  |         return initializer; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,45 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import org.springframework.http.codec.multipart.FilePart; | ||||||
|  | import org.springframework.web.bind.annotation.*; | ||||||
|  | import reactor.core.publisher.Flux; | ||||||
|  | import reactor.core.publisher.Mono; | ||||||
|  | 
 | ||||||
|  | import java.nio.file.Paths; | ||||||
|  | 
 | ||||||
|  | @RestController | ||||||
|  | public class FileRecordController { | ||||||
|  | 
 | ||||||
|  |     private final FileRecordService fileRecordService; | ||||||
|  | 
 | ||||||
|  |     public FileRecordController(FileRecordService fileRecordService) { | ||||||
|  |         this.fileRecordService = fileRecordService; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @PostMapping("/upload-files") | ||||||
|  |     public Mono<String> uploadFileWithoutEntity(@RequestPart("files") Flux<FilePart> filePartFlux) { | ||||||
|  |         return filePartFlux.flatMap(file -> file.transferTo(Paths.get(file.filename()))) | ||||||
|  |             .then(Mono.just("OK")) | ||||||
|  |             .onErrorResume(error -> Mono.just("Error uploading files")); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @PostMapping("/upload-files-entity") | ||||||
|  |     public Mono<FileRecord> uploadFileWithEntity(@RequestPart("files") Flux<FilePart> filePartFlux) { | ||||||
|  |         FileRecord fileRecord = new FileRecord(); | ||||||
|  | 
 | ||||||
|  |         return filePartFlux.flatMap(filePart -> filePart.transferTo(Paths.get(filePart.filename())) | ||||||
|  |             .then(Mono.just(filePart.filename()))) | ||||||
|  |             .collectList() | ||||||
|  |             .flatMap(filenames -> { | ||||||
|  |                 fileRecord.setFilenames(filenames); | ||||||
|  |                 return fileRecordService.save(fileRecord); | ||||||
|  |             }) | ||||||
|  |             .onErrorResume(error -> Mono.error(error)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @GetMapping("/files/{id}") | ||||||
|  |     public Mono<FileRecord> geFilesById(@PathVariable("id") int id) { | ||||||
|  |         return fileRecordService.findById(id) | ||||||
|  |             .onErrorResume(error -> Mono.error(error)); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -0,0 +1,8 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import org.springframework.data.r2dbc.repository.R2dbcRepository; | ||||||
|  | import org.springframework.stereotype.Repository; | ||||||
|  | 
 | ||||||
|  | @Repository | ||||||
|  | public interface FileRecordRepository extends R2dbcRepository<FileRecord, Integer> { | ||||||
|  | } | ||||||
| @ -0,0 +1,23 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import org.springframework.stereotype.Service; | ||||||
|  | import reactor.core.publisher.Mono; | ||||||
|  | 
 | ||||||
|  | @Service | ||||||
|  | public class FileRecordService { | ||||||
|  | 
 | ||||||
|  |     private FileRecordRepository fileRecordRepository; | ||||||
|  | 
 | ||||||
|  |     public FileRecordService(FileRecordRepository fileRecordRepository) { | ||||||
|  |         this.fileRecordRepository = fileRecordRepository; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public Mono<FileRecord> save(FileRecord fileRecord) { | ||||||
|  |         return fileRecordRepository.save(fileRecord); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public Mono<FileRecord> findById(int id) { | ||||||
|  |         return fileRecordRepository.findById(id); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1 @@ | |||||||
|  | spring.r2dbc.url=r2dbc:h2:file:///./testdb | ||||||
							
								
								
									
										5
									
								
								spring-5-webflux-2/src/main/resources/schema.sql
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								spring-5-webflux-2/src/main/resources/schema.sql
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,5 @@ | |||||||
|  | CREATE TABLE IF NOT EXISTS file_record (  | ||||||
|  |   id INT NOT NULL AUTO_INCREMENT,  | ||||||
|  |   filenames VARCHAR(255),  | ||||||
|  |   PRIMARY KEY (id)  | ||||||
|  | ); | ||||||
| @ -0,0 +1,98 @@ | |||||||
|  | package com.baeldung.webflux.filerecord; | ||||||
|  | 
 | ||||||
|  | import org.junit.jupiter.api.Test; | ||||||
|  | import org.junit.jupiter.api.extension.ExtendWith; | ||||||
|  | import org.springframework.beans.factory.annotation.Autowired; | ||||||
|  | import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; | ||||||
|  | import org.springframework.boot.test.context.SpringBootTest; | ||||||
|  | import org.springframework.boot.test.mock.mockito.MockBean; | ||||||
|  | import org.springframework.context.annotation.ComponentScan; | ||||||
|  | import org.springframework.http.MediaType; | ||||||
|  | import org.springframework.mock.web.MockMultipartFile; | ||||||
|  | import org.springframework.test.context.ContextConfiguration; | ||||||
|  | import org.springframework.test.context.junit.jupiter.SpringExtension; | ||||||
|  | import org.springframework.test.web.reactive.server.WebTestClient; | ||||||
|  | import org.springframework.util.LinkedMultiValueMap; | ||||||
|  | import reactor.core.publisher.Mono; | ||||||
|  | 
 | ||||||
|  | import java.util.List; | ||||||
|  | 
 | ||||||
|  | import static org.mockito.ArgumentMatchers.any; | ||||||
|  | import static org.mockito.Mockito.when; | ||||||
|  | 
 | ||||||
|  | @ExtendWith(SpringExtension.class) | ||||||
|  | @SpringBootTest(classes = FileRecordController.class) | ||||||
|  | @ContextConfiguration(classes = { FileRecordController.class }) | ||||||
|  | @ComponentScan("com.baeldung.webflux.filerecord") | ||||||
|  | @AutoConfigureWebTestClient | ||||||
|  | class FileRecordControllerIntegrationTest { | ||||||
|  | 
 | ||||||
|  |     @Autowired | ||||||
|  |     private WebTestClient webTestClient; | ||||||
|  | 
 | ||||||
|  |     @MockBean | ||||||
|  |     private FileRecordService fileRecordService; | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void givenUploadFilesWithEntity_whenRequestIsValid_thenReturnCreated() throws Exception { | ||||||
|  | 
 | ||||||
|  |         FileRecord fileRecord = new FileRecord(); | ||||||
|  |         MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         List<MockMultipartFile> files = List.of(firstFile, secondFile); | ||||||
|  |         fileRecord.setFilenames(files.stream() | ||||||
|  |             .map(MockMultipartFile::getOriginalFilename) | ||||||
|  |             .toList()); | ||||||
|  | 
 | ||||||
|  |         Mono<FileRecord> fileRecordMono = Mono.just(fileRecord); | ||||||
|  |         when(fileRecordService.save(any(FileRecord.class))).thenReturn(fileRecordMono); | ||||||
|  | 
 | ||||||
|  |         webTestClient.post() | ||||||
|  |             .uri("/upload-files-entity") | ||||||
|  |             .body(Mono.just(fileRecord), FileRecord.class) | ||||||
|  |             .exchange() | ||||||
|  |             .expectBody(FileRecord.class); | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void givenUploadFiles_whenRequestIsValid_thenReturnOk() throws Exception { | ||||||
|  | 
 | ||||||
|  |         MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         LinkedMultiValueMap<String, MockMultipartFile> multipartData = new LinkedMultiValueMap<>(); | ||||||
|  |         multipartData.add("file", firstFile); | ||||||
|  |         multipartData.add("file", secondFile); | ||||||
|  | 
 | ||||||
|  |         webTestClient.post() | ||||||
|  |             .uri("/upload-files") | ||||||
|  |             .bodyValue(multipartData) | ||||||
|  |             .exchange() | ||||||
|  |             .expectStatus() | ||||||
|  |             .isOk(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void givenUploadFilesWithEntity_whenRequestFileIsFindById_thenReturnOk() throws Exception { | ||||||
|  | 
 | ||||||
|  |         FileRecord fileRecord = new FileRecord(); | ||||||
|  |         MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); | ||||||
|  |         List<MockMultipartFile> files = List.of(firstFile, secondFile); | ||||||
|  |         fileRecord.setId(1); | ||||||
|  |         fileRecord.setFilenames(files.stream() | ||||||
|  |             .map(MockMultipartFile::getOriginalFilename) | ||||||
|  |             .toList()); | ||||||
|  | 
 | ||||||
|  |         Mono<FileRecord> fileRecordMono = Mono.just(fileRecord); | ||||||
|  | 
 | ||||||
|  |         when(fileRecordService.findById(1)).thenReturn(fileRecordMono); | ||||||
|  | 
 | ||||||
|  |         webTestClient.get() | ||||||
|  |             .uri("/files/{id}", 1) | ||||||
|  |             .exchange() | ||||||
|  |             .expectStatus() | ||||||
|  |             .isOk(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -31,6 +31,7 @@ | |||||||
|         <!--        <module>spring-cloud-stream-starters</module>--> |         <!--        <module>spring-cloud-stream-starters</module>--> | ||||||
|         <module>spring-cloud-connectors-heroku</module> |         <module>spring-cloud-connectors-heroku</module> | ||||||
|         <module>spring-cloud-aws</module> |         <module>spring-cloud-aws</module> | ||||||
|  |         <module>spring-cloud-aws-v3</module> | ||||||
|         <module>spring-cloud-consul</module> |         <module>spring-cloud-consul</module> | ||||||
|         <!--        <module>spring-cloud-zuul-eureka-integration</module>--> |         <!--        <module>spring-cloud-zuul-eureka-integration</module>--> | ||||||
|         <!--        <module>spring-cloud-contract</module>--> |         <!--        <module>spring-cloud-contract</module>--> | ||||||
|  | |||||||
							
								
								
									
										3
									
								
								spring-cloud-modules/spring-cloud-aws-v3/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								spring-cloud-modules/spring-cloud-aws-v3/README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,3 @@ | |||||||
|  | # Spring Cloud AWS | ||||||
|  | 
 | ||||||
|  | TBD | ||||||
							
								
								
									
										68
									
								
								spring-cloud-modules/spring-cloud-aws-v3/pom.xml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								spring-cloud-modules/spring-cloud-aws-v3/pom.xml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,68 @@ | |||||||
|  | <?xml version="1.0" encoding="UTF-8"?> | ||||||
|  | <project xmlns="http://maven.apache.org/POM/4.0.0" | ||||||
|  |          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||||
|  |          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||||
|  |     <modelVersion>4.0.0</modelVersion> | ||||||
|  |     <groupId>com.baeldung.spring.cloud</groupId> | ||||||
|  |     <artifactId>spring-cloud-aws-v3</artifactId> | ||||||
|  |     <version>0.0.1-SNAPSHOT</version> | ||||||
|  |     <name>spring-cloud-aws-v3</name> | ||||||
|  |     <packaging>jar</packaging> | ||||||
|  |     <description>Spring Cloud AWS Examples</description> | ||||||
|  | 
 | ||||||
|  |     <dependencyManagement> | ||||||
|  |         <dependencies> | ||||||
|  |             <dependency> | ||||||
|  |                 <groupId>io.awspring.cloud</groupId> | ||||||
|  |                 <artifactId>spring-cloud-aws</artifactId> | ||||||
|  |                 <version>${spring-cloud-aws.version}</version> | ||||||
|  |                 <type>pom</type> | ||||||
|  |                 <scope>import</scope> | ||||||
|  |             </dependency> | ||||||
|  |         </dependencies> | ||||||
|  |     </dependencyManagement> | ||||||
|  | 
 | ||||||
|  |     <dependencies> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.boot</groupId> | ||||||
|  |             <artifactId>spring-boot-starter-web</artifactId> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>io.awspring.cloud</groupId> | ||||||
|  |             <artifactId>spring-cloud-aws-starter</artifactId> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>io.awspring.cloud</groupId> | ||||||
|  |             <artifactId>spring-cloud-aws-starter-sqs</artifactId> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.boot</groupId> | ||||||
|  |             <artifactId>spring-boot-starter-test</artifactId> | ||||||
|  |             <scope>test</scope> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.testcontainers</groupId> | ||||||
|  |             <artifactId>localstack</artifactId> | ||||||
|  |             <scope>test</scope> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.testcontainers</groupId> | ||||||
|  |             <artifactId>junit-jupiter</artifactId> | ||||||
|  |             <scope>test</scope> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.awaitility</groupId> | ||||||
|  |             <artifactId>awaitility</artifactId> | ||||||
|  |             <scope>test</scope> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |     </dependencies> | ||||||
|  | 
 | ||||||
|  |     <properties> | ||||||
|  |         <start-class>com.baeldung.spring.cloud.aws.SpringCloudAwsApplication</start-class> | ||||||
|  |         <spring-cloud-aws.version>3.1.0</spring-cloud-aws.version> | ||||||
|  |         <maven.compiler.source>17</maven.compiler.source> | ||||||
|  |         <maven.compiler.target>17</maven.compiler.target> | ||||||
|  |     </properties> | ||||||
|  | 
 | ||||||
|  | </project> | ||||||
| @ -0,0 +1,17 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws; | ||||||
|  | 
 | ||||||
|  | import org.springframework.boot.SpringApplication; | ||||||
|  | import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||||
|  | import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||||||
|  | 
 | ||||||
|  | import com.baeldung.spring.cloud.aws.sqs.EventQueuesProperties; | ||||||
|  | 
 | ||||||
|  | @SpringBootApplication | ||||||
|  | @EnableConfigurationProperties(EventQueuesProperties.class) | ||||||
|  | public class SpringCloudAwsApplication { | ||||||
|  | 
 | ||||||
|  |     public static void main(String[] args) { | ||||||
|  |         SpringApplication.run(SpringCloudAwsApplication.class, args); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,35 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | import org.springframework.boot.context.properties.ConfigurationProperties; | ||||||
|  | 
 | ||||||
|  | @ConfigurationProperties(prefix = "events.queues") | ||||||
|  | public class EventQueuesProperties { | ||||||
|  | 
 | ||||||
|  |     private String userCreatedByNameQueue; | ||||||
|  |     private String userCreatedRecordQueue; | ||||||
|  |     private String userCreatedEventTypeQueue; | ||||||
|  | 
 | ||||||
|  |     public String getUserCreatedByNameQueue() { | ||||||
|  |         return userCreatedByNameQueue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setUserCreatedByNameQueue(String userCreatedByNameQueue) { | ||||||
|  |         this.userCreatedByNameQueue = userCreatedByNameQueue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public String getUserCreatedRecordQueue() { | ||||||
|  |         return userCreatedRecordQueue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setUserCreatedRecordQueue(String userCreatedRecordQueue) { | ||||||
|  |         this.userCreatedRecordQueue = userCreatedRecordQueue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public String getUserCreatedEventTypeQueue() { | ||||||
|  |         return userCreatedEventTypeQueue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public void setUserCreatedEventTypeQueue(String userCreatedEventTypeQueue) { | ||||||
|  |         this.userCreatedEventTypeQueue = userCreatedEventTypeQueue; | ||||||
|  |     } | ||||||
|  | } | ||||||
| @ -0,0 +1,5 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | public record User(String id, String name, String email) { | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,5 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | public record UserCreatedEvent(String id, String username, String email) { | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,49 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | import static io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP; | ||||||
|  | 
 | ||||||
|  | import java.util.UUID; | ||||||
|  | 
 | ||||||
|  | import org.slf4j.Logger; | ||||||
|  | import org.slf4j.LoggerFactory; | ||||||
|  | import org.springframework.messaging.Message; | ||||||
|  | import org.springframework.messaging.handler.annotation.Header; | ||||||
|  | import org.springframework.stereotype.Component; | ||||||
|  | 
 | ||||||
|  | import io.awspring.cloud.sqs.annotation.SqsListener; | ||||||
|  | 
 | ||||||
|  | @Component | ||||||
|  | public class UserEventListeners { | ||||||
|  | 
 | ||||||
|  |     private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class); | ||||||
|  | 
 | ||||||
|  |     public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType"; | ||||||
|  | 
 | ||||||
|  |     private final UserRepository userRepository; | ||||||
|  | 
 | ||||||
|  |     public UserEventListeners(UserRepository userRepository) { | ||||||
|  |         this.userRepository = userRepository; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @SqsListener("${events.queues.user-created-by-name-queue}") | ||||||
|  |     public void receiveStringMessage(String username) { | ||||||
|  |         logger.info("Received message: {}", username); | ||||||
|  |         userRepository.save(new User(UUID.randomUUID() | ||||||
|  |             .toString(), username, null)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @SqsListener(value = "${events.queues.user-created-record-queue}") | ||||||
|  |     public void receiveRecordMessage(UserCreatedEvent event) { | ||||||
|  |         logger.info("Received message: {}", event); | ||||||
|  |         userRepository.save(new User(event.id(), event.username(), event.email())); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @SqsListener("${events.queues.user-created-event-type-queue}") | ||||||
|  |     public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType, | ||||||
|  |         @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) { | ||||||
|  |         logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive); | ||||||
|  |         UserCreatedEvent payload = message.getPayload(); | ||||||
|  |         userRepository.save(new User(payload.id(), payload.username(), payload.email())); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,28 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | import java.util.Map; | ||||||
|  | import java.util.Optional; | ||||||
|  | import java.util.concurrent.ConcurrentHashMap; | ||||||
|  | 
 | ||||||
|  | import org.springframework.stereotype.Repository; | ||||||
|  | 
 | ||||||
|  | @Repository | ||||||
|  | public class UserRepository { | ||||||
|  | 
 | ||||||
|  |     private final Map<String, User> persistedUsers = new ConcurrentHashMap<>(); | ||||||
|  | 
 | ||||||
|  |     public void save(User userToSave) { | ||||||
|  |         persistedUsers.put(userToSave.id(), userToSave); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public Optional<User> findById(String userId) { | ||||||
|  |         return Optional.ofNullable(persistedUsers.get(userId)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     public Optional<User> findByName(String name) { | ||||||
|  |         return persistedUsers.values().stream() | ||||||
|  |             .filter(user -> user.name().equals(name)) | ||||||
|  |             .findFirst(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,5 @@ | |||||||
|  | events: | ||||||
|  |   queues: | ||||||
|  |     user-created-by-name-queue: user_created_by_name_queue | ||||||
|  |     user-created-record-queue: user_created_record_queue | ||||||
|  |     user-created-event-type-queue: user_created_event_type_queue | ||||||
| @ -0,0 +1,32 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; | ||||||
|  | 
 | ||||||
|  | import org.springframework.boot.test.context.SpringBootTest; | ||||||
|  | import org.springframework.test.context.DynamicPropertyRegistry; | ||||||
|  | import org.springframework.test.context.DynamicPropertySource; | ||||||
|  | import org.testcontainers.containers.localstack.LocalStackContainer; | ||||||
|  | import org.testcontainers.junit.jupiter.Container; | ||||||
|  | import org.testcontainers.junit.jupiter.Testcontainers; | ||||||
|  | import org.testcontainers.utility.DockerImageName; | ||||||
|  | 
 | ||||||
|  | @SpringBootTest | ||||||
|  | @Testcontainers | ||||||
|  | public class BaseSqsIntegrationTest { | ||||||
|  | 
 | ||||||
|  |     private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2"; | ||||||
|  | 
 | ||||||
|  |     @Container | ||||||
|  |     static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION)); | ||||||
|  | 
 | ||||||
|  |     @DynamicPropertySource | ||||||
|  |     static void overrideProperties(DynamicPropertyRegistry registry) { | ||||||
|  |         registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion()); | ||||||
|  |         registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey()); | ||||||
|  |         registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey()); | ||||||
|  |         registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS) | ||||||
|  |             .toString()); | ||||||
|  |         // ...other AWS services endpoints can be added here | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
| @ -0,0 +1,84 @@ | |||||||
|  | package com.baeldung.spring.cloud.aws.sqs; | ||||||
|  | 
 | ||||||
|  | import static com.baeldung.spring.cloud.aws.sqs.UserEventListeners.EVENT_TYPE_CUSTOM_HEADER; | ||||||
|  | import static org.awaitility.Awaitility.await; | ||||||
|  | 
 | ||||||
|  | import java.time.Duration; | ||||||
|  | import java.util.Map; | ||||||
|  | import java.util.UUID; | ||||||
|  | 
 | ||||||
|  | import org.junit.jupiter.api.Test; | ||||||
|  | import org.slf4j.Logger; | ||||||
|  | import org.slf4j.LoggerFactory; | ||||||
|  | import org.springframework.beans.factory.annotation.Autowired; | ||||||
|  | 
 | ||||||
|  | import io.awspring.cloud.sqs.operations.SqsTemplate; | ||||||
|  | 
 | ||||||
|  | public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest { | ||||||
|  | 
 | ||||||
|  |     private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class); | ||||||
|  | 
 | ||||||
|  |     @Autowired | ||||||
|  |     private SqsTemplate sqsTemplate; | ||||||
|  | 
 | ||||||
|  |     @Autowired | ||||||
|  |     private UserRepository userRepository; | ||||||
|  | 
 | ||||||
|  |     @Autowired | ||||||
|  |     private EventQueuesProperties eventQueuesProperties; | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     void givenAStringPayload_whenSend_shouldReceive() { | ||||||
|  |         // given | ||||||
|  |         var userName = "Albert"; | ||||||
|  | 
 | ||||||
|  |         // when | ||||||
|  |         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue()) | ||||||
|  |             .payload(userName)); | ||||||
|  |         logger.info("Message sent with payload {}", userName); | ||||||
|  | 
 | ||||||
|  |         // then | ||||||
|  |         await().atMost(Duration.ofSeconds(3)) | ||||||
|  |             .until(() -> userRepository.findByName(userName) | ||||||
|  |                 .isPresent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     void givenARecordPayload_whenSend_shouldReceive() { | ||||||
|  |         // given | ||||||
|  |         String userId = UUID.randomUUID() | ||||||
|  |             .toString(); | ||||||
|  |         var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); | ||||||
|  | 
 | ||||||
|  |         // when | ||||||
|  |         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue()) | ||||||
|  |             .payload(payload)); | ||||||
|  | 
 | ||||||
|  |         // then | ||||||
|  |         logger.info("Message sent with payload: {}", payload); | ||||||
|  |         await().atMost(Duration.ofSeconds(3)) | ||||||
|  |             .until(() -> userRepository.findById(userId) | ||||||
|  |                 .isPresent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     void givenCustomHeaders_whenSend_shouldReceive() { | ||||||
|  |         // given | ||||||
|  |         String userId = UUID.randomUUID() | ||||||
|  |             .toString(); | ||||||
|  |         var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); | ||||||
|  |         var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent"); | ||||||
|  | 
 | ||||||
|  |         // when | ||||||
|  |         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue()) | ||||||
|  |             .payload(payload) | ||||||
|  |             .headers(headers)); | ||||||
|  | 
 | ||||||
|  |         // then | ||||||
|  |         logger.info("Sent message with payload {} and custom headers: {}", payload, headers); | ||||||
|  |         await().atMost(Duration.ofSeconds(3)) | ||||||
|  |             .until(() -> userRepository.findById(userId) | ||||||
|  |                 .isPresent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user