BAEL-1992
This commit is contained in:
parent
031bce6a26
commit
40dde25b79
|
@ -0,0 +1,23 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<groupId>org.baeldung</groupId>
|
||||||
|
<artifactId>mqtt</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>com.baeldung</groupId>
|
||||||
|
<artifactId>parent-modules</artifactId>
|
||||||
|
<version>1.0.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||||
|
<version>1.2.0</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,49 @@
|
||||||
|
package com.baeldung.mqtt;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.IMqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class EngineTemperatureSensor implements Callable<Void> {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(EngineTemperatureSensor.class);
|
||||||
|
public static final String TOPIC = "engine/temperature";
|
||||||
|
|
||||||
|
private IMqttClient client;
|
||||||
|
private Random rnd = new Random();
|
||||||
|
|
||||||
|
public EngineTemperatureSensor(IMqttClient client) {
|
||||||
|
this.client = client;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
|
||||||
|
if ( !client.isConnected()) {
|
||||||
|
log.info("[I31] Client not connected.");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttMessage msg = readEngineTemp();
|
||||||
|
msg.setQos(0);
|
||||||
|
msg.setRetained(true);
|
||||||
|
client.publish(TOPIC,msg);
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method simulates reading the engine temperature
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private MqttMessage readEngineTemp() {
|
||||||
|
double temp = 80 + rnd.nextDouble() * 20.0;
|
||||||
|
byte[] payload = String.format("T:%04.2f",temp).getBytes();
|
||||||
|
MqttMessage msg = new MqttMessage(payload);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,108 @@
|
||||||
|
package com.baeldung.mqtt;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class EngineTemperatureSensorLiveTest {
|
||||||
|
|
||||||
|
private static Logger log = LoggerFactory.getLogger(EngineTemperatureSensorLiveTest.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendSingleMessage_thenSuccess() throws Exception {
|
||||||
|
|
||||||
|
String senderId = UUID.randomUUID().toString();
|
||||||
|
MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId);
|
||||||
|
|
||||||
|
String receiverId = UUID.randomUUID().toString();
|
||||||
|
MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId);
|
||||||
|
|
||||||
|
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setAutomaticReconnect(true);
|
||||||
|
options.setCleanSession(true);
|
||||||
|
options.setConnectionTimeout(10);
|
||||||
|
|
||||||
|
|
||||||
|
receiver.connect(options);
|
||||||
|
sender.connect(options);
|
||||||
|
|
||||||
|
CountDownLatch receivedSignal = new CountDownLatch(1);
|
||||||
|
|
||||||
|
receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
|
||||||
|
log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload()));
|
||||||
|
receivedSignal.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
Callable<Void> target = new EngineTemperatureSensor(sender);
|
||||||
|
target.call();
|
||||||
|
|
||||||
|
receivedSignal.await(1, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
log.info("[I51] Success !");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenSendMultipleMessages_thenSuccess() throws Exception {
|
||||||
|
|
||||||
|
String senderId = UUID.randomUUID().toString();
|
||||||
|
MqttClient sender = new MqttClient("tcp://iot.eclipse.org:1883",senderId);
|
||||||
|
|
||||||
|
String receiverId = UUID.randomUUID().toString();
|
||||||
|
MqttClient receiver = new MqttClient("tcp://iot.eclipse.org:1883",receiverId);
|
||||||
|
|
||||||
|
|
||||||
|
MqttConnectOptions options = new MqttConnectOptions();
|
||||||
|
options.setAutomaticReconnect(true);
|
||||||
|
options.setCleanSession(true);
|
||||||
|
options.setConnectionTimeout(10);
|
||||||
|
|
||||||
|
|
||||||
|
sender.connect(options);
|
||||||
|
receiver.connect(options);
|
||||||
|
|
||||||
|
CountDownLatch receivedSignal = new CountDownLatch(10);
|
||||||
|
|
||||||
|
receiver.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
|
||||||
|
log.info("[I41] Message received: topic={}, payload={}", topic, new String(msg.getPayload()));
|
||||||
|
receivedSignal.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
Callable<Void> target = new EngineTemperatureSensor(sender);
|
||||||
|
|
||||||
|
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
executor.scheduleAtFixedRate(() -> {
|
||||||
|
try {
|
||||||
|
target.call();
|
||||||
|
}
|
||||||
|
catch(Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}, 1, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
|
||||||
|
receivedSignal.await(1, TimeUnit.DAYS);
|
||||||
|
executor.shutdown();
|
||||||
|
|
||||||
|
assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero");
|
||||||
|
|
||||||
|
log.info("[I51] Success !");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue