Merge pull request #4832 from psevestre/master

Code for BAEL-1992
This commit is contained in:
Loredana Crusoveanu 2018-07-28 21:59:20 +03:00 committed by GitHub
commit e6c78e9a23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 0 deletions

4
mqtt/README.md Normal file
View File

@ -0,0 +1,4 @@
### Relevant Articles:
================================
- [MQTT Client in Java](http://www.baeldung.com/mqtt-client)

23
mqtt/pom.xml Normal file
View File

@ -0,0 +1,23 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.baeldung</groupId>
<artifactId>mqtt</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,49 @@
package com.baeldung.mqtt;
import java.util.Random;
import java.util.concurrent.Callable;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EngineTemperatureSensor implements Callable<Void> {
private static final Logger log = LoggerFactory.getLogger(EngineTemperatureSensor.class);
public static final String TOPIC = "engine/temperature";
private IMqttClient client;
private Random rnd = new Random();
public EngineTemperatureSensor(IMqttClient client) {
this.client = client;
}
@Override
public Void call() throws Exception {
if ( !client.isConnected()) {
log.info("[I31] Client not connected.");
return null;
}
MqttMessage msg = readEngineTemp();
msg.setQos(0);
msg.setRetained(true);
client.publish(TOPIC,msg);
return null;
}
/**
* This method simulates reading the engine temperature
* @return
*/
private MqttMessage readEngineTemp() {
double temp = 80 + rnd.nextDouble() * 20.0;
byte[] payload = String.format("T:%04.2f",temp).getBytes();
MqttMessage msg = new MqttMessage(payload);
return msg;
}
}

View File

@ -0,0 +1,109 @@
package com.baeldung.mqtt;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EngineTemperatureSensorLiveTest {
private static Logger log = LoggerFactory.getLogger(EngineTemperatureSensorLiveTest.class);
@Test
public void whenSendSingleMessage_thenSuccess() throws Exception {
String publisherId = UUID.randomUUID().toString();
MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);
String subscriberId = UUID.randomUUID().toString();
MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
subscriber.connect(options);
publisher.connect(options);
CountDownLatch receivedSignal = new CountDownLatch(1);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
byte[] payload = msg.getPayload();
log.info("[I46] Message received: topic={}, payload={}", topic, new String(payload));
receivedSignal.countDown();
});
Callable<Void> target = new EngineTemperatureSensor(publisher);
target.call();
receivedSignal.await(1, TimeUnit.MINUTES);
log.info("[I56] Success !");
}
@Test
public void whenSendMultipleMessages_thenSuccess() throws Exception {
String publisherId = UUID.randomUUID().toString();
MqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);
String subscriberId = UUID.randomUUID().toString();
MqttClient subscriber = new MqttClient("tcp://iot.eclipse.org:1883",subscriberId);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
subscriber.connect(options);
CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
byte[] payload = msg.getPayload();
log.info("[I82] Message received: topic={}, payload={}", topic, new String(payload));
receivedSignal.countDown();
});
Callable<Void> target = new EngineTemperatureSensor(publisher);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
try {
target.call();
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
}, 1, 1, TimeUnit.SECONDS);
receivedSignal.await(1, TimeUnit.MINUTES);
executor.shutdown();
assertTrue(receivedSignal.getCount() == 0 , "Countdown should be zero");
log.info("[I105] Success !");
}
}