ARTEMIS-2843 non-destructive LVQ not delivering msg to consumer

This commit is contained in:
Clebert Suconic 2020-07-10 12:12:09 -04:00
parent 7be77169c2
commit ccc0fa7100
3 changed files with 95 additions and 2 deletions

View File

@ -154,6 +154,21 @@ public class LastValueQueue extends QueueImpl {
replaceLVQMessage(ref, hr);
if (isNonDestructive() && hr.isDelivered()) {
hr.resetDelivered();
// --------------------------------------------------------------------------------
// If non Destructive, and if a reference was previously delivered
// we would not be able to receive this message again
// unless we reset the iterators
// The message is not removed, so we can't actually remove it
// a result of this operation is that previously delivered messages
// will probably be delivered again.
// if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
// operation on the iterator for a single message
resetAllIterators();
deliverAsync();
}
} else {
hr = new HolderReference(prop, ref);
@ -166,6 +181,18 @@ public class LastValueQueue extends QueueImpl {
}
}
@Override
public long getMessageCount() {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
} else {
return (long) pendingMetrics.getMessageCount() + getScheduledCount();
}
}
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
@ -301,12 +328,23 @@ public class LastValueQueue extends QueueImpl {
private final SimpleString prop;
private volatile boolean delivered = false;
private volatile MessageReference ref;
private long consumerID;
private boolean hasConsumerID = false;
public void resetDelivered() {
delivered = false;
}
public boolean isDelivered() {
return delivered;
}
HolderReference(final SimpleString prop, final MessageReference ref) {
this.prop = prop;
@ -324,6 +362,7 @@ public class LastValueQueue extends QueueImpl {
@Override
public void handled() {
delivered = true;
// We need to remove the entry from the map just before it gets delivered
ref.handled();
if (!ref.getQueue().isNonDestructive()) {

View File

@ -170,7 +170,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final PagingStore pagingStore;
private final PageSubscription pageSubscription;
protected final PageSubscription pageSubscription;
private ReferenceCounter refCountForConsumers;
@ -192,7 +192,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
private final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
protected final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
private final QueueMessageMetrics deliveringMetrics = new QueueMessageMetrics(this, "delivering");

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;
@ -48,6 +49,59 @@ public class LVQTest extends JMSTestBase {
return cf;
}
@Test
public void testLVQandNonDestructive() throws Exception {
ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();
fact.setConsumerWindowSize(0);
try (Connection connection = fact.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
// swapping these two lines makes the test either succeed for fail
// Queue queue = session.createQueue("random?last-value=true");
Queue queue = session.createQueue("random?last-value=true&non-destructive=true");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("Message 1");
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
producer.send(message);
TextMessage tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
tm.acknowledge();
Thread.sleep(1000);
assertEquals("Message 1", tm.getText());
message = session.createTextMessage();
message.setText("Message 2");
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "A");
producer.send(message);
tm = (TextMessage) consumer.receive(2000);
assertNotNull(tm);
assertEquals("Message 2", tm.getText());
// It is important to query here
// as we shouldn't rely on addHead after the consumer is closed
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
Wait.assertEquals(1, serverQueue::getMessageCount);
}
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
Wait.assertEquals(1, serverQueue::getMessageCount);
serverQueue.deleteMatchingReferences(null);
// This should be removed all
assertEquals(0, serverQueue.getMessageCount());
}
@Test
public void testLastValueQueueUsingAddressQueueParameters() throws Exception {
ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();