From e3b4b5f5523dcb96bf83aad20181bd0dd5b05e81 Mon Sep 17 00:00:00 2001 From: pcoates Date: Fri, 2 Aug 2019 16:43:55 +0100 Subject: [PATCH] BAEL-2688 Moved example code from spring-amqp-simple to spring-amqp project. Also updated for Spring Boot. --- .../springamqp/broadcast/BroadcastConfig.java | 52 +++++++++++++++++++ .../broadcast/BroadcastMessageApp.java | 51 ++++++++++++++++++ .../simple/HelloWorldMessageApp.java | 38 ++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastConfig.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastMessageApp.java create mode 100644 spring-amqp/src/main/java/com/baeldung/springamqp/simple/HelloWorldMessageApp.java diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastConfig.java b/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastConfig.java new file mode 100644 index 0000000000..12c5987cc4 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastConfig.java @@ -0,0 +1,52 @@ +package com.baeldung.springamqp.broadcast; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class BroadcastConfig { + + private static final boolean NON_DURABLE = false; + + public final static String FANOUT_QUEUE_1_NAME = "com.baeldung.spring-amqp-simple.fanout.queue1"; + public final static String FANOUT_QUEUE_2_NAME = "com.baeldung.spring-amqp-simple.fanout.queue2"; + public final static String FANOUT_EXCHANGE_NAME = "com.baeldung.spring-amqp-simple.fanout.exchange"; + + public final static String TOPIC_QUEUE_1_NAME = "com.baeldung.spring-amqp-simple.topic.queue1"; + public final static String TOPIC_QUEUE_2_NAME = "com.baeldung.spring-amqp-simple.topic.queue2"; + public final static String TOPIC_EXCHANGE_NAME = "com.baeldung.spring-amqp-simple.topic.exchange"; + public static final String BINDING_PATTERN_IMPORTANT = "*.important.*"; + public static final String BINDING_PATTERN_ERROR = "#.error"; + + @Bean + public Declarables topicBindings() { + Queue topicQueue1 = new Queue(TOPIC_QUEUE_1_NAME, NON_DURABLE); + Queue topicQueue2 = new Queue(TOPIC_QUEUE_2_NAME, NON_DURABLE); + + TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE_NAME, NON_DURABLE, false); + + return new Declarables(topicQueue1, topicQueue2, topicExchange, BindingBuilder + .bind(topicQueue1) + .to(topicExchange) + .with(BINDING_PATTERN_IMPORTANT), BindingBuilder + .bind(topicQueue2) + .to(topicExchange) + .with(BINDING_PATTERN_ERROR)); + } + + @Bean + public Declarables fanoutBindings() { + Queue fanoutQueue1 = new Queue(FANOUT_QUEUE_1_NAME, NON_DURABLE); + Queue fanoutQueue2 = new Queue(FANOUT_QUEUE_2_NAME, NON_DURABLE); + + FanoutExchange fanoutExchange = new FanoutExchange(FANOUT_EXCHANGE_NAME, NON_DURABLE, false); + + return new Declarables(fanoutQueue1, fanoutQueue2, fanoutExchange, BindingBuilder + .bind(fanoutQueue1) + .to(fanoutExchange), BindingBuilder + .bind(fanoutQueue2) + .to(fanoutExchange)); + } + +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastMessageApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastMessageApp.java new file mode 100644 index 0000000000..d90087ec5c --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/broadcast/BroadcastMessageApp.java @@ -0,0 +1,51 @@ +package com.baeldung.springamqp.broadcast; + +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import static com.baeldung.springamqp.broadcast.BroadcastConfig.*; + +@SpringBootApplication +public class BroadcastMessageApp { + + private static String ROUTING_KEY_USER_IMPORTANT_WARN = "user.important.warn"; + private static String ROUTING_KEY_USER_IMPORTANT_ERROR = "user.important.error"; + + public static void main(String[] args) { + SpringApplication.run(BroadcastMessageApp.class, args); + } + + @Bean + public ApplicationRunner runner(RabbitTemplate rabbitTemplate) { + String message = " payload is broadcast"; + return args -> { + rabbitTemplate.convertAndSend(BroadcastConfig.FANOUT_EXCHANGE_NAME, "", "fanout" + message); + rabbitTemplate.convertAndSend(BroadcastConfig.TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); + rabbitTemplate.convertAndSend(BroadcastConfig.TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); + }; + } + + @RabbitListener(queues = { FANOUT_QUEUE_1_NAME }) + public void receiveMessageFromFanout1(String message) { + System.out.println("Received fanout 1 message: " + message); + } + + @RabbitListener(queues = { FANOUT_QUEUE_2_NAME }) + public void receiveMessageFromFanout2(String message) { + System.out.println("Received fanout 2 message: " + message); + } + + @RabbitListener(queues = { TOPIC_QUEUE_1_NAME }) + public void receiveMessageFromTopic1(String message) { + System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); + } + + @RabbitListener(queues = { TOPIC_QUEUE_2_NAME }) + public void receiveMessageFromTopic2(String message) { + System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); + } +} diff --git a/spring-amqp/src/main/java/com/baeldung/springamqp/simple/HelloWorldMessageApp.java b/spring-amqp/src/main/java/com/baeldung/springamqp/simple/HelloWorldMessageApp.java new file mode 100644 index 0000000000..25dcdf29c1 --- /dev/null +++ b/spring-amqp/src/main/java/com/baeldung/springamqp/simple/HelloWorldMessageApp.java @@ -0,0 +1,38 @@ +package com.baeldung.springamqp.simple; + +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class HelloWorldMessageApp { + + private static final boolean NON_DURABLE = false; + private static final String MY_QUEUE_NAME = "myQueue"; + + public static void main(String[] args) { + SpringApplication.run(HelloWorldMessageApp.class, args); + } + + @Bean + public ApplicationRunner runner(RabbitTemplate template) { + return args -> { + template.convertAndSend("myQueue", "Hello, world!"); + }; + } + + @Bean + public Queue myQueue() { + return new Queue(MY_QUEUE_NAME, NON_DURABLE); + } + + @RabbitListener(queues = MY_QUEUE_NAME) + public void listen(String in) { + System.out.println("Message read from myQueue : " + in); + } + +}