diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 8d1c98eada..5e36d333d2 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -232,17 +232,20 @@ public class LinkedListImpl implements LinkedList { } if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan. - if (localLastAdd.prev != null && localLastAdd.prev.val() != null) { - if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) { - logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val()); - addAfter(localLastAdd.prev, e); + if (logger.isDebugEnabled()) { + logger.debug("localLastAdd Value = {}, we are adding {}", localLastAdd.val(), e); + } + + int compareLastAdd = comparator.compare(localLastAdd.val(), e); + + if (compareLastAdd > 0) { + if (scanRight(localLastAdd, e)) { return; } } - if (localLastAdd.next != null && localLastAdd.next.val() != null) { - if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) { - logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val()); - addAfter(localLastAdd, e); + + if (compareLastAdd < 0) { + if (scanLeft(localLastAdd, e)) { return; } } @@ -264,6 +267,31 @@ public class LinkedListImpl implements LinkedList { } } + protected boolean scanRight(Node position, E e) { + Node fetching = position.next; + while (fetching != null) { + if (comparator.compare(fetching.val(), e) < 0) { + addAfter(position, e); + return true; + } + position = fetching; + fetching = fetching.next; + } + return false; // unlikely to happen, using this just to be safe + } + + protected boolean scanLeft(Node position, E e) { + Node fetching = position.prev; + while (fetching != null) { + if (comparator.compare(fetching.val(), e) > 0) { + addAfter(fetching, e); + return true; + } + fetching = fetching.prev; + } + return false; // unlikely to happen, using this just to be safe + } + protected boolean addSortedScan(E e) { logger.trace("addSortedScan {}...", e); Node fetching = head.next; diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java index e4641f9696..82c22f4e44 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java @@ -77,8 +77,12 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase { Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); Queue destination = consumerSession.createQueue(queueName); MessageConsumer consumer = consumerSession.createConsumer(destination); + MessageConsumer consumer2 = consumerSession.createConsumer(destination); + MessageConsumer consumer3 = consumerSession.createConsumer(destination); for (;;) { consumer.receive(); + consumer2.receive(); + consumer3.receive(); } } catch (Exception e) { } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index b39a9e75dc..eacd29e469 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStore; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -103,23 +103,45 @@ public class LinkedListTest extends ActiveMQTestBase { list.addSorted(1); list.addSorted(3); list.addSorted(4); - Assert.assertEquals(0, scans); // no scans made until now - list.addSorted(2); // this should need a scan - Assert.assertEquals(1, scans); + list.addSorted(2); list.addSorted(10); list.addSorted(20); list.addSorted(19); - list.addSorted(7); // this will need a scan as it's totally random - Assert.assertEquals(2, scans); + list.addSorted(7); list.addSorted(8); - Assert.assertEquals(2, scans); + Assert.assertEquals(0, scans); // no full scans should be done Assert.assertEquals(1, (int)list.poll()); list.addSorted(9); - Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache + Assert.assertEquals(1, scans); // remove (poll) should clear the last added cache, a scan will be needed + printDebug(); - validateOrder(null); + } + @Test + public void scanDirectionalTest() { + list.addSorted(9); + Assert.assertEquals(1, list.size()); + list.addSorted(5); + Assert.assertEquals(2, list.size()); + list.addSorted(6); + Assert.assertEquals(3, list.size()); + list.addSorted(2); + Assert.assertEquals(4, list.size()); + list.addSorted(7); + Assert.assertEquals(5, list.size()); + list.addSorted(4); + Assert.assertEquals(6, list.size()); + list.addSorted(8); + Assert.assertEquals(7, list.size()); + list.addSorted(1); + Assert.assertEquals(8, list.size()); + list.addSorted(10); + Assert.assertEquals(9, list.size()); + list.addSorted(3); + Assert.assertEquals(10, list.size()); + printDebug(); + validateOrder(null); } private void printDebug() {