From b666cb495b21576ea1a41708fde9ad039819ed8f Mon Sep 17 00:00:00 2001 From: brusdev Date: Tue, 10 Dec 2019 12:55:58 +0100 Subject: [PATCH] ARTEMIS-2572 The retryMessages remove all paged messages Add a paged message to the tail, when the QueueIterateAction doesn't handle it, to avoid removing unhandled paged message. Move the refRemoved calls from the QueueIterateActions to the iterQueue to fix the queue stats. --- .../artemis/core/server/impl/QueueImpl.java | 31 +----- .../tests/integration/paging/PagingTest.java | 103 ++++++++++++++++++ 2 files changed, 109 insertions(+), 25 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 3f28b3a49d..bf91e63da6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1929,16 +1929,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { - return actMessage(tx, ref, true); - } - - @Override - public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception { incDelivering(ref); acknowledge(tx, ref, ackReason, null); - if (fromMessageReferences) { - refRemoved(ref); - } return true; } }; @@ -1982,6 +1974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (filter1 == null || filter1.match(ref.getMessage())) { if (messageAction.actMessage(tx, ref)) { iter.remove(); + refRemoved(ref); } txCount++; count++; @@ -1998,7 +1991,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { List cancelled = scheduledDeliveryHandler.cancel(filter1); for (MessageReference messageReference : cancelled) { - messageAction.actMessage(tx, messageReference, false); + messageAction.actMessage(tx, messageReference); count++; txCount++; } @@ -2019,7 +2012,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (filter1 == null || filter1.match(reference.getMessage())) { count++; txCount++; - messageAction.actMessage(tx, reference, false); + if (!messageAction.actMessage(tx, reference)) { + addTail(reference, false); + } } else { addTail(reference, false); } @@ -2394,8 +2389,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (!ignored) { move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null); - refRemoved(ref); - //move(toAddress, tx, ref, false, rejectDuplicates); } return true; @@ -2459,7 +2452,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } else { move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false); } - refRemoved(ref); + return true; } @@ -3908,18 +3901,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { * @throws Exception */ public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception; - - /** - * - * @param tx the transaction which the message action should participate in - * @param ref the message reference which the action should act upon - * @param fromMessageReferences false if the queue's stats should *not* be updated (e.g. paged or scheduled refs) - * @return true if the action should result in the removal of the message from the queue; false otherwise - * @throws Exception - */ - public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception { - return actMessage(tx, ref); - } } /* For external use we need to use a synchronized version since the list is not thread safe */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 8e816d27a1..c020fbe760 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -578,6 +578,109 @@ public class PagingTest extends ActiveMQTestBase { Assert.assertEquals(numberOfMessages * 2, removedMessages); } + @Test + public void testQueueRetryMessages() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 500; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(PagingTest.ADDRESS, new SimpleString(PagingTest.ADDRESS + "Queue"), null, true); + session.createQueue(PagingTest.ADDRESS + "Original", PagingTest.ADDRESS + "QueueOriginal", null, true); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message = null; + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + + message = session.createMessage(true); + message.getBodyBuffer().writeBytes(body); + message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original"); + message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal"); + producer.send(message); + + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + producer.close(); + session.close(); + + session = sf.createSession(false, false, false); + producer = session.createProducer(PagingTest.ADDRESS); + producer.send(session.createMessage(true)); + session.rollback(); + producer.close(); + session.close(); + + session = sf.createSession(false, false, false); + producer = session.createProducer(PagingTest.ADDRESS); + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + + message = session.createMessage(true); + message.getBodyBuffer().writeBytes(body); + message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original"); + message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal"); + producer.send(message); + + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + producer.close(); + session.close(); + + Queue queue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "Queue")); + Queue originalQueue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "QueueOriginal")); + + Wait.assertEquals(numberOfMessages * 4, queue::getMessageCount); + Wait.assertEquals(0, originalQueue::getMessageCount); + + QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "Queue"); + QueueControl originalQueueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "QueueOriginal"); + queueControl.retryMessages(); + + Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount, 5000); + Wait.assertEquals(numberOfMessages * 2, originalQueue::getMessageCount, 5000); + } + @Test public void testEmptyAddress() throws Exception { if (storeType == StoreConfiguration.StoreType.FILE) {