diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 9f12b47246..4f3b8e9dc9 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -538,6 +538,10 @@ public class LinkedListImpl implements LinkedList { current = current.prev; current.iterCount++; + + if (last == node) { + last = current; + } } else { current = null; } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java index f1e50d3261..4d01441c9b 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java @@ -30,7 +30,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { private static final AtomicIntegerFieldUpdater SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size"); - protected LinkedListImpl[] levels; + protected final LinkedListImpl[] levels; private volatile int size; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index f01403fb90..50dc850c09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -17,16 +17,11 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -38,13 +33,13 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; /** @@ -59,25 +54,9 @@ import org.jboss.logging.Logger; public class LastValueQueue extends QueueImpl { private static final Logger logger = Logger.getLogger(LastValueQueue.class); - private final Map map = new ConcurrentHashMap<>(); + private final Map map = new HashMap<>(); private final SimpleString lastValueKey; - // only use this within synchronized methods or synchronized(this) blocks - protected final LinkedList nextDeliveries = new LinkedList<>(); - - - /* in certain cases we need to redeliver a message */ - @Override - protected MessageReference nextDelivery() { - return nextDeliveries.poll(); - } - - @Override - protected void repeatNextDelivery(MessageReference reference) { - // put the ref back onto the head of the list so that the next time poll() is called this ref is returned - nextDeliveries.addFirst(reference); - } - @Deprecated public LastValueQueue(final long persistenceID, @@ -162,116 +141,49 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addTail(final MessageReference ref, final boolean direct) { - if (scheduleIfPossible(ref)) { - return; - } - final SimpleString prop = ref.getLastValueProperty(); - - if (prop != null) { - HolderReference hr = map.get(prop); - - if (hr != null) { - if (isNonDestructive() && hr.isInDelivery()) { - // if the ref is already being delivered we'll do the replace in the postAcknowledge - hr.setReplacementRef(ref); - } else { - // We need to overwrite the old ref with the new one and ack the old one - replaceLVQMessage(ref, hr); - - if (isNonDestructive() && hr.isDelivered()) { - hr.resetDelivered(); - // since we're replacing a ref that was already delivered we want to trigger a delivery for this new replacement - nextDeliveries.add(hr); - deliverAsync(); - } - } - } else { - hr = new HolderReference(prop, ref); - - map.put(prop, hr); - - super.addTail(hr, isNonDestructive() ? false : direct); - } - } else { + if (!scheduleIfPossible(ref)) { + trackLastValue(ref); super.addTail(ref, isNonDestructive() ? false : direct); } } + @Override + public void addHead(final MessageReference ref, boolean scheduling) { + if (scheduling) { + // track last value when scheduled message is actually enqueued + trackLastValue(ref); + } else if (isNonDestructive() == false) { + // for released messages from a consumer or tx that have been destroyed, + // use as a last value in the absence of any newer value, it may be stale + trackLastValueIfAbsent(ref); + } + super.addHead(ref, scheduling); + } + + @Override + public void addSorted(final MessageReference ref, boolean scheduling) { + addHead(ref, scheduling); + } + + private void trackLastValue(MessageReference ref) { + final SimpleString lastValueProperty = ref.getLastValueProperty(); + if (lastValueProperty != null) { + map.put(lastValueProperty, ref); + } + } + + private void trackLastValueIfAbsent(MessageReference ref) { + final SimpleString lastValueProperty = ref.getLastValueProperty(); + if (lastValueProperty != null) { + map.putIfAbsent(lastValueProperty, ref); + } + } @Override public long getMessageCount() { - if (pageSubscription != null) { - // messageReferences will have depaged messages which we need to discount from the counter as they are - // counted on the pageSubscription as well - return (long) pendingMetrics.getMessageCount() + getScheduledCount() + pageSubscription.getMessageCount(); - } else { - return (long) pendingMetrics.getMessageCount() + getScheduledCount(); - } - } - - /** LVQ has to use regular addHead due to last value queues calculations */ - @Override - public void addSorted(MessageReference ref, boolean scheduling) { - this.addHead(ref, scheduling); - } - - /** LVQ has to use regular addHead due to last value queues calculations */ - @Override - public void addSorted(List refs, boolean scheduling) { - this.addHead(refs, scheduling); - } - - @Override - public synchronized void addHead(final MessageReference ref, boolean scheduling) { - // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay - if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { - return; - } - - SimpleString lastValueProp = ref.getLastValueProperty(); - - if (lastValueProp != null) { - HolderReference hr = map.get(lastValueProp); - - if (hr != null) { - if (scheduling) { - // We need to overwrite the old ref with the new one and ack the old one - - replaceLVQMessage(ref, hr); - } else { - // We keep the current ref and ack the one we are returning - - super.referenceHandled(ref); - - try { - super.acknowledge(ref); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); - } - } - } else { - hr = new HolderReference(lastValueProp, ref); - - map.put(lastValueProp, hr); - - super.addHead(hr, scheduling); - } - } else { - super.addHead(ref, scheduling); - } - } - - @Override - public void postAcknowledge(final MessageReference ref, AckReason reason) { - if (isNonDestructive()) { - if (ref instanceof HolderReference) { - HolderReference hr = (HolderReference) ref; - if (hr.getReplacementRef() != null) { - replaceLVQMessage(hr.getReplacementRef(), hr); - } - } - } - super.postAcknowledge(ref, reason); + // with LV - delivered messages can remain on the queue so the delivering count + // count must be discounted else we are accounting the same message more than once + return super.getMessageCount() - getDeliveringCount(); } @Override @@ -284,21 +196,36 @@ public class LastValueQueue extends QueueImpl { return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey); } - private void replaceLVQMessage(MessageReference ref, HolderReference hr) { - MessageReference oldRef = hr.getReference(); - - referenceHandled(oldRef); - super.refRemoved(oldRef); - - try { - oldRef.acknowledge(null, AckReason.REPLACED, null); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + @Override + protected void pruneLastValues() { + // called with synchronized(this) from super.deliver() + try (LinkedListIterator iter = messageReferences.iterator()) { + while (iter.hasNext()) { + MessageReference ref = iter.next(); + if (!currentLastValue(ref)) { + iter.remove(); + try { + referenceHandled(ref); + super.refRemoved(ref); + ref.acknowledge(null, AckReason.REPLACED, null); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + } + } + } } + } - hr.setReference(ref); - addRefSize(ref); - refAdded(ref); + private boolean currentLastValue(final MessageReference ref) { + boolean currentLastValue = false; + SimpleString lastValueProp = ref.getLastValueProperty(); + if (lastValueProp != null) { + MessageReference current = map.get(lastValueProp); + if (current == ref) { + currentLastValue = true; + } + } + return currentLastValue; } @Override @@ -327,16 +254,9 @@ public class LastValueQueue extends QueueImpl { } @Override - public synchronized void reload(final MessageReference ref) { - // repopulate LVQ map & reload proper HolderReferences - SimpleString lastValueProp = ref.getLastValueProperty(); - if (lastValueProp != null) { - HolderReference hr = new HolderReference(lastValueProp, ref); - map.put(lastValueProp, hr); - super.reload(hr); - } else { - super.reload(ref); - } + public synchronized void reload(final MessageReference newRef) { + trackLastValue(newRef); + super.reload(newRef); } private synchronized void removeIfCurrent(MessageReference ref) { @@ -361,9 +281,6 @@ public class LastValueQueue extends QueueImpl { }; } - - - @Override public boolean isLastValue() { return true; @@ -378,238 +295,6 @@ public class LastValueQueue extends QueueImpl { return Collections.unmodifiableSet(map.keySet()); } - private static class HolderReference implements MessageReference { - - private final SimpleString prop; - - private volatile boolean delivered = false; - - private volatile MessageReference ref; - - private volatile MessageReference replacementRef; - - private long consumerID; - - private boolean hasConsumerID = false; - - - public MessageReference getReplacementRef() { - return replacementRef; - } - - public void setReplacementRef(MessageReference replacementRef) { - this.replacementRef = replacementRef; - } - - public void resetDelivered() { - delivered = false; - } - - public boolean isDelivered() { - return delivered; - } - - HolderReference(final SimpleString prop, final MessageReference ref) { - this.prop = prop; - - this.ref = ref; - } - - @Override - public void onDelivery(Consumer callback) { - // HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables - } - - MessageReference getReference() { - return ref; - } - - @Override - public void handled() { - delivered = true; - // We need to remove the entry from the map just before it gets delivered - ref.handled(); - if (!ref.getQueue().isNonDestructive()) { - ((LastValueQueue) ref.getQueue()).removeIfCurrent(this); - } - } - - @Override - public void setInDelivery(boolean inDelivery) { - ref.setInDelivery(inDelivery); - } - - @Override - public boolean isInDelivery() { - return ref.isInDelivery(); - } - - @Override - public Object getProtocolData() { - return ref.getProtocolData(); - } - - @Override - public void setProtocolData(Object data) { - ref.setProtocolData(data); - } - - @Override - public void setAlreadyAcked() { - ref.setAlreadyAcked(); - } - - @Override - public boolean isAlreadyAcked() { - return ref.isAlreadyAcked(); - } - - void setReference(final MessageReference ref) { - this.ref = ref; - } - - @Override - public MessageReference copy(final Queue queue) { - return ref.copy(queue); - } - - @Override - public void decrementDeliveryCount() { - ref.decrementDeliveryCount(); - } - - @Override - public int getDeliveryCount() { - return ref.getDeliveryCount(); - } - - @Override - public Message getMessage() { - return ref.getMessage(); - } - - @Override - public long getMessageID() { - return ref.getMessageID(); - } - - @Override - public boolean isDurable() { - return getMessage().isDurable(); - } - - @Override - public SimpleString getLastValueProperty() { - return prop; - } - - @Override - public Queue getQueue() { - return ref.getQueue(); - } - - @Override - public long getScheduledDeliveryTime() { - return ref.getScheduledDeliveryTime(); - } - - @Override - public void incrementDeliveryCount() { - ref.incrementDeliveryCount(); - } - - @Override - public void setDeliveryCount(final int deliveryCount) { - ref.setDeliveryCount(deliveryCount); - } - - @Override - public void setScheduledDeliveryTime(final long scheduledDeliveryTime) { - ref.setScheduledDeliveryTime(scheduledDeliveryTime); - } - - @Override - public void acknowledge(Transaction tx) throws Exception { - ref.acknowledge(tx); - } - - @Override - public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception { - ref.acknowledge(tx, consumer); - } - - @Override - public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { - ref.acknowledge(tx, reason, consumer); - } - - @Override - public void setPersistedCount(int count) { - ref.setPersistedCount(count); - } - - @Override - public int getPersistedCount() { - return ref.getPersistedCount(); - } - - @Override - public boolean isPaged() { - return false; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#acknowledge(org.apache.activemq.artemis.core.server.MessageReference) - */ - @Override - public void acknowledge() throws Exception { - ref.getQueue().acknowledge(this); - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.server.MessageReference#getMessageMemoryEstimate() - */ - @Override - public int getMessageMemoryEstimate() { - return ref.getMessage().getMemoryEstimate(); - } - - @Override - public void emptyConsumerID() { - this.hasConsumerID = false; - } - - @Override - public void setConsumerId(long consumerID) { - this.hasConsumerID = true; - this.consumerID = consumerID; - } - - @Override - public boolean hasConsumerId() { - return hasConsumerID; - } - - @Override - public long getConsumerId() { - if (!this.hasConsumerID) { - throw new IllegalStateException("consumerID isn't specified: please check hasConsumerId first"); - } - return this.consumerID; - } - - @Override - public long getPersistentSize() throws ActiveMQException { - return ref.getPersistentSize(); - } - - @Override - public String toString() { - return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString(); - } - - } - @Override public int hashCode() { final int prime = 31; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java index 1afca46a6a..e3964a1d22 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java @@ -49,7 +49,7 @@ public class QueueConsumersImpl implements QueueConsume private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); - private UpdatableIterator iterator = new UpdatableIterator<>(consumers.resettableIterator()); + private final UpdatableIterator iterator = new UpdatableIterator<>(consumers.resettableIterator()); //-- START :: ResettableIterator Methods // As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 6987bcf4a3..8afa15f363 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -194,7 +194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final MpscUnboundedArrayQueue intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192); // This is where messages are stored - private final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator()); + protected final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator()); private NodeStore nodeStore; @@ -340,16 +340,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile long ringSize; - /* in certain cases we need to redeliver a message directly. - * it's useful for usecases last LastValueQueue */ - protected MessageReference nextDelivery() { - return null; - } - - protected void repeatNextDelivery(MessageReference reference) { - - } - @Override public boolean isSwept() { return swept; @@ -2895,7 +2885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - private synchronized void doInternalPoll() { + synchronized void doInternalPoll() { int added = 0; MessageReference ref; @@ -2974,27 +2964,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); consumers.reset(); while (true) { - if (handled == MAX_DELIVERIES_IN_LOOP) { - // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too - // long - + if (handled == MAX_DELIVERIES_IN_LOOP || System.nanoTime() - timeout > 0) { + // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long deliverAsync(true); - - return false; - } - - if (System.nanoTime() - timeout > 0) { - if (logger.isTraceEnabled()) { - logger.trace("delivery has been running for too long. Scheduling another delivery task now"); - } - - deliverAsync(true); - return false; } MessageReference ref; - Consumer handledconsumer = null; synchronized (this) { @@ -3024,6 +3000,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumers.hasNext()) { holder = consumers.next(); } else { + pruneLastValues(); break; } @@ -3034,15 +3011,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { holder.iter = messageReferences.iterator(); } - // LVQ support - ref = nextDelivery(); - boolean nextDelivery = false; - if (ref != null) { - nextDelivery = true; - } - - if (ref == null && holder.iter.hasNext()) { + if (holder.iter.hasNext()) { ref = holder.iter.next(); + } else { + ref = null; } if (ref == null) { @@ -3092,18 +3064,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { handled++; consumers.reset(); } else if (status == HandleStatus.BUSY) { - if (nextDelivery) { - repeatNextDelivery(ref); - } else { - try { - holder.iter.repeat(); - } catch (NoSuchElementException e) { - // this could happen if there was an exception on the queue handling - // and it returned BUSY because of that exception - // - // We will just log it as there's nothing else we can do now. - logger.warn(e.getMessage(), e); - } + try { + holder.iter.repeat(); + } catch (NoSuchElementException e) { + // this could happen if there was an exception on the queue handling + // and it returned BUSY because of that exception + // + // We will just log it as there's nothing else we can do now. + logger.warn(e.getMessage(), e); } noDelivery++; @@ -3130,6 +3098,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Round robin'd all if (noDelivery == this.consumers.size()) { + pruneLastValues(); + if (handledconsumer != null) { // this shouldn't really happen, // however I'm keeping this as an assertion case future developers ever change the logic here on this class @@ -3144,6 +3114,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { noDelivery = 0; } + } if (handledconsumer != null) { @@ -3154,6 +3125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } + // called with 'this' locked + protected void pruneLastValues() { + // interception point for LVQ + } + protected void removeMessageReference(ConsumerHolder holder, MessageReference ref) { holder.iter.remove(); refRemoved(ref); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java index 844e735bac..8b0e9732ff 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.junit.Test; public class JMSLVQTest extends JMSClientTestSupport { @@ -178,4 +179,35 @@ public class JMSLVQTest extends JMSClientTestSupport { p.send(queue1, message2); } } + + @Test + public void testNonDestructiveWithSelector() throws Exception { + final String MY_QUEUE = RandomUtil.randomString(); + final boolean NON_DESTRUCTIVE = true; + server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST).setNonDestructive(NON_DESTRUCTIVE).setLastValue(true)); + + ConnectionSupplier connectionSupplier = CoreConnection; + + Connection consumerConnection1 = connectionSupplier.createConnection(); + Session consumerSession1 = consumerConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue myQueue = consumerSession1.createQueue(MY_QUEUE); + MessageConsumer consumer1 = consumerSession1.createConsumer(myQueue); + consumerConnection1.start(); + + Connection consumerConnection2 = connectionSupplier.createConnection(); + Session consumerSession2 = consumerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + myQueue = consumerSession2.createQueue(MY_QUEUE); + MessageConsumer consumer2 = consumerSession2.createConsumer(myQueue, "foo='bar'"); + + Connection producerConnection = connectionSupplier.createConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = producerSession.createProducer(myQueue); + + for (int i = 0; i < 1000; i++) { + TextMessage m = producerSession.createTextMessage(); + m.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "abc"); + p.send(m); + assertNotNull(consumer1.receive(500)); + } + } } \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java index 609d7121a0..143cce2a23 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java @@ -620,6 +620,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { HashMap dups = new HashMap<>(); List producers = new ArrayList<>(); + int receivedTally = 0; try (Connection connection = connectionSupplier.createConnection()) { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -641,6 +642,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { if (tm == null) { break; } + receivedTally++; results.get(tm.getStringProperty("lastval")).add(tm.getText()); tm.acknowledge(); } @@ -669,6 +671,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport { Assert.fail("Duplicate messages received " + sb); } + Assert.assertEquals("Got all messages produced", MESSAGE_COUNT_PER_GROUP * GROUP_COUNT * PRODUCER_COUNT, receivedTally); Wait.assertEquals((long) GROUP_COUNT, () -> server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 100, false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java index f0f6900486..ba1d22d38a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/LVQTest.java @@ -87,9 +87,9 @@ public class LVQTest extends JMSTestBase { assertNotNull(tm); assertEquals("Message 2", tm.getText()); - // It is important to query here - // as we shouldn't rely on addHead after the consumer is closed org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue("random"); + // one message on the queue and one in delivery - the same message if it's an LVQ + // LVQ getMessageCount will discount! Wait.assertEquals(1, serverQueue::getMessageCount); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java index 46e27ad9a2..015f34fbb4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQRecoveryTest.java @@ -157,7 +157,7 @@ public class LVQRecoveryTest extends ActiveMQTestBase { m = consumer.receive(1000); Assert.assertNotNull(m); m.acknowledge(); - Assert.assertEquals(m.getBodyBuffer().readString(), "m6"); + Assert.assertEquals("m6", m.getBodyBuffer().readString()); m = consumer.receiveImmediate(); Assert.assertNull(m); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index b040f6bc7c..763f6ee3cd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -86,7 +86,7 @@ public class LVQTest extends ActiveMQTestBase { ClientMessage m2 = createTextMessage(clientSession, "m2"); m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); producer.send(m2); - assertEquals(1, server.locateQueue(qName1).getMessageCount()); + Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount()); clientSession.close(); server.stop(); @@ -100,7 +100,8 @@ public class LVQTest extends ActiveMQTestBase { ClientMessage m3 = createTextMessage(clientSession, "m3"); m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); producer.send(m3); - assertEquals(1, server.locateQueue(qName1).getMessageCount()); + // wait b/c prune takes a deliver attempt which is async + Wait.assertEquals(1, () -> server.locateQueue(qName1).getMessageCount()); ClientConsumer consumer = clientSession.createConsumer(qName1); clientSession.start(); @@ -293,7 +294,6 @@ public class LVQTest extends ActiveMQTestBase { @Test public void testMultipleMessagesInTx() throws Exception { ClientProducer producer = clientSessionTxReceives.createProducer(address); - ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); SimpleString messageId1 = new SimpleString("SMID1"); SimpleString messageId2 = new SimpleString("SMID2"); ClientMessage m1 = createTextMessage(clientSession, "m1"); @@ -308,6 +308,7 @@ public class LVQTest extends ActiveMQTestBase { producer.send(m2); producer.send(m3); producer.send(m4); + ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); clientSessionTxReceives.start(); ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); @@ -392,6 +393,7 @@ public class LVQTest extends ActiveMQTestBase { public void testMultipleMessagesInTxSend() throws Exception { ClientProducer producer = clientSessionTxSends.createProducer(address); ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1); + clientSessionTxSends.start(); SimpleString rh = new SimpleString("SMID1"); ClientMessage m1 = createTextMessage(clientSession, "m1"); m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); @@ -412,11 +414,18 @@ public class LVQTest extends ActiveMQTestBase { producer.send(m5); producer.send(m6); clientSessionTxSends.commit(); - clientSessionTxSends.start(); + for (int i = 1; i < 6; i++) { + ClientMessage m = consumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals("m" + i, m.getBodyBuffer().readString()); + } + consumer.close(); + consumer = clientSessionTxSends.createConsumer(qName1); ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); m.acknowledge(); - Assert.assertEquals(m.getBodyBuffer().readString(), "m6"); + Assert.assertEquals("m6", m.getBodyBuffer().readString()); } @Test @@ -460,7 +469,6 @@ public class LVQTest extends ActiveMQTestBase { @Test public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception { ClientProducer producer = clientSessionTxSends.createProducer(address); - ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1); SimpleString rh = new SimpleString("SMID1"); ClientMessage m1 = createTextMessage(clientSession, "m1"); m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); @@ -488,6 +496,7 @@ public class LVQTest extends ActiveMQTestBase { producer.send(m6); clientSessionTxSends.commit(); clientSessionTxSends.start(); + ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1); ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); m.acknowledge(); @@ -705,7 +714,6 @@ public class LVQTest extends ActiveMQTestBase { @Test public void testLargeMessage() throws Exception { ClientProducer producer = clientSessionTxReceives.createProducer(address); - ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); SimpleString rh = new SimpleString("SMID1"); for (int i = 0; i < 50; i++) { @@ -715,6 +723,7 @@ public class LVQTest extends ActiveMQTestBase { producer.send(message); clientSession.commit(); } + ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); clientSessionTxReceives.start(); ClientMessage m = consumer.receive(1000); Assert.assertNotNull(m); @@ -736,11 +745,11 @@ public class LVQTest extends ActiveMQTestBase { Queue queue = server.locateQueue(qName1); producer.send(m1); - long oldSize = queue.getPersistentSize(); + Wait.assertEquals(123, () -> queue.getPersistentSize()); producer.send(m2); + // encoded size is a little larger than payload + Wait.assertTrue(() -> queue.getPersistentSize() > 10 * 1024); assertEquals(queue.getDeliveringSize(), 0); - assertNotEquals(queue.getPersistentSize(), oldSize); - assertTrue(queue.getPersistentSize() > 10 * 1024); } @Test diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index 373619139c..47f3761f52 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -1032,6 +1032,41 @@ public class LinkedListTest extends ActiveMQTestBase { } + @Test + public void testRemoveLastNudgeNoReplay() { + for (int i = 1; i < 3; i++) { + doTestRemoveLastNudgeNoReplay(i); + } + } + + private void doTestRemoveLastNudgeNoReplay(int num) { + + LinkedListIterator iter = list.iterator(); + + for (int i = 0; i < num; i++) { + list.addTail(i); + } + + // exhaust iterator + for (int i = 0; i < num; i++) { + assertTrue(iter.hasNext()); + assertEquals(i, iter.next().intValue()); + } + + // remove last + LinkedListIterator pruneIterator = list.iterator(); + while (pruneIterator.hasNext()) { + int v = pruneIterator.next(); + if (v == num - 1) { + pruneIterator.remove(); + } + } + + // ensure existing iterator does not reset or replay + assertFalse(iter.hasNext()); + assertEquals(num - 1, list.size()); + } + @Test public void testGCNepotismPoll() { final int count = 100;