From b59da11c6661d79f0ad76ebed6bf1be3f7f55f2c Mon Sep 17 00:00:00 2001 From: psevestre Date: Wed, 13 Jun 2018 02:12:05 -0300 Subject: [PATCH] Code for BAEL-1474 (#4472) * Code for BAEL-1474 * Renamed test to conform to PMD rules --- pom.xml | 1 + ...java => FileProcessorIntegrationTest.java} | 2 +- spring-webflux-amqp/.gitignore | 25 ++ spring-webflux-amqp/pom.xml | 92 ++++++ .../src/docker/docker-compose.yml | 23 ++ .../spring/amqp/DestinationsConfig.java | 59 ++++ .../amqp/SpringWebfluxAmqpApplication.java | 270 ++++++++++++++++++ .../src/main/resources/application.yml | 27 ++ .../amqp/SpringWebfluxAmqpLiveTest.java | 26 ++ 9 files changed, 524 insertions(+), 1 deletion(-) rename spring-core/src/test/java/com/baeldung/dependson/processor/{FileProcessorTest.java => FileProcessorIntegrationTest.java} (96%) create mode 100755 spring-webflux-amqp/.gitignore create mode 100755 spring-webflux-amqp/pom.xml create mode 100755 spring-webflux-amqp/src/docker/docker-compose.yml create mode 100755 spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java create mode 100755 spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java create mode 100755 spring-webflux-amqp/src/main/resources/application.yml create mode 100755 spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java diff --git a/pom.xml b/pom.xml index d42c3ac617..38f88a2784 100644 --- a/pom.xml +++ b/pom.xml @@ -263,6 +263,7 @@ performance-tests twilio java-ee-8-security-api + spring-webflux-amqp diff --git a/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java b/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java similarity index 96% rename from spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java rename to spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java index b54ac16125..11d9daf3bf 100644 --- a/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java +++ b/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java @@ -16,7 +16,7 @@ import com.baeldung.dependson.shared.File; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TestConfig.class) -public class FileProcessorTest { +public class FileProcessorIntegrationTest { @Autowired ApplicationContext context; diff --git a/spring-webflux-amqp/.gitignore b/spring-webflux-amqp/.gitignore new file mode 100755 index 0000000000..82eca336e3 --- /dev/null +++ b/spring-webflux-amqp/.gitignore @@ -0,0 +1,25 @@ +/target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/build/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ \ No newline at end of file diff --git a/spring-webflux-amqp/pom.xml b/spring-webflux-amqp/pom.xml new file mode 100755 index 0000000000..75fafd7b10 --- /dev/null +++ b/spring-webflux-amqp/pom.xml @@ -0,0 +1,92 @@ + + + 4.0.0 + + org.baeldung.spring + spring-webflux-amqp + 1.0.0-SNAPSHOT + jar + + spring-webflux-amqp + Spring WebFlux AMQP Sample + + + parent-boot-2 + com.baeldung + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + + org.springframework.boot + spring-boot-starter-test + test + + + + io.projectreactor + reactor-test + test + + + + org.springframework.boot + spring-boot-starter-integration + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/libs-snapshot + + true + + + + + + + spring-releases + Spring Releases + https://repo.spring.io/libs-release + + + + + diff --git a/spring-webflux-amqp/src/docker/docker-compose.yml b/spring-webflux-amqp/src/docker/docker-compose.yml new file mode 100755 index 0000000000..03292aeb63 --- /dev/null +++ b/spring-webflux-amqp/src/docker/docker-compose.yml @@ -0,0 +1,23 @@ +## +## Create a simple RabbitMQ environment with multiple clients +## +version: "3" + +services: + +## +## RabitMQ server +## + rabbitmq: + image: rabbitmq:3 + hostname: rabbit + environment: + RABBITMQ_ERLANG_COOKIE: test + ports: + - "5672:5672" + volumes: + - rabbitmq-data:/var/lib/rabbitmq + +volumes: + rabbitmq-data: + diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java new file mode 100755 index 0000000000..410b87c764 --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java @@ -0,0 +1,59 @@ +package org.baeldung.spring.amqp; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("destinations") +public class DestinationsConfig { + + + private Map queues = new HashMap<>(); + + private Map topics = new HashMap<>(); + + + + public Map getQueues() { + return queues; + } + + public void setQueues(Map queues) { + this.queues = queues; + } + + public Map getTopics() { + return topics; + } + + public void setTopics(Map topics) { + this.topics = topics; + } + + // DestinationInfo stores the Exchange name and routing key used + // by our producers when posting messages + static class DestinationInfo { + + private String exchange; + private String routingKey; + + + public String getExchange() { + return exchange; + } + public void setExchange(String exchange) { + this.exchange = exchange; + } + public String getRoutingKey() { + return routingKey; + } + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + + + } + +} diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java new file mode 100755 index 0000000000..eb3b858ddc --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java @@ -0,0 +1,270 @@ +package org.baeldung.spring.amqp; + +import java.util.stream.Stream; + +import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@SpringBootApplication +@EnableConfigurationProperties(DestinationsConfig.class) +@RestController +public class SpringWebfluxAmqpApplication { + + private static Logger log = LoggerFactory.getLogger(SpringWebfluxAmqpApplication.class); + + @Autowired + private AmqpTemplate amqpTemplate; + + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private DestinationsConfig destinationsConfig; + + + public static void main(String[] args) { + SpringApplication.run(SpringWebfluxAmqpApplication.class, args); + } + + @Bean + public CommandLineRunner setupQueueDestinations(AmqpAdmin amqpAdmin,DestinationsConfig destinationsConfig) { + + return (args) -> { + + log.info("[I48] Creating Destinations..."); + + destinationsConfig.getQueues() + .forEach((key, destination) -> { + + log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey()); + + Exchange ex = ExchangeBuilder + .directExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + Queue q = QueueBuilder + .durable(destination.getRoutingKey()) + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + amqpAdmin.declareBinding(b); + + log.info("[I70] Binding successfully created."); + + }); + + }; + + } + + @Bean + public CommandLineRunner setupTopicDestinations(AmqpAdmin amqpAdmin, DestinationsConfig destinationsConfig) { + + return (args) -> { + + // For topic each consumer will have its own Queue, so no binding + destinationsConfig.getTopics() + .forEach((key, destination) -> { + + log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange()); + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + log.info("[I107] Topic Exchange successfully created."); + + }); + }; + } + + @PostMapping(value = "/queue/{name}") + public Mono> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getQueues() + .get(name); + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound().build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted().build(); + + }); + + } + + + /** + * Receive messages for the given queue + * @param name + * @return + */ + @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromQueue(@PathVariable String name) { + + final DestinationInfo d = destinationsConfig.getQueues().get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound().build()); + } + + Stream s = Stream.generate(() -> { + String queueName = d.getRoutingKey(); + + log.info("[I137] Polling {}", queueName); + + Object payload = amqpTemplate.receiveAndConvert(queueName,5000); + if ( payload == null ) { + payload = "No news is good news..."; + } + + return payload.toString(); + }); + + + return Flux + .fromStream(s) + .subscribeOn(Schedulers.elastic()); + + } + + /** + * send message to a given topic + * @param name + * @param payload + * @return + */ + @PostMapping(value = "/topic/{name}") + public Mono> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getTopics().get(name); + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound().build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted().build(); + + }); + } + + + @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromTopic(@PathVariable String name) { + + DestinationInfo d = destinationsConfig.getTopics().get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound().build()); + } + + final Queue topicQueue = createTopicQueue(d); + + Stream s = Stream.generate(() -> { + String queueName = topicQueue.getName(); + + log.info("[I137] Polling {}", queueName); + + try { + Object payload = amqpTemplate.receiveAndConvert(queueName,5000); + if ( payload == null ) { + payload = "No news is good news..."; + } + + return payload.toString(); + } + catch(AmqpException ex) { + log.warn("[W247] Received an AMQP Exception: {}", ex.getMessage()); + return null; + } + }); + + + return Flux.fromStream(s) + .doOnCancel(() -> { + log.info("[I250] doOnCancel()"); + amqpAdmin.deleteQueue(topicQueue.getName()); + }) + .subscribeOn(Schedulers.elastic()); + + + } + + + private Queue createTopicQueue(DestinationInfo destination) { + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + // Create a durable queue + Queue q = QueueBuilder + .durable() + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + + amqpAdmin.declareBinding(b); + + return q; + } + + +} diff --git a/spring-webflux-amqp/src/main/resources/application.yml b/spring-webflux-amqp/src/main/resources/application.yml new file mode 100755 index 0000000000..3f527ce4c5 --- /dev/null +++ b/spring-webflux-amqp/src/main/resources/application.yml @@ -0,0 +1,27 @@ +spring: + rabbitmq: + host: 192.168.99.100 + port: 5672 + username: guest + password: guest + +destinations: + queues: + NYSE: + exchange: nyse + routing-key: NYSE + IBOV: + exchange: ibov + routing-key: IBOV + + + topics: + weather: + exchange: alerts + routing-key: WEATHER + + + + + + diff --git a/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java b/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java new file mode 100755 index 0000000000..bda490c189 --- /dev/null +++ b/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java @@ -0,0 +1,26 @@ +package org.baeldung.spring.amqp; + +import org.junit.Test; +import org.springframework.test.web.reactive.server.WebTestClient; + + +public class SpringWebfluxAmqpLiveTest { + + @Test + public void whenSendingAMessageToQueue_thenAcceptedReturnCode() { + + WebTestClient client = WebTestClient.bindToServer() + .baseUrl("http://localhost:8080") + .build(); + + client.post() + .uri("/queue/NYSE") + .syncBody("Test Message") + .exchange() + .expectStatus().isAccepted(); + + } + + + +}