Support MQTT 3.1 silent subscription fail
This commit is contained in:
Dejan Bosanac 2015-04-20 18:15:20 +02:00
parent 4a821186a4
commit a4fbe70872
3 changed files with 36 additions and 2 deletions

View File

@ -87,6 +87,8 @@ public class MQTTProtocolConverter {
private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
public static final int V3_1 = 3;
public static final int V3_1_1 = 4;
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
@ -119,6 +121,8 @@ public class MQTTProtocolConverter {
private final MQTTPacketIdGenerator packetIdGenerator; private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics; private boolean publishDollarTopics;
public int version;
private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
/* /*
@ -246,6 +250,8 @@ public class MQTTProtocolConverter {
passswd = connect.password().toString(); passswd = connect.password().toString();
} }
version = connect.version();
configureInactivityMonitor(connect.keepAlive()); configureInactivityMonitor(connect.keepAlive());
connectionInfo.setConnectionId(connectionId); connectionInfo.setConnectionId(connectionId);

View File

@ -204,10 +204,16 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
if (response.isException()) { if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException(); final Throwable throwable = ((ExceptionResponse) response).getException();
LOG.warn("Error subscribing to {}", topicName, throwable); LOG.warn("Error subscribing to {}", topicName, throwable);
// version 3.1 don't supports silent fail
// version 3.1.1 send "error" qos
if (protocol.version == protocol.V3_1_1) {
qos[0] = SUBSCRIBE_ERROR; qos[0] = SUBSCRIBE_ERROR;
} else { } else {
qos[0] = (byte) qoS.ordinal(); qos[0] = (byte) qoS.ordinal();
} }
} else {
qos[0] = (byte) qoS.ordinal();
}
} }
}); });

View File

@ -138,8 +138,9 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo"); mqtt.setClientId("foo");
mqtt.setKeepAlive((short) 2); mqtt.setKeepAlive((short) 2);
mqtt.setVersion("3.1.1");
final BlockingConnection connection = mqtt.blockingConnection(); BlockingConnection connection = mqtt.blockingConnection();
connection.connect(); connection.connect();
final String NAMED = "named"; final String NAMED = "named";
@ -163,7 +164,28 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
assertEquals(ANONYMOUS, new String(msg.getPayload())); assertEquals(ANONYMOUS, new String(msg.getPayload()));
msg.ack(); msg.ack();
//delete retained message
connection.publish(ANONYMOUS, "".getBytes(), QoS.AT_MOST_ONCE, true);
connection.disconnect(); connection.disconnect();
// Test 3.1 functionality
mqtt.setVersion("3.1");
connection = mqtt.blockingConnection();
connection.connect();
qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE) });
assertEquals(QoS.AT_MOST_ONCE.ordinal(), qos[0]);
MQTT mqttPub = createMQTTConnection("pub", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
BlockingConnection connectionPub = mqttPub.blockingConnection();
connectionPub.connect();
connectionPub.publish(NAMED, NAMED.getBytes(), QoS.AT_MOST_ONCE, true);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNull(msg);
} }
@Test(timeout = 60 * 1000) @Test(timeout = 60 * 1000)