ARTEMIS-990 Dont require Perms on MQTT mngment Q

This commit is contained in:
Martyn Taylor 2017-03-10 10:14:19 +00:00
parent c1c0354d92
commit b33fea0d7f
3 changed files with 17 additions and 12 deletions

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@ -64,10 +65,6 @@ public class MQTTPublishManager {
synchronized void start() throws Exception { synchronized void start() throws Exception {
this.state = session.getSessionState(); this.state = session.getSessionState();
this.outboundStore = state.getOutboundStore(); this.outboundStore = state.getOutboundStore();
createManagementAddress();
createManagementQueue();
createManagementConsumer();
} }
synchronized void stop() throws Exception { synchronized void stop() throws Exception {
@ -79,7 +76,7 @@ public class MQTTPublishManager {
} }
void clean() throws Exception { void clean() throws Exception {
createManagementAddress(); SimpleString managementAddress = createManagementAddress();
Queue queue = session.getServer().locateQueue(managementAddress); Queue queue = session.getServer().locateQueue(managementAddress);
if (queue != null) { if (queue != null) {
queue.deleteQueue(); queue.deleteQueue();
@ -92,14 +89,14 @@ public class MQTTPublishManager {
managementConsumer.setStarted(true); managementConsumer.setStarted(true);
} }
private void createManagementAddress() { private SimpleString createManagementAddress() {
managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
} }
private void createManagementQueue() throws Exception { private void createManagementQueue() throws Exception {
Queue q = session.getServer().locateQueue(managementAddress); Queue q = session.getServer().locateQueue(managementAddress);
if (q == null) { if (q == null) {
session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES); session.getServer().createQueue(managementAddress, RoutingType.ANYCAST, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false);
} }
} }
@ -189,11 +186,20 @@ public class MQTTPublishManager {
session.getProtocolHandler().sendPubRel(messageId); session.getProtocolHandler().sendPubRel(messageId);
} }
private SimpleString getManagementAddress() throws Exception {
if (managementAddress == null) {
managementAddress = createManagementAddress();
createManagementQueue();
createManagementConsumer();
}
return managementAddress;
}
void handlePubRec(int messageId) throws Exception { void handlePubRec(int messageId) throws Exception {
try { try {
Pair<Long, Long> ref = outboundStore.publishReceived(messageId); Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
if (ref != null) { if (ref != null) {
Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
session.getServerSession().send(m, true); session.getServerSession().send(m, true);
session.getServerSession().acknowledge(ref.getB(), ref.getA()); session.getServerSession().acknowledge(ref.getB(), ref.getA());
} else { } else {

View File

@ -49,10 +49,9 @@ public class MQTTRetainMessageManager {
Queue queue = session.getServer().locateQueue(retainAddress); Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) { if (queue == null) {
queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true); queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
} }
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) { try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
synchronized (queue) { synchronized (queue) {
if (iterator.hasNext()) { if (iterator.hasNext()) {

View File

@ -1845,7 +1845,7 @@ public class MQTTTest extends MQTTTestSupport {
connection2.connect(); connection2.connect();
connection2.subscribe(mqttTopic); connection2.subscribe(mqttTopic);
Message message = connection2.receive(); Message message = connection2.receive(5000, TimeUnit.MILLISECONDS);
assertEquals(payload, new String(message.getPayload())); assertEquals(payload, new String(message.getPayload()));
} }