diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java index 0fbd35f6f8..27aa9b4889 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java @@ -62,11 +62,17 @@ public final class FilterConstants { */ public static final SimpleString ACTIVEMQ_SIZE = new SimpleString("AMQSize"); + /** + * Name of the ActiveMQ Artemis Address header + */ + public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress"); + /** * All ActiveMQ Artemis headers are prepended by this prefix. */ public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ"); + private FilterConstants() { // Utility class } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index fc61dd9fa9..c985c0f3fd 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -90,16 +90,7 @@ public class MQTTPublishManager { } private int generateMqttId(int qos) { - if (qos == 1) { - return session.getSessionState().generateId(); - } - else { - Integer mqttid = session.getSessionState().generateId(); - if (mqttid == null) { - mqttid = (int) session.getServer().getStorageManager().generateID(); - } - return mqttid; - } + return session.getSessionState().generateId(); } /** diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 954a1bd1f8..cbe64a6a53 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -37,11 +38,27 @@ public class MQTTSubscriptionManager { private MQTTLogger log = MQTTLogger.LOGGER; + // We filter out Artemis managment messages and notifications + private SimpleString managementFilter; + public MQTTSubscriptionManager(MQTTSession session) { this.session = session; consumers = new ConcurrentHashMap<>(); consumerQoSLevels = new ConcurrentHashMap<>(); + + // Create filter string to ignore management messages + StringBuilder builder = new StringBuilder(); + builder.append("NOT (("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementAddress()); + builder.append("') OR ("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementNotificationAddress()); + builder.append("'))"); + managementFilter = new SimpleString(builder.toString()); } synchronized void start() throws Exception { @@ -85,8 +102,7 @@ public class MQTTSubscriptionManager { */ private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception { long cid = session.getServer().getStorageManager().generateID(); - - ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1); + ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1); consumer.setStarted(true); consumers.put(topic, consumer); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 77bf7c5eb8..f807a183d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -169,6 +169,9 @@ public class FilterImpl implements Filter { else if (FilterConstants.ACTIVEMQ_SIZE.equals(fieldName)) { return msg.getEncodeSize(); } + else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) { + return msg.getAddress(); + } else { return null; }