diff --git a/spring-cloud-modules/pom.xml b/spring-cloud-modules/pom.xml index 729dd8eaf1..ccc8590b0b 100644 --- a/spring-cloud-modules/pom.xml +++ b/spring-cloud-modules/pom.xml @@ -31,6 +31,7 @@ spring-cloud-connectors-heroku spring-cloud-aws + spring-cloud-aws-v3 spring-cloud-consul diff --git a/spring-cloud-modules/spring-cloud-aws-v3/README.md b/spring-cloud-modules/spring-cloud-aws-v3/README.md new file mode 100644 index 0000000000..d63b0eaa5d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/README.md @@ -0,0 +1,3 @@ +# Spring Cloud AWS + +TBD \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-aws-v3/pom.xml b/spring-cloud-modules/spring-cloud-aws-v3/pom.xml new file mode 100644 index 0000000000..7c020ae8a3 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + com.baeldung.spring.cloud + spring-cloud-aws-v3 + 0.0.1-SNAPSHOT + spring-cloud-aws-v3 + jar + Spring Cloud AWS Examples + + + + + io.awspring.cloud + spring-cloud-aws + ${spring-cloud-aws.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter-web + + + io.awspring.cloud + spring-cloud-aws-starter + + + io.awspring.cloud + spring-cloud-aws-starter-sqs + + + org.springframework.boot + spring-boot-starter-test + test + + + org.testcontainers + localstack + test + + + org.testcontainers + junit-jupiter + test + + + org.awaitility + awaitility + test + + + + + + com.baeldung.spring.cloud.aws.SpringCloudAwsApplication + 3.1.0 + 17 + 17 + + + \ No newline at end of file diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java new file mode 100644 index 0000000000..ece8a72cbb --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/SpringCloudAwsApplication.java @@ -0,0 +1,17 @@ +package com.baeldung.spring.cloud.aws; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +import com.baeldung.spring.cloud.aws.sqs.EventQueuesProperties; + +@SpringBootApplication +@EnableConfigurationProperties(EventQueuesProperties.class) +public class SpringCloudAwsApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringCloudAwsApplication.class, args); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java new file mode 100644 index 0000000000..d3631c39fa --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/EventQueuesProperties.java @@ -0,0 +1,35 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "events.queues") +public class EventQueuesProperties { + + private String userCreatedByNameQueue; + private String userCreatedRecordQueue; + private String userCreatedEventTypeQueue; + + public String getUserCreatedByNameQueue() { + return userCreatedByNameQueue; + } + + public void setUserCreatedByNameQueue(String userCreatedByNameQueue) { + this.userCreatedByNameQueue = userCreatedByNameQueue; + } + + public String getUserCreatedRecordQueue() { + return userCreatedRecordQueue; + } + + public void setUserCreatedRecordQueue(String userCreatedRecordQueue) { + this.userCreatedRecordQueue = userCreatedRecordQueue; + } + + public String getUserCreatedEventTypeQueue() { + return userCreatedEventTypeQueue; + } + + public void setUserCreatedEventTypeQueue(String userCreatedEventTypeQueue) { + this.userCreatedEventTypeQueue = userCreatedEventTypeQueue; + } +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java new file mode 100644 index 0000000000..43b538f24d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/User.java @@ -0,0 +1,5 @@ +package com.baeldung.spring.cloud.aws.sqs; + +public record User(String id, String name, String email) { + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java new file mode 100644 index 0000000000..242ddfa20d --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserCreatedEvent.java @@ -0,0 +1,5 @@ +package com.baeldung.spring.cloud.aws.sqs; + +public record UserCreatedEvent(String id, String username, String email) { + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java new file mode 100644 index 0000000000..50cff6e9bb --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserEventListeners.java @@ -0,0 +1,49 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP; + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import io.awspring.cloud.sqs.annotation.SqsListener; + +@Component +public class UserEventListeners { + + private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class); + + public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType"; + + private final UserRepository userRepository; + + public UserEventListeners(UserRepository userRepository) { + this.userRepository = userRepository; + } + + @SqsListener("${events.queues.user-created-by-name-queue}") + public void receiveStringMessage(String username) { + logger.info("Received message: {}", username); + userRepository.save(new User(UUID.randomUUID() + .toString(), username, null)); + } + + @SqsListener(value = "${events.queues.user-created-record-queue}") + public void receiveRecordMessage(UserCreatedEvent event) { + logger.info("Received message: {}", event); + userRepository.save(new User(event.id(), event.username(), event.email())); + } + + @SqsListener("${events.queues.user-created-event-type-queue}") + public void customHeaderMessage(Message message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType, + @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) { + logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive); + UserCreatedEvent payload = message.getPayload(); + userRepository.save(new User(payload.id(), payload.username(), payload.email())); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java new file mode 100644 index 0000000000..c42352dc53 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/java/com/baeldung/spring/cloud/aws/sqs/UserRepository.java @@ -0,0 +1,28 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.stereotype.Repository; + +@Repository +public class UserRepository { + + private final Map persistedUsers = new ConcurrentHashMap<>(); + + public void save(User userToSave) { + persistedUsers.put(userToSave.id(), userToSave); + } + + public Optional findById(String userId) { + return Optional.ofNullable(persistedUsers.get(userId)); + } + + public Optional findByName(String name) { + return persistedUsers.values().stream() + .filter(user -> user.name().equals(name)) + .findFirst(); + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml b/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml new file mode 100644 index 0000000000..68a7ba512c --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/main/resources/application.yaml @@ -0,0 +1,5 @@ +events: + queues: + user-created-by-name-queue: user_created_by_name_queue + user-created-record-queue: user_created_record_queue + user-created-event-type-queue: user_created_event_type_queue diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java new file mode 100644 index 0000000000..6fd4dedbb0 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/BaseSqsIntegrationTest.java @@ -0,0 +1,32 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; + +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@SpringBootTest +@Testcontainers +public class BaseSqsIntegrationTest { + + private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2"; + + @Container + static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION)); + + @DynamicPropertySource + static void overrideProperties(DynamicPropertyRegistry registry) { + registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion()); + registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey()); + registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey()); + registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS) + .toString()); + // ...other AWS services endpoints can be added here + } + +} diff --git a/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java new file mode 100644 index 0000000000..9248c77385 --- /dev/null +++ b/spring-cloud-modules/spring-cloud-aws-v3/src/test/java/com/baeldung/spring/cloud/aws/sqs/SpringCloudAwsSQSLiveTest.java @@ -0,0 +1,84 @@ +package com.baeldung.spring.cloud.aws.sqs; + +import static com.baeldung.spring.cloud.aws.sqs.UserEventListeners.EVENT_TYPE_CUSTOM_HEADER; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.Map; +import java.util.UUID; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import io.awspring.cloud.sqs.operations.SqsTemplate; + +public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class); + + @Autowired + private SqsTemplate sqsTemplate; + + @Autowired + private UserRepository userRepository; + + @Autowired + private EventQueuesProperties eventQueuesProperties; + + @Test + void givenAStringPayload_whenSend_shouldReceive() { + // given + var userName = "Albert"; + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue()) + .payload(userName)); + logger.info("Message sent with payload {}", userName); + + // then + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findByName(userName) + .isPresent()); + } + + @Test + void givenARecordPayload_whenSend_shouldReceive() { + // given + String userId = UUID.randomUUID() + .toString(); + var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue()) + .payload(payload)); + + // then + logger.info("Message sent with payload: {}", payload); + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findById(userId) + .isPresent()); + } + + @Test + void givenCustomHeaders_whenSend_shouldReceive() { + // given + String userId = UUID.randomUUID() + .toString(); + var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); + var headers = Map. of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent"); + + // when + sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue()) + .payload(payload) + .headers(headers)); + + // then + logger.info("Sent message with payload {} and custom headers: {}", payload, headers); + await().atMost(Duration.ofSeconds(3)) + .until(() -> userRepository.findById(userId) + .isPresent()); + } + +}