diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 1260169d00..cb362eca5a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,11 +36,12 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.utils.LinkedListIterator; @@ -558,16 +558,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - MessageReference message = queue.getReference(messageID); - if ( message == null ) { - return false; - } - else { - final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS); - if (originalAddress != null) { - return queue.moveReference(messageID, new SimpleString(originalAddress)); + Filter singleMessageFilter = new Filter() { + @Override + public boolean match(ServerMessage message) { + return message.getMessageID() == messageID; } - } + + @Override + public SimpleString getFilterString() { + return new SimpleString("custom filter for MESSAGEID= messageID"); + } + }; + + queue.retryMessages(singleMessageFilter); } finally { blockOnIO(); @@ -580,25 +583,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { checkStarted(); clearIO(); - int retriedMessages = 0; try { - Iterator messageIterator = queue.totalIterator(); - while (messageIterator.hasNext()) { - MessageReference message = messageIterator.next(); - // Will only try messages with Message.HDR_ORIGINAL_ADDRESS set. - final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS); - final long messageID = message.getMessage().getMessageID(); - if ( originalAddress != null) { - if ( queue.moveReference(messageID, new SimpleString(originalAddress))) { - retriedMessages++; - } - } - } + return queue.retryMessages(null); } finally { blockOnIO(); } - return retriedMessages; } public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index ba12a55757..f5a19a851b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -164,6 +164,8 @@ public interface Queue extends Bindable { SimpleString toAddress, boolean rejectDuplicates) throws Exception; + int retryMessages(Filter filter) throws Exception; + void addRedistributor(long delay); void cancelRedistributor() throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2ac5c8aa41..95a7ba6927 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -1525,6 +1526,50 @@ public class QueueImpl implements Queue { }); } + public int retryMessages(Filter filter) throws Exception { + + final HashMap queues = new HashMap<>(); + + return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { + @Override + public void actMessage(Transaction tx, MessageReference ref) throws Exception { + + SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS); + SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE); + + if (originalMessageAddress != null) { + + incDelivering(); + + Long targetQueue = null; + if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { + targetQueue = queues.get(originalMessageQueue); + if (targetQueue == null) { + Binding binding = postOffice.getBinding(originalMessageQueue); + + if (binding != null && binding instanceof LocalQueueBinding) { + targetQueue = ((LocalQueueBinding)binding).getID(); + queues.put(originalMessageQueue, targetQueue); + } + } + } + + if (targetQueue != null) { + move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue()); + } + else { + move(originalMessageAddress, tx, ref, false, false); + + } + + + } + } + }); + + + } + public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { LinkedListIterator iter = iterator(); @@ -2057,11 +2102,20 @@ public class QueueImpl implements Queue { final Transaction tx, final MessageReference ref, final boolean expiry, - final boolean rejectDuplicate) throws Exception { + final boolean rejectDuplicate, + final long ... queueIDs) throws Exception { ServerMessage copyMessage = makeCopy(ref, expiry); copyMessage.setAddress(toAddress); + if (queueIDs != null && queueIDs.length > 0) { + ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length); + for (long id : queueIDs) { + buffer.putLong(id); + } + copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array()); + } + postOffice.route(copyMessage, null, tx, false, rejectDuplicate); acknowledge(tx, ref); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index d42c89df64..b18be4ac27 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -907,6 +907,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public int retryMessages(Filter filter) throws Exception { + return 0; + } + @Override public int getConsumerCount() { return 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java index b5183ca047..ac9b28eee0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java @@ -16,6 +16,20 @@ */ package org.apache.activemq.artemis.tests.integration.jms.server.management; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.Notification; +import javax.naming.Context; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -39,6 +53,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; +import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.jms.server.management.JMSNotificationType; import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; @@ -51,20 +66,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.Notification; -import javax.naming.Context; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * A QueueControlTest *
@@ -808,6 +809,16 @@ public class JMSQueueControlTest extends ManagementTestBase { return testQueue; } + protected ActiveMQTopic createTestTopicWithDLQ(final String queueName, final ActiveMQQueue dlq) throws Exception { + serverManager.createTopic(false, queueName); + ActiveMQTopic testQueue = (ActiveMQTopic) ActiveMQJMSClient.createTopic(queueName); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress())); + addressSettings.setMaxDeliveryAttempts(1); + server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings); + return testQueue; + } + /** * Test retrying all messages put on DLQ - i.e. they should appear on the original queue. * @throws Exception @@ -834,10 +845,64 @@ public class JMSQueueControlTest extends ManagementTestBase { Assert.assertEquals(0, getMessageCount(testQueueControl)); Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl)); + Assert.assertEquals(10,getMessageCount(dlqQueueControl)); + dlqQueueControl.retryMessages(); Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl)); Assert.assertEquals(0,getMessageCount(dlqQueueControl)); + + connection.close(); + } + + /** + * Test retrying all messages put on DLQ - i.e. they should appear on the original queue. + * @throws Exception + */ + @Test + public void testRetryMessagesOnTopic() throws Exception { + ActiveMQQueue dlq = createDLQ(RandomUtil.randomString()); + ActiveMQTopic testTopic = createTestTopicWithDLQ(RandomUtil.randomString(), dlq); + + Connection connectionConsume = createConnection(); + connectionConsume.setClientID("ID"); + Session sessionConsume = connectionConsume.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons1 = sessionConsume.createDurableSubscriber(testTopic, "sub1"); + MessageConsumer cons2 = sessionConsume.createDurableSubscriber(testTopic, "sub2"); + + + final int numMessagesToTest = 10; + JMSUtil.sendMessages(testTopic, numMessagesToTest); + + + connectionConsume.start(); + for (int i = 0; i < numMessagesToTest; i++) { + Assert.assertNotNull(cons1.receive(500)); + } + sessionConsume.commit(); + + Assert.assertNull(cons1.receiveNoWait()); + + connectionConsume.start(); + for (int i = 0; i < numMessagesToTest; i++) { + cons2.receive(500); + } + sessionConsume.rollback(); + Assert.assertNull(cons2.receiveNoWait()); + + JMSQueueControl dlqQueueControl = createManagementControl(dlq); + dlqQueueControl.retryMessages(); + + Assert.assertNull("Retry is sending back to cons1 even though it succeeded", cons1.receiveNoWait()); + + for (int i = 0; i < numMessagesToTest; i++) { + Assert.assertNotNull(cons2.receive(500)); + } + sessionConsume.commit(); + Assert.assertNull(cons1.receiveNoWait()); + + connectionConsume.close(); + } /** diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 970e89d260..4aefc9a55f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -49,6 +49,11 @@ public class FakeQueue implements Queue { } + @Override + public int retryMessages(Filter filter) throws Exception { + return 0; + } + @Override public void setConsumersRefCount(ReferenceCounter referenceCounter) {