diff --git a/spring-5-webflux-2/README.md b/spring-5-webflux-2/README.md index a0fafd903e..5232d86643 100644 --- a/spring-5-webflux-2/README.md +++ b/spring-5-webflux-2/README.md @@ -7,3 +7,4 @@ This module contains articles about Spring 5 WebFlux - [Comparison Between Mono’s doOnNext() and doOnSuccess()](https://www.baeldung.com/mono-doonnext-doonsuccess) - [How to Access the First Element of a Flux](https://www.baeldung.com/java-flux-first-element) - [Using zipWhen() With Mono](https://www.baeldung.com/java-mono-zipwhen) +- [Upload Multiple Files Using WebFlux](https://www.baeldung.com/spring-webflux-upload-multiple-files) diff --git a/spring-5-webflux-2/pom.xml b/spring-5-webflux-2/pom.xml index efb99e06d5..5422ed55c4 100644 --- a/spring-5-webflux-2/pom.xml +++ b/spring-5-webflux-2/pom.xml @@ -88,6 +88,20 @@ mockwebserver 4.12.0 + + org.springframework.boot + spring-boot-starter-data-r2dbc + + + com.h2database + h2 + runtime + + + io.r2dbc + r2dbc-h2 + runtime + @@ -96,6 +110,14 @@ org.springframework.boot spring-boot-maven-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 16 + 16 + + diff --git a/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecord.java b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecord.java new file mode 100644 index 0000000000..5c0e265e13 --- /dev/null +++ b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecord.java @@ -0,0 +1,35 @@ +package com.baeldung.webflux.filerecord; + +import org.springframework.data.annotation.Id; + +import java.util.List; + +public class FileRecord { + @Id + private int id; + + private List filenames; + + public FileRecord(List filenames) { + this.filenames = filenames; + } + + public FileRecord() { + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public List getFilenames() { + return filenames; + } + + public void setFilenames(List filenames) { + this.filenames = filenames; + } +} diff --git a/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordApplication.java b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordApplication.java new file mode 100644 index 0000000000..9c20e10040 --- /dev/null +++ b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordApplication.java @@ -0,0 +1,28 @@ +package com.baeldung.webflux.filerecord; + +import io.r2dbc.spi.ConnectionFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.core.io.ClassPathResource; +import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer; +import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator; + +@SpringBootApplication +public class FileRecordApplication { + + public static void main(String[] args) { + SpringApplication.run(FileRecordApplication.class, args); + } + + @Bean + ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) { + + ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer(); + initializer.setConnectionFactory(connectionFactory); + initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("schema.sql"))); + + return initializer; + } + +} diff --git a/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordController.java b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordController.java new file mode 100644 index 0000000000..943cec9a2d --- /dev/null +++ b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordController.java @@ -0,0 +1,45 @@ +package com.baeldung.webflux.filerecord; + +import org.springframework.http.codec.multipart.FilePart; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.file.Paths; + +@RestController +public class FileRecordController { + + private final FileRecordService fileRecordService; + + public FileRecordController(FileRecordService fileRecordService) { + this.fileRecordService = fileRecordService; + } + + @PostMapping("/upload-files") + public Mono uploadFileWithoutEntity(@RequestPart("files") Flux filePartFlux) { + return filePartFlux.flatMap(file -> file.transferTo(Paths.get(file.filename()))) + .then(Mono.just("OK")) + .onErrorResume(error -> Mono.just("Error uploading files")); + } + + @PostMapping("/upload-files-entity") + public Mono uploadFileWithEntity(@RequestPart("files") Flux filePartFlux) { + FileRecord fileRecord = new FileRecord(); + + return filePartFlux.flatMap(filePart -> filePart.transferTo(Paths.get(filePart.filename())) + .then(Mono.just(filePart.filename()))) + .collectList() + .flatMap(filenames -> { + fileRecord.setFilenames(filenames); + return fileRecordService.save(fileRecord); + }) + .onErrorResume(error -> Mono.error(error)); + } + + @GetMapping("/files/{id}") + public Mono geFilesById(@PathVariable("id") int id) { + return fileRecordService.findById(id) + .onErrorResume(error -> Mono.error(error)); + } +} diff --git a/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordRepository.java b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordRepository.java new file mode 100644 index 0000000000..b4cd4df627 --- /dev/null +++ b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordRepository.java @@ -0,0 +1,8 @@ +package com.baeldung.webflux.filerecord; + +import org.springframework.data.r2dbc.repository.R2dbcRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface FileRecordRepository extends R2dbcRepository { +} diff --git a/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordService.java b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordService.java new file mode 100644 index 0000000000..6e74f3577b --- /dev/null +++ b/spring-5-webflux-2/src/main/java/com/baeldung/webflux/filerecord/FileRecordService.java @@ -0,0 +1,23 @@ +package com.baeldung.webflux.filerecord; + +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +public class FileRecordService { + + private FileRecordRepository fileRecordRepository; + + public FileRecordService(FileRecordRepository fileRecordRepository) { + this.fileRecordRepository = fileRecordRepository; + } + + public Mono save(FileRecord fileRecord) { + return fileRecordRepository.save(fileRecord); + } + + public Mono findById(int id) { + return fileRecordRepository.findById(id); + } + +} diff --git a/spring-5-webflux-2/src/main/resources/application.properties b/spring-5-webflux-2/src/main/resources/application.properties new file mode 100644 index 0000000000..5e52669cdb --- /dev/null +++ b/spring-5-webflux-2/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.r2dbc.url=r2dbc:h2:file:///./testdb \ No newline at end of file diff --git a/spring-5-webflux-2/src/main/resources/schema.sql b/spring-5-webflux-2/src/main/resources/schema.sql new file mode 100644 index 0000000000..76a70c8ff6 --- /dev/null +++ b/spring-5-webflux-2/src/main/resources/schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS file_record ( + id INT NOT NULL AUTO_INCREMENT, + filenames VARCHAR(255), + PRIMARY KEY (id) +); \ No newline at end of file diff --git a/spring-5-webflux-2/src/test/java/com/baeldung/webflux/filerecord/FileRecordControllerIntegrationTest.java b/spring-5-webflux-2/src/test/java/com/baeldung/webflux/filerecord/FileRecordControllerIntegrationTest.java new file mode 100644 index 0000000000..c47d53632b --- /dev/null +++ b/spring-5-webflux-2/src/test/java/com/baeldung/webflux/filerecord/FileRecordControllerIntegrationTest.java @@ -0,0 +1,98 @@ +package com.baeldung.webflux.filerecord; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.http.MediaType; +import org.springframework.mock.web.MockMultipartFile; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.util.LinkedMultiValueMap; +import reactor.core.publisher.Mono; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = FileRecordController.class) +@ContextConfiguration(classes = { FileRecordController.class }) +@ComponentScan("com.baeldung.webflux.filerecord") +@AutoConfigureWebTestClient +class FileRecordControllerIntegrationTest { + + @Autowired + private WebTestClient webTestClient; + + @MockBean + private FileRecordService fileRecordService; + + @Test + public void givenUploadFilesWithEntity_whenRequestIsValid_thenReturnCreated() throws Exception { + + FileRecord fileRecord = new FileRecord(); + MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + List files = List.of(firstFile, secondFile); + fileRecord.setFilenames(files.stream() + .map(MockMultipartFile::getOriginalFilename) + .toList()); + + Mono fileRecordMono = Mono.just(fileRecord); + when(fileRecordService.save(any(FileRecord.class))).thenReturn(fileRecordMono); + + webTestClient.post() + .uri("/upload-files-entity") + .body(Mono.just(fileRecord), FileRecord.class) + .exchange() + .expectBody(FileRecord.class); + + } + + @Test + public void givenUploadFiles_whenRequestIsValid_thenReturnOk() throws Exception { + + MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + LinkedMultiValueMap multipartData = new LinkedMultiValueMap<>(); + multipartData.add("file", firstFile); + multipartData.add("file", secondFile); + + webTestClient.post() + .uri("/upload-files") + .bodyValue(multipartData) + .exchange() + .expectStatus() + .isOk(); + } + + @Test + public void givenUploadFilesWithEntity_whenRequestFileIsFindById_thenReturnOk() throws Exception { + + FileRecord fileRecord = new FileRecord(); + MockMultipartFile firstFile = new MockMultipartFile("file", "baeldungdata.txt", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + MockMultipartFile secondFile = new MockMultipartFile("file", "baeldungdata.pdf", MediaType.TEXT_PLAIN_VALUE, "Test file content".getBytes()); + List files = List.of(firstFile, secondFile); + fileRecord.setId(1); + fileRecord.setFilenames(files.stream() + .map(MockMultipartFile::getOriginalFilename) + .toList()); + + Mono fileRecordMono = Mono.just(fileRecord); + + when(fileRecordService.findById(1)).thenReturn(fileRecordMono); + + webTestClient.get() + .uri("/files/{id}", 1) + .exchange() + .expectStatus() + .isOk(); + } + +} \ No newline at end of file diff --git a/spring-cloud-modules/pom.xml b/spring-cloud-modules/pom.xml index 729dd8eaf1..ccc8590b0b 100644 --- a/spring-cloud-modules/pom.xml +++ b/spring-cloud-modules/pom.xml @@ -31,6 +31,7 @@ spring-cloud-connectors-heroku spring-cloud-aws + spring-cloud-aws-v3 spring-cloud-consul diff --git a/spring-cloud-modules/spring-cloud-aws-v3/README.md b/spring-cloud-modules/spring-cloud-aws-v3/README.md new file mode 100644 index 0000000000..d63b0eaa5d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/README.md @@ -0,0 +1,3 @@ +# Spring Cloud AWS + +TBD \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-aws-v3/pom.xml b/spring-cloud-modules/spring-cloud-aws-v3/pom.xml new file mode 100644 index 0000000000..7c020ae8a3 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + com.baeldung.spring.cloud + spring-cloud-aws-v3 + 0.0.1-SNAPSHOT + spring-cloud-aws-v3 + jar + Spring Cloud AWS Examples + + + + + io.awspring.cloud + spring-cloud-aws + ${spring-cloud-aws.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-web + + + io.awspring.cloud + spring-cloud-aws-starter + + + io.awspring.cloud + spring-cloud-aws-starter-sqs + + + org.springframework.boot + spring-boot-starter-test + test + + + org.testcontainers + localstack + test + + + org.testcontainers + junit-jupiter + test + + + org.awaitility + awaitility + test + + + + + + com.baeldung.spring.cloud.aws.SpringCloudAwsApplication + 3.1.0 + 17 + 17 + + + \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java new file mode 100644 index 0000000000..ece8a72cbb --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java @@ -0,0 +1,17 @@ +package com.baeldung.spring.cloud.aws; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +import com.baeldung.spring.cloud.aws.sqs.EventQueuesProperties; + +@SpringBootApplication +@EnableConfigurationProperties(EventQueuesProperties.class) +public class SpringCloudAwsApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringCloudAwsApplication.class, args); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java new file mode 100644 index 0000000000..d3631c39fa --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java @@ -0,0 +1,35 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "events.queues") +public class EventQueuesProperties { + + private String userCreatedByNameQueue; + private String userCreatedRecordQueue; + private String userCreatedEventTypeQueue; + + public String getUserCreatedByNameQueue() { + return userCreatedByNameQueue; + } + + public void setUserCreatedByNameQueue(String userCreatedByNameQueue) { + this.userCreatedByNameQueue = userCreatedByNameQueue; + } + + public String getUserCreatedRecordQueue() { + return userCreatedRecordQueue; + } + + public void setUserCreatedRecordQueue(String userCreatedRecordQueue) { + this.userCreatedRecordQueue = userCreatedRecordQueue; + } + + public String getUserCreatedEventTypeQueue() { + return userCreatedEventTypeQueue; + } + + public void setUserCreatedEventTypeQueue(String userCreatedEventTypeQueue) { + this.userCreatedEventTypeQueue = userCreatedEventTypeQueue; + } +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java new file mode 100644 index 0000000000..43b538f24d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java @@ -0,0 +1,5 @@ +package com.baeldung.spring.cloud.aws.sqs; + +public record User(String id, String name, String email) { + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java new file mode 100644 index 0000000000..242ddfa20d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java @@ -0,0 +1,5 @@ +package com.baeldung.spring.cloud.aws.sqs; + +public record UserCreatedEvent(String id, String username, String email) { + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java new file mode 100644 index 0000000000..50cff6e9bb --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java @@ -0,0 +1,49 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import io.awspring.cloud.sqs.annotation.SqsListener; + +@Component +public class UserEventListeners { + + private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class); + + public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType"; + + private final UserRepository userRepository; + + public UserEventListeners(UserRepository userRepository) { + this.userRepository = userRepository; + } + + @SqsListener("${events.queues.user-created-by-name-queue}") + public void receiveStringMessage(String username) { + logger.info("Received message: {}", username); + userRepository.save(new User(UUID.randomUUID() + .toString(), username, null)); + } + + @SqsListener(value = "${events.queues.user-created-record-queue}") + public void receiveRecordMessage(UserCreatedEvent event) { + logger.info("Received message: {}", event); + userRepository.save(new User(event.id(), event.username(), event.email())); + } + + @SqsListener("${events.queues.user-created-event-type-queue}") + public void customHeaderMessage(Message message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType, + @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) { + logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive); + UserCreatedEvent payload = message.getPayload(); + userRepository.save(new User(payload.id(), payload.username(), payload.email())); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java new file mode 100644 index 0000000000..c42352dc53 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java @@ -0,0 +1,28 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.stereotype.Repository; + +@Repository +public class UserRepository { + + private final Map persistedUsers = new ConcurrentHashMap<>(); + + public void save(User userToSave) { + persistedUsers.put(userToSave.id(), userToSave); + } + + public Optional findById(String userId) { + return Optional.ofNullable(persistedUsers.get(userId)); + } + + public Optional findByName(String name) { + return persistedUsers.values().stream() + .filter(user -> user.name().equals(name)) + .findFirst(); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml b/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml new file mode 100644 index 0000000000..68a7ba512c --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml @@ -0,0 +1,5 @@ +events: + queues: + user-created-by-name-queue: user_created_by_name_queue + user-created-record-queue: user_created_record_queue + user-created-event-type-queue: user_created_event_type_queue diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java new file mode 100644 index 0000000000..6fd4dedbb0 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java @@ -0,0 +1,32 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest +@Testcontainers +public class BaseSqsIntegrationTest { + + private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2"; + + @Container + static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION)); + + @DynamicPropertySource + static void overrideProperties(DynamicPropertyRegistry registry) { + registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion()); + registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey()); + registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey()); + registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS) + .toString()); + // ...other AWS services endpoints can be added here + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java new file mode 100644 index 0000000000..9248c77385 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java @@ -0,0 +1,84 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static com.baeldung.spring.cloud.aws.sqs.UserEventListeners.EVENT_TYPE_CUSTOM_HEADER; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.Map; +import java.util.UUID; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import io.awspring.cloud.sqs.operations.SqsTemplate; + +public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class); + + @Autowired + private SqsTemplate sqsTemplate; + + @Autowired + private UserRepository userRepository; + + @Autowired + private EventQueuesProperties eventQueuesProperties; + + @Test + void givenAStringPayload_whenSend_shouldReceive() { + // given + var userName = "Albert"; + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue()) + .payload(userName)); + logger.info("Message sent with payload {}", userName); + + // then + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findByName(userName) + .isPresent()); + } + + @Test + void givenARecordPayload_whenSend_shouldReceive() { + // given + String userId = UUID.randomUUID() + .toString(); + var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue()) + .payload(payload)); + + // then + logger.info("Message sent with payload: {}", payload); + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findById(userId) + .isPresent()); + } + + @Test + void givenCustomHeaders_whenSend_shouldReceive() { + // given + String userId = UUID.randomUUID() + .toString(); + var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); + var headers = Map. of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent"); + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue()) + .payload(payload) + .headers(headers)); + + // then + logger.info("Sent message with payload {} and custom headers: {}", payload, headers); + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findById(userId) + .isPresent()); + } + +}