From 61eb379741ab3faebdffa4d7347507e8bd11d890 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 20 Aug 2019 17:05:33 -0400 Subject: [PATCH] ARTEMIS-2458 Fix AMQP Transaction Rollback Ordering by using a sorted add --- .../utils/collections/LinkedListImpl.java | 62 ++++++++ .../utils/collections/PriorityLinkedList.java | 2 + .../collections/PriorityLinkedListImpl.java | 17 ++- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../protocol/mqtt/MQTTPublishManager.java | 2 +- .../activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/ServerConsumer.java | 2 +- .../core/server/cluster/impl/BridgeImpl.java | 2 +- .../server/impl/MessageReferenceImpl.java | 23 +++ .../artemis/core/server/impl/QueueImpl.java | 27 +++- .../core/server/impl/ServerConsumerImpl.java | 4 +- .../core/server/impl/ServerSessionImpl.java | 2 +- .../impl/ScheduledDeliveryHandlerTest.java | 2 +- .../integration/cli/DummyServerConsumer.java | 2 +- .../integration/client/JMSOrderTest.java | 132 ++++++++++++++++++ .../unit/core/postoffice/impl/FakeQueue.java | 2 +- .../tests/unit/util/LinkedListTest.java | 73 +++++++++- 17 files changed, 343 insertions(+), 16 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index cb202583b9..7426947689 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.utils.collections; import java.lang.reflect.Array; +import java.util.Comparator; import java.util.NoSuchElementException; import java.util.Objects; @@ -43,8 +44,15 @@ public class LinkedListImpl implements LinkedList { private int nextIndex; + private final Comparator comparator; + public LinkedListImpl() { + this(null); + } + + public LinkedListImpl(Comparator comparator) { iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE); + this.comparator = comparator; } @Override @@ -84,6 +92,60 @@ public class LinkedListImpl implements LinkedList { } } + public void addSorted(E e) { + if (comparator == null) { + throw new NullPointerException("comparator=null"); + } + if (size == 0) { + addHead(e); + } else { + if (comparator.compare(head.next.val(), e) < 0) { + addHead(e); + return; + } + + // in our usage, most of the times we will just add to the end + // as the QueueImpl cancellations in AMQP will return the buffer back to the queue, in the order they were consumed. + // There is an exception to that case, when there are more messages on the queue. + // This would be an optimization for our usage. + // avoiding scanning the entire List just to add at the end, so we compare the end first. + if (comparator.compare(tail.val(), e) >= 0) { + addTail(e); + return; + } + + Node fetching = head.next; + while (fetching.next != null) { + int compareNext = comparator.compare(fetching.next.val(), e); + if (compareNext <= 0) { + addAfter(fetching, e); + return; + } + fetching = fetching.next; + } + + // this shouldn't happen as the tail was compared before iterating + // the only possibilities for this to happen are: + // - there is a bug on the comparator + // - This method is buggy + // - The list wasn't properly synchronized as this list does't support concurrent access + // + // Also I'm not bothering about creating a Logger ID for this, because the only reason for this code to exist + // is because my OCD level is not letting this out. + throw new IllegalStateException("Cannot find a suitable place for your element, There's a mismatch in the comparator or there was concurrent adccess on the queue"); + } + } + + private void addAfter(Node node, E e) { + Node newNode = Node.with(e); + Node nextNode = node.next; + node.next = newNode; + newNode.prev = node; + newNode.next = nextNode; + nextNode.prev = newNode; + size++; + } + @Override public E poll() { Node ret = head.next; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java index 19e58c27b8..9437f55550 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java @@ -27,6 +27,8 @@ public interface PriorityLinkedList { void addTail(T t, int priority); + void addSorted(T t, int priority); + T poll(); void clear(); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java index 00cf0464e9..bde946151d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.utils.collections; import java.lang.reflect.Array; +import java.util.Comparator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -40,10 +41,15 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { private int lastPriority = -1; public PriorityLinkedListImpl(final int priorities) { + this(priorities, null); + } + + + public PriorityLinkedListImpl(final int priorities, Comparator comparator) { levels = (LinkedListImpl[]) Array.newInstance(LinkedListImpl.class, priorities); for (int i = 0; i < priorities; i++) { - levels[i] = new LinkedListImpl<>(); + levels[i] = new LinkedListImpl<>(comparator); } } @@ -80,6 +86,15 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { exclusiveIncrementSize(1); } + @Override + public void addSorted(T t, int priority) { + checkHighest(priority); + + levels[priority].addSorted(t); + + exclusiveIncrementSize(1); + } + @Override public T poll() { T t = null; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f850cc185e..dc249ff00a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -405,7 +405,7 @@ public class AMQPSessionCallback implements SessionCallback { public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { OperationContext oldContext = recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); + ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true); ((ServerConsumer) brokerConsumer).getQueue().forceDelivery(); } finally { resetContext(oldContext); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index abcfe3f3d5..b473a87c1e 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -131,7 +131,7 @@ public class MQTTPublishManager { sendServerMessage(mqttid, message, deliveryCount, qos); } else { // Client must have disconnected and it's Subscription QoS cleared - consumer.individualCancel(message.getMessageID(), false); + consumer.individualCancel(message.getMessageID(), false, true); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 2d7f373fde..bab38d6e0c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -172,7 +172,8 @@ public interface Queue extends Bindable,CriticalComponent { void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck); - void cancel(MessageReference reference, long timeBase) throws Exception; + /** @param sorted it should use the messageID as a reference to where to add it in the queue */ + void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception; void deliverAsync(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java index 4d3591954b..f1f8b1e4c9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java @@ -98,7 +98,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo { void reject(long messageID) throws Exception; - void individualCancel(long messageID, boolean failed) throws Exception; + void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception; void forceDelivery(long sequence); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 7d5bbe8ac7..1024118543 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -354,7 +354,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled refqueue = ref.getQueue(); try { - refqueue.cancel(ref, timeBase); + refqueue.cancel(ref, timeBase, false); } catch (Exception e) { // There isn't much we can do besides log an error ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 97bb7f6868..459b703d20 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.util.Comparator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Consumer; @@ -33,6 +34,28 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl; */ public class MessageReferenceImpl extends LinkedListImpl.Node implements MessageReference, Runnable { + private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID(); + + public static Comparator getIDComparator() { + return idComparator; + } + + private static class MessageReferenceComparatorByID implements Comparator { + + @Override + public int compare(MessageReference o1, MessageReference o2) { + long value = o2.getMessage().getMessageID() - o1.getMessage().getMessageID(); + if (value > 0) { + return 1; + } else if (value < 0) { + return -1; + } else { + return 0; + } + } + } + + private static final AtomicIntegerFieldUpdater DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater .newUpdater(MessageReferenceImpl.class, "deliveryCount"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 1c133ef7d6..e19d9ef0ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final MpscUnboundedArrayQueue intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192); // This is where messages are stored - private final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES); + private final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator()); // The quantity of pagedReferences on messageReferences priority list private final AtomicInteger pagedReferences = new AtomicInteger(0); @@ -1631,11 +1631,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception { + public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception { Pair redeliveryResult = checkRedelivery(reference, timeBase, false); if (redeliveryResult.getA()) { if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) { - internalAddHead(reference); + if (sorted) { + internalAddSorted(reference); + } else { + internalAddHead(reference); + } } resetAllIterators(); @@ -2469,6 +2473,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { messageReferences.addHead(ref, priority); } + /** + * The caller of this method requires synchronized on the queue. + * I'm not going to add synchronized to this method just for a precaution, + * as I'm not 100% sure this won't cause any extra runtime. + * + * @param ref + */ + private void internalAddSorted(final MessageReference ref) { + queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); + pendingMetrics.incrementMetrics(ref); + refAdded(ref); + + int priority = getPriority(ref); + + messageReferences.addSorted(ref, priority); + } + private int getPriority(MessageReference ref) { try { return ref.getMessage().getPriority(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 37dd74a87b..54cf9a28ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -992,7 +992,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public synchronized void individualCancel(final long messageID, boolean failed) throws Exception { + public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception { if (browseOnly) { return; } @@ -1007,7 +1007,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.decrementDeliveryCount(); } - ref.getQueue().cancel(ref, System.currentTimeMillis()); + ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 90443026cc..296382b30f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1146,7 +1146,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { ServerConsumer consumer = locateConsumer(consumerID); if (consumer != null) { - consumer.individualCancel(messageID, failed); + consumer.individualCancel(messageID, failed, false); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index ad4cbb3169..ea15264b3c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1108,7 +1108,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public void cancel(MessageReference reference, long timeBase) throws Exception { + public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java index 58bf2d3d5a..ee7bdbbb99 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java @@ -150,7 +150,7 @@ public class DummyServerConsumer implements ServerConsumer { } @Override - public void individualCancel(long messageID, boolean failed) throws Exception { + public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception { } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java new file mode 100644 index 0000000000..dcc5a40c5d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory; + +@RunWith(value = Parameterized.class) +public class JMSOrderTest extends JMSTestBase { + + String protocol; + + ConnectionFactory protocolCF; + + public JMSOrderTest(String protocol) { + this.protocol = protocol; + } + + @Before + public void setupCF() { + protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616"); + } + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}}); + } + + protected void sendToAmqQueue(int count) throws Exception { + Connection activemqConnection = protocolCF.createConnection(); + Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue amqTestQueue = amqSession.createQueue(name.getMethodName()); + sendMessages(activemqConnection, amqTestQueue, count); + activemqConnection.close(); + } + + public void sendMessages(Connection connection, Destination destination, int count) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(destination); + + for (int i = 1; i <= count; i++) { + TextMessage message = session.createTextMessage(); + message.setText("TextMessage: " + i); + message.setIntProperty("nr", i); + p.send(message); + } + + session.close(); + + } + + @Test(timeout = 60000) + public void testReceiveSomeThenRollback() throws Exception { + Connection connection = protocolCF.createConnection(); + try { + connection.start(); + + int totalCount = 5; + int consumeBeforeRollback = 2; + + sendToAmqQueue(totalCount); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeBeforeRollback; i++) { + Message message = consumer.receive(3000); + assertNotNull(message); + assertEquals("Unexpected message number", i, message.getIntProperty("nr")); + } + + session.rollback(); + + // Consume again.. the previously consumed messages should get delivered + // again after the rollback and then the remainder should follow + List messageNumbers = new ArrayList<>(); + for (int i = 1; i <= totalCount; i++) { + Message message = consumer.receive(3000); + assertNotNull("Failed to receive message: " + i, message); + int msgNum = message.getIntProperty("nr"); + System.out.println("Received " + msgNum); + messageNumbers.add(msgNum); + } + + session.commit(); + + assertEquals("Unexpected size of list", totalCount, messageNumbers.size()); + for (int i = 0; i < messageNumbers.size(); i++) { + assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i)); + } + } finally { + connection.close(); + } + + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 290aa15e62..612d621476 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -351,7 +351,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public void cancel(final MessageReference reference, final long timeBase) throws Exception { + public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception { // no-op } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index e7c3eba29e..5d8f21b898 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -17,14 +17,18 @@ package org.apache.activemq.artemis.tests.unit.util; import java.lang.ref.WeakReference; +import java.util.Comparator; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -37,7 +41,74 @@ public class LinkedListTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - list = new LinkedListImpl<>(); + list = new LinkedListImpl<>(integerComparator); + } + + Comparator integerComparator = new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + if (o1.intValue() == o2.intValue()) { + return 0; + } + if (o2.intValue() > o1.intValue()) { + return 1; + } else { + return -1; + } + } + }; + + @Test + public void addSorted() { + + list.addSorted(1); + list.addSorted(3); + list.addSorted(2); + list.addSorted(0); + validateOrder(null); + Assert.assertEquals(4, list.size()); + + } + + + @Test + public void randomSorted() { + + HashSet values = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + + int value = RandomUtil.randomInt(); + if (!values.contains(value)) { + values.add(value); + list.addSorted(value); + } + } + + Assert.assertEquals(values.size(), list.size()); + + validateOrder(values); + + Assert.assertEquals(0, values.size()); + + } + + private void validateOrder(HashSet values) { + Integer previous = null; + LinkedListIterator integerIterator = list.iterator(); + while (integerIterator.hasNext()) { + + Integer value = integerIterator.next(); + if (previous != null) { + Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0); + Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue()); + } + + if (values != null) { + values.remove(value); + } + previous = value; + } + integerIterator.close(); } @Test