diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 8d211ca826..7d325ece03 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -216,6 +216,28 @@ public interface QueueControl { @Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION) boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception; + + /** + * Retries the message corresponding to the given messageID to the original queue. + * This is appropriate on dead messages on Dead letter queues only. + * + * @param messageID + * @return {@code true} if the message was retried, {@code false} else + * @throws Exception + */ + @Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION) + boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception; + + /** + * Retries all messages on a DLQ to their respective original queues. + * This is appropriate on dead messages on Dead letter queues only. + * + * @return the number of retried messages. + * @throws Exception + */ + @Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION) + int retryMessages() throws Exception; + /** * Moves the message corresponding to the specified message ID to the specified other queue. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 7d4aaaf2fd..1260169d00 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -551,6 +552,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } + public boolean retryMessage(final long messageID) throws Exception { + + checkStarted(); + clearIO(); + + try { + MessageReference message = queue.getReference(messageID); + if ( message == null ) { + return false; + } + else { + final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS); + if (originalAddress != null) { + return queue.moveReference(messageID, new SimpleString(originalAddress)); + } + } + } + finally { + blockOnIO(); + } + + return false; + } + + public int retryMessages() throws Exception { + checkStarted(); + clearIO(); + + int retriedMessages = 0; + try { + Iterator messageIterator = queue.totalIterator(); + while (messageIterator.hasNext()) { + MessageReference message = messageIterator.next(); + // Will only try messages with Message.HDR_ORIGINAL_ADDRESS set. + final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS); + final long messageID = message.getMessage().getMessageID(); + if ( originalAddress != null) { + if ( queue.moveReference(messageID, new SimpleString(originalAddress))) { + retriedMessages++; + } + } + } + } + finally { + blockOnIO(); + } + return retriedMessages; + } + public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception { return moveMessage(messageID, otherQueueName, false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 0de4047de7..b968a03d52 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -747,6 +747,118 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + /** + * Test retry - get a message from DLQ and put on original queue. + */ + @Test + public void testRetryMessage() throws Exception { + final SimpleString dla = new SimpleString("DLA"); + final SimpleString qName = new SimpleString("q1"); + final SimpleString adName = new SimpleString("ad1"); + final SimpleString dlq = new SimpleString("DLQ1"); + final String sampleText = "Put me on DLQ"; + + AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla); + server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings); + + session.createQueue(dla, dlq, null, false); + session.createQueue(adName, qName, null, false); + + // Send message to queue. + ClientProducer producer = session.createProducer(adName); + producer.send(createTextMessage(session, sampleText)); + session.start(); + + ClientConsumer clientConsumer = session.createConsumer(qName); + ClientMessage clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText); + + // force a rollback to DLQ + session.rollback(); + clientMessage = clientConsumer.receiveImmediate(); + Assert.assertNull(clientMessage); + + QueueControl queueControl = createManagementControl(dla, dlq); + Assert.assertEquals(1, getMessageCount(queueControl)); + final long messageID = getFirstMessageId(queueControl); + + // Retry the message - i.e. it should go from DLQ to original Queue. + Assert.assertTrue(queueControl.retryMessage(messageID)); + + // Assert DLQ is empty... + Assert.assertEquals(0, getMessageCount(queueControl)); + + // .. and that the message is now on the original queue once more. + clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), "Put me on DLQ!"); + + clientConsumer.close(); + } + + /** + * Test retry multiple messages from DLQ to original queue. + */ + @Test + public void testRetryMultipleMessages() throws Exception { + final SimpleString dla = new SimpleString("DLA"); + final SimpleString qName = new SimpleString("q1"); + final SimpleString adName = new SimpleString("ad1"); + final SimpleString dlq = new SimpleString("DLQ1"); + final String sampleText = "Put me on DLQ"; + final int numMessagesToTest = 10; + + AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla); + server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings); + + session.createQueue(dla, dlq, null, false); + session.createQueue(adName, qName, null, false); + + // Send message to queue. + ClientProducer producer = session.createProducer(adName); + for (int i = 0; i < numMessagesToTest; i++) { + producer.send(createTextMessage(session, sampleText)); + } + + session.start(); + + // Read and rollback all messages to DLQ + ClientConsumer clientConsumer = session.createConsumer(qName); + for (int i = 0; i < numMessagesToTest; i++) { + ClientMessage clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText); + session.rollback(); + } + + Assert.assertNull(clientConsumer.receiveImmediate()); + + QueueControl dlqQueueControl = createManagementControl(dla, dlq); + Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl)); + + // Retry all messages - i.e. they should go from DLQ to original Queue. + Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages()); + + // Assert DLQ is empty... + Assert.assertEquals(0, getMessageCount(dlqQueueControl)); + + // .. and that the messages is now on the original queue once more. + for (int i = 0; i < numMessagesToTest; i++) { + ClientMessage clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText); + } + + clientConsumer.close(); + } + /** *
    *
  1. send a message to queue
  2. @@ -1930,4 +2042,10 @@ public class QueueControlTest extends ManagementTestBase { return queueControl; } + + protected long getFirstMessageId(final QueueControl queueControl) throws Exception { + Map[] messages = queueControl.listMessages(null); + long messageID = (Long) messages[0].get("messageID"); + return messageID; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index d296819cd4..2b75f12c6f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -229,6 +229,14 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Boolean) proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates); } + public boolean retryMessage(final long messageID) throws Exception { + return (Boolean) proxy.invokeOperation("retryMessage", messageID); + } + + public int retryMessages() throws Exception { + return (Integer) proxy.invokeOperation("retryMessages"); + } + public int removeMessages(final String filter) throws Exception { return (Integer) proxy.invokeOperation("removeMessages", filter); }