diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 77b45ab767..626916f551 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -62,10 +62,6 @@ public class MQTTPublishManager { synchronized void start() throws Exception { this.state = session.getSessionState(); this.outboundStore = state.getOutboundStore(); - - createManagementAddress(); - createManagementQueue(); - createManagementConsumer(); } synchronized void stop() throws Exception { @@ -77,7 +73,7 @@ public class MQTTPublishManager { } void clean() throws Exception { - createManagementAddress(); + SimpleString managementAddress = createManagementAddress(); Queue queue = session.getServer().locateQueue(managementAddress); if (queue != null) { queue.deleteQueue(); @@ -90,14 +86,14 @@ public class MQTTPublishManager { managementConsumer.setStarted(true); } - private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); + private SimpleString createManagementAddress() { + return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId()); } private void createManagementQueue() throws Exception { Queue q = session.getServer().locateQueue(managementAddress); if (q == null) { - session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES); + session.getServer().createQueue(managementAddress, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false); } } @@ -183,11 +179,20 @@ public class MQTTPublishManager { session.getProtocolHandler().sendPubRel(messageId); } + private SimpleString getManagementAddress() throws Exception { + if (managementAddress == null) { + managementAddress = createManagementAddress(); + createManagementQueue(); + createManagementConsumer(); + } + return managementAddress; + } + void handlePubRec(int messageId) throws Exception { try { Pair ref = outboundStore.publishReceived(messageId); if (ref != null) { - ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + ServerMessage m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId); session.getServerSession().send(m, true); session.getServerSession().acknowledge(ref.getB(), ref.getA()); } else { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 70db040e0b..7acc3b4967 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -49,7 +49,7 @@ public class MQTTRetainMessageManager { Queue queue = session.getServer().locateQueue(retainAddress); if (queue == null) { - queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true); + queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false); } try (LinkedListIterator iterator = queue.iterator()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 7cd1bf14ff..c211260075 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTTException; @@ -53,7 +54,6 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.vertx.java.core.impl.ConcurrentHashSet; /** * QT @@ -1711,7 +1711,7 @@ public class MQTTTest extends MQTTTestSupport { connection2.connect(); connection2.subscribe(mqttTopic); - Message message = connection2.receive(); + Message message = connection2.receive(5000, TimeUnit.MILLISECONDS); assertEquals(payload, new String(message.getPayload())); }