[MQTT] Try to improve error handling
This commit is contained in:
parent
6272514820
commit
227bbdea2f
|
@ -351,7 +351,7 @@ class Monitor extends BeanModel {
|
|||
}
|
||||
} else if (this.type === "mqtt") {
|
||||
try {
|
||||
bean.msg = await mqttAsync(this.url, this.mqttTopic, this.mqttSuccessMessage, {
|
||||
bean.msg = await mqttAsync(this.hostname, this.mqttTopic, this.mqttSuccessMessage, {
|
||||
mqttPort: this.port,
|
||||
mqttUsername: this.mqttUsername,
|
||||
mqttPassword: this.mqttPassword,
|
||||
|
|
|
@ -12,7 +12,6 @@ const fs = require("fs");
|
|||
const nodeJsUtil = require("util");
|
||||
const mqtt = require("mqtt");
|
||||
|
||||
|
||||
// From ping-lite
|
||||
exports.WIN = /^win/.test(process.platform);
|
||||
exports.LIN = /^linux/.test(process.platform);
|
||||
|
@ -94,36 +93,46 @@ exports.pingAsync = function (hostname, ipv6 = false) {
|
|||
exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const { port, username, password, interval = 20 } = options;
|
||||
try {
|
||||
// Adds MQTT protocol to the hostname if not already present
|
||||
if (!/^(?:http|mqtt)s?:\/\//.test(hostname)) {
|
||||
hostname = "mqtt://" + hostname;
|
||||
}
|
||||
let client = mqtt.connect(hostname, {
|
||||
port,
|
||||
username,
|
||||
password
|
||||
});
|
||||
client.on("connect", () => {
|
||||
client.subscribe(topic);
|
||||
});
|
||||
client.on("message", (messageTopic, message) => {
|
||||
if (messageTopic == topic) {
|
||||
if (message.toString() === okMessage) {
|
||||
client.end();
|
||||
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
|
||||
} else {
|
||||
client.end();
|
||||
reject(new Error(`Error; Topic: ${messageTopic}; Message: ${message.toString()}`));
|
||||
}
|
||||
}
|
||||
});
|
||||
setTimeout(() => {
|
||||
client.end();
|
||||
}, interval * 1000);
|
||||
} catch (error) {
|
||||
reject(new Error(error));
|
||||
|
||||
// Adds MQTT protocol to the hostname if not already present
|
||||
if (!/^(?:http|mqtt)s?:\/\//.test(hostname)) {
|
||||
hostname = "mqtt://" + hostname;
|
||||
}
|
||||
|
||||
debug("MQTT connecting");
|
||||
|
||||
let client = mqtt.connect(hostname, {
|
||||
port,
|
||||
username,
|
||||
password
|
||||
});
|
||||
|
||||
client.on("connect", () => {
|
||||
debug("MQTT subscribe topic");
|
||||
client.subscribe(topic);
|
||||
});
|
||||
|
||||
client.on("error", (error) => {
|
||||
client.end();
|
||||
reject(error);
|
||||
});
|
||||
|
||||
client.on("message", (messageTopic, message) => {
|
||||
if (messageTopic == topic) {
|
||||
if (message.toString() === okMessage) {
|
||||
client.end();
|
||||
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
|
||||
} else {
|
||||
client.end();
|
||||
reject(new Error(`Error; Topic: ${messageTopic}; Message: ${message.toString()}`));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
client.end();
|
||||
}, interval * 1000);
|
||||
|
||||
});
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue