Merge pull request #15655 from tomazfernandes/BAEL-7349
BAEL-7349 - Spring Cloud AWS 3.0 SQS - Introduction
This commit is contained in:
		
						commit
						8cbcfd8287
					
				| @ -31,6 +31,7 @@ | ||||
|         <!--        <module>spring-cloud-stream-starters</module>--> | ||||
|         <module>spring-cloud-connectors-heroku</module> | ||||
|         <module>spring-cloud-aws</module> | ||||
|         <module>spring-cloud-aws-v3</module> | ||||
|         <module>spring-cloud-consul</module> | ||||
|         <!--        <module>spring-cloud-zuul-eureka-integration</module>--> | ||||
|         <!--        <module>spring-cloud-contract</module>--> | ||||
|  | ||||
							
								
								
									
										3
									
								
								spring-cloud-modules/spring-cloud-aws-v3/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								spring-cloud-modules/spring-cloud-aws-v3/README.md
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,3 @@ | ||||
| # Spring Cloud AWS | ||||
| 
 | ||||
| TBD | ||||
							
								
								
									
										68
									
								
								spring-cloud-modules/spring-cloud-aws-v3/pom.xml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										68
									
								
								spring-cloud-modules/spring-cloud-aws-v3/pom.xml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,68 @@ | ||||
| <?xml version="1.0" encoding="UTF-8"?> | ||||
| <project xmlns="http://maven.apache.org/POM/4.0.0" | ||||
|          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||
|          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||
|     <modelVersion>4.0.0</modelVersion> | ||||
|     <groupId>com.baeldung.spring.cloud</groupId> | ||||
|     <artifactId>spring-cloud-aws-v3</artifactId> | ||||
|     <version>0.0.1-SNAPSHOT</version> | ||||
|     <name>spring-cloud-aws-v3</name> | ||||
|     <packaging>jar</packaging> | ||||
|     <description>Spring Cloud AWS Examples</description> | ||||
| 
 | ||||
|     <dependencyManagement> | ||||
|         <dependencies> | ||||
|             <dependency> | ||||
|                 <groupId>io.awspring.cloud</groupId> | ||||
|                 <artifactId>spring-cloud-aws</artifactId> | ||||
|                 <version>${spring-cloud-aws.version}</version> | ||||
|                 <type>pom</type> | ||||
|                 <scope>import</scope> | ||||
|             </dependency> | ||||
|         </dependencies> | ||||
|     </dependencyManagement> | ||||
| 
 | ||||
|     <dependencies> | ||||
|         <dependency> | ||||
|             <groupId>org.springframework.boot</groupId> | ||||
|             <artifactId>spring-boot-starter-web</artifactId> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>io.awspring.cloud</groupId> | ||||
|             <artifactId>spring-cloud-aws-starter</artifactId> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>io.awspring.cloud</groupId> | ||||
|             <artifactId>spring-cloud-aws-starter-sqs</artifactId> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>org.springframework.boot</groupId> | ||||
|             <artifactId>spring-boot-starter-test</artifactId> | ||||
|             <scope>test</scope> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>org.testcontainers</groupId> | ||||
|             <artifactId>localstack</artifactId> | ||||
|             <scope>test</scope> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>org.testcontainers</groupId> | ||||
|             <artifactId>junit-jupiter</artifactId> | ||||
|             <scope>test</scope> | ||||
|         </dependency> | ||||
|         <dependency> | ||||
|             <groupId>org.awaitility</groupId> | ||||
|             <artifactId>awaitility</artifactId> | ||||
|             <scope>test</scope> | ||||
|         </dependency> | ||||
| 
 | ||||
|     </dependencies> | ||||
| 
 | ||||
|     <properties> | ||||
|         <start-class>com.baeldung.spring.cloud.aws.SpringCloudAwsApplication</start-class> | ||||
|         <spring-cloud-aws.version>3.1.0</spring-cloud-aws.version> | ||||
|         <maven.compiler.source>17</maven.compiler.source> | ||||
|         <maven.compiler.target>17</maven.compiler.target> | ||||
|     </properties> | ||||
| 
 | ||||
| </project> | ||||
| @ -0,0 +1,17 @@ | ||||
| package com.baeldung.spring.cloud.aws; | ||||
| 
 | ||||
| import org.springframework.boot.SpringApplication; | ||||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
| import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||||
| 
 | ||||
| import com.baeldung.spring.cloud.aws.sqs.EventQueuesProperties; | ||||
| 
 | ||||
| @SpringBootApplication | ||||
| @EnableConfigurationProperties(EventQueuesProperties.class) | ||||
| public class SpringCloudAwsApplication { | ||||
| 
 | ||||
|     public static void main(String[] args) { | ||||
|         SpringApplication.run(SpringCloudAwsApplication.class, args); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,35 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| import org.springframework.boot.context.properties.ConfigurationProperties; | ||||
| 
 | ||||
| @ConfigurationProperties(prefix = "events.queues") | ||||
| public class EventQueuesProperties { | ||||
| 
 | ||||
|     private String userCreatedByNameQueue; | ||||
|     private String userCreatedRecordQueue; | ||||
|     private String userCreatedEventTypeQueue; | ||||
| 
 | ||||
|     public String getUserCreatedByNameQueue() { | ||||
|         return userCreatedByNameQueue; | ||||
|     } | ||||
| 
 | ||||
|     public void setUserCreatedByNameQueue(String userCreatedByNameQueue) { | ||||
|         this.userCreatedByNameQueue = userCreatedByNameQueue; | ||||
|     } | ||||
| 
 | ||||
|     public String getUserCreatedRecordQueue() { | ||||
|         return userCreatedRecordQueue; | ||||
|     } | ||||
| 
 | ||||
|     public void setUserCreatedRecordQueue(String userCreatedRecordQueue) { | ||||
|         this.userCreatedRecordQueue = userCreatedRecordQueue; | ||||
|     } | ||||
| 
 | ||||
|     public String getUserCreatedEventTypeQueue() { | ||||
|         return userCreatedEventTypeQueue; | ||||
|     } | ||||
| 
 | ||||
|     public void setUserCreatedEventTypeQueue(String userCreatedEventTypeQueue) { | ||||
|         this.userCreatedEventTypeQueue = userCreatedEventTypeQueue; | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,5 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| public record User(String id, String name, String email) { | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,5 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| public record UserCreatedEvent(String id, String username, String email) { | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,49 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| import static io.awspring.cloud.sqs.listener.SqsHeaders.MessageSystemAttributes.SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP; | ||||
| 
 | ||||
| import java.util.UUID; | ||||
| 
 | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.springframework.messaging.Message; | ||||
| import org.springframework.messaging.handler.annotation.Header; | ||||
| import org.springframework.stereotype.Component; | ||||
| 
 | ||||
| import io.awspring.cloud.sqs.annotation.SqsListener; | ||||
| 
 | ||||
| @Component | ||||
| public class UserEventListeners { | ||||
| 
 | ||||
|     private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class); | ||||
| 
 | ||||
|     public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType"; | ||||
| 
 | ||||
|     private final UserRepository userRepository; | ||||
| 
 | ||||
|     public UserEventListeners(UserRepository userRepository) { | ||||
|         this.userRepository = userRepository; | ||||
|     } | ||||
| 
 | ||||
|     @SqsListener("${events.queues.user-created-by-name-queue}") | ||||
|     public void receiveStringMessage(String username) { | ||||
|         logger.info("Received message: {}", username); | ||||
|         userRepository.save(new User(UUID.randomUUID() | ||||
|             .toString(), username, null)); | ||||
|     } | ||||
| 
 | ||||
|     @SqsListener(value = "${events.queues.user-created-record-queue}") | ||||
|     public void receiveRecordMessage(UserCreatedEvent event) { | ||||
|         logger.info("Received message: {}", event); | ||||
|         userRepository.save(new User(event.id(), event.username(), event.email())); | ||||
|     } | ||||
| 
 | ||||
|     @SqsListener("${events.queues.user-created-event-type-queue}") | ||||
|     public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType, | ||||
|         @Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) { | ||||
|         logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive); | ||||
|         UserCreatedEvent payload = message.getPayload(); | ||||
|         userRepository.save(new User(payload.id(), payload.username(), payload.email())); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,28 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| import java.util.Map; | ||||
| import java.util.Optional; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
| 
 | ||||
| import org.springframework.stereotype.Repository; | ||||
| 
 | ||||
| @Repository | ||||
| public class UserRepository { | ||||
| 
 | ||||
|     private final Map<String, User> persistedUsers = new ConcurrentHashMap<>(); | ||||
| 
 | ||||
|     public void save(User userToSave) { | ||||
|         persistedUsers.put(userToSave.id(), userToSave); | ||||
|     } | ||||
| 
 | ||||
|     public Optional<User> findById(String userId) { | ||||
|         return Optional.ofNullable(persistedUsers.get(userId)); | ||||
|     } | ||||
| 
 | ||||
|     public Optional<User> findByName(String name) { | ||||
|         return persistedUsers.values().stream() | ||||
|             .filter(user -> user.name().equals(name)) | ||||
|             .findFirst(); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,5 @@ | ||||
| events: | ||||
|   queues: | ||||
|     user-created-by-name-queue: user_created_by_name_queue | ||||
|     user-created-record-queue: user_created_record_queue | ||||
|     user-created-event-type-queue: user_created_event_type_queue | ||||
| @ -0,0 +1,32 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS; | ||||
| 
 | ||||
| import org.springframework.boot.test.context.SpringBootTest; | ||||
| import org.springframework.test.context.DynamicPropertyRegistry; | ||||
| import org.springframework.test.context.DynamicPropertySource; | ||||
| import org.testcontainers.containers.localstack.LocalStackContainer; | ||||
| import org.testcontainers.junit.jupiter.Container; | ||||
| import org.testcontainers.junit.jupiter.Testcontainers; | ||||
| import org.testcontainers.utility.DockerImageName; | ||||
| 
 | ||||
| @SpringBootTest | ||||
| @Testcontainers | ||||
| public class BaseSqsIntegrationTest { | ||||
| 
 | ||||
|     private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2"; | ||||
| 
 | ||||
|     @Container | ||||
|     static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION)); | ||||
| 
 | ||||
|     @DynamicPropertySource | ||||
|     static void overrideProperties(DynamicPropertyRegistry registry) { | ||||
|         registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion()); | ||||
|         registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey()); | ||||
|         registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey()); | ||||
|         registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS) | ||||
|             .toString()); | ||||
|         // ...other AWS services endpoints can be added here | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,84 @@ | ||||
| package com.baeldung.spring.cloud.aws.sqs; | ||||
| 
 | ||||
| import static com.baeldung.spring.cloud.aws.sqs.UserEventListeners.EVENT_TYPE_CUSTOM_HEADER; | ||||
| import static org.awaitility.Awaitility.await; | ||||
| 
 | ||||
| import java.time.Duration; | ||||
| import java.util.Map; | ||||
| import java.util.UUID; | ||||
| 
 | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| 
 | ||||
| import io.awspring.cloud.sqs.operations.SqsTemplate; | ||||
| 
 | ||||
| public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest { | ||||
| 
 | ||||
|     private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class); | ||||
| 
 | ||||
|     @Autowired | ||||
|     private SqsTemplate sqsTemplate; | ||||
| 
 | ||||
|     @Autowired | ||||
|     private UserRepository userRepository; | ||||
| 
 | ||||
|     @Autowired | ||||
|     private EventQueuesProperties eventQueuesProperties; | ||||
| 
 | ||||
|     @Test | ||||
|     void givenAStringPayload_whenSend_shouldReceive() { | ||||
|         // given | ||||
|         var userName = "Albert"; | ||||
| 
 | ||||
|         // when | ||||
|         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue()) | ||||
|             .payload(userName)); | ||||
|         logger.info("Message sent with payload {}", userName); | ||||
| 
 | ||||
|         // then | ||||
|         await().atMost(Duration.ofSeconds(3)) | ||||
|             .until(() -> userRepository.findByName(userName) | ||||
|                 .isPresent()); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     void givenARecordPayload_whenSend_shouldReceive() { | ||||
|         // given | ||||
|         String userId = UUID.randomUUID() | ||||
|             .toString(); | ||||
|         var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); | ||||
| 
 | ||||
|         // when | ||||
|         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue()) | ||||
|             .payload(payload)); | ||||
| 
 | ||||
|         // then | ||||
|         logger.info("Message sent with payload: {}", payload); | ||||
|         await().atMost(Duration.ofSeconds(3)) | ||||
|             .until(() -> userRepository.findById(userId) | ||||
|                 .isPresent()); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     void givenCustomHeaders_whenSend_shouldReceive() { | ||||
|         // given | ||||
|         String userId = UUID.randomUUID() | ||||
|             .toString(); | ||||
|         var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com"); | ||||
|         var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent"); | ||||
| 
 | ||||
|         // when | ||||
|         sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue()) | ||||
|             .payload(payload) | ||||
|             .headers(headers)); | ||||
| 
 | ||||
|         // then | ||||
|         logger.info("Sent message with payload {} and custom headers: {}", payload, headers); | ||||
|         await().atMost(Duration.ofSeconds(3)) | ||||
|             .until(() -> userRepository.findById(userId) | ||||
|                 .isPresent()); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user