ARTEMIS-252 added jmx operations to retry messages

This commit is contained in:
Petter Nordlander 2015-10-10 16:58:04 +02:00 committed by Clebert Suconic
parent 78410bcbfe
commit 7afe87996b
4 changed files with 198 additions and 0 deletions

View File

@ -216,6 +216,28 @@ public interface QueueControl {
@Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION) @Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION)
boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception; boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
/**
* Retries the message corresponding to the given messageID to the original queue.
* This is appropriate on dead messages on Dead letter queues only.
*
* @param messageID
* @return {@code true} if the message was retried, {@code false} else
* @throws Exception
*/
@Operation(desc = "Retry the message corresponding to the given messageID to the original queue", impact = MBeanOperationInfo.ACTION)
boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
/**
* Retries all messages on a DLQ to their respective original queues.
* This is appropriate on dead messages on Dead letter queues only.
*
* @return the number of retried messages.
* @throws Exception
*/
@Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact = MBeanOperationInfo.ACTION)
int retryMessages() throws Exception;
/** /**
* Moves the message corresponding to the specified message ID to the specified other queue. * Moves the message corresponding to the specified message ID to the specified other queue.
* *

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -551,6 +552,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
} }
public boolean retryMessage(final long messageID) throws Exception {
checkStarted();
clearIO();
try {
MessageReference message = queue.getReference(messageID);
if ( message == null ) {
return false;
}
else {
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
if (originalAddress != null) {
return queue.moveReference(messageID, new SimpleString(originalAddress));
}
}
}
finally {
blockOnIO();
}
return false;
}
public int retryMessages() throws Exception {
checkStarted();
clearIO();
int retriedMessages = 0;
try {
Iterator<MessageReference> messageIterator = queue.totalIterator();
while (messageIterator.hasNext()) {
MessageReference message = messageIterator.next();
// Will only try messages with Message.HDR_ORIGINAL_ADDRESS set.
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
final long messageID = message.getMessage().getMessageID();
if ( originalAddress != null) {
if ( queue.moveReference(messageID, new SimpleString(originalAddress))) {
retriedMessages++;
}
}
}
}
finally {
blockOnIO();
}
return retriedMessages;
}
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception { public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception {
return moveMessage(messageID, otherQueueName, false); return moveMessage(messageID, otherQueueName, false);
} }

View File

@ -747,6 +747,118 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
/**
* Test retry - get a message from DLQ and put on original queue.
*/
@Test
public void testRetryMessage() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final SimpleString dlq = new SimpleString("DLQ1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
session.createQueue(dla, dlq, null, false);
session.createQueue(adName, qName, null, false);
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
producer.send(createTextMessage(session, sampleText));
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
QueueControl queueControl = createManagementControl(dla, dlq);
Assert.assertEquals(1, getMessageCount(queueControl));
final long messageID = getFirstMessageId(queueControl);
// Retry the message - i.e. it should go from DLQ to original Queue.
Assert.assertTrue(queueControl.retryMessage(messageID));
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(queueControl));
// .. and that the message is now on the original queue once more.
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), "Put me on DLQ!");
clientConsumer.close();
}
/**
* Test retry multiple messages from DLQ to original queue.
*/
@Test
public void testRetryMultipleMessages() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final SimpleString dlq = new SimpleString("DLQ1");
final String sampleText = "Put me on DLQ";
final int numMessagesToTest = 10;
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
session.createQueue(dla, dlq, null, false);
session.createQueue(adName, qName, null, false);
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
for (int i = 0; i < numMessagesToTest; i++) {
producer.send(createTextMessage(session, sampleText));
}
session.start();
// Read and rollback all messages to DLQ
ClientConsumer clientConsumer = session.createConsumer(qName);
for (int i = 0; i < numMessagesToTest; i++) {
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
session.rollback();
}
Assert.assertNull(clientConsumer.receiveImmediate());
QueueControl dlqQueueControl = createManagementControl(dla, dlq);
Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
// Retry all messages - i.e. they should go from DLQ to original Queue.
Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(dlqQueueControl));
// .. and that the messages is now on the original queue once more.
for (int i = 0; i < numMessagesToTest; i++) {
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
}
clientConsumer.close();
}
/** /**
* <ol> * <ol>
* <li>send a message to queue</li> * <li>send a message to queue</li>
@ -1930,4 +2042,10 @@ public class QueueControlTest extends ManagementTestBase {
return queueControl; return queueControl;
} }
protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
Map<String, Object>[] messages = queueControl.listMessages(null);
long messageID = (Long) messages[0].get("messageID");
return messageID;
}
} }

View File

@ -229,6 +229,14 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Boolean) proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates); return (Boolean) proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
} }
public boolean retryMessage(final long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("retryMessage", messageID);
}
public int retryMessages() throws Exception {
return (Integer) proxy.invokeOperation("retryMessages");
}
public int removeMessages(final String filter) throws Exception { public int removeMessages(final String filter) throws Exception {
return (Integer) proxy.invokeOperation("removeMessages", filter); return (Integer) proxy.invokeOperation("removeMessages", filter);
} }