From e608c9af2cda8e3ebd92e220223c17906c6efc02 Mon Sep 17 00:00:00 2001 From: Assen Sharlandjiev Date: Mon, 16 Sep 2019 10:38:20 +0300 Subject: [PATCH] fix for #ARTEMIS-2476 --- .../mqtt/MQTTRetainMessageManager.java | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index a7381ea8df..841c7c847b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -52,19 +52,12 @@ public class MQTTRetainMessageManager { queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false); } - try (LinkedListIterator iterator = queue.iterator()) { - synchronized (queue) { - if (iterator.hasNext()) { - MessageReference ref = iterator.next(); - iterator.remove(); - queue.acknowledge(tx, ref); - } + queue.deleteAllReferences(); - if (!reset) { - sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); - } - } + if (!reset) { + sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); } + } // SEND to Queue. @@ -76,18 +69,23 @@ public class MQTTRetainMessageManager { // Iterate over all matching retain queues and add the queue Transaction tx = session.getServerSession().newTransaction(); try { - synchronized (queue) { - for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) { - Queue retainedQueue = session.getServer().locateQueue(retainedQueueName); - try (LinkedListIterator i = retainedQueue.iterator()) { - if (i.hasNext()) { - Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); - sendToQueue(message, queue, tx); + for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) { + Queue retainedQueue = session.getServer().locateQueue(retainedQueueName); + try (LinkedListIterator i = retainedQueue.iterator()) { + if (i.hasNext()) { + MessageReference ref = i.next(); + while (i.hasNext()) { + ref = i.next(); + if (i.hasNext()) { + i.remove(); + } } + Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID()); + sendToQueue(message, queue, tx); } } } - } catch (Throwable t) { + } catch (Exception t) { tx.rollback(); throw t; }