This commit is contained in:
Clebert Suconic 2021-01-27 20:41:33 -05:00
commit 2cf8d5c181
2 changed files with 32 additions and 5 deletions

View File

@ -3919,10 +3919,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return;
}
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
if (!isNonDestructive()) {
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@ -329,6 +328,30 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
}
@Test
public void testMessageCount() throws Exception {
sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(NON_DESTRUCTIVE_QUEUE_NAME));
assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount());
//Consume Once
receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount());
sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount());
//Consume Again as should be non-destructive
receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount());
QueueControl control = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + NON_DESTRUCTIVE_QUEUE_NAME);
control.removeAllMessages();
assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount());
}
private void receive(ConnectionSupplier consumerConnectionSupplier, String queueName, int i) throws JMSException {
try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {