diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java index e989f7ea06..0943336c0c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/protocol/MqttProtocolVerifier.java @@ -16,30 +16,32 @@ */ package org.apache.activemq.transport.protocol; +import java.nio.ByteBuffer; + /** * * */ public class MqttProtocolVerifier implements ProtocolVerifier { - /* (non-Javadoc) - * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) - */ @Override public boolean isProtocol(byte[] value) { - boolean mqtt311 = value[4] == 77 && // M - value[5] == 81 && // Q - value[6] == 84 && // T - value[7] == 84; // T + ByteBuffer buf = ByteBuffer.wrap(value); - boolean mqtt31 = value[4] == 77 && // M - value[5] == 81 && // Q - value[6] == 73 && // I - value[7] == 115; // s - - return mqtt311 || mqtt31; + if (!(buf.get() == 16 && validateRemainingLength(buf) && buf.get() == (byte) 0)) { + return false; + } + byte b = buf.get() ; + return ((b == 4 || b == 6) && (buf.get() == 77)); } - - + private boolean validateRemainingLength(ByteBuffer buffer) { + byte msb = (byte) 0b10000000; + for (byte i = 0; i < 4; i++) { + if ((buffer.get() & msb) != msb) { + return true; + } + } + return false; + } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 5e28b2a42e..791e7981ba 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1961,4 +1961,31 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + + @Test + public void testConnectWithLargePassword() throws Exception { + for (String version : Arrays.asList("3.1", "3.1.1")) { + String longString = new String(new char[65535]); + + BlockingConnection connection = null; + try { + MQTT mqtt = createMQTTConnection("test-" + version, true); + mqtt.setUserName(longString); + mqtt.setPassword(longString); + mqtt.setConnectAttemptsMax(1); + mqtt.setVersion(version); + connection = mqtt.blockingConnection(); + connection.connect(); + final BlockingConnection con = connection; + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return con.isConnected(); + } + })); + } finally { + if (connection != null && connection.isConnected()) connection.disconnect(); + } + } + } }