diff --git a/spring-webflux-amqp/src/docker/docker-compose.yml b/spring-webflux-amqp/src/docker/docker-compose.yml deleted file mode 100755 index 03292aeb63..0000000000 --- a/spring-webflux-amqp/src/docker/docker-compose.yml +++ /dev/null @@ -1,23 +0,0 @@ -## -## Create a simple RabbitMQ environment with multiple clients -## -version: "3" - -services: - -## -## RabitMQ server -## - rabbitmq: - image: rabbitmq:3 - hostname: rabbit - environment: - RABBITMQ_ERLANG_COOKIE: test - ports: - - "5672:5672" - volumes: - - rabbitmq-data:/var/lib/rabbitmq - -volumes: - rabbitmq-data: - diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java new file mode 100644 index 0000000000..52f6d924fa --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java @@ -0,0 +1,307 @@ +package org.baeldung.spring.amqp; + +import java.time.Duration; +import java.util.Date; + +import javax.annotation.PostConstruct; + +import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.core.MessageListener; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@RestController +public class AmqpReactiveController { + + private static Logger log = LoggerFactory.getLogger(AmqpReactiveController.class); + + @Autowired + private AmqpTemplate amqpTemplate; + + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private DestinationsConfig destinationsConfig; + + @Autowired + private MessageListenerContainerFactory messageListenerContainerFactory; + + @PostConstruct + public void setupQueueDestinations() { + + log.info("[I48] Creating Destinations..."); + + destinationsConfig.getQueues() + .forEach((key, destination) -> { + + log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey()); + + Exchange ex = ExchangeBuilder.directExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + Queue q = QueueBuilder.durable(destination.getRoutingKey()) + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + + amqpAdmin.declareBinding(b); + + log.info("[I70] Binding successfully created."); + + }); + } + + @PostConstruct + public void setupTopicDestinations() { + + // For topic each consumer will have its own Queue, so no binding + destinationsConfig.getTopics() + .forEach((key, destination) -> { + + log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange()); + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + log.info("[I107] Topic Exchange successfully created."); + + }); + } + + @PostMapping(value = "/queue/{name}") + public Mono> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getQueues() + .get(name); + + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound() + .build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted() + .build(); + + }); + + } + + /** + * Receive messages for the given queue + * @param name + * @param errorHandler + * @return + */ + @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromQueue(@PathVariable String name) { + + DestinationInfo d = destinationsConfig.getQueues() + .get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound() + .build()); + } + + MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(d.getRoutingKey()); + + Flux f = Flux. create(emitter -> { + + log.info("[I168] Adding listener, queue={}", d.getRoutingKey()); + mlc.setupMessageListener((MessageListener) m -> { + + String qname = m.getMessageProperties() + .getConsumerQueue(); + + log.info("[I137] Message received, queue={}", qname); + + if (emitter.isCancelled()) { + log.info("[I166] cancelled, queue={}", qname); + mlc.stop(); + return; + } + + String payload = new String(m.getBody()); + emitter.next(payload); + + log.info("[I176] Message sent to client, queue={}", qname); + + }); + + emitter.onRequest(v -> { + log.info("[I171] Starting container, queue={}", d.getRoutingKey()); + mlc.start(); + }); + + emitter.onDispose(() -> { + log.info("[I176] onDispose: queue={}", d.getRoutingKey()); + mlc.stop(); + }); + + log.info("[I171] Container started, queue={}", d.getRoutingKey()); + + }); + + + return Flux.interval(Duration.ofSeconds(5)) + .map(v -> { + log.info("[I209] sending keepalive message..."); + return "No news is good news"; + }) + .mergeWith(f); + } + + /** + * send message to a given topic + * @param name + * @param payload + * @return + */ + @PostMapping(value = "/topic/{name}") + public Mono> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getTopics() + .get(name); + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound() + .build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted() + .build(); + + }); + } + + @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromTopic(@PathVariable String name) { + + DestinationInfo d = destinationsConfig.getTopics() + .get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound() + .build()); + } + + Queue topicQueue = createTopicQueue(d); + String qname = topicQueue.getName(); + + MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname); + + Flux f = Flux. create(emitter -> { + + log.info("[I168] Adding listener, queue={}", qname); + + mlc.setupMessageListener((MessageListener) m -> { + + log.info("[I137] Message received, queue={}", qname); + + if (emitter.isCancelled()) { + log.info("[I166] cancelled, queue={}", qname); + mlc.stop(); + return; + } + + String payload = new String(m.getBody()); + emitter.next(payload); + + log.info("[I176] Message sent to client, queue={}", qname); + + }); + + emitter.onRequest(v -> { + log.info("[I171] Starting container, queue={}", qname); + mlc.start(); + }); + + emitter.onDispose(() -> { + log.info("[I176] onDispose: queue={}", qname); + amqpAdmin.deleteQueue(qname); + mlc.stop(); + }); + + log.info("[I171] Container started, queue={}", qname); + + }); + + return Flux.interval(Duration.ofSeconds(5)) + .map(v -> { + log.info("[I209] sending keepalive message..."); + return "No news is good news"; + }) + .mergeWith(f); + + } + + private Queue createTopicQueue(DestinationInfo destination) { + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + Queue q = QueueBuilder.nonDurable() + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + + amqpAdmin.declareBinding(b); + + return q; + } + +} diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java new file mode 100644 index 0000000000..29b8d28a80 --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java @@ -0,0 +1,29 @@ +package org.baeldung.spring.amqp; + +import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MessageListenerContainerFactory { + + @Autowired + private ConnectionFactory connectionFactory; + + public MessageListenerContainerFactory() { + } + + public MessageListenerContainer createMessageListenerContainer(String queueName) { + + SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory); + + mlc.addQueueNames(queueName); + mlc.setAcknowledgeMode(AcknowledgeMode.AUTO); + + return mlc; + } + +}