ARTEMIS-3890 - rework LVQ implementation to ensure all messages get delivered, replacement of lvq now tied to the deliver loop. Fix issue with duplicates - bug in LinkedListImpl`

This commit is contained in:
Gary Tully 2022-06-07 10:49:36 -05:00 committed by clebertsuconic
parent 91f1d37266
commit 4a4765c39c
11 changed files with 192 additions and 448 deletions

View File

@ -538,6 +538,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
current = current.prev;
current.iterCount++;
if (last == node) {
last = current;
}
} else {
current = null;
}

View File

@ -30,7 +30,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");
protected LinkedListImpl<E>[] levels;
protected final LinkedListImpl<E>[] levels;
private volatile int size;

View File

@ -17,16 +17,11 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -38,13 +33,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.jboss.logging.Logger;
/**
@ -59,25 +54,9 @@ import org.jboss.logging.Logger;
public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class);
private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
private final Map<SimpleString, MessageReference> map = new HashMap<>();
private final SimpleString lastValueKey;
// only use this within synchronized methods or synchronized(this) blocks
protected final LinkedList<MessageReference> nextDeliveries = new LinkedList<>();
/* in certain cases we need to redeliver a message */
@Override
protected MessageReference nextDelivery() {
return nextDeliveries.poll();
}
@Override
protected void repeatNextDelivery(MessageReference reference) {
// put the ref back onto the head of the list so that the next time poll() is called this ref is returned
nextDeliveries.addFirst(reference);
}
@Deprecated
public LastValueQueue(final long persistenceID,
@ -162,116 +141,49 @@ public class LastValueQueue extends QueueImpl {
@Override
public synchronized void addTail(final MessageReference ref, final boolean direct) {
if (scheduleIfPossible(ref)) {
return;
}
final SimpleString prop = ref.getLastValueProperty();
if (prop != null) {
HolderReference hr = map.get(prop);
if (hr != null) {
if (isNonDestructive() && hr.isInDelivery()) {
// if the ref is already being delivered we'll do the replace in the postAcknowledge
hr.setReplacementRef(ref);
} else {
// We need to overwrite the old ref with the new one and ack the old one
replaceLVQMessage(ref, hr);
if (isNonDestructive() && hr.isDelivered()) {
hr.resetDelivered();
// since we're replacing a ref that was already delivered we want to trigger a delivery for this new replacement
nextDeliveries.add(hr);
deliverAsync();
}
}
} else {
hr = new HolderReference(prop, ref);
map.put(prop, hr);
super.addTail(hr, isNonDestructive() ? false : direct);
}
} else {
if (!scheduleIfPossible(ref)) {
trackLastValue(ref);
super.addTail(ref, isNonDestructive() ? false : direct);
}
}
@Override
public void addHead(final MessageReference ref, boolean scheduling) {
if (scheduling) {
// track last value when scheduled message is actually enqueued
trackLastValue(ref);
} else if (isNonDestructive() == false) {
// for released messages from a consumer or tx that have been destroyed,
// use as a last value in the absence of any newer value, it may be stale
trackLastValueIfAbsent(ref);
}
super.addHead(ref, scheduling);
}
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
addHead(ref, scheduling);
}
private void trackLastValue(MessageReference ref) {
final SimpleString lastValueProperty = ref.getLastValueProperty();
if (lastValueProperty != null) {
map.put(lastValueProperty, ref);
}
}
private void trackLastValueIfAbsent(MessageReference ref) {
final SimpleString lastValueProperty = ref.getLastValueProperty();
if (lastValueProperty != null) {
map.putIfAbsent(lastValueProperty, ref);
}
}
@Override
public long getMessageCount() {
if (pageSubscription != null) {
// messageReferences will have depaged messages which we need to discount from the counter as they are
// counted on the pageSubscription as well
return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount();
} else {
return (long) pendingMetrics.getMessageCount() + getScheduledCount();
}
}
/** LVQ has to use regular addHead due to last value queues calculations */
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
this.addHead(ref, scheduling);
}
/** LVQ has to use regular addHead due to last value queues calculations */
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
this.addHead(refs, scheduling);
}
@Override
public synchronized void addHead(final MessageReference ref, boolean scheduling) {
// we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
return;
}
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
HolderReference hr = map.get(lastValueProp);
if (hr != null) {
if (scheduling) {
// We need to overwrite the old ref with the new one and ack the old one
replaceLVQMessage(ref, hr);
} else {
// We keep the current ref and ack the one we are returning
super.referenceHandled(ref);
try {
super.acknowledge(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
}
} else {
hr = new HolderReference(lastValueProp, ref);
map.put(lastValueProp, hr);
super.addHead(hr, scheduling);
}
} else {
super.addHead(ref, scheduling);
}
}
@Override
public void postAcknowledge(final MessageReference ref, AckReason reason) {
if (isNonDestructive()) {
if (ref instanceof HolderReference) {
HolderReference hr = (HolderReference) ref;
if (hr.getReplacementRef() != null) {
replaceLVQMessage(hr.getReplacementRef(), hr);
}
}
}
super.postAcknowledge(ref, reason);
// with LV - delivered messages can remain on the queue so the delivering count
// count must be discounted else we are accounting the same message more than once
return super.getMessageCount() - getDeliveringCount();
}
@Override
@ -284,21 +196,36 @@ public class LastValueQueue extends QueueImpl {
return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
}
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
referenceHandled(oldRef);
super.refRemoved(oldRef);
try {
oldRef.acknowledge(null, AckReason.REPLACED, null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
@Override
protected void pruneLastValues() {
// called with synchronized(this) from super.deliver()
try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (!currentLastValue(ref)) {
iter.remove();
try {
referenceHandled(ref);
super.refRemoved(ref);
ref.acknowledge(null, AckReason.REPLACED, null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
}
}
}
}
}
hr.setReference(ref);
addRefSize(ref);
refAdded(ref);
private boolean currentLastValue(final MessageReference ref) {
boolean currentLastValue = false;
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
MessageReference current = map.get(lastValueProp);
if (current == ref) {
currentLastValue = true;
}
}
return currentLastValue;
}
@Override
@ -327,16 +254,9 @@ public class LastValueQueue extends QueueImpl {
}
@Override
public synchronized void reload(final MessageReference ref) {
// repopulate LVQ map & reload proper HolderReferences
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
HolderReference hr = new HolderReference(lastValueProp, ref);
map.put(lastValueProp, hr);
super.reload(hr);
} else {
super.reload(ref);
}
public synchronized void reload(final MessageReference newRef) {
trackLastValue(newRef);
super.reload(newRef);
}
private synchronized void removeIfCurrent(MessageReference ref) {
@ -361,9 +281,6 @@ public class LastValueQueue extends QueueImpl {
};
}
@Override
public boolean isLastValue() {
return true;
@ -378,238 +295,6 @@ public class LastValueQueue extends QueueImpl {
return Collections.unmodifiableSet(map.keySet());
}
private static class HolderReference implements MessageReference {
private final SimpleString prop;
private volatile boolean delivered = false;
private volatile MessageReference ref;
private volatile MessageReference replacementRef;
private long consumerID;
private boolean hasConsumerID = false;
public MessageReference getReplacementRef() {
return replacementRef;
}
public void setReplacementRef(MessageReference replacementRef) {
this.replacementRef = replacementRef;
}
public void resetDelivered() {
delivered = false;
}
public boolean isDelivered() {
return delivered;
}
HolderReference(final SimpleString prop, final MessageReference ref) {
this.prop = prop;
this.ref = ref;
}
@Override
public void onDelivery(Consumer<? super MessageReference> callback) {
// HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
}
MessageReference getReference() {
return ref;
}
@Override
public void handled() {
delivered = true;
// We need to remove the entry from the map just before it gets delivered
ref.handled();
if (!ref.getQueue().isNonDestructive()) {
((LastValueQueue) ref.getQueue()).removeIfCurrent(this);
}
}
@Override
public void setInDelivery(boolean inDelivery) {
ref.setInDelivery(inDelivery);
}
@Override
public boolean isInDelivery() {
return ref.isInDelivery();
}
@Override
public Object getProtocolData() {
return ref.getProtocolData();
}
@Override
public void setProtocolData(Object data) {
ref.setProtocolData(data);
}
@Override
public void setAlreadyAcked() {
ref.setAlreadyAcked();
}
@Override
public boolean isAlreadyAcked() {
return ref.isAlreadyAcked();
}
void setReference(final MessageReference ref) {
this.ref = ref;
}
@Override
public MessageReference copy(final Queue queue) {
return ref.copy(queue);
}
@Override
public void decrementDeliveryCount() {
ref.decrementDeliveryCount();
}
@Override
public int getDeliveryCount() {
return ref.getDeliveryCount();
}
@Override
public Message getMessage() {
return ref.getMessage();
}
@Override
public long getMessageID() {
return ref.getMessageID();
}
@Override
public boolean isDurable() {
return getMessage().isDurable();
}
@Override
public SimpleString getLastValueProperty() {
return prop;
}
@Override
public Queue getQueue() {
return ref.getQueue();
}
@Override
public long getScheduledDeliveryTime() {
return ref.getScheduledDeliveryTime();
}
@Override
public void incrementDeliveryCount() {
ref.incrementDeliveryCount();
}
@Override
public void setDeliveryCount(final int deliveryCount) {
ref.setDeliveryCount(deliveryCount);
}
@Override
public void setScheduledDeliveryTime(final long scheduledDeliveryTime) {
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
@Override
public void acknowledge(Transaction tx) throws Exception {
ref.acknowledge(tx);
}
@Override
public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
ref.acknowledge(tx, consumer);
}
@Override
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
ref.acknowledge(tx, reason, consumer);
}
@Override
public void setPersistedCount(int count) {
ref.setPersistedCount(count);
}
@Override
public int getPersistedCount() {
return ref.getPersistedCount();
}
@Override
public boolean isPaged() {
return false;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#acknowledge(org.apache.activemq.artemis.core.server.MessageReference)
*/
@Override
public void acknowledge() throws Exception {
ref.getQueue().acknowledge(this);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.server.MessageReference#getMessageMemoryEstimate()
*/
@Override
public int getMessageMemoryEstimate() {
return ref.getMessage().getMemoryEstimate();
}
@Override
public void emptyConsumerID() {
this.hasConsumerID = false;
}
@Override
public void setConsumerId(long consumerID) {
this.hasConsumerID = true;
this.consumerID = consumerID;
}
@Override
public boolean hasConsumerId() {
return hasConsumerID;
}
@Override
public long getConsumerId() {
if (!this.hasConsumerID) {
throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first");
}
return this.consumerID;
}
@Override
public long getPersistentSize() throws ActiveMQException {
return ref.getPersistentSize();
}
@Override
public String toString() {
return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
}
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -49,7 +49,7 @@ public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsume
private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
private UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
private final UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
//-- START :: ResettableIterator Methods
// As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time.

View File

@ -194,7 +194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
// This is where messages are stored
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
private NodeStore<MessageReference> nodeStore;
@ -340,16 +340,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile long ringSize;
/* in certain cases we need to redeliver a message directly.
* it's useful for usecases last LastValueQueue */
protected MessageReference nextDelivery() {
return null;
}
protected void repeatNextDelivery(MessageReference reference) {
}
@Override
public boolean isSwept() {
return swept;
@ -2895,7 +2885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private synchronized void doInternalPoll() {
synchronized void doInternalPoll() {
int added = 0;
MessageReference ref;
@ -2974,27 +2964,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
consumers.reset();
while (true) {
if (handled == MAX_DELIVERIES_IN_LOOP) {
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
// long
if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) {
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
deliverAsync(true);
return false;
}
if (System.nanoTime() - timeout > 0) {
if (logger.isTraceEnabled()) {
logger.trace("delivery has been running for too long. Scheduling another delivery task now");
}
deliverAsync(true);
return false;
}
MessageReference ref;
Consumer handledconsumer = null;
synchronized (this) {
@ -3024,6 +3000,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumers.hasNext()) {
holder = consumers.next();
} else {
pruneLastValues();
break;
}
@ -3034,15 +3011,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
holder.iter = messageReferences.iterator();
}
// LVQ support
ref = nextDelivery();
boolean nextDelivery = false;
if (ref != null) {
nextDelivery = true;
}
if (ref == null && holder.iter.hasNext()) {
if (holder.iter.hasNext()) {
ref = holder.iter.next();
} else {
ref = null;
}
if (ref == null) {
@ -3092,18 +3064,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
if (nextDelivery) {
repeatNextDelivery(ref);
} else {
try {
holder.iter.repeat();
} catch (NoSuchElementException e) {
// this could happen if there was an exception on the queue handling
// and it returned BUSY because of that exception
//
// We will just log it as there's nothing else we can do now.
logger.warn(e.getMessage(), e);
}
try {
holder.iter.repeat();
} catch (NoSuchElementException e) {
// this could happen if there was an exception on the queue handling
// and it returned BUSY because of that exception
//
// We will just log it as there's nothing else we can do now.
logger.warn(e.getMessage(), e);
}
noDelivery++;
@ -3130,6 +3098,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Round robin'd all
if (noDelivery == this.consumers.size()) {
pruneLastValues();
if (handledconsumer != null) {
// this shouldn't really happen,
// however I'm keeping this as an assertion case future developers ever change the logic here on this class
@ -3144,6 +3114,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
noDelivery = 0;
}
}
if (handledconsumer != null) {
@ -3154,6 +3125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
// called with 'this' locked
protected void pruneLastValues() {
// interception point for LVQ
}
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
holder.iter.remove();
refRemoved(ref);

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Test;
public class JMSLVQTest extends JMSClientTestSupport {
@ -178,4 +179,35 @@ public class JMSLVQTest extends JMSClientTestSupport {
p.send(queue1, message2);
}
}
@Test
public void testNonDestructiveWithSelector() throws Exception {
final String MY_QUEUE = RandomUtil.randomString();
final boolean NON_DESTRUCTIVE = true;
server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST).setNonDestructive(NON_DESTRUCTIVE).setLastValue(true));
ConnectionSupplier connectionSupplier = CoreConnection;
Connection consumerConnection1 = connectionSupplier.createConnection();
Session consumerSession1 = consumerConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue myQueue = consumerSession1.createQueue(MY_QUEUE);
MessageConsumer consumer1 = consumerSession1.createConsumer(myQueue);
consumerConnection1.start();
Connection consumerConnection2 = connectionSupplier.createConnection();
Session consumerSession2 = consumerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
myQueue = consumerSession2.createQueue(MY_QUEUE);
MessageConsumer consumer2 = consumerSession2.createConsumer(myQueue, "foo='bar'");
Connection producerConnection = connectionSupplier.createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = producerSession.createProducer(myQueue);
for (int i = 0; i < 1000; i++) {
TextMessage m = producerSession.createTextMessage();
m.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "abc");
p.send(m);
assertNotNull(consumer1.receive(500));
}
}
}

View File

@ -620,6 +620,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
HashMap<String, Integer> dups = new HashMap<>();
List<Producer> producers = new ArrayList<>();
int receivedTally = 0;
try (Connection connection = connectionSupplier.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -641,6 +642,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
if (tm == null) {
break;
}
receivedTally++;
results.get(tm.getStringProperty("lastval")).add(tm.getText());
tm.acknowledge();
}
@ -669,6 +671,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
Assert.fail("Duplicate messages received " + sb);
}
Assert.assertEquals("Got all messages produced", MESSAGE_COUNT_PER_GROUP * GROUP_COUNT * PRODUCER_COUNT, receivedTally);
Wait.assertEquals((long) GROUP_COUNT, () -> server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 100, false);
}

View File

@ -87,9 +87,9 @@ public class LVQTest extends JMSTestBase {
assertNotNull(tm);
assertEquals("Message 2", tm.getText());
// It is important to query here
// as we shouldn't rely on addHead after the consumer is closed
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random");
// one message on the queue and one in delivery - the same message if it's an LVQ
// LVQ getMessageCount will discount!
Wait.assertEquals(1, serverQueue::getMessageCount);
}

View File

@ -157,7 +157,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase {
m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
Assert.assertEquals("m6", m.getBodyBuffer().readString());
m = consumer.receiveImmediate();
Assert.assertNull(m);
}

View File

@ -86,7 +86,7 @@ public class LVQTest extends ActiveMQTestBase {
ClientMessage m2 = createTextMessage(clientSession, "m2");
m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m2);
assertEquals(1, server.locateQueue(qName1).getMessageCount());
Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
clientSession.close();
server.stop();
@ -100,7 +100,8 @@ public class LVQTest extends ActiveMQTestBase {
ClientMessage m3 = createTextMessage(clientSession, "m3");
m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
producer.send(m3);
assertEquals(1, server.locateQueue(qName1).getMessageCount());
// wait b/c prune takes a deliver attempt which is async
Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount());
ClientConsumer consumer = clientSession.createConsumer(qName1);
clientSession.start();
@ -293,7 +294,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testMultipleMessagesInTx() throws Exception {
ClientProducer producer = clientSessionTxReceives.createProducer(address);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
SimpleString messageId1 = new SimpleString("SMID1");
SimpleString messageId2 = new SimpleString("SMID2");
ClientMessage m1 = createTextMessage(clientSession, "m1");
@ -308,6 +308,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m2);
producer.send(m3);
producer.send(m4);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
@ -392,6 +393,7 @@ public class LVQTest extends ActiveMQTestBase {
public void testMultipleMessagesInTxSend() throws Exception {
ClientProducer producer = clientSessionTxSends.createProducer(address);
ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
clientSessionTxSends.start();
SimpleString rh = new SimpleString("SMID1");
ClientMessage m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@ -412,11 +414,18 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m5);
producer.send(m6);
clientSessionTxSends.commit();
clientSessionTxSends.start();
for (int i = 1; i < 6; i++) {
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals("m" + i, m.getBodyBuffer().readString());
}
consumer.close();
consumer = clientSessionTxSends.createConsumer(qName1);
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m6");
Assert.assertEquals("m6", m.getBodyBuffer().readString());
}
@Test
@ -460,7 +469,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception {
ClientProducer producer = clientSessionTxSends.createProducer(address);
ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
SimpleString rh = new SimpleString("SMID1");
ClientMessage m1 = createTextMessage(clientSession, "m1");
m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
@ -488,6 +496,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(m6);
clientSessionTxSends.commit();
clientSessionTxSends.start();
ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
@ -705,7 +714,6 @@ public class LVQTest extends ActiveMQTestBase {
@Test
public void testLargeMessage() throws Exception {
ClientProducer producer = clientSessionTxReceives.createProducer(address);
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
SimpleString rh = new SimpleString("SMID1");
for (int i = 0; i < 50; i++) {
@ -715,6 +723,7 @@ public class LVQTest extends ActiveMQTestBase {
producer.send(message);
clientSession.commit();
}
ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
clientSessionTxReceives.start();
ClientMessage m = consumer.receive(1000);
Assert.assertNotNull(m);
@ -736,11 +745,11 @@ public class LVQTest extends ActiveMQTestBase {
Queue queue = server.locateQueue(qName1);
producer.send(m1);
long oldSize = queue.getPersistentSize();
Wait.assertEquals(123, () -> queue.getPersistentSize());
producer.send(m2);
// encoded size is a little larger than payload
Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024);
assertEquals(queue.getDeliveringSize(), 0);
assertNotEquals(queue.getPersistentSize(), oldSize);
assertTrue(queue.getPersistentSize() > 10 * 1024);
}
@Test

View File

@ -1032,6 +1032,41 @@ public class LinkedListTest extends ActiveMQTestBase {
}
@Test
public void testRemoveLastNudgeNoReplay() {
for (int i = 1; i < 3; i++) {
doTestRemoveLastNudgeNoReplay(i);
}
}
private void doTestRemoveLastNudgeNoReplay(int num) {
LinkedListIterator<Integer> iter = list.iterator();
for (int i = 0; i < num; i++) {
list.addTail(i);
}
// exhaust iterator
for (int i = 0; i < num; i++) {
assertTrue(iter.hasNext());
assertEquals(i, iter.next().intValue());
}
// remove last
LinkedListIterator<Integer> pruneIterator = list.iterator();
while (pruneIterator.hasNext()) {
int v = pruneIterator.next();
if (v == num - 1) {
pruneIterator.remove();
}
}
// ensure existing iterator does not reset or replay
assertFalse(iter.hasNext());
assertEquals(num - 1, list.size());
}
@Test
public void testGCNepotismPoll() {
final int count = 100;