diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 1f2032fcc0..f51699b18d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -45,6 +45,7 @@ import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.logs.AuditLogger; @@ -241,8 +242,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { */ String password = connect.payload().passwordInBytes() == null ? null : new String(connect.payload().passwordInBytes(), CharsetUtil.UTF_8); String username = connect.payload().userName(); - Pair validationData = validateUser(username, password); - if (!validationData.getA()) { + Pair validationData = null; + try { + validationData = validateUser(username, password); + if (!validationData.getA()) { + return; + } + } catch (InvalidClientIdException e) { + handleInvalidClientId(); return; } @@ -487,15 +494,19 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { session.getConnection().setClientIdAssignedByBroker(true); } else { // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null - if (session.getVersion() == MQTTVersion.MQTT_5) { - session.getProtocolHandler().sendConnack(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID); - } else { - session.getProtocolHandler().sendConnack(MQTTReasonCodes.IDENTIFIER_REJECTED_3); - } - disconnect(true); - return false; + return handleInvalidClientId(); } } return true; } + + private boolean handleInvalidClientId() { + if (session.getVersion() == MQTTVersion.MQTT_5) { + session.getProtocolHandler().sendConnack(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID); + } else { + session.getProtocolHandler().sendConnack(MQTTReasonCodes.IDENTIFIER_REJECTED_3); + } + disconnect(true); + return false; + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/exceptions/InvalidClientIdException.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/exceptions/InvalidClientIdException.java new file mode 100644 index 0000000000..4ce4cfc7b9 --- /dev/null +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/exceptions/InvalidClientIdException.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.mqtt.exceptions; + +public class InvalidClientIdException extends RuntimeException { + +} diff --git a/docs/user-manual/en/mqtt.md b/docs/user-manual/en/mqtt.md index 6f1f280877..56ed2114b1 100644 --- a/docs/user-manual/en/mqtt.md +++ b/docs/user-manual/en/mqtt.md @@ -100,6 +100,20 @@ UTF-8 strings and print them up to 256 characters. Payload logging is limited to avoid filling the logs with potentially hundreds of megabytes of unhelpful information. +## Custom Client ID Handling + +The client ID used by an MQTT application is very important as it uniquely +identifies the application. In some situations broker administrators may want +to perform extra validation or even modify incoming client IDs to support +specific use-cases. This is possible by implementing a custom security manager +as demonstrated in the `security-manager` example shipped with the broker. + +The simplest implementation is a "wrapper" just like the `security-manager` +example uses. In the `authenticate` method you can modify the client ID using +`setClientId()` on the `org.apache.activemq.artemis.spi.core.protocol.RemotingConnection` +that is passed in. If you perform some custom validation of the client ID you +can reject the client ID by throwing a `org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException`. + ## Wildcard subscriptions MQTT addresses are hierarchical much like a file system, and they use a special diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java index bf8a352d20..0207f49b52 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTSecurityManagerTest.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException; import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.Role; @@ -33,11 +34,14 @@ import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.MQTTException; +import org.fusesource.mqtt.codec.CONNACK; import org.junit.Test; public class MQTTSecurityManagerTest extends MQTTTestSupport { private String clientID = "new-" + RandomUtil.randomString(); + private boolean rejectClientId = false; @Override public boolean isSecurityEnabled() { @@ -53,6 +57,9 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { String password, RemotingConnection remotingConnection, String securityDomain) { + if (rejectClientId) { + throw new InvalidClientIdException(); + } remotingConnection.setClientID(clientID); return new Subject(); } @@ -148,4 +155,25 @@ public class MQTTSecurityManagerTest extends MQTTTestSupport { if (connection1 != null && connection1.isConnected()) connection1.disconnect(); } } + + @Test(timeout = 30000) + public void testSecurityManagerRejectClientID() throws Exception { + rejectClientId = true; + BlockingConnection connection = null; + try { + MQTT mqtt = createMQTTConnection(RandomUtil.randomString(), true); + mqtt.setUserName(fullUser); + mqtt.setPassword(fullPass); + mqtt.setConnectAttemptsMax(1); + connection = mqtt.blockingConnection(); + try { + connection.connect(); + fail("Should have thrown exception"); + } catch (MQTTException e) { + assertEquals(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED, e.connack.code()); + } + } finally { + if (connection != null && connection.isConnected()) connection.disconnect(); + } + } }