From 172642e190d47c76a14a40b7053a6df9524c746f Mon Sep 17 00:00:00 2001 From: Philippe Date: Tue, 26 Jun 2018 01:10:00 -0300 Subject: [PATCH 1/4] BAEL-1474 Take2 --- .../src/docker/docker-compose.yml | 23 -- .../spring/amqp/AmqpReactiveController.java | 307 ++++++++++++++++++ .../amqp/MessageListenerContainerFactory.java | 29 ++ 3 files changed, 336 insertions(+), 23 deletions(-) delete mode 100755 spring-webflux-amqp/src/docker/docker-compose.yml create mode 100644 spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java create mode 100644 spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java diff --git a/spring-webflux-amqp/src/docker/docker-compose.yml b/spring-webflux-amqp/src/docker/docker-compose.yml deleted file mode 100755 index 03292aeb63..0000000000 --- a/spring-webflux-amqp/src/docker/docker-compose.yml +++ /dev/null @@ -1,23 +0,0 @@ -## -## Create a simple RabbitMQ environment with multiple clients -## -version: "3" - -services: - -## -## RabitMQ server -## - rabbitmq: - image: rabbitmq:3 - hostname: rabbit - environment: - RABBITMQ_ERLANG_COOKIE: test - ports: - - "5672:5672" - volumes: - - rabbitmq-data:/var/lib/rabbitmq - -volumes: - rabbitmq-data: - diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java new file mode 100644 index 0000000000..52f6d924fa --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/AmqpReactiveController.java @@ -0,0 +1,307 @@ +package org.baeldung.spring.amqp; + +import java.time.Duration; +import java.util.Date; + +import javax.annotation.PostConstruct; + +import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Exchange; +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.core.MessageListener; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +@RestController +public class AmqpReactiveController { + + private static Logger log = LoggerFactory.getLogger(AmqpReactiveController.class); + + @Autowired + private AmqpTemplate amqpTemplate; + + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private DestinationsConfig destinationsConfig; + + @Autowired + private MessageListenerContainerFactory messageListenerContainerFactory; + + @PostConstruct + public void setupQueueDestinations() { + + log.info("[I48] Creating Destinations..."); + + destinationsConfig.getQueues() + .forEach((key, destination) -> { + + log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey()); + + Exchange ex = ExchangeBuilder.directExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + Queue q = QueueBuilder.durable(destination.getRoutingKey()) + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + + amqpAdmin.declareBinding(b); + + log.info("[I70] Binding successfully created."); + + }); + } + + @PostConstruct + public void setupTopicDestinations() { + + // For topic each consumer will have its own Queue, so no binding + destinationsConfig.getTopics() + .forEach((key, destination) -> { + + log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange()); + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + log.info("[I107] Topic Exchange successfully created."); + + }); + } + + @PostMapping(value = "/queue/{name}") + public Mono> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getQueues() + .get(name); + + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound() + .build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted() + .build(); + + }); + + } + + /** + * Receive messages for the given queue + * @param name + * @param errorHandler + * @return + */ + @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromQueue(@PathVariable String name) { + + DestinationInfo d = destinationsConfig.getQueues() + .get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound() + .build()); + } + + MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(d.getRoutingKey()); + + Flux f = Flux. create(emitter -> { + + log.info("[I168] Adding listener, queue={}", d.getRoutingKey()); + mlc.setupMessageListener((MessageListener) m -> { + + String qname = m.getMessageProperties() + .getConsumerQueue(); + + log.info("[I137] Message received, queue={}", qname); + + if (emitter.isCancelled()) { + log.info("[I166] cancelled, queue={}", qname); + mlc.stop(); + return; + } + + String payload = new String(m.getBody()); + emitter.next(payload); + + log.info("[I176] Message sent to client, queue={}", qname); + + }); + + emitter.onRequest(v -> { + log.info("[I171] Starting container, queue={}", d.getRoutingKey()); + mlc.start(); + }); + + emitter.onDispose(() -> { + log.info("[I176] onDispose: queue={}", d.getRoutingKey()); + mlc.stop(); + }); + + log.info("[I171] Container started, queue={}", d.getRoutingKey()); + + }); + + + return Flux.interval(Duration.ofSeconds(5)) + .map(v -> { + log.info("[I209] sending keepalive message..."); + return "No news is good news"; + }) + .mergeWith(f); + } + + /** + * send message to a given topic + * @param name + * @param payload + * @return + */ + @PostMapping(value = "/topic/{name}") + public Mono> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) { + + // Lookup exchange details + final DestinationInfo d = destinationsConfig.getTopics() + .get(name); + if (d == null) { + // Destination not found. + return Mono.just(ResponseEntity.notFound() + .build()); + } + + return Mono.fromCallable(() -> { + + log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey()); + amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); + + return ResponseEntity.accepted() + .build(); + + }); + } + + @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux receiveMessagesFromTopic(@PathVariable String name) { + + DestinationInfo d = destinationsConfig.getTopics() + .get(name); + + if (d == null) { + return Flux.just(ResponseEntity.notFound() + .build()); + } + + Queue topicQueue = createTopicQueue(d); + String qname = topicQueue.getName(); + + MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname); + + Flux f = Flux. create(emitter -> { + + log.info("[I168] Adding listener, queue={}", qname); + + mlc.setupMessageListener((MessageListener) m -> { + + log.info("[I137] Message received, queue={}", qname); + + if (emitter.isCancelled()) { + log.info("[I166] cancelled, queue={}", qname); + mlc.stop(); + return; + } + + String payload = new String(m.getBody()); + emitter.next(payload); + + log.info("[I176] Message sent to client, queue={}", qname); + + }); + + emitter.onRequest(v -> { + log.info("[I171] Starting container, queue={}", qname); + mlc.start(); + }); + + emitter.onDispose(() -> { + log.info("[I176] onDispose: queue={}", qname); + amqpAdmin.deleteQueue(qname); + mlc.stop(); + }); + + log.info("[I171] Container started, queue={}", qname); + + }); + + return Flux.interval(Duration.ofSeconds(5)) + .map(v -> { + log.info("[I209] sending keepalive message..."); + return "No news is good news"; + }) + .mergeWith(f); + + } + + private Queue createTopicQueue(DestinationInfo destination) { + + Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) + .durable(true) + .build(); + + amqpAdmin.declareExchange(ex); + + Queue q = QueueBuilder.nonDurable() + .build(); + + amqpAdmin.declareQueue(q); + + Binding b = BindingBuilder.bind(q) + .to(ex) + .with(destination.getRoutingKey()) + .noargs(); + + amqpAdmin.declareBinding(b); + + return q; + } + +} diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java new file mode 100644 index 0000000000..29b8d28a80 --- /dev/null +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/MessageListenerContainerFactory.java @@ -0,0 +1,29 @@ +package org.baeldung.spring.amqp; + +import org.springframework.amqp.core.AcknowledgeMode; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class MessageListenerContainerFactory { + + @Autowired + private ConnectionFactory connectionFactory; + + public MessageListenerContainerFactory() { + } + + public MessageListenerContainer createMessageListenerContainer(String queueName) { + + SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory); + + mlc.addQueueNames(queueName); + mlc.setAcknowledgeMode(AcknowledgeMode.AUTO); + + return mlc; + } + +} From 652cf1e49c2be1a74b38b20319a374cc13f72678 Mon Sep 17 00:00:00 2001 From: Philippe Date: Sat, 30 Jun 2018 00:14:05 +0200 Subject: [PATCH 2/4] Remove extra code --- .../amqp/SpringWebfluxAmqpApplication.java | 255 ------------------ 1 file changed, 255 deletions(-) diff --git a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java index eb3b858ddc..30614e7ee6 100755 --- a/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java +++ b/spring-webflux-amqp/src/main/java/org/baeldung/spring/amqp/SpringWebfluxAmqpApplication.java @@ -1,270 +1,15 @@ package org.baeldung.spring.amqp; -import java.util.stream.Stream; - -import org.baeldung.spring.amqp.DestinationsConfig.DestinationInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.amqp.AmqpException; -import org.springframework.amqp.core.AmqpAdmin; -import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.Exchange; -import org.springframework.amqp.core.ExchangeBuilder; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.QueueBuilder; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; @SpringBootApplication @EnableConfigurationProperties(DestinationsConfig.class) -@RestController public class SpringWebfluxAmqpApplication { - private static Logger log = LoggerFactory.getLogger(SpringWebfluxAmqpApplication.class); - - @Autowired - private AmqpTemplate amqpTemplate; - - @Autowired - private AmqpAdmin amqpAdmin; - - @Autowired - private DestinationsConfig destinationsConfig; - - public static void main(String[] args) { SpringApplication.run(SpringWebfluxAmqpApplication.class, args); } - - @Bean - public CommandLineRunner setupQueueDestinations(AmqpAdmin amqpAdmin,DestinationsConfig destinationsConfig) { - - return (args) -> { - - log.info("[I48] Creating Destinations..."); - - destinationsConfig.getQueues() - .forEach((key, destination) -> { - - log.info("[I54] Creating directExchange: key={}, name={}, routingKey={}", key, destination.getExchange(), destination.getRoutingKey()); - - Exchange ex = ExchangeBuilder - .directExchange(destination.getExchange()) - .durable(true) - .build(); - - amqpAdmin.declareExchange(ex); - - Queue q = QueueBuilder - .durable(destination.getRoutingKey()) - .build(); - - amqpAdmin.declareQueue(q); - - Binding b = BindingBuilder.bind(q) - .to(ex) - .with(destination.getRoutingKey()) - .noargs(); - amqpAdmin.declareBinding(b); - - log.info("[I70] Binding successfully created."); - - }); - - }; - - } - - @Bean - public CommandLineRunner setupTopicDestinations(AmqpAdmin amqpAdmin, DestinationsConfig destinationsConfig) { - - return (args) -> { - - // For topic each consumer will have its own Queue, so no binding - destinationsConfig.getTopics() - .forEach((key, destination) -> { - - log.info("[I98] Creating TopicExchange: name={}, exchange={}", key, destination.getExchange()); - - Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) - .durable(true) - .build(); - - amqpAdmin.declareExchange(ex); - - log.info("[I107] Topic Exchange successfully created."); - - }); - }; - } - - @PostMapping(value = "/queue/{name}") - public Mono> sendMessageToQueue(@PathVariable String name, @RequestBody String payload) { - - // Lookup exchange details - final DestinationInfo d = destinationsConfig.getQueues() - .get(name); - if (d == null) { - // Destination not found. - return Mono.just(ResponseEntity.notFound().build()); - } - - return Mono.fromCallable(() -> { - - log.info("[I51] sendMessageToQueue: queue={}, routingKey={}", d.getExchange(), d.getRoutingKey()); - amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); - - return ResponseEntity.accepted().build(); - - }); - - } - - - /** - * Receive messages for the given queue - * @param name - * @return - */ - @GetMapping(value = "/queue/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public Flux receiveMessagesFromQueue(@PathVariable String name) { - - final DestinationInfo d = destinationsConfig.getQueues().get(name); - - if (d == null) { - return Flux.just(ResponseEntity.notFound().build()); - } - - Stream s = Stream.generate(() -> { - String queueName = d.getRoutingKey(); - - log.info("[I137] Polling {}", queueName); - - Object payload = amqpTemplate.receiveAndConvert(queueName,5000); - if ( payload == null ) { - payload = "No news is good news..."; - } - - return payload.toString(); - }); - - - return Flux - .fromStream(s) - .subscribeOn(Schedulers.elastic()); - - } - - /** - * send message to a given topic - * @param name - * @param payload - * @return - */ - @PostMapping(value = "/topic/{name}") - public Mono> sendMessageToTopic(@PathVariable String name, @RequestBody String payload) { - - // Lookup exchange details - final DestinationInfo d = destinationsConfig.getTopics().get(name); - if (d == null) { - // Destination not found. - return Mono.just(ResponseEntity.notFound().build()); - } - - return Mono.fromCallable(() -> { - - log.info("[I51] sendMessageToTopic: topic={}, routingKey={}", d.getExchange(), d.getRoutingKey()); - amqpTemplate.convertAndSend(d.getExchange(), d.getRoutingKey(), payload); - - return ResponseEntity.accepted().build(); - - }); - } - - - @GetMapping(value = "/topic/{name}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public Flux receiveMessagesFromTopic(@PathVariable String name) { - - DestinationInfo d = destinationsConfig.getTopics().get(name); - - if (d == null) { - return Flux.just(ResponseEntity.notFound().build()); - } - - final Queue topicQueue = createTopicQueue(d); - - Stream s = Stream.generate(() -> { - String queueName = topicQueue.getName(); - - log.info("[I137] Polling {}", queueName); - - try { - Object payload = amqpTemplate.receiveAndConvert(queueName,5000); - if ( payload == null ) { - payload = "No news is good news..."; - } - - return payload.toString(); - } - catch(AmqpException ex) { - log.warn("[W247] Received an AMQP Exception: {}", ex.getMessage()); - return null; - } - }); - - - return Flux.fromStream(s) - .doOnCancel(() -> { - log.info("[I250] doOnCancel()"); - amqpAdmin.deleteQueue(topicQueue.getName()); - }) - .subscribeOn(Schedulers.elastic()); - - - } - - - private Queue createTopicQueue(DestinationInfo destination) { - - Exchange ex = ExchangeBuilder.topicExchange(destination.getExchange()) - .durable(true) - .build(); - - amqpAdmin.declareExchange(ex); - - // Create a durable queue - Queue q = QueueBuilder - .durable() - .build(); - - amqpAdmin.declareQueue(q); - - Binding b = BindingBuilder.bind(q) - .to(ex) - .with(destination.getRoutingKey()) - .noargs(); - - amqpAdmin.declareBinding(b); - - return q; - } - } From 40dde25b799f03be6135c640b2e16661d5b11c59 Mon Sep 17 00:00:00 2001 From: Philippe Date: Fri, 20 Jul 2018 18:37:02 -0300 Subject: [PATCH 3/4] BAEL-1992 --- mqtt/pom.xml | 23 ++++ .../mqtt/EngineTemperatureSensor.java | 49 ++++++++ .../mqtt/EngineTemperatureSensorLiveTest.java | 108 ++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 mqtt/pom.xml create mode 100644 mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java create mode 100644 mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java diff --git a/mqtt/pom.xml b/mqtt/pom.xml new file mode 100644 index 0000000000..346433aa69 --- /dev/null +++ b/mqtt/pom.xml @@ -0,0 +1,23 @@ + + 4.0.0 + org.baeldung + mqtt + 0.0.1-SNAPSHOT + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.0 + + + + + diff --git a/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java new file mode 100644 index 0000000000..98111edb94 --- /dev/null +++ b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java @@ -0,0 +1,49 @@ +package com.baeldung.mqtt; + +import java.util.Random; +import java.util.concurrent.Callable; + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensor implements Callable { + + private static final Logger log = LoggerFactory.getLogger(EngineTemperatureSensor.class); + public static final String TOPIC = "engine/temperature"; + + private IMqttClient client; + private Random rnd = new Random(); + + public EngineTemperatureSensor(IMqttClient client) { + this.client = client; + } + + @Override + public Void call() throws Exception { + + if ( !client.isConnected()) { + log.info("[I31] Client not connected."); + return null; + } + + MqttMessage msg = readEngineTemp(); + msg.setQos(0); + msg.setRetained(true); + client.publish(TOPIC,msg); + + return null; + } + + /** + * This method simulates reading the engine temperature + * @return + */ + private MqttMessage readEngineTemp() { + double temp = 80 + rnd.nextDouble() * 20.0; + byte[] payload = String.format("T:%04.2f",temp).getBytes(); + MqttMessage msg = new MqttMessage(payload); + return msg; + } +} \ No newline at end of file diff --git a/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java new file mode 100644 index 0000000000..94031b5415 --- /dev/null +++ b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java @@ -0,0 +1,108 @@ +package com.baeldung.mqtt; + + + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensorLiveTest { + + private static Logger log = LoggerFactory.getLogger(EngineTemperatureSensorLiveTest.class); + + @Test + public void whenSendSingleMessage_thenSuccess() throws Exception { + + String senderId = UUID.randomUUID().toString(); + MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); + + String receiverId = UUID.randomUUID().toString(); + MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + receiver.connect(options); + sender.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(1); + + receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(sender); + target.call(); + + receivedSignal.await(1, TimeUnit.MINUTES); + + log.info("[I51] Success !"); + } + + @Test + public void whenSendMultipleMessages_thenSuccess() throws Exception { + + String senderId = UUID.randomUUID().toString(); + MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); + + String receiverId = UUID.randomUUID().toString(); + MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + sender.connect(options); + receiver.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(10); + + receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(sender); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(() -> { + try { + target.call(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + }, 1, 1, TimeUnit.SECONDS); + + + receivedSignal.await(1, TimeUnit.DAYS); + executor.shutdown(); + + assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero"); + + log.info("[I51] Success !"); + } + + +} From a1d1d6b16f5e44e4fb7f3a693cc3c89d9cb47a34 Mon Sep 17 00:00:00 2001 From: Philippe Date: Sat, 28 Jul 2018 13:37:39 -0300 Subject: [PATCH 4/4] [refs#BAEL-1992] Minor refactoring --- mqtt/README.md | 4 ++ .../mqtt/EngineTemperatureSensorLiveTest.java | 45 ++++++++++--------- 2 files changed, 27 insertions(+), 22 deletions(-) create mode 100644 mqtt/README.md diff --git a/mqtt/README.md b/mqtt/README.md new file mode 100644 index 0000000000..5a388aab4c --- /dev/null +++ b/mqtt/README.md @@ -0,0 +1,4 @@ +### Relevant Articles: +================================ + +- [MQTT Client in Java](http://www.baeldung.com/mqtt-client) diff --git a/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java index 94031b5415..b1c0002888 100644 --- a/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java +++ b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java @@ -24,12 +24,11 @@ public class EngineTemperatureSensorLiveTest { @Test public void whenSendSingleMessage_thenSuccess() throws Exception { - String senderId = UUID.randomUUID().toString(); - MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); - - String receiverId = UUID.randomUUID().toString(); - MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + String publisherId = UUID.randomUUID().toString(); + MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId); + String subscriberId = UUID.randomUUID().toString(); + MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId); MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); @@ -37,33 +36,34 @@ public class EngineTemperatureSensorLiveTest { options.setConnectionTimeout(10); - receiver.connect(options); - sender.connect(options); + subscriber.connect(options); + publisher.connect(options); CountDownLatch receivedSignal = new CountDownLatch(1); - receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { - log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + byte[] payload = msg.getPayload(); + log.info("[I46] Message received: topic={}, payload={}", topic, new String(payload)); receivedSignal.countDown(); }); - Callable target = new EngineTemperatureSensor(sender); + Callable target = new EngineTemperatureSensor(publisher); target.call(); receivedSignal.await(1, TimeUnit.MINUTES); - log.info("[I51] Success !"); + log.info("[I56] Success !"); } @Test public void whenSendMultipleMessages_thenSuccess() throws Exception { - String senderId = UUID.randomUUID().toString(); - MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); + String publisherId = UUID.randomUUID().toString(); + MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId); - String receiverId = UUID.randomUUID().toString(); - MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + String subscriberId = UUID.randomUUID().toString(); + MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId); MqttConnectOptions options = new MqttConnectOptions(); @@ -72,18 +72,19 @@ public class EngineTemperatureSensorLiveTest { options.setConnectionTimeout(10); - sender.connect(options); - receiver.connect(options); + publisher.connect(options); + subscriber.connect(options); CountDownLatch receivedSignal = new CountDownLatch(10); - receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { - log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + byte[] payload = msg.getPayload(); + log.info("[I82] Message received: topic={}, payload={}", topic, new String(payload)); receivedSignal.countDown(); }); - Callable target = new EngineTemperatureSensor(sender); + Callable target = new EngineTemperatureSensor(publisher); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(() -> { @@ -96,12 +97,12 @@ public class EngineTemperatureSensorLiveTest { }, 1, 1, TimeUnit.SECONDS); - receivedSignal.await(1, TimeUnit.DAYS); + receivedSignal.await(1, TimeUnit.MINUTES); executor.shutdown(); assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero"); - log.info("[I51] Success !"); + log.info("[I105] Success !"); }