Fix MQTT to return meaningful error codes on connect fail.
This commit is contained in:
Timothy Bish 2014-07-24 13:30:59 -04:00
parent e1cd19e86e
commit 619864dd42
3 changed files with 143 additions and 4 deletions

View File

@ -28,8 +28,10 @@ import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@ -292,7 +294,11 @@ public class MQTTProtocolConverter {
Throwable exception = ((ExceptionResponse) response).getException();
//let the client know
CONNACK ack = new CONNACK();
if (exception instanceof SecurityException) {
if (exception instanceof InvalidClientIDException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
} else if (exception instanceof SecurityException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
} else if (exception instanceof CredentialException) {
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
} else {
ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);

View File

@ -25,9 +25,19 @@ import static org.junit.Assert.fail;
import java.net.ProtocolException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidClientIDException;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@ -90,7 +100,7 @@ public class MQTTAuthTests extends MQTTAuthTestSupport {
try {
connAck.decode(frame);
LOG.info("{}", connAck);
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
assertEquals(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED, connAck.code());
} catch (ProtocolException e) {
failed.set(true);
fail("Error decoding publish " + e.getMessage());
@ -171,4 +181,110 @@ public class MQTTAuthTests extends MQTTAuthTestSupport {
Message msg = connectionSub.receive(1, TimeUnit.SECONDS);
assertNull("Shouldn't receive the message", msg);
}
@Test(timeout = 60 * 1000)
public void testInvalidClientIdGetCorrectErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("invalid", true);
final AtomicInteger errorCode = new AtomicInteger();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
errorCode.set(connAck.code().ordinal());
assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, connAck.code());
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal(), errorCode.get());
}
@Test(timeout = 60 * 1000)
public void testBadCredentialExceptionGetsCorrectErrorCode() throws Exception {
MQTT mqttPub = createMQTTConnection("bad-credential", true);
mqttPub.setUserName("admin");
mqttPub.setPassword("admin");
final AtomicInteger errorCode = new AtomicInteger();
mqttPub.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received: {}", frame);
if (frame.messageType() == CONNACK.TYPE) {
CONNACK connAck = new CONNACK();
try {
connAck.decode(frame);
LOG.info("{}", connAck);
errorCode.set(connAck.code().ordinal());
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent: {}", frame);
}
});
BlockingConnection connectionPub = mqttPub.blockingConnection();
try {
connectionPub.connect();
fail("Should not be able to connect.");
} catch (Exception e) {
}
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD.ordinal(), errorCode.get());
}
@Override
protected void createPlugins(List<BrokerPlugin> plugins) throws Exception {
BrokerPlugin failOnSpecificConditionsPlugin = new BrokerPlugin() {
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new BrokerFilter(broker) {
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
String clientId = info.getClientId();
if (clientId != null && !clientId.isEmpty()) {
if (clientId.equalsIgnoreCase("invalid")) {
LOG.info("Client ID was invalid");
throw new InvalidClientIDException("Bad client Id");
} else if (clientId.equalsIgnoreCase("bad-credential")) {
LOG.info("User Name was invalid");
throw new CredentialException("Unknwon User Name.");
}
}
}
};
}
};
plugins.add(failOnSpecificConditionsPlugin);
}
}

View File

@ -25,6 +25,7 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
@ -136,13 +137,14 @@ public class MQTTTestSupport {
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
addMQTTConnector();
addOpenWireConnector();
cf = new ActiveMQConnectionFactory(jmsUri);
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
createPlugins(plugins);
BrokerPlugin authenticationPlugin = configureAuthentication();
if (authenticationPlugin != null) {
plugins.add(configureAuthorization());
@ -173,6 +175,21 @@ public class MQTTTestSupport {
brokerService.setPopulateJMSXUserID(true);
}
/**
* Allows a subclass to add additional broker plugins during the broker startup
* process. This method should not add Authorization or Authentication plugins
* as those are handled by the configureAuthentication and configureAuthorization
* methods later.
*
* @param plugins
* The List object to add Plugins for installation into the new Broker.
*
* @throws Exception if an error occurs during the plugin creation process.
*/
protected void createPlugins(List<BrokerPlugin> plugins) throws Exception {
// NOOP
}
protected BrokerPlugin configureAuthentication() throws Exception {
return null;
}