Code for BAEL-1474 (#4472)

* Code for BAEL-1474

* Renamed test to conform to PMD rules
This commit is contained in:
psevestre 2018-06-13 02:12:05 -03:00 committed by maibin
parent 2a07b03b69
commit b59da11c66
9 changed files with 524 additions and 1 deletions

View File

@ -263,6 +263,7 @@
<module>performance-tests</module>
<module>twilio</module>
<module>java-ee-8-security-api</module>
<module>spring-webflux-amqp</module>
</modules>
<dependencies>

View File

@ -16,7 +16,7 @@ import com.baeldung.dependson.shared.File;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class FileProcessorTest {
public class FileProcessorIntegrationTest {
@Autowired
ApplicationContext context;

25
spring-webflux-amqp/.gitignore vendored Executable file
View File

@ -0,0 +1,25 @@
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/build/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

92
spring-webflux-amqp/pom.xml Executable file
View File

@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.baeldung.spring</groupId>
<artifactId>spring-webflux-amqp</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-webflux-amqp</name>
<description>Spring WebFlux AMQP Sample</description>
<parent>
<artifactId>parent-boot-2</artifactId>
<groupId>com.baeldung</groupId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../parent-boot-2</relativePath>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
</pluginRepositories>
</project>

View File

@ -0,0 +1,23 @@
##
## Create a simple RabbitMQ environment with multiple clients
##
version: "3"
services:
##
## RabitMQ server
##
rabbitmq:
image: rabbitmq:3
hostname: rabbit
environment:
RABBITMQ_ERLANG_COOKIE: test
ports:
- "5672:5672"
volumes:
- rabbitmq-data:/var/lib/rabbitmq
volumes:
rabbitmq-data:

View File

@ -0,0 +1,59 @@
package org.baeldung.spring.amqp;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("destinations")
public class DestinationsConfig {
private Map<String,DestinationInfo> queues = new HashMap<>();
private Map<String,DestinationInfo> topics = new HashMap<>();
public Map<String, DestinationInfo> getQueues() {
return queues;
}
public void setQueues(Map<String, DestinationInfo> queues) {
this.queues = queues;
}
public Map<String, DestinationInfo> getTopics() {
return topics;
}
public void setTopics(Map<String, DestinationInfo> topics) {
this.topics = topics;
}
// DestinationInfo stores the Exchange name and routing key used
// by our producers when posting messages
static class DestinationInfo {
private String exchange;
private String routingKey;
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}
}

View File

@ -0,0 +1,270 @@
package org.baeldung.spring.amqp;
import java.util.stream.Stream;
import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
private static Logger log = LoggerFactory.getLogger(SpringWebfluxAmqpApplication.class);
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private DestinationsConfig destinationsConfig;
public static void main(String[] args) {
SpringApplication.run(SpringWebfluxAmqpApplication.class, args);
}
@Bean
public CommandLineRunner setupQueueDestinations(AmqpAdmin amqpAdmin,DestinationsConfig destinationsConfig) {
return (args) -> {
log.info("[I48] Creating Destinations...");
destinationsConfig.getQueues()
.forEach((key, destination) -> {
log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey());
Exchange ex = ExchangeBuilder
.directExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
Queue q = QueueBuilder
.durable(destination.getRoutingKey())
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
log.info("[I70] Binding successfully created.");
});
};
}
@Bean
public CommandLineRunner setupTopicDestinations(AmqpAdmin amqpAdmin, DestinationsConfig destinationsConfig) {
return (args) -> {
// For topic each consumer will have its own Queue, so no binding
destinationsConfig.getTopics()
.forEach((key, destination) -> {
log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange());
Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
log.info("[I107] Topic Exchange successfully created.");
});
};
}
@PostMapping(value = "/queue/{name}")
public Mono<ResponseEntity<?>> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details
final DestinationInfo d = destinationsConfig.getQueues()
.get(name);
if (d == null) {
// Destination not found.
return Mono.just(ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey());
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted().build();
});
}
/**
* Receive messages for the given queue
* @param name
* @return
*/
@GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {
final DestinationInfo d = destinationsConfig.getQueues().get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound().build());
}
Stream<String> s = Stream.generate(() -> {
String queueName = d.getRoutingKey();
log.info("[I137] Polling {}", queueName);
Object payload = amqpTemplate.receiveAndConvert(queueName,5000);
if ( payload == null ) {
payload = "No news is good news...";
}
return payload.toString();
});
return Flux
.fromStream(s)
.subscribeOn(Schedulers.elastic());
}
/**
* send message to a given topic
* @param name
* @param payload
* @return
*/
@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) {
// Lookup exchange details
final DestinationInfo d = destinationsConfig.getTopics().get(name);
if (d == null) {
// Destination not found.
return Mono.just(ResponseEntity.notFound().build());
}
return Mono.fromCallable(() -> {
log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey());
amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
return ResponseEntity.accepted().build();
});
}
@GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
DestinationInfo d = destinationsConfig.getTopics().get(name);
if (d == null) {
return Flux.just(ResponseEntity.notFound().build());
}
final Queue topicQueue = createTopicQueue(d);
Stream<String> s = Stream.generate(() -> {
String queueName = topicQueue.getName();
log.info("[I137] Polling {}", queueName);
try {
Object payload = amqpTemplate.receiveAndConvert(queueName,5000);
if ( payload == null ) {
payload = "No news is good news...";
}
return payload.toString();
}
catch(AmqpException ex) {
log.warn("[W247] Received an AMQP Exception: {}", ex.getMessage());
return null;
}
});
return Flux.fromStream(s)
.doOnCancel(() -> {
log.info("[I250] doOnCancel()");
amqpAdmin.deleteQueue(topicQueue.getName());
})
.subscribeOn(Schedulers.elastic());
}
private Queue createTopicQueue(DestinationInfo destination) {
Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
.durable(true)
.build();
amqpAdmin.declareExchange(ex);
// Create a durable queue
Queue q = QueueBuilder
.durable()
.build();
amqpAdmin.declareQueue(q);
Binding b = BindingBuilder.bind(q)
.to(ex)
.with(destination.getRoutingKey())
.noargs();
amqpAdmin.declareBinding(b);
return q;
}
}

View File

@ -0,0 +1,27 @@
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
destinations:
queues:
NYSE:
exchange: nyse
routing-key: NYSE
IBOV:
exchange: ibov
routing-key: IBOV
topics:
weather:
exchange: alerts
routing-key: WEATHER

View File

@ -0,0 +1,26 @@
package org.baeldung.spring.amqp;
import org.junit.Test;
import org.springframework.test.web.reactive.server.WebTestClient;
public class SpringWebfluxAmqpLiveTest {
@Test
public void whenSendingAMessageToQueue_thenAcceptedReturnCode() {
WebTestClient client = WebTestClient.bindToServer()
.baseUrl("http://localhost:8080")
.build();
client.post()
.uri("/queue/NYSE")
.syncBody("Test Message")
.exchange()
.expectStatus().isAccepted();
}
}