This closes #1500
This commit is contained in:
commit
022d5ba853
|
@ -246,6 +246,14 @@ public interface QueueControl {
|
|||
int removeMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transactions during the operation to avoid OutOfMemory") int flushLimit,
|
||||
@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
|
||||
|
||||
/**
|
||||
* Removes all the message from the queue.
|
||||
*
|
||||
* @return the number of removed messages
|
||||
*/
|
||||
@Operation(desc = "Remove all the messages from the Queue (and returns the number of removed messages)", impact = MBeanOperationInfo.ACTION)
|
||||
int removeAllMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Expires all the message corresponding to the specified filter.
|
||||
* <br>
|
||||
|
|
|
@ -594,6 +594,11 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int removeAllMessages() throws Exception {
|
||||
return removeMessages(FLUSH_LIMIT, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expireMessage(final long messageID) throws Exception {
|
||||
checkStarted();
|
||||
|
|
|
@ -1270,6 +1270,29 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAllMessages() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
session.createQueue(address, queue, null, false);
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
|
||||
// send on queue
|
||||
producer.send(session.createMessage(false));
|
||||
producer.send(session.createMessage(false));
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(2, getMessageCount(queueControl));
|
||||
|
||||
// removed matching messages to otherQueue
|
||||
int removedMatchedMessagesCount = queueControl.removeAllMessages();
|
||||
Assert.assertEquals(2, removedMatchedMessagesCount);
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveMessagesWithEmptyFilter() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
|
|
@ -307,6 +307,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
|
|||
return (Integer) proxy.invokeOperation(Integer.class, "removeMessages", limit, filter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int removeAllMessages() throws Exception {
|
||||
return (Integer) proxy.invokeOperation( "removeAllMessages");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeMessage(final long messageID) throws Exception {
|
||||
return (Boolean) proxy.invokeOperation("removeMessage", messageID);
|
||||
|
|
Loading…
Reference in New Issue