ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted

This commit is contained in:
Clebert Suconic 2022-11-25 11:41:03 -05:00 committed by clebertsuconic
parent 7d537882ca
commit af2b8e4b07
3 changed files with 71 additions and 17 deletions

View File

@ -232,17 +232,20 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
addAfter(localLastAdd.prev, e);
if (logger.isDebugEnabled()) {
logger.debug("localLastAdd Value = {}, we are adding {}", localLastAdd.val(), e);
}
int compareLastAdd = comparator.compare(localLastAdd.val(), e);
if (compareLastAdd > 0) {
if (scanRight(localLastAdd, e)) {
return;
}
}
if (localLastAdd.next != null && localLastAdd.next.val() != null) {
if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
addAfter(localLastAdd, e);
if (compareLastAdd < 0) {
if (scanLeft(localLastAdd, e)) {
return;
}
}
@ -264,6 +267,31 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
protected boolean scanRight(Node<E> position, E e) {
Node<E> fetching = position.next;
while (fetching != null) {
if (comparator.compare(fetching.val(), e) < 0) {
addAfter(position, e);
return true;
}
position = fetching;
fetching = fetching.next;
}
return false; // unlikely to happen, using this just to be safe
}
protected boolean scanLeft(Node<E> position, E e) {
Node<E> fetching = position.prev;
while (fetching != null) {
if (comparator.compare(fetching.val(), e) > 0) {
addAfter(fetching, e);
return true;
}
fetching = fetching.prev;
}
return false; // unlikely to happen, using this just to be safe
}
protected boolean addSortedScan(E e) {
logger.trace("addSortedScan {}...", e);
Node<E> fetching = head.next;

View File

@ -77,8 +77,12 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(destination);
MessageConsumer consumer2 = consumerSession.createConsumer(destination);
MessageConsumer consumer3 = consumerSession.createConsumer(destination);
for (;;) {
consumer.receive();
consumer2.receive();
consumer3.receive();
}
} catch (Exception e) {
}

View File

@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -103,23 +103,45 @@ public class LinkedListTest extends ActiveMQTestBase {
list.addSorted(1);
list.addSorted(3);
list.addSorted(4);
Assert.assertEquals(0, scans); // no scans made until now
list.addSorted(2); // this should need a scan
Assert.assertEquals(1, scans);
list.addSorted(2);
list.addSorted(10);
list.addSorted(20);
list.addSorted(19);
list.addSorted(7); // this will need a scan as it's totally random
Assert.assertEquals(2, scans);
list.addSorted(7);
list.addSorted(8);
Assert.assertEquals(2, scans);
Assert.assertEquals(0, scans); // no full scans should be done
Assert.assertEquals(1, (int)list.poll());
list.addSorted(9);
Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache
Assert.assertEquals(1, scans); // remove (poll) should clear the last added cache, a scan will be needed
printDebug();
validateOrder(null);
}
@Test
public void scanDirectionalTest() {
list.addSorted(9);
Assert.assertEquals(1, list.size());
list.addSorted(5);
Assert.assertEquals(2, list.size());
list.addSorted(6);
Assert.assertEquals(3, list.size());
list.addSorted(2);
Assert.assertEquals(4, list.size());
list.addSorted(7);
Assert.assertEquals(5, list.size());
list.addSorted(4);
Assert.assertEquals(6, list.size());
list.addSorted(8);
Assert.assertEquals(7, list.size());
list.addSorted(1);
Assert.assertEquals(8, list.size());
list.addSorted(10);
Assert.assertEquals(9, list.size());
list.addSorted(3);
Assert.assertEquals(10, list.size());
printDebug();
validateOrder(null);
}
private void printDebug() {