This closes #648
This commit is contained in:
commit
771dab13bf
|
@ -62,11 +62,17 @@ public final class FilterConstants {
|
|||
*/
|
||||
public static final SimpleString ACTIVEMQ_SIZE = new SimpleString("AMQSize");
|
||||
|
||||
/**
|
||||
* Name of the ActiveMQ Artemis Address header
|
||||
*/
|
||||
public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
|
||||
|
||||
/**
|
||||
* All ActiveMQ Artemis headers are prepended by this prefix.
|
||||
*/
|
||||
public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ");
|
||||
|
||||
|
||||
private FilterConstants() {
|
||||
// Utility class
|
||||
}
|
||||
|
|
|
@ -90,16 +90,7 @@ public class MQTTPublishManager {
|
|||
}
|
||||
|
||||
private int generateMqttId(int qos) {
|
||||
if (qos == 1) {
|
||||
return session.getSessionState().generateId();
|
||||
}
|
||||
else {
|
||||
Integer mqttid = session.getSessionState().generateId();
|
||||
if (mqttid == null) {
|
||||
mqttid = (int) session.getServer().getStorageManager().generateID();
|
||||
}
|
||||
return mqttid;
|
||||
}
|
||||
return session.getSessionState().generateId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||
import org.apache.activemq.artemis.api.core.FilterConstants;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
|
@ -37,11 +38,27 @@ public class MQTTSubscriptionManager {
|
|||
|
||||
private MQTTLogger log = MQTTLogger.LOGGER;
|
||||
|
||||
// We filter out Artemis managment messages and notifications
|
||||
private SimpleString managementFilter;
|
||||
|
||||
public MQTTSubscriptionManager(MQTTSession session) {
|
||||
this.session = session;
|
||||
|
||||
consumers = new ConcurrentHashMap<>();
|
||||
consumerQoSLevels = new ConcurrentHashMap<>();
|
||||
|
||||
// Create filter string to ignore management messages
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("NOT ((");
|
||||
builder.append(FilterConstants.ACTIVEMQ_ADDRESS);
|
||||
builder.append(" = '");
|
||||
builder.append(session.getServer().getConfiguration().getManagementAddress());
|
||||
builder.append("') OR (");
|
||||
builder.append(FilterConstants.ACTIVEMQ_ADDRESS);
|
||||
builder.append(" = '");
|
||||
builder.append(session.getServer().getConfiguration().getManagementNotificationAddress());
|
||||
builder.append("'))");
|
||||
managementFilter = new SimpleString(builder.toString());
|
||||
}
|
||||
|
||||
synchronized void start() throws Exception {
|
||||
|
@ -85,8 +102,7 @@ public class MQTTSubscriptionManager {
|
|||
*/
|
||||
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
|
||||
long cid = session.getServer().getStorageManager().generateID();
|
||||
|
||||
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
|
||||
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1);
|
||||
consumer.setStarted(true);
|
||||
|
||||
consumers.put(topic, consumer);
|
||||
|
|
|
@ -169,6 +169,9 @@ public class FilterImpl implements Filter {
|
|||
else if (FilterConstants.ACTIVEMQ_SIZE.equals(fieldName)) {
|
||||
return msg.getEncodeSize();
|
||||
}
|
||||
else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
|
||||
return msg.getAddress();
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue