ARTEMIS-641 filter out management notifications in MQTT

This commit is contained in:
Martyn Taylor 2016-07-20 13:30:34 +01:00 committed by jbertram
parent bed73f57b3
commit e341b54c49
1 changed files with 18 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -37,11 +38,27 @@ public class MQTTSubscriptionManager {
private MQTTLogger log = MQTTLogger.LOGGER;
// We filter out Artemis managment messages and notifications
private SimpleString managementFilter;
public MQTTSubscriptionManager(MQTTSession session) {
this.session = session;
consumers = new ConcurrentHashMap<>();
consumerQoSLevels = new ConcurrentHashMap<>();
// Create filter string to ignore management messages
StringBuilder builder = new StringBuilder();
builder.append("NOT ((");
builder.append(FilterConstants.ACTIVEMQ_ADDRESS);
builder.append(" = '");
builder.append(session.getServer().getConfiguration().getManagementAddress());
builder.append("') OR (");
builder.append(FilterConstants.ACTIVEMQ_ADDRESS);
builder.append(" = '");
builder.append(session.getServer().getConfiguration().getManagementNotificationAddress());
builder.append("'))");
managementFilter = new SimpleString(builder.toString());
}
synchronized void start() throws Exception {
@ -85,8 +102,7 @@ public class MQTTSubscriptionManager {
*/
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
long cid = session.getServer().getStorageManager().generateID();
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1);
consumer.setStarted(true);
consumers.put(topic, consumer);