ARTEMIS-252 Added support to retry messages via JMX on JMS Queue interface

This commit is contained in:
Petter Nordlander 2015-10-11 19:34:23 +02:00 committed by Clebert Suconic
parent 7afe87996b
commit 989172596e
4 changed files with 131 additions and 0 deletions

View File

@ -227,6 +227,27 @@ public interface JMSQueueControl extends DestinationControl {
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
/**
* Retries the message corresponding to the given messageID to the original queue.
* This is appropriate on dead messages on Dead letter queues only.
*
* @param messageID
* @return {@code true} if the message was retried, {@code false} else
* @throws Exception
*/
@Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION)
boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID) throws Exception;
/**
* Retries all messages on a DLQ to their respective original queues.
* This is appropriate on dead messages on Dead letter queues only.
*
* @return the number of retried messages.
* @throws Exception
*/
@Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION)
int retryMessages() throws Exception;
/**
* Lists the message counter for this queue.
*/

View File

@ -275,6 +275,25 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
return coreQueueControl.changeMessagesPriority(filter, newPriority);
}
public boolean retryMessage(final String jmsMessageID) throws Exception {
// Figure out messageID from JMSMessageID.
final String filter = createFilterForJMSMessageID(jmsMessageID);
Map<String,Object>[] messages = coreQueueControl.listMessages(filter);
if ( messages.length != 1) { // if no messages. There should not be more than one, JMSMessageID should be unique.
return false;
}
final Map<String,Object> messageToRedeliver = messages[0];
Long messageID = (Long)messageToRedeliver.get("messageID");
return messageID != null && coreQueueControl.retryMessage(messageID);
}
public int retryMessages() throws Exception {
return coreQueueControl.retryMessages();
}
public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception {
return moveMessage(messageID, otherQueueName, false);
}

View File

@ -792,6 +792,84 @@ public class JMSQueueControlTest extends ManagementTestBase {
connection.close();
}
protected ActiveMQQueue createDLQ(final String deadLetterQueueName) throws Exception {
serverManager.createQueue(false, deadLetterQueueName, null, true, deadLetterQueueName);
return (ActiveMQQueue) ActiveMQJMSClient.createQueue(deadLetterQueueName);
}
protected ActiveMQQueue createTestQueueWithDLQ(final String queueName, final ActiveMQQueue dlq) throws Exception {
serverManager.createQueue(false,queueName,null,true,queueName);
ActiveMQQueue testQueue = (ActiveMQQueue) ActiveMQJMSClient.createQueue(queueName);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
addressSettings.setMaxDeliveryAttempts(1);
server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings);
return testQueue;
}
/**
* Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
* @throws Exception
*/
@Test
public void testRetryMessages() throws Exception {
ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
final int numMessagesToTest = 10;
JMSUtil.sendMessages(testQueue, numMessagesToTest);
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(testQueue);
for (int i = 0;i < numMessagesToTest;i++) {
Message msg = consumer.receive(500L);
}
session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
JMSQueueControl testQueueControl = createManagementControl(testQueue);
JMSQueueControl dlqQueueControl = createManagementControl(dlq);
Assert.assertEquals(0, getMessageCount(testQueueControl));
Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
dlqQueueControl.retryMessages();
Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl));
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
}
/**
* Test retrying a specific message on DLQ.
* Expected to be sent back to original queue.
* @throws Exception
*/
@Test
public void testRetryMessage() throws Exception {
ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
String messageID = JMSUtil.sendMessages(testQueue,1)[0];
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(testQueue);
consumer.receive(500L);
session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
JMSQueueControl testQueueControl = createManagementControl(testQueue);
JMSQueueControl dlqQueueControl = createManagementControl(dlq);
Assert.assertEquals(0, getMessageCount(testQueueControl));
Assert.assertEquals(1,getMessageCount(dlqQueueControl));
dlqQueueControl.retryMessage(messageID);
Assert.assertEquals(1, getMessageCount(testQueueControl));
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
}
@Test
public void testMoveMessage() throws Exception {
String otherQueueName = RandomUtil.randomString();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.jms.server.management;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
@ -171,6 +172,18 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
return (String) proxy.invokeOperation("listMessageCounterHistory");
}
public boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("retryMessage",messageID);
}
public int retryMessages() throws Exception {
return (Integer) proxy.invokeOperation("retryMessages");
}
public boolean retryMessage(final String messageID) throws Exception {
return (Boolean) proxy.invokeOperation("retryMessage",messageID);
}
@Override
public Map<String, Object>[] listScheduledMessages() throws Exception {
return null;