From 21cd612098248971772de65f7789f12d74437878 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 3 Mar 2016 12:41:59 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5886 Ensure we reject protocol version that we don't currently support. (cherry picked from commit f3544f248ffdce8d4c4bb80698b0d69a7e7492d9) --- activemq-mqtt/pom.xml | 8 +- .../transport/mqtt/MQTTProtocolConverter.java | 15 ++++ .../mqtt/MQTTProtocolConverterTest.java | 79 +++++++++++++++++++ 3 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index 1c406081d7..016f1e86d1 100755 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -140,7 +140,11 @@ activemq-kahadb-store test - + + org.mockito + mockito-core + test + org.eclipse.paho org.eclipse.paho.client.mqttv3 @@ -311,7 +315,7 @@ - + diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index f57c745e58..5050b2f7e1 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -238,6 +238,21 @@ public class MQTTProtocolConverter { } this.connect = connect; + // The Server MUST respond to the CONNECT Packet with a CONNACK return code 0x01 + // (unacceptable protocol level) and then disconnect the Client if the Protocol Level + // is not supported by the Server [MQTT-3.1.2-2]. + if (connect.version() < 3 || connect.version() > 4) { + CONNACK ack = new CONNACK(); + ack.code(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION); + try { + getMQTTTransport().sendToMQTT(ack.encode()); + getMQTTTransport().onException(IOExceptionSupport.create("Unsupported or invalid protocol version", null)); + } catch (IOException e) { + getMQTTTransport().onException(IOExceptionSupport.create(e)); + } + return; + } + String clientId = ""; if (connect.clientId() != null) { clientId = connect.clientId().toString(); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java new file mode 100644 index 0000000000..bfe3149d5c --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverterTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.fusesource.mqtt.codec.CONNACK; +import org.fusesource.mqtt.codec.CONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +/** + * Tests for various usage scenarios of the protocol converter + */ +public class MQTTProtocolConverterTest { + + private MQTTTransport transport; + private BrokerService broker; + + @Before + public void setUp() throws Exception { + transport = Mockito.mock(MQTTTransport.class); + broker = Mockito.mock(BrokerService.class); + } + + @Test + public void testConnectWithInvalidProtocolVersionToLow() throws IOException { + doTestConnectWithInvalidProtocolVersion(2); + } + + @Test + public void testConnectWithInvalidProtocolVersionToHigh() throws IOException { + doTestConnectWithInvalidProtocolVersion(5); + } + + private void doTestConnectWithInvalidProtocolVersion(int version) throws IOException { + MQTTProtocolConverter converter = new MQTTProtocolConverter(transport, broker); + + CONNECT connect = Mockito.mock(CONNECT.class); + + Mockito.when(connect.version()).thenReturn(version); + + converter.onMQTTConnect(connect); + ArgumentCaptor capturedException = ArgumentCaptor.forClass(IOException.class); + Mockito.verify(transport).onException(capturedException.capture()); + + assertTrue(capturedException.getValue().getMessage().contains("version")); + + ArgumentCaptor capturedFrame = ArgumentCaptor.forClass(MQTTFrame.class); + Mockito.verify(transport).sendToMQTT(capturedFrame.capture()); + + MQTTFrame response = capturedFrame.getValue(); + assertEquals(CONNACK.TYPE, response.messageType()); + + CONNACK connAck = new CONNACK().decode(response); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION, connAck.code()); + } +}