From 40dde25b799f03be6135c640b2e16661d5b11c59 Mon Sep 17 00:00:00 2001 From: Philippe Date: Fri, 20 Jul 2018 18:37:02 -0300 Subject: [PATCH] BAEL-1992 --- mqtt/pom.xml | 23 ++++ .../mqtt/EngineTemperatureSensor.java | 49 ++++++++ .../mqtt/EngineTemperatureSensorLiveTest.java | 108 ++++++++++++++++++ 3 files changed, 180 insertions(+) create mode 100644 mqtt/pom.xml create mode 100644 mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java create mode 100644 mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java diff --git a/mqtt/pom.xml b/mqtt/pom.xml new file mode 100644 index 0000000000..346433aa69 --- /dev/null +++ b/mqtt/pom.xml @@ -0,0 +1,23 @@ + + 4.0.0 + org.baeldung + mqtt + 0.0.1-SNAPSHOT + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.0 + + + + + diff --git a/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java new file mode 100644 index 0000000000..98111edb94 --- /dev/null +++ b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java @@ -0,0 +1,49 @@ +package com.baeldung.mqtt; + +import java.util.Random; +import java.util.concurrent.Callable; + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensor implements Callable { + + private static final Logger log = LoggerFactory.getLogger(EngineTemperatureSensor.class); + public static final String TOPIC = "engine/temperature"; + + private IMqttClient client; + private Random rnd = new Random(); + + public EngineTemperatureSensor(IMqttClient client) { + this.client = client; + } + + @Override + public Void call() throws Exception { + + if ( !client.isConnected()) { + log.info("[I31] Client not connected."); + return null; + } + + MqttMessage msg = readEngineTemp(); + msg.setQos(0); + msg.setRetained(true); + client.publish(TOPIC,msg); + + return null; + } + + /** + * This method simulates reading the engine temperature + * @return + */ + private MqttMessage readEngineTemp() { + double temp = 80 + rnd.nextDouble() * 20.0; + byte[] payload = String.format("T:%04.2f",temp).getBytes(); + MqttMessage msg = new MqttMessage(payload); + return msg; + } +} \ No newline at end of file diff --git a/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java new file mode 100644 index 0000000000..94031b5415 --- /dev/null +++ b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java @@ -0,0 +1,108 @@ +package com.baeldung.mqtt; + + + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensorLiveTest { + + private static Logger log = LoggerFactory.getLogger(EngineTemperatureSensorLiveTest.class); + + @Test + public void whenSendSingleMessage_thenSuccess() throws Exception { + + String senderId = UUID.randomUUID().toString(); + MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); + + String receiverId = UUID.randomUUID().toString(); + MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + receiver.connect(options); + sender.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(1); + + receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(sender); + target.call(); + + receivedSignal.await(1, TimeUnit.MINUTES); + + log.info("[I51] Success !"); + } + + @Test + public void whenSendMultipleMessages_thenSuccess() throws Exception { + + String senderId = UUID.randomUUID().toString(); + MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId); + + String receiverId = UUID.randomUUID().toString(); + MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId); + + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + sender.connect(options); + receiver.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(10); + + receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload())); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(sender); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(() -> { + try { + target.call(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + }, 1, 1, TimeUnit.SECONDS); + + + receivedSignal.await(1, TimeUnit.DAYS); + executor.shutdown(); + + assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero"); + + log.info("[I51] Success !"); + } + + +}