diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 553521bd45..55fdfccd9b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -64,10 +65,6 @@ public class MQTTPublishManager { synchronized void start() throws Exception { this.state = session.getSessionState(); this.outboundStore = state.getOutboundStore(); - - createManagementAddress(); - createManagementQueue(); - createManagementConsumer(); } synchronized void stop() throws Exception { @@ -79,7 +76,7 @@ public class MQTTPublishManager { } void clean() throws Exception { - createManagementAddress(); + SimpleString managementAddress = createManagementAddress(); Queue queue = session.getServer().locateQueue(managementAddress); if (queue != null) { queue.deleteQueue(); @@ -92,14 +89,14 @@ public class MQTTPublishManager { managementConsumer.setStarted(true); } - private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); + private SimpleString createManagementAddress() { + return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); } private void createManagementQueue() throws Exception { Queue q = session.getServer().locateQueue(managementAddress); if (q == null) { - session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES); + session.getServer().createQueue(managementAddress, RoutingType.ANYCAST, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false); } } @@ -189,11 +186,20 @@ public class MQTTPublishManager { session.getProtocolHandler().sendPubRel(messageId); } + private SimpleString getManagementAddress() throws Exception { + if (managementAddress == null) { + managementAddress = createManagementAddress(); + createManagementQueue(); + createManagementConsumer(); + } + return managementAddress; + } + void handlePubRec(int messageId) throws Exception { try { Pair ref = outboundStore.publishReceived(messageId); if (ref != null) { - Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId); session.getServerSession().send(m, true); session.getServerSession().acknowledge(ref.getB(), ref.getA()); } else { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 0b52a0b0ed..d0a3c070c3 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -49,10 +49,9 @@ public class MQTTRetainMessageManager { Queue queue = session.getServer().locateQueue(retainAddress); if (queue == null) { - queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true); + queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false); } - try (LinkedListIterator iterator = queue.iterator()) { synchronized (queue) { if (iterator.hasNext()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 91db1d2a01..32062c0591 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1845,7 +1845,7 @@ public class MQTTTest extends MQTTTestSupport { connection2.connect(); connection2.subscribe(mqttTopic); - Message message = connection2.receive(); + Message message = connection2.receive(5000, TimeUnit.MILLISECONDS); assertEquals(payload, new String(message.getPayload())); }