fix for #ARTEMIS-2476

This commit is contained in:
Assen Sharlandjiev 2019-09-16 10:38:20 +03:00 committed by Michael Pearce
parent 3868a6541c
commit e608c9af2c
1 changed files with 17 additions and 19 deletions

View File

@ -52,19 +52,12 @@ public class MQTTRetainMessageManager {
queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false); queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
} }
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) { queue.deleteAllReferences();
synchronized (queue) {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
iterator.remove();
queue.acknowledge(tx, ref);
}
if (!reset) { if (!reset) {
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx); sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), queue, tx);
}
}
} }
} }
// SEND to Queue. // SEND to Queue.
@ -76,18 +69,23 @@ public class MQTTRetainMessageManager {
// Iterate over all matching retain queues and add the queue // Iterate over all matching retain queues and add the queue
Transaction tx = session.getServerSession().newTransaction(); Transaction tx = session.getServerSession().newTransaction();
try { try {
synchronized (queue) { for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) {
for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames()) { Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
Queue retainedQueue = session.getServer().locateQueue(retainedQueueName); try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) {
try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) { if (i.hasNext()) {
if (i.hasNext()) { MessageReference ref = i.next();
Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); while (i.hasNext()) {
sendToQueue(message, queue, tx); ref = i.next();
if (i.hasNext()) {
i.remove();
}
} }
Message message = ref.getMessage().copy(session.getServer().getStorageManager().generateID());
sendToQueue(message, queue, tx);
} }
} }
} }
} catch (Throwable t) { } catch (Exception t) {
tx.rollback(); tx.rollback();
throw t; throw t;
} }