mirror of https://github.com/apache/activemq.git
AMQ-6661 - Fix Auto MQTT protocol detection
Protocol detection now takes into account the variable length header in the CONNECT packet.
This commit is contained in:
parent
4bf3152fa9
commit
808a4c5c17
|
@ -16,30 +16,32 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.protocol;
|
package org.apache.activemq.transport.protocol;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class MqttProtocolVerifier implements ProtocolVerifier {
|
public class MqttProtocolVerifier implements ProtocolVerifier {
|
||||||
|
|
||||||
/* (non-Javadoc)
|
|
||||||
* @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isProtocol(byte[] value) {
|
public boolean isProtocol(byte[] value) {
|
||||||
boolean mqtt311 = value[4] == 77 && // M
|
ByteBuffer buf = ByteBuffer.wrap(value);
|
||||||
value[5] == 81 && // Q
|
|
||||||
value[6] == 84 && // T
|
|
||||||
value[7] == 84; // T
|
|
||||||
|
|
||||||
boolean mqtt31 = value[4] == 77 && // M
|
if (!(buf.get() == 16 && validateRemainingLength(buf) && buf.get() == (byte) 0)) {
|
||||||
value[5] == 81 && // Q
|
return false;
|
||||||
value[6] == 73 && // I
|
}
|
||||||
value[7] == 115; // s
|
byte b = buf.get() ;
|
||||||
|
return ((b == 4 || b == 6) && (buf.get() == 77));
|
||||||
return mqtt311 || mqtt31;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean validateRemainingLength(ByteBuffer buffer) {
|
||||||
|
byte msb = (byte) 0b10000000;
|
||||||
|
for (byte i = 0; i < 4; i++) {
|
||||||
|
if ((buffer.get() & msb) != msb) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1961,4 +1961,31 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
|
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectWithLargePassword() throws Exception {
|
||||||
|
for (String version : Arrays.asList("3.1", "3.1.1")) {
|
||||||
|
String longString = new String(new char[65535]);
|
||||||
|
|
||||||
|
BlockingConnection connection = null;
|
||||||
|
try {
|
||||||
|
MQTT mqtt = createMQTTConnection("test-" + version, true);
|
||||||
|
mqtt.setUserName(longString);
|
||||||
|
mqtt.setPassword(longString);
|
||||||
|
mqtt.setConnectAttemptsMax(1);
|
||||||
|
mqtt.setVersion(version);
|
||||||
|
connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
final BlockingConnection con = connection;
|
||||||
|
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return con.isConnected();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} finally {
|
||||||
|
if (connection != null && connection.isConnected()) connection.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue