diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 0bcf57257f..6c9e6fb9fb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -524,7 +524,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } protected Object getMessageAnnotation(String annotation) { - return getMessageAnnotation(Symbol.getSymbol(annotation)); + Object messageAnnotation = getMessageAnnotation(Symbol.getSymbol(AMQPMessageSupport.toAnnotationName(annotation))); + if (messageAnnotation == null) { + messageAnnotation = getMessageAnnotation(Symbol.getSymbol(annotation)); + } + return messageAnnotation; } protected Object getMessageAnnotation(Symbol annotation) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index eb04963c4d..628e43e1f0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -361,6 +361,10 @@ public interface Queue extends Bindable,CriticalComponent { int retryMessages(Filter filter) throws Exception; + default int retryMessages(Filter filter, Integer expectedHits) throws Exception { + return retryMessages(filter); + } + void addRedistributor(long delay); void cancelRedistributor() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index ef0abf2293..2379d5e648 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2140,6 +2140,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { QueueIterateAction messageAction) throws Exception { int count = 0; int txCount = 0; + Integer expectedHits = messageAction.expectedHits(); // This is to avoid scheduling depaging while iterQueue is happening // this should minimize the use of the paged executor. depagePending = true; @@ -2170,6 +2171,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } txCount++; count++; + if (expectedHits != null && count >= expectedHits.intValue()) { + break; + } } } @@ -2611,10 +2615,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int retryMessages(Filter filter) throws Exception { + return retryMessages(filter, null); + } + + @Override + public int retryMessages(Filter filter, Integer expectedHits) throws Exception { final HashMap queues = new HashMap<>(); return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { + @Override + public Integer expectedHits() { + return expectedHits; + } + @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -4164,6 +4178,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { */ abstract class QueueIterateAction { + public Integer expectedHits() { + return null; + } + /** * * @param tx the transaction which the message action should participate in diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 30b3f7a408..38d324f6ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -128,6 +128,52 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testRetryExpiry() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(1); + message.setText("Test-Message"); + message.setDurable(true); + message.setApplicationProperty("key1", "Value1"); + sender.send(message); + + message = new AmqpMessage(); + message.setTimeToLive(1); + message.setBytes(new byte[500 * 1024]); + sender.send(message); + sender.close(); + + final Queue dlqView = getProxyToQueue(getDeadLetterAddress()); + + Wait.assertEquals(2, dlqView::getMessageCount); + Assert.assertEquals(2, dlqView.retryMessages(null)); + Wait.assertEquals(0, dlqView::getMessageCount); + Wait.assertEquals(2, queueView::getMessageCount); + + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + // Now try and get the message + receiver.flow(2); + for (int i = 0; i < 2; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } + connection.close(); + Wait.assertEquals(0, queueView::getMessageCount); + Wait.assertEquals(0, dlqView::getMessageCount); + } + /** This test is validating a broker feature where the message copy through the DLQ will receive an annotation. * It is also testing filter on that annotation. */ @Test(timeout = 60000)