This closes #193
This commit is contained in:
commit
8848c9681c
|
@ -216,6 +216,28 @@ public interface QueueControl {
|
|||
@Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION)
|
||||
boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
|
||||
|
||||
|
||||
/**
|
||||
* Retries the message corresponding to the given messageID to the original queue.
|
||||
* This is appropriate on dead messages on Dead letter queues only.
|
||||
*
|
||||
* @param messageID
|
||||
* @return {@code true} if the message was retried, {@code false} else
|
||||
* @throws Exception
|
||||
*/
|
||||
@Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION)
|
||||
boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
|
||||
|
||||
/**
|
||||
* Retries all messages on a DLQ to their respective original queues.
|
||||
* This is appropriate on dead messages on Dead letter queues only.
|
||||
*
|
||||
* @return the number of retried messages.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION)
|
||||
int retryMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Moves the message corresponding to the specified message ID to the specified other queue.
|
||||
*
|
||||
|
|
|
@ -227,6 +227,27 @@ public interface JMSQueueControl extends DestinationControl {
|
|||
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
|
||||
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
|
||||
|
||||
/**
|
||||
* Retries the message corresponding to the given messageID to the original queue.
|
||||
* This is appropriate on dead messages on Dead letter queues only.
|
||||
*
|
||||
* @param messageID
|
||||
* @return {@code true} if the message was retried, {@code false} else
|
||||
* @throws Exception
|
||||
*/
|
||||
@Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION)
|
||||
boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID) throws Exception;
|
||||
|
||||
/**
|
||||
* Retries all messages on a DLQ to their respective original queues.
|
||||
* This is appropriate on dead messages on Dead letter queues only.
|
||||
*
|
||||
* @return the number of retried messages.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION)
|
||||
int retryMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Lists the message counter for this queue.
|
||||
*/
|
||||
|
|
|
@ -275,6 +275,25 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
return coreQueueControl.changeMessagesPriority(filter, newPriority);
|
||||
}
|
||||
|
||||
public boolean retryMessage(final String jmsMessageID) throws Exception {
|
||||
|
||||
// Figure out messageID from JMSMessageID.
|
||||
final String filter = createFilterForJMSMessageID(jmsMessageID);
|
||||
Map<String,Object>[] messages = coreQueueControl.listMessages(filter);
|
||||
if ( messages.length != 1) { // if no messages. There should not be more than one, JMSMessageID should be unique.
|
||||
return false;
|
||||
}
|
||||
|
||||
final Map<String,Object> messageToRedeliver = messages[0];
|
||||
Long messageID = (Long)messageToRedeliver.get("messageID");
|
||||
return messageID != null && coreQueueControl.retryMessage(messageID);
|
||||
}
|
||||
|
||||
public int retryMessages() throws Exception {
|
||||
return coreQueueControl.retryMessages();
|
||||
}
|
||||
|
||||
|
||||
public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception {
|
||||
return moveMessage(messageID, otherQueueName, false);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -551,6 +552,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean retryMessage(final long messageID) throws Exception {
|
||||
|
||||
checkStarted();
|
||||
clearIO();
|
||||
|
||||
try {
|
||||
MessageReference message = queue.getReference(messageID);
|
||||
if ( message == null ) {
|
||||
return false;
|
||||
}
|
||||
else {
|
||||
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
|
||||
if (originalAddress != null) {
|
||||
return queue.moveReference(messageID, new SimpleString(originalAddress));
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public int retryMessages() throws Exception {
|
||||
checkStarted();
|
||||
clearIO();
|
||||
|
||||
int retriedMessages = 0;
|
||||
try {
|
||||
Iterator<MessageReference> messageIterator = queue.totalIterator();
|
||||
while (messageIterator.hasNext()) {
|
||||
MessageReference message = messageIterator.next();
|
||||
// Will only try messages with Message.HDR_ORIGINAL_ADDRESS set.
|
||||
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
|
||||
final long messageID = message.getMessage().getMessageID();
|
||||
if ( originalAddress != null) {
|
||||
if ( queue.moveReference(messageID, new SimpleString(originalAddress))) {
|
||||
retriedMessages++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
blockOnIO();
|
||||
}
|
||||
return retriedMessages;
|
||||
}
|
||||
|
||||
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception {
|
||||
return moveMessage(messageID, otherQueueName, false);
|
||||
}
|
||||
|
|
|
@ -792,6 +792,84 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
protected ActiveMQQueue createDLQ(final String deadLetterQueueName) throws Exception {
|
||||
serverManager.createQueue(false, deadLetterQueueName, null, true, deadLetterQueueName);
|
||||
return (ActiveMQQueue) ActiveMQJMSClient.createQueue(deadLetterQueueName);
|
||||
}
|
||||
|
||||
protected ActiveMQQueue createTestQueueWithDLQ(final String queueName, final ActiveMQQueue dlq) throws Exception {
|
||||
serverManager.createQueue(false,queueName,null,true,queueName);
|
||||
ActiveMQQueue testQueue = (ActiveMQQueue) ActiveMQJMSClient.createQueue(queueName);
|
||||
AddressSettings addressSettings = new AddressSettings();
|
||||
addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
|
||||
addressSettings.setMaxDeliveryAttempts(1);
|
||||
server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings);
|
||||
return testQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRetryMessages() throws Exception {
|
||||
ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
|
||||
ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
|
||||
|
||||
final int numMessagesToTest = 10;
|
||||
JMSUtil.sendMessages(testQueue, numMessagesToTest);
|
||||
|
||||
Connection connection = createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(testQueue);
|
||||
for (int i = 0;i < numMessagesToTest;i++) {
|
||||
Message msg = consumer.receive(500L);
|
||||
}
|
||||
session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
|
||||
|
||||
JMSQueueControl testQueueControl = createManagementControl(testQueue);
|
||||
JMSQueueControl dlqQueueControl = createManagementControl(dlq);
|
||||
Assert.assertEquals(0, getMessageCount(testQueueControl));
|
||||
Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
|
||||
|
||||
dlqQueueControl.retryMessages();
|
||||
|
||||
Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl));
|
||||
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retrying a specific message on DLQ.
|
||||
* Expected to be sent back to original queue.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRetryMessage() throws Exception {
|
||||
ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
|
||||
ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
|
||||
String messageID = JMSUtil.sendMessages(testQueue,1)[0];
|
||||
|
||||
Connection connection = createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(testQueue);
|
||||
consumer.receive(500L);
|
||||
session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
|
||||
|
||||
JMSQueueControl testQueueControl = createManagementControl(testQueue);
|
||||
JMSQueueControl dlqQueueControl = createManagementControl(dlq);
|
||||
Assert.assertEquals(0, getMessageCount(testQueueControl));
|
||||
Assert.assertEquals(1,getMessageCount(dlqQueueControl));
|
||||
|
||||
dlqQueueControl.retryMessage(messageID);
|
||||
|
||||
Assert.assertEquals(1, getMessageCount(testQueueControl));
|
||||
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveMessage() throws Exception {
|
||||
String otherQueueName = RandomUtil.randomString();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.tests.integration.jms.server.management;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.management.Parameter;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
|
||||
|
@ -171,6 +172,18 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
|
|||
return (String) proxy.invokeOperation("listMessageCounterHistory");
|
||||
}
|
||||
|
||||
public boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception {
|
||||
return (Boolean) proxy.invokeOperation("retryMessage",messageID);
|
||||
}
|
||||
|
||||
public int retryMessages() throws Exception {
|
||||
return (Integer) proxy.invokeOperation("retryMessages");
|
||||
}
|
||||
|
||||
public boolean retryMessage(final String messageID) throws Exception {
|
||||
return (Boolean) proxy.invokeOperation("retryMessage",messageID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object>[] listScheduledMessages() throws Exception {
|
||||
return null;
|
||||
|
|
|
@ -747,6 +747,118 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retry - get a message from DLQ and put on original queue.
|
||||
*/
|
||||
@Test
|
||||
public void testRetryMessage() throws Exception {
|
||||
final SimpleString dla = new SimpleString("DLA");
|
||||
final SimpleString qName = new SimpleString("q1");
|
||||
final SimpleString adName = new SimpleString("ad1");
|
||||
final SimpleString dlq = new SimpleString("DLQ1");
|
||||
final String sampleText = "Put me on DLQ";
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
|
||||
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
|
||||
|
||||
session.createQueue(dla, dlq, null, false);
|
||||
session.createQueue(adName, qName, null, false);
|
||||
|
||||
// Send message to queue.
|
||||
ClientProducer producer = session.createProducer(adName);
|
||||
producer.send(createTextMessage(session, sampleText));
|
||||
session.start();
|
||||
|
||||
ClientConsumer clientConsumer = session.createConsumer(qName);
|
||||
ClientMessage clientMessage = clientConsumer.receive(500);
|
||||
clientMessage.acknowledge();
|
||||
Assert.assertNotNull(clientMessage);
|
||||
|
||||
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
|
||||
|
||||
// force a rollback to DLQ
|
||||
session.rollback();
|
||||
clientMessage = clientConsumer.receiveImmediate();
|
||||
Assert.assertNull(clientMessage);
|
||||
|
||||
QueueControl queueControl = createManagementControl(dla, dlq);
|
||||
Assert.assertEquals(1, getMessageCount(queueControl));
|
||||
final long messageID = getFirstMessageId(queueControl);
|
||||
|
||||
// Retry the message - i.e. it should go from DLQ to original Queue.
|
||||
Assert.assertTrue(queueControl.retryMessage(messageID));
|
||||
|
||||
// Assert DLQ is empty...
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
// .. and that the message is now on the original queue once more.
|
||||
clientMessage = clientConsumer.receive(500);
|
||||
clientMessage.acknowledge();
|
||||
Assert.assertNotNull(clientMessage);
|
||||
|
||||
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), "Put me on DLQ!");
|
||||
|
||||
clientConsumer.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test retry multiple messages from DLQ to original queue.
|
||||
*/
|
||||
@Test
|
||||
public void testRetryMultipleMessages() throws Exception {
|
||||
final SimpleString dla = new SimpleString("DLA");
|
||||
final SimpleString qName = new SimpleString("q1");
|
||||
final SimpleString adName = new SimpleString("ad1");
|
||||
final SimpleString dlq = new SimpleString("DLQ1");
|
||||
final String sampleText = "Put me on DLQ";
|
||||
final int numMessagesToTest = 10;
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
|
||||
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
|
||||
|
||||
session.createQueue(dla, dlq, null, false);
|
||||
session.createQueue(adName, qName, null, false);
|
||||
|
||||
// Send message to queue.
|
||||
ClientProducer producer = session.createProducer(adName);
|
||||
for (int i = 0; i < numMessagesToTest; i++) {
|
||||
producer.send(createTextMessage(session, sampleText));
|
||||
}
|
||||
|
||||
session.start();
|
||||
|
||||
// Read and rollback all messages to DLQ
|
||||
ClientConsumer clientConsumer = session.createConsumer(qName);
|
||||
for (int i = 0; i < numMessagesToTest; i++) {
|
||||
ClientMessage clientMessage = clientConsumer.receive(500);
|
||||
clientMessage.acknowledge();
|
||||
Assert.assertNotNull(clientMessage);
|
||||
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
|
||||
session.rollback();
|
||||
}
|
||||
|
||||
Assert.assertNull(clientConsumer.receiveImmediate());
|
||||
|
||||
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
|
||||
Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
|
||||
|
||||
// Retry all messages - i.e. they should go from DLQ to original Queue.
|
||||
Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());
|
||||
|
||||
// Assert DLQ is empty...
|
||||
Assert.assertEquals(0, getMessageCount(dlqQueueControl));
|
||||
|
||||
// .. and that the messages is now on the original queue once more.
|
||||
for (int i = 0; i < numMessagesToTest; i++) {
|
||||
ClientMessage clientMessage = clientConsumer.receive(500);
|
||||
clientMessage.acknowledge();
|
||||
Assert.assertNotNull(clientMessage);
|
||||
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
|
||||
}
|
||||
|
||||
clientConsumer.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* <ol>
|
||||
* <li>send a message to queue</li>
|
||||
|
@ -1930,4 +2042,10 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
|
||||
return queueControl;
|
||||
}
|
||||
|
||||
protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
|
||||
Map<String, Object>[] messages = queueControl.listMessages(null);
|
||||
long messageID = (Long) messages[0].get("messageID");
|
||||
return messageID;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,6 +229,14 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
return (Boolean) proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
|
||||
}
|
||||
|
||||
public boolean retryMessage(final long messageID) throws Exception {
|
||||
return (Boolean) proxy.invokeOperation("retryMessage", messageID);
|
||||
}
|
||||
|
||||
public int retryMessages() throws Exception {
|
||||
return (Integer) proxy.invokeOperation("retryMessages");
|
||||
}
|
||||
|
||||
public int removeMessages(final String filter) throws Exception {
|
||||
return (Integer) proxy.invokeOperation("removeMessages", filter);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue