diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 7f39be0134..769656f107 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -28,8 +28,10 @@ import java.util.zip.DataFormatException; import java.util.zip.Inflater; import javax.jms.Destination; +import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import javax.jms.Message; +import javax.security.auth.login.CredentialException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -292,7 +294,11 @@ public class MQTTProtocolConverter { Throwable exception = ((ExceptionResponse) response).getException(); //let the client know CONNACK ack = new CONNACK(); - if (exception instanceof SecurityException) { + if (exception instanceof InvalidClientIDException) { + ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); + } else if (exception instanceof SecurityException) { + ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); + } else if (exception instanceof CredentialException) { ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); } else { ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java index 8c832b7042..3cb856f1d5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTests.java @@ -25,9 +25,19 @@ import static org.junit.Assert.fail; import java.net.ProtocolException; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.InvalidClientIDException; +import javax.security.auth.login.CredentialException; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ConnectionInfo; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -90,7 +100,7 @@ public class MQTTAuthTests extends MQTTAuthTestSupport { try { connAck.decode(frame); LOG.info("{}", connAck); - assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED, connAck.code()); } catch (ProtocolException e) { failed.set(true); fail("Error decoding publish " + e.getMessage()); @@ -171,4 +181,110 @@ public class MQTTAuthTests extends MQTTAuthTestSupport { Message msg = connectionSub.receive(1, TimeUnit.SECONDS); assertNull("Shouldn't receive the message", msg); } + + @Test(timeout = 60 * 1000) + public void testInvalidClientIdGetCorrectErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("invalid", true); + + final AtomicInteger errorCode = new AtomicInteger(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + errorCode.set(connAck.code().ordinal()); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, connAck.code()); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal(), errorCode.get()); + } + + @Test(timeout = 60 * 1000) + public void testBadCredentialExceptionGetsCorrectErrorCode() throws Exception { + MQTT mqttPub = createMQTTConnection("bad-credential", true); + mqttPub.setUserName("admin"); + mqttPub.setPassword("admin"); + + final AtomicInteger errorCode = new AtomicInteger(); + + mqttPub.setTracer(new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client received: {}", frame); + if (frame.messageType() == CONNACK.TYPE) { + CONNACK connAck = new CONNACK(); + try { + connAck.decode(frame); + LOG.info("{}", connAck); + errorCode.set(connAck.code().ordinal()); + assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code()); + } catch (ProtocolException e) { + fail("Error decoding publish " + e.getMessage()); + } + } + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client sent: {}", frame); + } + }); + + BlockingConnection connectionPub = mqttPub.blockingConnection(); + try { + connectionPub.connect(); + fail("Should not be able to connect."); + } catch (Exception e) { + } + + assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.ordinal(), errorCode.get()); + } + + @Override + protected void createPlugins(List plugins) throws Exception { + BrokerPlugin failOnSpecificConditionsPlugin = new BrokerPlugin() { + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + @Override + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + String clientId = info.getClientId(); + if (clientId != null && !clientId.isEmpty()) { + if (clientId.equalsIgnoreCase("invalid")) { + LOG.info("Client ID was invalid"); + throw new InvalidClientIDException("Bad client Id"); + } else if (clientId.equalsIgnoreCase("bad-credential")) { + LOG.info("User Name was invalid"); + throw new CredentialException("Unknwon User Name."); + } + } + } + }; + } + }; + + plugins.add(failOnSpecificConditionsPlugin); + } } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java index 4571a36e02..d2448616c4 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java @@ -25,6 +25,7 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; @@ -136,13 +137,14 @@ public class MQTTTestSupport { sslContext.afterPropertiesSet(); brokerService.setSslContext(sslContext); - ArrayList plugins = new ArrayList(); - addMQTTConnector(); addOpenWireConnector(); cf = new ActiveMQConnectionFactory(jmsUri); + ArrayList plugins = new ArrayList(); + createPlugins(plugins); + BrokerPlugin authenticationPlugin = configureAuthentication(); if (authenticationPlugin != null) { plugins.add(configureAuthorization()); @@ -173,6 +175,21 @@ public class MQTTTestSupport { brokerService.setPopulateJMSXUserID(true); } + /** + * Allows a subclass to add additional broker plugins during the broker startup + * process. This method should not add Authorization or Authentication plugins + * as those are handled by the configureAuthentication and configureAuthorization + * methods later. + * + * @param plugins + * The List object to add Plugins for installation into the new Broker. + * + * @throws Exception if an error occurs during the plugin creation process. + */ + protected void createPlugins(List plugins) throws Exception { + // NOOP + } + protected BrokerPlugin configureAuthentication() throws Exception { return null; }