BAEL-427 Examples for topic and fanout exchanges. (#1648)
* BAEL-427 Examples for topic and fanout exchanges. * BAEL-427 Separating code for the new article from the old one
This commit is contained in:
parent
f674f12eac
commit
709857b9aa
|
@ -2,7 +2,7 @@ package com.baeldung.springamqpsimple;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
|
@ -10,6 +10,7 @@ public class MessageConsumer {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
|
||||||
|
|
||||||
|
@RabbitListener(queues = {SpringAmqpConfig.queueName})
|
||||||
public void receiveMessage(String message) {
|
public void receiveMessage(String message) {
|
||||||
logger.info("Received Message: " + message);
|
logger.info("Received Message: " + message);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
package com.baeldung.springamqpsimple;
|
package com.baeldung.springamqpsimple;
|
||||||
|
|
||||||
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.core.Binding;
|
||||||
|
import org.springframework.amqp.core.BindingBuilder;
|
||||||
|
import org.springframework.amqp.core.DirectExchange;
|
||||||
|
import org.springframework.amqp.core.Exchange;
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||||
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
|
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
|
||||||
|
@ -32,7 +36,7 @@ public class SpringAmqpConfig {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
|
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
|
||||||
MessageListenerAdapter listenerAdapter) {
|
MessageListenerAdapter listenerAdapter) {
|
||||||
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
|
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
|
||||||
container.setConnectionFactory(connectionFactory);
|
container.setConnectionFactory(connectionFactory);
|
||||||
container.setQueueNames(queueName);
|
container.setQueueNames(queueName);
|
||||||
|
@ -44,5 +48,4 @@ public class SpringAmqpConfig {
|
||||||
MessageListenerAdapter listenerAdapter(MessageConsumer messageReceiver) {
|
MessageListenerAdapter listenerAdapter(MessageConsumer messageReceiver) {
|
||||||
return new MessageListenerAdapter(messageReceiver, "receiveMessage");
|
return new MessageListenerAdapter(messageReceiver, "receiveMessage");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
package com.baeldung.springamqpsimple.broadcast;
|
||||||
|
|
||||||
|
import org.springframework.amqp.core.BindingBuilder;
|
||||||
|
import org.springframework.amqp.core.Declarable;
|
||||||
|
import org.springframework.amqp.core.DirectExchange;
|
||||||
|
import org.springframework.amqp.core.FanoutExchange;
|
||||||
|
import org.springframework.amqp.core.Queue;
|
||||||
|
import org.springframework.amqp.core.TopicExchange;
|
||||||
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||||
|
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.Profile;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@Profile("!test")
|
||||||
|
public class BroadcastConfig {
|
||||||
|
|
||||||
|
public final static String fanoutQueue1Name = "com.baeldung.spring-amqp-simple.fanout.queue1";
|
||||||
|
public final static String fanoutQueue2Name = "com.baeldung.spring-amqp-simple.fanout.queue2";
|
||||||
|
public final static String fanoutExchangeName = "com.baeldung.spring-amqp-simple.fanout.exchange";
|
||||||
|
|
||||||
|
public final static String topicQueue1Name = "com.baeldung.spring-amqp-simple.topic.queue1";
|
||||||
|
public final static String topicQueue2Name = "com.baeldung.spring-amqp-simple.topic.queue2";
|
||||||
|
public final static String topicExchangeName = "com.baeldung.spring-amql-simple.topic.exchange";
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public List<Declarable> topicBindings() {
|
||||||
|
Queue topicQueue1 = new Queue(topicQueue1Name, false);
|
||||||
|
Queue topicQueue2 = new Queue(topicQueue2Name, false);
|
||||||
|
|
||||||
|
TopicExchange topicExchange = new TopicExchange(topicExchangeName);
|
||||||
|
|
||||||
|
return Arrays.asList(
|
||||||
|
topicQueue1,
|
||||||
|
topicQueue2,
|
||||||
|
topicExchange,
|
||||||
|
BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.important.*"),
|
||||||
|
BindingBuilder.bind(topicQueue2).to(topicExchange).with("user.#")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public List<Declarable> fanoutBindings() {
|
||||||
|
Queue fanoutQueue1 = new Queue(fanoutQueue1Name, false);
|
||||||
|
Queue fanoutQueue2 = new Queue(fanoutQueue2Name, false);
|
||||||
|
|
||||||
|
FanoutExchange fanoutExchange = new FanoutExchange(fanoutExchangeName);
|
||||||
|
|
||||||
|
return Arrays.asList(
|
||||||
|
fanoutQueue1,
|
||||||
|
fanoutQueue2,
|
||||||
|
fanoutExchange,
|
||||||
|
BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
|
||||||
|
BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public SimpleRabbitListenerContainerFactory container(ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) {
|
||||||
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||||
|
configurer.configure(factory, connectionFactory);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
package com.baeldung.springamqpsimple.broadcast;
|
||||||
|
|
||||||
|
import com.baeldung.springamqpsimple.MessageConsumer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BroadcastMessageConsumers {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
|
||||||
|
|
||||||
|
@RabbitListener(queues = {BroadcastConfig.fanoutQueue1Name})
|
||||||
|
public void receiveMessageFromFanout1(String message) {
|
||||||
|
logger.info("Received fanout 1 message: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = {BroadcastConfig.fanoutQueue2Name})
|
||||||
|
public void receiveMessageFromFanout2(String message) {
|
||||||
|
logger.info("Received fanout 2 message: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = {BroadcastConfig.topicQueue1Name})
|
||||||
|
public void receiveMessageFromTopic1(String message) {
|
||||||
|
logger.info("Received topic 1 message: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@RabbitListener(queues = {BroadcastConfig.topicQueue2Name})
|
||||||
|
public void receiveMessageFromTopic2(String message) {
|
||||||
|
logger.info("Received topic 2 message: " + message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package com.baeldung.springamqpsimple.broadcast;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.stereotype.Controller;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMethod;
|
||||||
|
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||||
|
|
||||||
|
@Controller
|
||||||
|
public class BroadcastMessageController {
|
||||||
|
|
||||||
|
private final BroadcastMessageProducer messageProducer;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public BroadcastMessageController(BroadcastMessageProducer messageProducer) {
|
||||||
|
this.messageProducer = messageProducer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@RequestMapping(value="/broadcast", method= RequestMethod.POST)
|
||||||
|
@ResponseStatus(value= HttpStatus.CREATED)
|
||||||
|
public void sendMessage(@RequestBody String message) {
|
||||||
|
messageProducer.sendMessages(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.baeldung.springamqpsimple.broadcast;
|
||||||
|
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BroadcastMessageProducer {
|
||||||
|
|
||||||
|
private final RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public BroadcastMessageProducer(RabbitTemplate rabbitTemplate) {
|
||||||
|
this.rabbitTemplate = rabbitTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessages(String message) {
|
||||||
|
rabbitTemplate.convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
|
||||||
|
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
|
||||||
|
rabbitTemplate.convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
spring:
|
spring:
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
username: baeldung
|
username: guest
|
||||||
password: baeldung
|
password: guest
|
||||||
|
host: 10.10.10.105
|
|
@ -0,0 +1,48 @@
|
||||||
|
package broadcast;
|
||||||
|
|
||||||
|
import com.baeldung.springamqpsimple.broadcast.BroadcastConfig;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
|
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
|
||||||
|
|
||||||
|
@RunWith(SpringRunner.class)
|
||||||
|
@ActiveProfiles("test")
|
||||||
|
@SpringBootTest(webEnvironment = RANDOM_PORT)
|
||||||
|
public class BroadcastMessageControllerIntegrationTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TestRestTemplate restTemplate;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenPostingMessage_thenMessageIsCreated() {
|
||||||
|
final String message = "Hello World!";
|
||||||
|
ResponseEntity<Void> responseEntity = restTemplate.postForEntity("/broadcast", message, Void.class);
|
||||||
|
|
||||||
|
assertEquals(HttpStatus.CREATED, responseEntity.getStatusCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenPostingMessage_thenMessageIsSentToBroker() {
|
||||||
|
final String message = "Hello World!";
|
||||||
|
restTemplate.postForEntity("/broadcast", message, Void.class);
|
||||||
|
|
||||||
|
verify(rabbitTemplate).convertAndSend(BroadcastConfig.fanoutExchangeName, "", message);
|
||||||
|
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.not-important.info", message);
|
||||||
|
verify(rabbitTemplate).convertAndSend(BroadcastConfig.topicExchangeName, "user.important.error", message);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue