diff --git a/mqtt/README.md b/mqtt/README.md new file mode 100644 index 0000000000..5a388aab4c --- /dev/null +++ b/mqtt/README.md @@ -0,0 +1,4 @@ +### Relevant Articles: +================================ + +- [MQTT Client in Java](http://www.baeldung.com/mqtt-client) diff --git a/mqtt/pom.xml b/mqtt/pom.xml new file mode 100644 index 0000000000..346433aa69 --- /dev/null +++ b/mqtt/pom.xml @@ -0,0 +1,23 @@ + + 4.0.0 + org.baeldung + mqtt + 0.0.1-SNAPSHOT + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.0 + + + + + diff --git a/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java new file mode 100644 index 0000000000..98111edb94 --- /dev/null +++ b/mqtt/src/main/java/com/baeldung/mqtt/EngineTemperatureSensor.java @@ -0,0 +1,49 @@ +package com.baeldung.mqtt; + +import java.util.Random; +import java.util.concurrent.Callable; + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensor implements Callable { + + private static final Logger log = LoggerFactory.getLogger(EngineTemperatureSensor.class); + public static final String TOPIC = "engine/temperature"; + + private IMqttClient client; + private Random rnd = new Random(); + + public EngineTemperatureSensor(IMqttClient client) { + this.client = client; + } + + @Override + public Void call() throws Exception { + + if ( !client.isConnected()) { + log.info("[I31] Client not connected."); + return null; + } + + MqttMessage msg = readEngineTemp(); + msg.setQos(0); + msg.setRetained(true); + client.publish(TOPIC,msg); + + return null; + } + + /** + * This method simulates reading the engine temperature + * @return + */ + private MqttMessage readEngineTemp() { + double temp = 80 + rnd.nextDouble() * 20.0; + byte[] payload = String.format("T:%04.2f",temp).getBytes(); + MqttMessage msg = new MqttMessage(payload); + return msg; + } +} \ No newline at end of file diff --git a/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java new file mode 100644 index 0000000000..b1c0002888 --- /dev/null +++ b/mqtt/src/test/java/com/baeldung/mqtt/EngineTemperatureSensorLiveTest.java @@ -0,0 +1,109 @@ +package com.baeldung.mqtt; + + + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EngineTemperatureSensorLiveTest { + + private static Logger log = LoggerFactory.getLogger(EngineTemperatureSensorLiveTest.class); + + @Test + public void whenSendSingleMessage_thenSuccess() throws Exception { + + String publisherId = UUID.randomUUID().toString(); + MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId); + + String subscriberId = UUID.randomUUID().toString(); + MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + subscriber.connect(options); + publisher.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(1); + + subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + byte[] payload = msg.getPayload(); + log.info("[I46] Message received: topic={}, payload={}", topic, new String(payload)); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(publisher); + target.call(); + + receivedSignal.await(1, TimeUnit.MINUTES); + + log.info("[I56] Success !"); + } + + @Test + public void whenSendMultipleMessages_thenSuccess() throws Exception { + + String publisherId = UUID.randomUUID().toString(); + MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId); + + String subscriberId = UUID.randomUUID().toString(); + MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId); + + + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + options.setConnectionTimeout(10); + + + publisher.connect(options); + subscriber.connect(options); + + CountDownLatch receivedSignal = new CountDownLatch(10); + + subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { + byte[] payload = msg.getPayload(); + log.info("[I82] Message received: topic={}, payload={}", topic, new String(payload)); + receivedSignal.countDown(); + }); + + + Callable target = new EngineTemperatureSensor(publisher); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(() -> { + try { + target.call(); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + }, 1, 1, TimeUnit.SECONDS); + + + receivedSignal.await(1, TimeUnit.MINUTES); + executor.shutdown(); + + assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero"); + + log.info("[I105] Success !"); + } + + +}