This closes #664 Add management filter to Queue not Consumer MQTT
This commit is contained in:
commit
edcecccab3
|
@ -92,7 +92,7 @@ public class MQTTSubscriptionManager {
|
||||||
|
|
||||||
Queue q = session.getServer().locateQueue(queue);
|
Queue q = session.getServer().locateQueue(queue);
|
||||||
if (q == null) {
|
if (q == null) {
|
||||||
session.getServerSession().createQueue(new SimpleString(address), queue, null, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
|
session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
|
||||||
}
|
}
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ public class MQTTSubscriptionManager {
|
||||||
*/
|
*/
|
||||||
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
|
private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
|
||||||
long cid = session.getServer().getStorageManager().generateID();
|
long cid = session.getServer().getStorageManager().generateID();
|
||||||
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1);
|
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
|
||||||
consumer.setStarted(true);
|
consumer.setStarted(true);
|
||||||
|
|
||||||
consumers.put(topic, consumer);
|
consumers.put(topic, consumer);
|
||||||
|
|
Loading…
Reference in New Issue