diff --git a/pom.xml b/pom.xml
index d42c3ac617..38f88a2784 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,7 @@
performance-tests
twilio
java-ee-8-security-api
+ spring-webflux-amqp
diff --git a/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java b/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java
similarity index 96%
rename from spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java
rename to spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java
index b54ac16125..11d9daf3bf 100644
--- a/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorTest.java
+++ b/spring-core/src/test/java/com/baeldung/dependson/processor/FileProcessorIntegrationTest.java
@@ -16,7 +16,7 @@ import com.baeldung.dependson.shared.File;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TestConfig.class)
-public class FileProcessorTest {
+public class FileProcessorIntegrationTest {
@Autowired
ApplicationContext context;
diff --git a/spring-webflux-amqp/.gitignore b/spring-webflux-amqp/.gitignore
new file mode 100755
index 0000000000..82eca336e3
--- /dev/null
+++ b/spring-webflux-amqp/.gitignore
@@ -0,0 +1,25 @@
+/target/
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/build/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
\ No newline at end of file
diff --git a/spring-webflux-amqp/pom.xml b/spring-webflux-amqp/pom.xml
new file mode 100755
index 0000000000..75fafd7b10
--- /dev/null
+++ b/spring-webflux-amqp/pom.xml
@@ -0,0 +1,92 @@
+
+
+ 4.0.0
+
+ org.baeldung.spring
+ spring-webflux-amqp
+ 1.0.0-SNAPSHOT
+ jar
+
+ spring-webflux-amqp
+ Spring WebFlux AMQP Sample
+
+
+ parent-boot-2
+ com.baeldung
+ 0.0.1-SNAPSHOT
+ ../parent-boot-2
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+
+ org.springframework.boot
+ spring-boot-starter-integration
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/libs-snapshot
+
+ true
+
+
+
+
+
+
+ spring-releases
+ Spring Releases
+ https://repo.spring.io/libs-release
+
+
+
+
+
diff --git a/spring-webflux-amqp/src/docker/docker-compose.yml b/spring-webflux-amqp/src/docker/docker-compose.yml
new file mode 100755
index 0000000000..03292aeb63
--- /dev/null
+++ b/spring-webflux-amqp/src/docker/docker-compose.yml
@@ -0,0 +1,23 @@
+##
+## Create a simple RabbitMQ environment with multiple clients
+##
+version: "3"
+
+services:
+
+##
+## RabitMQ server
+##
+ rabbitmq:
+ image: rabbitmq:3
+ hostname: rabbit
+ environment:
+ RABBITMQ_ERLANG_COOKIE: test
+ ports:
+ - "5672:5672"
+ volumes:
+ - rabbitmq-data:/var/lib/rabbitmq
+
+volumes:
+ rabbitmq-data:
+
diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java
new file mode 100755
index 0000000000..410b87c764
--- /dev/null
+++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/DestinationsConfig.java
@@ -0,0 +1,59 @@
+package org.baeldung.spring.amqp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("destinations")
+public class DestinationsConfig {
+
+
+ private Map queues = new HashMap<>();
+
+ private Map topics = new HashMap<>();
+
+
+
+ public Map getQueues() {
+ return queues;
+ }
+
+ public void setQueues(Map queues) {
+ this.queues = queues;
+ }
+
+ public Map getTopics() {
+ return topics;
+ }
+
+ public void setTopics(Map topics) {
+ this.topics = topics;
+ }
+
+ // DestinationInfo stores the Exchange name and routing key used
+ // by our producers when posting messages
+ static class DestinationInfo {
+
+ private String exchange;
+ private String routingKey;
+
+
+ public String getExchange() {
+ return exchange;
+ }
+ public void setExchange(String exchange) {
+ this.exchange = exchange;
+ }
+ public String getRoutingKey() {
+ return routingKey;
+ }
+ public void setRoutingKey(String routingKey) {
+ this.routingKey = routingKey;
+ }
+
+
+
+ }
+
+}
diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java
new file mode 100755
index 0000000000..eb3b858ddc
--- /dev/null
+++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java
@@ -0,0 +1,270 @@
+package org.baeldung.spring.amqp;
+
+import java.util.stream.Stream;
+
+import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.AmqpTemplate;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Exchange;
+import org.springframework.amqp.core.ExchangeBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.QueueBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@SpringBootApplication
+@EnableConfigurationProperties(DestinationsConfig.class)
+@RestController
+public class SpringWebfluxAmqpApplication {
+
+ private static Logger log = LoggerFactory.getLogger(SpringWebfluxAmqpApplication.class);
+
+ @Autowired
+ private AmqpTemplate amqpTemplate;
+
+ @Autowired
+ private AmqpAdmin amqpAdmin;
+
+ @Autowired
+ private DestinationsConfig destinationsConfig;
+
+
+ public static void main(String[] args) {
+ SpringApplication.run(SpringWebfluxAmqpApplication.class, args);
+ }
+
+ @Bean
+ public CommandLineRunner setupQueueDestinations(AmqpAdmin amqpAdmin,DestinationsConfig destinationsConfig) {
+
+ return (args) -> {
+
+ log.info("[I48] Creating Destinations...");
+
+ destinationsConfig.getQueues()
+ .forEach((key, destination) -> {
+
+ log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey());
+
+ Exchange ex = ExchangeBuilder
+ .directExchange(destination.getExchange())
+ .durable(true)
+ .build();
+
+ amqpAdmin.declareExchange(ex);
+
+ Queue q = QueueBuilder
+ .durable(destination.getRoutingKey())
+ .build();
+
+ amqpAdmin.declareQueue(q);
+
+ Binding b = BindingBuilder.bind(q)
+ .to(ex)
+ .with(destination.getRoutingKey())
+ .noargs();
+ amqpAdmin.declareBinding(b);
+
+ log.info("[I70] Binding successfully created.");
+
+ });
+
+ };
+
+ }
+
+ @Bean
+ public CommandLineRunner setupTopicDestinations(AmqpAdmin amqpAdmin, DestinationsConfig destinationsConfig) {
+
+ return (args) -> {
+
+ // For topic each consumer will have its own Queue, so no binding
+ destinationsConfig.getTopics()
+ .forEach((key, destination) -> {
+
+ log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange());
+
+ Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
+ .durable(true)
+ .build();
+
+ amqpAdmin.declareExchange(ex);
+
+ log.info("[I107] Topic Exchange successfully created.");
+
+ });
+ };
+ }
+
+ @PostMapping(value = "/queue/{name}")
+ public Mono> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) {
+
+ // Lookup exchange details
+ final DestinationInfo d = destinationsConfig.getQueues()
+ .get(name);
+ if (d == null) {
+ // Destination not found.
+ return Mono.just(ResponseEntity.notFound().build());
+ }
+
+ return Mono.fromCallable(() -> {
+
+ log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey());
+ amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
+
+ return ResponseEntity.accepted().build();
+
+ });
+
+ }
+
+
+ /**
+ * Receive messages for the given queue
+ * @param name
+ * @return
+ */
+ @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux> receiveMessagesFromQueue(@PathVariable String name) {
+
+ final DestinationInfo d = destinationsConfig.getQueues().get(name);
+
+ if (d == null) {
+ return Flux.just(ResponseEntity.notFound().build());
+ }
+
+ Stream s = Stream.generate(() -> {
+ String queueName = d.getRoutingKey();
+
+ log.info("[I137] Polling {}", queueName);
+
+ Object payload = amqpTemplate.receiveAndConvert(queueName,5000);
+ if ( payload == null ) {
+ payload = "No news is good news...";
+ }
+
+ return payload.toString();
+ });
+
+
+ return Flux
+ .fromStream(s)
+ .subscribeOn(Schedulers.elastic());
+
+ }
+
+ /**
+ * send message to a given topic
+ * @param name
+ * @param payload
+ * @return
+ */
+ @PostMapping(value = "/topic/{name}")
+ public Mono> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) {
+
+ // Lookup exchange details
+ final DestinationInfo d = destinationsConfig.getTopics().get(name);
+ if (d == null) {
+ // Destination not found.
+ return Mono.just(ResponseEntity.notFound().build());
+ }
+
+ return Mono.fromCallable(() -> {
+
+ log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey());
+ amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload);
+
+ return ResponseEntity.accepted().build();
+
+ });
+ }
+
+
+ @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux> receiveMessagesFromTopic(@PathVariable String name) {
+
+ DestinationInfo d = destinationsConfig.getTopics().get(name);
+
+ if (d == null) {
+ return Flux.just(ResponseEntity.notFound().build());
+ }
+
+ final Queue topicQueue = createTopicQueue(d);
+
+ Stream s = Stream.generate(() -> {
+ String queueName = topicQueue.getName();
+
+ log.info("[I137] Polling {}", queueName);
+
+ try {
+ Object payload = amqpTemplate.receiveAndConvert(queueName,5000);
+ if ( payload == null ) {
+ payload = "No news is good news...";
+ }
+
+ return payload.toString();
+ }
+ catch(AmqpException ex) {
+ log.warn("[W247] Received an AMQP Exception: {}", ex.getMessage());
+ return null;
+ }
+ });
+
+
+ return Flux.fromStream(s)
+ .doOnCancel(() -> {
+ log.info("[I250] doOnCancel()");
+ amqpAdmin.deleteQueue(topicQueue.getName());
+ })
+ .subscribeOn(Schedulers.elastic());
+
+
+ }
+
+
+ private Queue createTopicQueue(DestinationInfo destination) {
+
+ Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange())
+ .durable(true)
+ .build();
+
+ amqpAdmin.declareExchange(ex);
+
+ // Create a durable queue
+ Queue q = QueueBuilder
+ .durable()
+ .build();
+
+ amqpAdmin.declareQueue(q);
+
+ Binding b = BindingBuilder.bind(q)
+ .to(ex)
+ .with(destination.getRoutingKey())
+ .noargs();
+
+ amqpAdmin.declareBinding(b);
+
+ return q;
+ }
+
+
+}
diff --git a/spring-webflux-amqp/src/main/resources/application.yml b/spring-webflux-amqp/src/main/resources/application.yml
new file mode 100755
index 0000000000..3f527ce4c5
--- /dev/null
+++ b/spring-webflux-amqp/src/main/resources/application.yml
@@ -0,0 +1,27 @@
+spring:
+ rabbitmq:
+ host: 192.168.99.100
+ port: 5672
+ username: guest
+ password: guest
+
+destinations:
+ queues:
+ NYSE:
+ exchange: nyse
+ routing-key: NYSE
+ IBOV:
+ exchange: ibov
+ routing-key: IBOV
+
+
+ topics:
+ weather:
+ exchange: alerts
+ routing-key: WEATHER
+
+
+
+
+
+
diff --git a/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java b/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java
new file mode 100755
index 0000000000..bda490c189
--- /dev/null
+++ b/spring-webflux-amqp/src/test/java/org/baeldung/spring/amqp/SpringWebfluxAmqpLiveTest.java
@@ -0,0 +1,26 @@
+package org.baeldung.spring.amqp;
+
+import org.junit.Test;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+
+public class SpringWebfluxAmqpLiveTest {
+
+ @Test
+ public void whenSendingAMessageToQueue_thenAcceptedReturnCode() {
+
+ WebTestClient client = WebTestClient.bindToServer()
+ .baseUrl("http://localhost:8080")
+ .build();
+
+ client.post()
+ .uri("/queue/NYSE")
+ .syncBody("Test Message")
+ .exchange()
+ .expectStatus().isAccepted();
+
+ }
+
+
+
+}